Skip to content

Commit

Permalink
Tutorial: Fix file path, typos, reformat Python (#133)
Browse files Browse the repository at this point in the history
  • Loading branch information
sglavoie committed Apr 22, 2020
1 parent 1fa2c56 commit 9171565
Show file tree
Hide file tree
Showing 2 changed files with 181 additions and 152 deletions.
147 changes: 79 additions & 68 deletions TUTORIAL.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@
" row['name'] = row['name'].title()\n",
"\n",
"Flow(\n",
" load('data/beatles.csv'),\n",
" load('beatles.csv'),\n",
" titleName\n",
").results()[0]"
]
Expand All @@ -163,10 +163,13 @@
"from xml.etree import ElementTree\n",
"from urllib.request import urlopen\n",
"\n",
"# Get from Wikipedia the population count for each country\n",
"def country_population():\n",
" \"\"\"Get from Wikipedia the population count for each country.\"\"\"\n",
" # Read the Wikipedia page and parse it using etree\n",
" page = urlopen('https://en.wikipedia.org/wiki/List_of_countries_and_dependencies_by_population').read()\n",
" page = urlopen(\n",
" \"https://en.wikipedia.org/wiki/List_of_countries_and_dependencies_by_population\"\n",
" ).read()\n",
"\n",
" tree = ElementTree.fromstring(page)\n",
" # Iterate on all tables, rows and cells\n",
" for table in tree.findall('.//table'):\n",
Expand Down Expand Up @@ -221,14 +224,14 @@
"<table>\n",
"<thead>\n",
"<tr><th># </th><th>name\n",
"(string) </th><th style=\"text-align: right;\"> population\n",
"(string) </th><th style=\"text-align: right;\"> population\n",
"(number)</th></tr>\n",
"</thead>\n",
"<tbody>\n",
"<tr><td>1 </td><td>China </td><td style=\"text-align: right;\">1394.72 </td></tr>\n",
"<tr><td>2 </td><td>India </td><td style=\"text-align: right;\">1338.48 </td></tr>\n",
"<tr><td>...</td><td> </td><td style=\"text-align: right;\"> </td></tr>\n",
"<tr><td>240</td><td>Pitcairn Islands</td><td style=\"text-align: right;\"> 5e-05</td></tr>\n",
"<tr><td>1 </td><td>Demographics of China </td><td style=\"text-align: right;\">1402.32 </td></tr>\n",
"<tr><td>2 </td><td>Demographics of India </td><td style=\"text-align: right;\">1361.34 </td></tr>\n",
"<tr><td>...</td><td> </td><td style=\"text-align: right;\"> </td></tr>\n",
"<tr><td>241</td><td>Demographics of Pitcairn Islands</td><td style=\"text-align: right;\"> 5e-05</td></tr>\n",
"</tbody>\n",
"</table>"
],
Expand Down Expand Up @@ -269,7 +272,7 @@
"\n",
"Apart from data-types, it's also possible to set other constraints to the data. If the data fails validation (or does not fit the assigned data-type) an exception will be thrown - making this method highly effective for validating data and ensuring data quality. \n",
"\n",
"What about large data files? In the above examples, the results are loaded into memory, which is not always preferrable or acceptable. In many cases, we'd like to store the results directly onto a hard drive - without having the machine's RAM limit in any way the amount of data we can process.\n",
"What about large data files? In the above examples, the results are loaded into memory, which is not always preferable or acceptable. In many cases, we'd like to store the results directly onto a hard drive - without having the machine's RAM limit in any way the amount of data we can process.\n",
"\n",
"We do it by using _dump_ processors:"
]
Expand All @@ -282,9 +285,9 @@
{
"data": {
"text/plain": [
"{'count_of_rows': 240,\n",
" 'bytes': 5277,\n",
" 'hash': 'b293685b58a33bd7b02cc275d19d3a95',\n",
"{'count_of_rows': 241,\n",
" 'bytes': 9568,\n",
" 'hash': '91728bdcca916272c69747ea89eea3d6',\n",
" 'dataset_name': None}"
]
},
Expand All @@ -307,7 +310,7 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"Running this code will create a local directory called `county_population`, containing two files:"
"Running this code will create a local directory called `country_population`, containing two files:"
]
},
{
Expand All @@ -319,8 +322,8 @@
"name": "stdout",
"output_type": "stream",
"text": [
"country_population/res_1.csv\n",
"country_population/datapackage.json\n"
"country_population/datapackage.json\n",
"country_population/res_1.csv\n"
]
}
],
Expand All @@ -347,7 +350,7 @@
"name": "stdout",
"output_type": "stream",
"text": [
"{'name': 'China', 'population': Decimal('1394720000')}\n"
"{'name': 'Demographics of China', 'population': Decimal('1402320560')}\n"
]
}
],
Expand Down Expand Up @@ -415,8 +418,8 @@
"data": {
"text/plain": [
"{'count_of_rows': 6,\n",
" 'bytes': 744,\n",
" 'hash': '1f0df7ed401ccff9f6c1674e98c62467',\n",
" 'bytes': 1103,\n",
" 'hash': '81ce126294657c5f0a722803e86eb6f8',\n",
" 'dataset_name': None}"
]
},
Expand All @@ -432,15 +435,17 @@
"def all_triplets():\n",
" for a in range(1, 20):\n",
" for b in range(a, 20):\n",
" for c in range(b+1, 21):\n",
" for c in range(b + 1, 21):\n",
" yield dict(a=a, b=b, c=c)\n",
"\n",
"\n",
"# Yield row only if a^2 + b^2 == c^1\n",
"def filter_pythagorean_triplets(rows):\n",
" for row in rows:\n",
" if row['a']**2 + row['b']**2 == row['c']**2:\n",
" if row[\"a\"] ** 2 + row[\"b\"] ** 2 == row[\"c\"] ** 2:\n",
" yield row\n",
"\n",
"\n",
"Flow(\n",
" all_triplets(),\n",
" set_type('a', type='integer'),\n",
Expand All @@ -458,13 +463,13 @@
"source": [
"The `filter_pythagorean_triplets` function takes an iterator of rows, and yields only the ones that pass its condition. \n",
"\n",
"The flow framework knows whether a function is meant to hande a single row or a row iterator based on its parameters: \n",
"The flow framework knows whether a function is meant to handle a single row or a row iterator based on its parameters: \n",
"\n",
"- if it accepts a single `row` parameter, then it's a row processor.\n",
"- if it accepts a single `rows` parameter, then it's a rows processor.\n",
"- if it accepts a single `package` parameter, then it's a package processor.\n",
"- If it accepts a single `row` parameter, then it's a row processor.\n",
"- If it accepts a single `rows` parameter, then it's a rows processor.\n",
"- If it accepts a single `package` parameter, then it's a package processor.\n",
"\n",
"Let's see a few examples of what we can do with a package processors.\n",
"Let's see a few examples of what we can do with a package processor.\n",
"\n",
"First, let's add a field to the data:"
]
Expand Down Expand Up @@ -515,8 +520,8 @@
"data": {
"text/plain": [
"{'count_of_rows': 4,\n",
" 'bytes': 896,\n",
" 'hash': 'ae319bad0ad1e345a2a86d8dc9de8375',\n",
" 'bytes': 1175,\n",
" 'hash': '27b1a87d3fee5fc5d8dd667cec129107',\n",
" 'dataset_name': None}"
]
},
Expand All @@ -531,15 +536,15 @@
"\n",
"def add_is_guitarist_column_to_schema(package):\n",
" # Add a new field to the first resource\n",
" package.pkg.descriptor['resources'][0]['schema']['fields'].append(dict(\n",
" name='is_guitarist',\n",
" type='boolean'\n",
" ))\n",
" package.pkg.descriptor[\"resources\"][0][\"schema\"][\"fields\"].append(\n",
" dict(name=\"is_guitarist\", type=\"boolean\")\n",
" )\n",
" # Must yield the modified datapackage\n",
" yield package.pkg\n",
" # And its resources\n",
" yield from package\n",
"\n",
"\n",
"def add_is_guitarist_column(row):\n",
" row['is_guitarist'] = row['instrument'] == 'guitar'\n",
"\n",
Expand All @@ -557,7 +562,7 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"In this example we create two steps - one for adding the new field (`is_guitarist`) to the schema and another step to modify the actual data.\n",
"In this example we create two steps: one for adding the new field (`is_guitarist`) to the schema and another step to modify the actual data.\n",
"\n",
"We can combine the two into one step:"
]
Expand Down Expand Up @@ -608,8 +613,8 @@
"data": {
"text/plain": [
"{'count_of_rows': 4,\n",
" 'bytes': 896,\n",
" 'hash': 'ae319bad0ad1e345a2a86d8dc9de8375',\n",
" 'bytes': 1175,\n",
" 'hash': '27b1a87d3fee5fc5d8dd667cec129107',\n",
" 'dataset_name': None}"
]
},
Expand All @@ -625,10 +630,9 @@
"def add_is_guitarist_column(package):\n",
"\n",
" # Add a new field to the first resource\n",
" package.pkg.descriptor['resources'][0]['schema']['fields'].append(dict(\n",
" name='is_guitarist',\n",
" type='boolean'\n",
" ))\n",
" package.pkg.descriptor[\"resources\"][0][\"schema\"][\"fields\"].append(\n",
" dict(name=\"is_guitarist\", type=\"boolean\")\n",
" )\n",
" # Must yield the modified datapackage\n",
" yield package.pkg\n",
"\n",
Expand Down Expand Up @@ -659,7 +663,7 @@
"source": [
"The contract for the `package` processing function is simple:\n",
"\n",
"First modify `package.pkg` (which is a `Package` instance) and yield it.\n",
"First, modify `package.pkg` (which is a `Package` instance) and yield it.\n",
"\n",
"Then, yield any resources that should exist on the output, with or without modifications.\n",
"\n",
Expand Down Expand Up @@ -707,7 +711,7 @@
"<tr><td>1 </td><td>1931/1932</td><td style=\"text-align: right;\"> 5</td><td>Actress </td><td style=\"text-align: right;\">1</td><td>Helen Hayes </td><td>The Sin of Madelon Claudet</td></tr>\n",
"<tr><td>2 </td><td>1932/1933</td><td style=\"text-align: right;\"> 6</td><td>Actress </td><td style=\"text-align: right;\">1</td><td>Katharine Hepburn</td><td>Morning Glory </td></tr>\n",
"<tr><td>...</td><td> </td><td style=\"text-align: right;\"> </td><td> </td><td style=\"text-align: right;\"> </td><td> </td><td> </td></tr>\n",
"<tr><td>98 </td><td>2015 </td><td style=\"text-align: right;\">88</td><td>Honorary Award</td><td style=\"text-align: right;\">1</td><td>Gena Rowlands </td><td> </td></tr>\n",
"<tr><td>269</td><td>2015 </td><td style=\"text-align: right;\">88</td><td>Honorary Award</td><td style=\"text-align: right;\">1</td><td>Gena Rowlands </td><td>None </td></tr>\n",
"</tbody>\n",
"</table>"
],
Expand All @@ -728,9 +732,9 @@
{
"data": {
"text/plain": [
"{'count_of_rows': 98,\n",
" 'bytes': 6921,\n",
" 'hash': '902088336aa4aa4fbab33446a241b5de',\n",
"{'count_of_rows': 269,\n",
" 'bytes': 19027,\n",
" 'hash': '9aeffeec1baa861cac7316445b796b62',\n",
" 'dataset_name': None}"
]
},
Expand All @@ -742,6 +746,7 @@
"source": [
"from dataflows import Flow, load, dump_to_path, checkpoint, printer\n",
"\n",
"\n",
"def find_double_winners(package):\n",
"\n",
" # Remove the emmies resource - \n",
Expand All @@ -757,39 +762,44 @@
" # read all its data and create a set of winner names\n",
" emmy = next(resources)\n",
" emmy_winners = set(\n",
" map(lambda x: x['nominee'], \n",
" filter(lambda x: x['winner'],\n",
" emmy))\n",
" map(lambda x: x[\"nominee\"], filter(lambda x: x[\"winner\"], emmy))\n",
" )\n",
"\n",
" # Oscars are next - \n",
" # filter rows based on the emmy winner set\n",
" academy = next(resources)\n",
" yield filter(lambda row: (row['Winner'] and \n",
" row['Name'] in emmy_winners),\n",
" academy)\n",
" yield filter(\n",
" lambda row: (row[\"Winner\"] and row[\"Name\"] in emmy_winners), academy\n",
" )\n",
" \n",
" # important to deque generators to ensure finalization steps of previous processors are executed\n",
" yield from resources\n",
"\n",
"Flow(\n",
" # Emmy award nominees and winners\n",
" load('https://raw.githubusercontent.com/datahq/dataflows/master/data/emmy.csv', name='emmies'),\n",
" load(\n",
" \"https://raw.githubusercontent.com/datahq/dataflows/master/data/emmy.csv\",\n",
" name=\"emmies\",\n",
" ),\n",
" # Academy award nominees and winners\n",
" load('https://raw.githubusercontent.com/datahq/dataflows/master/data/academy.csv', encoding='utf8', name='oscars'),\n",
" load(\n",
" \"https://raw.githubusercontent.com/datahq/dataflows/master/data/academy.csv\",\n",
" encoding=\"utf8\",\n",
" name=\"oscars\",\n",
" ),\n",
" # save a checkpoint so we won't have to re-download the source data each time\n",
" checkpoint('emmy-academy-nominees-winners'),\n",
" checkpoint(\"emmy-academy-nominees-winners\"),\n",
" find_double_winners,\n",
" dump_to_path('double_winners'),\n",
" printer(num_rows=1, tablefmt='html')\n",
" dump_to_path(\"double_winners\"),\n",
" printer(num_rows=1, tablefmt=\"html\"),\n",
").process()[1]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Previous flow was a bit complicated, but luckily we have the `join`, `concatenate` and `filter_rows` processors which make such combinations a snap"
"The previous flow was a bit complicated, but luckily we have the `join`, `concatenate` and `filter_rows` processors which make such combinations a snap:"
]
},
{
Expand Down Expand Up @@ -859,7 +869,16 @@
}
],
"source": [
"from dataflows import Flow, load, dump_to_path, join, concatenate, filter_rows, printer, checkpoint\n",
"from dataflows import (\n",
" Flow,\n",
" load,\n",
" dump_to_path,\n",
" join,\n",
" concatenate,\n",
" filter_rows,\n",
" printer,\n",
" checkpoint,\n",
")\n",
"\n",
"Flow(\n",
" # load from the checkpoint we saved in the previous flow\n",
Expand Down Expand Up @@ -939,17 +958,16 @@
" def star_letter(row):\n",
" for k in row:\n",
" s = list(row[k])\n",
" s[star_letter_idx] = '*'\n",
" row[k] = ''.join(s)\n",
" s[star_letter_idx] = \"*\"\n",
" row[k] = \"\".join(s)\n",
" \n",
" def print_foo(row):\n",
" print(' '.join(list(row['foo'])))\n",
" print(\" \".join(list(row[\"foo\"])))\n",
"\n",
" return Flow(upper, star_letter, print_foo)\n",
"\n",
"Flow(\n",
" [{'foo': 'bar'},\n",
" {'foo': 'bax'}],\n",
" [{\"foo\": \"bar\"}, {\"foo\": \"bax\"}],\n",
" text_processing_flow(0),\n",
" text_processing_flow(1),\n",
" text_processing_flow(2),\n",
Expand All @@ -965,13 +983,6 @@
"* [DataFlows Processors Reference](https://github.com/datahq/dataflows/blob/master/PROCESSORS.md)\n",
"* [Datapackage Pipelines Tutorial](https://github.com/frictionlessdata/datapackage-pipelines/blob/master/TUTORIAL.ipynb) - Use the flows as building blocks for more complex pipelines processing systems."
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
Expand Down
Loading

0 comments on commit 9171565

Please sign in to comment.