# ETL with Python Tutorial

# pETL package
pETL is a general purpose Python package for extracting, transforming and loading tables of data.
<br>
Extract & Load : http://petl.readthedocs.io/en/latest/io.html
<br>
transform: http://petl.readthedocs.io/en/latest/transform.html

special thanks for https://github.com/justmarkham and https://github.com/guipsamora for the dataset

install petl by running ```pip install petl``` in Command-Line

In [1]:
import petl as etl

# Extract

### Extract JSON file and display

In [2]:
filename= 'drinks.json'

In [3]:
t1 = etl.fromjson(filename)
t1.display(10)

country,total_pure_alcohol,continent,servings
Afghanistan,0.0 Litres,AS,"[beer:0, spirit:0, wine:0]"
Albania,4.9 Litres,EU,"[beer:89, spirit:132, wine:54]"
Algeria,0.7 Litres,AF,"[beer:25, spirit:0, wine:14]"
Andorra,12.4 Litres,EU,"[beer:245, spirit:138, wine:312]"
Angola,5.9 Litres,AF,"[beer:217, spirit:57, wine:45]"
Antigua & Barbuda,4.9 Litres,,"[beer:102, spirit:128, wine:45]"
Argentina,8.3 Litres,SA,"[beer:193, spirit:25, wine:221]"
Armenia,3.8 Litres,EU,"[beer:21, spirit:179, wine:11]"
Australia,10.4 Litres,OC,"[beer:261, spirit:72, wine:212]"
Austria,9.7 Litres,EU,"[beer:279, spirit:75, wine:191]"


one of the key benefits from using Petl is low memory consumption, since the code is not reading the whole file, until we execute this command directy.  

Moreover - petl can the the data directly from zip files:

In [4]:
source = etl.sources.ZipSource('drinks.zip','drinks.json') # where is the data
etl.fromjson(source).display()

country,total_pure_alcohol,continent,servings
Afghanistan,0.0 Litres,AS,"[beer:0, spirit:0, wine:0]"
Albania,4.9 Litres,EU,"[beer:89, spirit:132, wine:54]"
Algeria,0.7 Litres,AF,"[beer:25, spirit:0, wine:14]"
Andorra,12.4 Litres,EU,"[beer:245, spirit:138, wine:312]"
Angola,5.9 Litres,AF,"[beer:217, spirit:57, wine:45]"


#### See the number of records

In [5]:
t1.nrows()

193

### Retrieving fields

In [6]:
fields = t1.fieldnames()
print fields

(u'country', u'total_pure_alcohol', u'continent', u'servings')


### Accessing specific value
our table is an iterable object of rows of cells:

In [7]:
print t1[2][1]

4.9 Litres


but we can slice the tables by columns too:

In [8]:
print t1['total_pure_alcohol'][1]

4.9 Litres


### Learning types

In [9]:
for f in fields:
    print f,'\t', t1.typecounter(f)

country 	Counter({'unicode': 193})
total_pure_alcohol 	Counter({'unicode': 193})
continent 	Counter({'unicode': 170, 'NoneType': 23})
servings 	Counter({'unicode': 193})


We can see few problems in our data, that need to be solved before loading to DB:
- total alcohol is a string and not a number
- servings is a unicode string and not a list or dictionary, and it is not normalized 1NF
- continent can contain empty values

Lets convert our data into 2 tables:
<table bordercolor="white"><tr><td>
<table>
<tr><td colspan=2 bgcolor='bbbbbb'>Countries</td></tr>
<tr><td><u>Country</td><td>String</td></tr>
<tr><td>Continent</td><td>String</td></tr>
<tr><td>Total_alcohol</td><td>Float</td></tr>
<tr><td>Total_serving</td><td>Integer</td></tr>
</table>
</td><td>
<table>
<tr><td colspan=2 bgcolor='bbbbbb'>Servings</td></tr>
<tr><td><u>S_id</td><td>Integer</td></tr>
<tr><td>Country</td><td>String</td></tr>
<tr><td>Type</td><td>String</td></tr>
<tr><td>servings</td><td>Integer</td></tr>
</table>
</td></tr>
</table>

_______________________________

# Transform

#### Lets define a function to converttotal_pure_alcohol to float

In [10]:
def to_float(text):
    return float(text[:-7])

print to_float('4.9 Litres')

4.9


#### Now we will apply this function to each of the rows

In [11]:
t2 = t1.convert('total_pure_alcohol',to_float)

In [12]:
print t2.typecounter('total_pure_alcohol')
t2.display(10)

Counter({'float': 193})


country,total_pure_alcohol,continent,servings
Afghanistan,0.0,AS,"[beer:0, spirit:0, wine:0]"
Albania,4.9,EU,"[beer:89, spirit:132, wine:54]"
Algeria,0.7,AF,"[beer:25, spirit:0, wine:14]"
Andorra,12.4,EU,"[beer:245, spirit:138, wine:312]"
Angola,5.9,AF,"[beer:217, spirit:57, wine:45]"
Antigua & Barbuda,4.9,,"[beer:102, spirit:128, wine:45]"
Argentina,8.3,SA,"[beer:193, spirit:25, wine:221]"
Armenia,3.8,EU,"[beer:21, spirit:179, wine:11]"
Australia,10.4,OC,"[beer:261, spirit:72, wine:212]"
Austria,9.7,EU,"[beer:279, spirit:75, wine:191]"


## Lets edit `servings` field
#### first we will convert the string to dictionary
dictionary is a useful data structure for our following steps

In [13]:
# lets check servings content and type on  a sample value
sample = t2['servings'][5]
print sample
print type(sample)

[beer:102, spirit:128, wine:45]
<type 'unicode'>


In [14]:
def to_dict(text):
    if text[0]=='[' and text[-1]==']':
        items =  text[1:-1].split(', ') # turn values to list
    else:
        return None
    output = {}
    for i in items:
        key, value = i.split(':') # splitting to key and value
        output[key]=int(value)
    return output

    
print to_dict(sample)
print type(to_dict(sample))

{u'beer': 102, u'spirit': 128, u'wine': 45}
<type 'dict'>


In [15]:
t3 = t2.convert('servings',to_dict)
t3.typecounter('servings')

Counter({'dict': 193})

### Now we will unpack `servings` (convert iterable field to number of fields)

In [16]:
t4 = t3.unpackdict('servings')
t4.display(5)

country,total_pure_alcohol,continent,beer,spirit,wine
Afghanistan,0.0,AS,0,0,0
Albania,4.9,EU,89,132,54
Algeria,0.7,AF,25,0,14
Andorra,12.4,EU,245,138,312
Angola,5.9,AF,217,57,45


learn more about unpacking: http://petl.readthedocs.io/en/latest/transform.html#unpacking-compound-values

#### We can add a new field based on row calculation - for example total price

In [17]:
def total_serving(row):
    return row['beer']+row['spirit']+row['wine']

In [18]:
t5 = t4.addfield('total_serving',total_serving)
t5.display(10)

country,total_pure_alcohol,continent,beer,spirit,wine,total_serving
Afghanistan,0.0,AS,0,0,0,0
Albania,4.9,EU,89,132,54,275
Algeria,0.7,AF,25,0,14,39
Andorra,12.4,EU,245,138,312,695
Angola,5.9,AF,217,57,45,319
Antigua & Barbuda,4.9,,102,128,45,275
Argentina,8.3,SA,193,25,221,439
Armenia,3.8,EU,21,179,11,211
Australia,10.4,OC,261,72,212,545
Austria,9.7,EU,279,75,191,545


### Lets cut out the relevant table for `countries` table

In [19]:
out_countries = t5.cut(['country','continent','total_pure_alcohol','total_serving'])
out_countries.display()

country,continent,total_pure_alcohol,total_serving
Afghanistan,AS,0.0,0
Albania,EU,4.9,275
Algeria,AF,0.7,39
Andorra,EU,12.4,695
Angola,AF,5.9,319


### Our next step will be to convert each row into multiple rows (by serving type)

### Lets cut the needed columns: 

In [20]:
t6 = t5.cut(['country','beer','wine','spirit'])
t6.display()

country,beer,wine,spirit
Afghanistan,0,0,0
Albania,89,54,132
Algeria,25,14,0
Andorra,245,312,138
Angola,217,45,57


### And perform Melting (row -> rows)

In [21]:
t7 = t6.melt(key=['country'])
t7.display(6)

country,variable,value
Afghanistan,beer,0
Afghanistan,wine,0
Afghanistan,spirit,0
Albania,beer,89
Albania,wine,54
Albania,spirit,132


### Let's filter out zero values and add row_numbers

In [23]:
t8 = t7.selectisnot('value',0) # selecting rows where serving !=0
t9 = t8.addrownumbers()
t9.display()

row,country,variable,value
1,Albania,beer,89
2,Albania,wine,54
3,Albania,spirit,132
4,Algeria,beer,25
5,Algeria,wine,14


In [24]:
out_servings = t9.rename({'row':'s_id','variable':'type','value':'servings'}) # renaming columns
out_servings.display()

s_id,country,type,servings
1,Albania,beer,89
2,Albania,wine,54
3,Albania,spirit,132
4,Algeria,beer,25
5,Algeria,wine,14


#### Lets check if joining our two tables results in logical output

In [25]:
out_countries.join(out_servings, # right table
                   lkey='country',rkey='country', #join equality columns
                   lprefix = 'c_', rprefix='serv_') # prefixes of columns from each table (not mandatory)

c_country,c_continent,c_total_pure_alcohol,c_total_serving,serv_s_id,serv_type,serv_servings
Albania,EU,4.9,275,1,beer,89
Albania,EU,4.9,275,2,wine,54
Albania,EU,4.9,275,3,spirit,132
Algeria,AF,0.7,39,4,beer,25
Algeria,AF,0.7,39,5,wine,14


<b>Transform</b> is a very rich library that can be combined with more internal and external functions.  
It's recommended to check the documentation of the following actions as well:
- Cat / Stack  - Union like operations
- Select family - Picking a subset of rows by condition / conditions
- Sort - Sorting the values
- Join / Leftjoin / Rightjoin / Outerjoin / Crossjoin - various join functionality
- Unjoin - Reversing Inner Join
- Unique / Duplicates / Distinct - Working with duplicated values
- Aggregate - Makes Aggregation calculation based on key (Group by)

### Let's try the aggregate function to find the maximal total serving at each continent

In [127]:
t5.display(5)

country,total_pure_alcohol,continent,beer,spirit,wine,total_serving
Afghanistan,0.0,AS,0,0,0,0
Albania,4.9,EU,89,132,54,275
Algeria,0.7,AF,25,0,14,39
Andorra,12.4,EU,245,138,312,695
Angola,5.9,AF,217,57,45,319


In [128]:
t5.aggregate('continent',max,'total_serving').displayall()

continent,value
,665
AF,504
AS,646
EU,695
OC,545
SA,439


_________________________________


# Load

We can save the ouput in multiple ways. First - Let's try csv (that we already know how to load to MySQL)

In [129]:
out_countries.tocsv('out_countries.csv')
out_servings.tocsv('out_servings.csv')

Now we will work with MySQL cursor to load the data - First we'll create the schema and tables

note - SqlAlchemy package is required

In [130]:
import MySQLdb as mdb

In [None]:
user = 'YOURUSERNAME'
passwd = 'YOURPASSWORD'

In [131]:
import MySQLdb as mdb
con = mdb.connect(
                host = '127.0.0.1', user = user, passwd = passwd) #optional - db="schema_name"  
# setting a cursor
cur = con.cursor()     # get the cursor

Create and use the rellevant schema:

In [132]:
cur.execute('DROP SCHEMA IF EXISTS drinks')
cur.execute('CREATE SCHEMA IF NOT EXISTS drinks')
cur.execute('USE drinks')
cur.execute('SET SQL_MODE=ANSI_QUOTES') #important for petl operation - use this kind of quotes

0L

In [133]:
out_countries.todb(cur,'countries',schema='drinks', create='True')

In [134]:
out_servings.todb(cur,'serving',schema='drinks', create='True')

#### We can also append data to existing tables

In [135]:
addition = [{'country':'Neverland','continent':None,'total_pure_alcohol':0,'total_serving':0}]

In [138]:
add_table = etl.fromdicts(addition) # create table object from dictionary
add_table.display(10)

country,total_pure_alcohol,continent,total_serving
Neverland,0,,0


In [139]:
add_table.appenddb(cur,'countries',schema='drinks',commit=True)

#### Now we can find all our data in MySQL server:

In [140]:
cur.execute(""" SELECT * 
                FROM drinks.countries
                WHERE country like 'N%' ;""")

print cur.fetchall()

(('North Korea', 'AS', 0.0, 0L), ('Namibia', 'AF', 6.8, 380L), ('Nauru', 'OC', 1.0, 57L), ('Nepal', 'AS', 0.2, 11L), ('Netherlands', 'EU', 9.4, 529L), ('New Zealand', 'OC', 9.3, 457L), ('Nicaragua', None, 3.5, 197L), ('Niger', 'AF', 0.1, 6L), ('Nigeria', 'AF', 9.1, 49L), ('Niue', 'OC', 7.0, 395L), ('Norway', 'EU', 6.7, 369L), ('Neverland', None, 0.0, 0L), ('Neverland', None, 0.0, 0L))


## P.S. - Pandas

Another very commonly used package for data manipulation is Pandas (http://pandas.pydata.org/), used not only for ETL but for data analysis, visuialization and data mining. You're welcome to check out this package and use it in your projects.