## [**Extract, Transform, Load (ETL) with Python**](#Extract,-Transform,-Load-(ETL)-with-Python)

#### Practice with Pandas, Spark, SQlite and seaborn.

Extract, Transform, Load or ETL is a three-phase process.
ETL processing involves data extraction from sources, then processed by transformation such as cleaning, scrub, finally loaded into target.

ETL or ELT  are just concepts. In fact, sometimes these 3 phases are combined flexibly. 

The following demos would practive these 3 phase in a flexible manner. And involve several popular tools such as Pandas, numpy, Spark/pySpark and SQLite.

<img src=grafana.png width="50%"/>



In [None]:
import pandas as pd
import numpy as np
import pyspark.pandas as ps
from pyspark.sql import SparkSession
import os
import sqlite3
from collections import  Counter
import matplotlib.pyplot as plt
import seaborn as sns
sns.set_theme()

#### **Extract, Transform, Load or ETL** Demo 1

Extract source data from local file, then transform the raw data into a new structure, followd by write cleaned data into a local DB. For convenience, our source data is quite small. 

* [ ] Data Extraction from sources.
* [ ] Data Transformation such as cleaning, scrub.
* [ ] Load data into target.   

##### **Extract Data**

* [x] Data Extraction from sources;
* [ ] Data Transformation such as cleaning, scrub.
* [ ] Load data into target. 

In [None]:
#Source data and basic path
dataset = 'flight_ontime.csv'
data_path = os.path.join(os.path.curdir, 'data', dataset)

In [None]:
#List contents in data directory
os.listdir(os.path.join(os.path.curdir, 'data'))

In [None]:
#Read the first 20 rows from source data, a csv file, by Pandas.
df = pd.read_csv(data_path, nrows=20)

In [None]:
#Confirm Pandas DataFrame data type.
type(df)

we extract the first bath of data from a local csv file, into a Pandas DataFrame.

##### **Transform Data**

* [x] Data Extraction from sources;
* [x] Data Transformation such as cleaning, scrub.
* [ ] Load data into target. 

Look at the high level info about the data first, by 2 Pandas functions info() and describe().

In [None]:
#Check basic data type of each columns, non-null count, memory usage, etc.
#No missing data in this DataFrame
df.info()

20 entries in total, each one has 12 columns. No missing data, including 3 data types. 

In [None]:
#By default, only calculate numerical records  df.describe(include=[np.number])
#df.describe(include='all') for both numerical and non-numerical.
df.describe()

In [None]:
#To gather statistical information from non-numerical columns.
df.describe(exclude=[np.number])

By far we have a general idea about these data. Like airport 'ATL' ranked number one in frequency.The airplave 'N956DL' executed 2 flights. 'DL' is the only unique in the 'OP_CARRIER' column. What's the second airport in frequency? How many short-haul or long-haul flights are operated? What columns are we interested in? To get answers, we need Pandas or Python standard libs.

In [None]:
# Solution 1 call Python standard lib.  from collections import  Counter
# Counter(df.ORIGIN).most_common(2)[1]
Counter(df.ORIGIN).most_common()[1]

In [None]:
Counter(df.DEST).most_common( )[1]

In [None]:
# Solution 2 call Pandas.
pd.DataFrame.value_counts(df[['ORIGIN']])[:3]

In [None]:
pd.DataFrame.value_counts(df[['DEST']])[:3]

In [None]:
# Solution 3 call Pandas.
# df.ORIGIN.agg(lambda x: x.value_counts())
df['ORIGIN'].agg(lambda x: x.value_counts())[:3]

In [None]:
df['DEST'].agg(lambda x: x.value_counts())[:3]

In [None]:
# Solution 4 call Pandas.
df.groupby(['ORIGIN'])['ORIGIN'].agg('count').sort_values(ascending=False, inplace=False)[:5]

In [None]:
df.groupby(['DEST'])['DEST'].agg('count').sort_values(ascending=False, inplace=False)[:5]

In [None]:
##Or even plot.

sns.displot(df, x="ORIGIN", hue="ORIGIN", )

In [None]:
sns.catplot(data=df, x="ORIGIN", kind="count", palette="ch:.25")

In [None]:
sns.displot(df, x="DEST", hue="DEST", )

In [None]:
sns.catplot(data=df, x="DEST", kind="count", palette="ch:.25")

In [None]:
# SFO rank No2 as original airport

df[df['ORIGIN'] == 'SFO']

In [None]:
# MSP rank No2 as destination.
df[df['DEST'] == 'MSP']

According the basic aggregation, the flight DL1859 is quite close to long-haul. 

In [None]:
df[['DISTANCE']].agg(['max','min','mean', 'std'])

In [None]:
df[['DISTANCE']].apply(['max','min','mean', 'std'])

In [None]:
df['DISTANCE'].sort_values(ascending=False, inplace=False)

Plot distribution, based on distance as follow.

In [None]:
df['DISTANCE'].sort_values().plot(kind='hist')

In [None]:
sns.displot(df, x="DISTANCE", binwidth=200)

In [None]:
sns.displot(df, x="DISTANCE", hue="DEST",  element="step")

In [None]:
sns.displot(df, x="DISTANCE", hue="DEST",   multiple="stack")

In [None]:
df.groupby(['OP_CARRIER'])['OP_CARRIER'].agg('count')

In [None]:
df.groupby(['DEST'])['DEST'].agg('count')

In [None]:
df

By far, the first 10 columns looks ok. So drop the last 2 columns. We also have several solutions to select desired columns or drop not needed columns. The first of following solutions goes straightforward. Should be familiar with 'axis' parameter to apply the second one.
The last one is the most tedious among them, since we keep most columns. For the first 2 solutions, we could use inplace=True as well. 

In [None]:
# Solution 1: straightforward approach
# 
df = df.drop(columns=['ARR_DELAY', 'ARR_DEL15'])

In [None]:
# Solution 2: straightforward approach
# df = df.drop(['ARR_DELAY', 'ARR_DEL15'], axis=1)

In [None]:
# Solution 3: DataFrame column selection. Create a  20 x 10 DataFrame. 
# df = df[['FL_DATE', 'OP_CARRIER', 'TAIL_NUM', 'OP_CARRIER_FL_NUM', 'ORIGIN', 
#          'ORIGIN_CITY_NAME', 'DEST', 'DEST_CITY_NAME', 'DISTANCE', 'AIR_TIME',]]

In [None]:
dfn.info()

In [None]:
! ls -R  

##### **Load**

* [x] Data Extraction from sources;
* [x] Data Transformation such as cleaning, scrub.
* [x] Load data into target. 

Load our new DataFrame dfn into a SQLite file.

In [None]:
# sqlite3 create db and table

db_filename='flight10.db'

SQL = '''CREATE TABLE domestic20 (FL_DATE text, OP_CARRIER text, TAIL_NUM text, OP_CARRIER_FL_NUM int, ORIGIN text, ORIGIN_CITY_NAME text, DEST text, DEST_CITY_NAME text, AIR_TIME real, DISTANCE real)'''

with sqlite3.connect(db_filename) as conn:
    cursor = conn.cursor()
    # Create table 'domestic20'
    cursor.execute(SQL)
    conn.commit()
    

In [None]:
# work with sqlit3
'''
SQL = """
insert into domestic20 (FL_DATE, OP_CARRIER, TAIL_NUM, OP_CARRIER_FL_NUM, ORIGIN, ORIGIN_CITY_NAME, DEST, DEST_CITY_NAME, AIR_TIME, DISTANCE)
values (:FL_DATE, :OP_CARRIER, :TAIL_NUM, :OP_CARRIER_FL_NUM, :ORIGIN, 'ORIGIN_CITY_NAME', :DEST, 'DEST_CITY_NAME', :AIR_TIME, :DISTANCE)
"""

For demo we use following instead.
'''

SQL = "INSERT INTO  domestic20   VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"

with sqlite3.connect(db_filename) as conn:
    cursor = conn.cursor()
    cursor.executemany(SQL, df.values)
    conn.commit()    


In [None]:
! ls -R

In [None]:
# work with sqlit3

with sqlite3.connect(db_filename) as conn:
    cursor = conn.cursor()

    cursor.execute("""
    select * from domestic20
    """)
    print('\n20 flights:')
    for row in cursor.fetchmany(20):
        FL_DATE, OP_CARRIER , TAIL_NUM, OP_CARRIER_FL_NUM, \
        ORIGIN, ORIGIN_CITY_NAME, DEST, DEST_CITY_NAME, \
        AIR_TIME, DISTANCE = row
        
        print('{:<10} {:<4} {:<8} {:<4} {:<5} {:<20} {:<5} {:<20} {:<10} {:<10}'.format(
            FL_DATE, OP_CARRIER , TAIL_NUM, OP_CARRIER_FL_NUM, \
            ORIGIN, ORIGIN_CITY_NAME, DEST, DEST_CITY_NAME, \
            AIR_TIME, DISTANCE))

The first round ETL process is finished. We have practiced some basic functions of Pandas and SQLite. Next we would practice pyspark.

#### **Extract, Transform, Load or ETL** Demo 2

* [ ] Data Extraction from sources.
* [ ] Data Transformation such as cleaning, scrub.
* [ ] Load data into target. 

##### **Extract Data**

* [x] Data Extraction from sources;
* [ ] Data Transformation such as cleaning, scrub.
* [ ] Load data into target. 

In [None]:
# Create a SparkSession for further operation 

spark = SparkSession.builder.getOrCreate()

Access our SQLite db by pyspark.

In [None]:
# spark read db
driver = "org.sqlite.JDBC"
url = "jdbc:sqlite:" + db_filename
tablename = "domestic20"

spark.read.format('jdbc').options(driver=driver, 
                                  dbtable=tablename,
                                  url=url).load().show(5, truncate=False)

In [None]:
# spark read db
# Attention: Here use option() not options().
# Alternative of above method

dbDataFrame = spark.read.format("jdbc").option("url", url)\
                                       .option("dbtable", tablename)\
                                       .option("driver",  driver)\
                                       .load()

Above operation created a 'pyspark.sql.dataframe.DataFrame' from our SQLite DB data.

In [None]:
type(dbDataFrame)

List all entries of our SQLite DB. Since it's so tiny, we could do it without concern.

In [None]:
dbDataFrame.show(truncate=False)

##### **Transform Data**

* [x] Data Extraction from sources;
* [x] Data Transformation such as cleaning, scrub.
* [ ] Load data into target. 

This time we create our own function to transform DataFrame. Define a function to convert 'nmi' into 'km'.

In [None]:
# udf convert nmi to km for DISTANCE
def nmi_km(nmi):
    return nmi * 1.85

In [None]:
#Define our own function
from pyspark.sql.functions import udf
from pyspark.sql.functions import col

convert_distance = udf(nmi_km)

In [None]:
dbDataFrame.select(convert_distance(col('DISTANCE'))).show(5)

In [None]:
#udf convert distance from nmi to km
dbDataFrame.withColumn('DISTANCE_km', convert_distance(dbDataFrame.DISTANCE)).show(5)

Function nmi_km() works.

In [None]:
dbDataFrame.show(5)

pyspark also includes many useful tools for string anf number operations. Let's try some of them.

In [None]:
#String operation
from pyspark.sql.functions import upper, lower

dbDataFrame.withColumn('lower_ORIGIN', lower(dbDataFrame.ORIGIN)).show(5)

In [None]:
#Aggregation 
from pyspark.sql.functions import count
dbDataFrame.select(count("DEST")).show() # 20

In [None]:
# in Python
from pyspark.sql.functions import countDistinct
dbDataFrame.select(countDistinct("DEST_CITY_NAME")).show() # 11

# -- in SQL
# SELECT COUNT(DEST_CITY_NAME *) FROM TABLE

In [None]:
# in Python
from pyspark.sql.functions import approx_count_distinct
dbDataFrame.select(approx_count_distinct("DEST_CITY_NAME", 0.2)).show() # 

# -- in SQL
# SELECT approx_count_distinct(DEST_CITY_NAME, 0.2) FROM TABLE

In [None]:
# in Python
from pyspark.sql.functions import first, last
dbDataFrame.select(first("DEST_CITY_NAME"), last("DEST_CITY_NAME")).show()

# -- in SQL
# SELECT first(DEST_CITY_NAME), last(DEST_CITY_NAME) FROM TABLE

In [None]:
# # in Python
from pyspark.sql.functions import min, max
dbDataFrame.select(min("DISTANCE"), max("DISTANCE")).show()

# -- in SQL
# SELECT min(DISTANCE), max(DISTANCE) FROM TABLE

In [None]:
# # in Python
from pyspark.sql.functions import sum
dbDataFrame.select(sum("DISTANCE")).show() # 2711

# -- in SQL
# SELECT sum(DISTANCE) FROM TABLE

In [None]:
# # in Python
from pyspark.sql.functions import sumDistinct, sum_distinct
dbDataFrame.select(sum_distinct("AIR_TIME")).show() # 

# 
# -- in SQL
# SELECT SUM(AIR_TIME) FROM TABLE 

In [None]:
# # in Python
from pyspark.sql.functions import sum, count, avg, expr, max, min


dbDataFrame.select(
    count("DISTANCE").alias("dest_route"),
    sum("DISTANCE").alias("total_distance"),
    avg("DISTANCE").alias("avg_distance"),
    expr("mean(DISTANCE)").alias("mean_distance"),
    max("DISTANCE").alias("long_route"),
    min("DISTANCE").alias("short_route"))\
  .selectExpr(
    "total_distance/dest_route",
    "avg_distance",
    "mean_distance", "short_route" , "long_route").show()
# 

Similar to Pandas operation we just did amoment ago.

In [None]:
# Grouping
dbDataFrame.groupBy("ORIGIN", "DEST").count().show()

In [None]:
# Grouping
dbDataFrame.groupBy("ORIGIN",).count().show()

In [None]:
dbDataFrame.groupBy("DEST").count().show()

In [None]:
# in Python
from pyspark.sql.functions import count

dbDataFrame.groupBy("DEST").agg(
    count("ORIGIN").alias("original"),
    expr("count(ORIGIN)")).show()

#### **Extract, Transform, Load or ETL** Demo 3

This section mainly focus on pyspark read write SQLite DB.

* [ ] Try pyspark.pandas.
* [ ] Read/Write data with pyspark.

##### **Try pyspark.pandas**

* [x] Try pyspark.pandas.
* [ ] Read/Write data with pyspark.

In [None]:
#import pyspark.pandas as ps

df = ps.read_sql("domestic20", con="jdbc:sqlite:{}/{}".format(os.getcwd(), db_filename  ))
df

This is not **pandas.core.frame.DataFrame** but a **pyspark.pandas.frame.DataFrame**.

In [None]:
type(df)

Select a subset of the new DataFrame by ps.read_sql. 

In [None]:
df2 = ps.read_sql("SELECT FL_DATE, OP_CARRIER, TAIL_NUM, OP_CARRIER_FL_NUM, ORIGIN, DEST, AIR_TIME, DISTANCE FROM  domestic20 LIMIT 5", \
                  con="jdbc:sqlite:{}/{}".format(os.getcwd(), db_filename ))
df2

Another approach to select data.

In [None]:
df3 = ps.read_sql("domestic20", 
                  con="jdbc:sqlite:{}/{}".format(os.getcwd(), db_filename),
                  columns=['ORIGIN', 'ORIGIN_CITY_NAME', 'DEST', 'DEST_CITY_NAME', 'DISTANCE']
                 )
df3

**Create a PySpark DataFrame from a pandas DataFrame**

In [None]:
# Pandas DataFrame
df = pd.read_csv(data_path, nrows=20)
# DataFrame column selection. Create a new  20 x 10 DataFrame
col10_20 = df[['FL_DATE', 'OP_CARRIER', 'TAIL_NUM', 'OP_CARRIER_FL_NUM', 
               'ORIGIN', 'ORIGIN_CITY_NAME','DEST', 'DEST_CITY_NAME', 
               'AIR_TIME', 'DISTANCE']]

In [None]:
type(col10_20)

In [None]:
df = spark.createDataFrame(col10_20)
df

**pyspark.sql.dataframe.DataFrame**

In [None]:
type(df)

In [None]:
df.show(3)
df.printSchema()

In [None]:
df.columns

In [None]:
df.show(1, vertical=True)

In [None]:
df.show(2, vertical=True)

**convert pyspark.sql.dataframe.DataFrame to pandas.core.frame.DataFrame**

In [None]:
# 
type(df.toPandas())

In [None]:
# 
df.toPandas()[:3]

In [None]:
type(df)

Transform data with new type.

In [None]:
df.select(df.ORIGIN_CITY_NAME).show()

In [None]:
df.withColumn('lower_ORIGIN', lower(df.ORIGIN)).show(5)

In [None]:
df.filter(df.DISTANCE >= 1000).show(5)

In [None]:
df['ORIGIN', 'DEST', 'AIR_TIME','DISTANCE' ].groupby('ORIGIN', 'DEST').avg('AIR_TIME','DISTANCE').show()

In [None]:
df

##### **Read/Write data with pyspark.**

* [x] Try pyspark.pandas.
* [x] Read/Write data with pyspark.

In [None]:
# spark DataFrame write into db. Attention the mode
# mode = "append"
# mode = "overwrite"
# mode = "ignore"

# mode : str, optional
#     specifies the behavior of the save operation when data already exists.

#     * ``append``: Append contents of this :class:`DataFrame` to existing data.
#     * ``overwrite``: Overwrite existing data.
#     * ``ignore``: Silently ignore this operation if data already exists.
#     * ``error`` or ``errorifexists`` (default case): Throw an exception if data already exists.

In [None]:
# driver = "org.sqlite.JDBC"
# url = ''.join(("jdbc:sqlite:", db_filename))
# tablename = "domestic20"
# 
mode = "ignore"
# 
df.write.jdbc(url, tablename, mode)

In [None]:
spark.read.format('jdbc').options(driver='org.sqlite.JDBC', 
                                  dbtable=tablename,
                                  url=url).load().show(20, truncate=False)

In [None]:
#
df.summary()

In [None]:
# Select subset
df1000 = df.filter(df.DISTANCE >= 1000)

In [None]:
#
df1000.show()

In [None]:
# 
type(df1000)

In [None]:
# 
df1000.describe()

In [None]:
#"AIR_TIME" and "DISTANCE" are highly correlated.
df1000.stat.corr("AIR_TIME", "DISTANCE")

In [None]:
# Now the table domestic20 of database will be overwrite by following operation.
# New table will only has DISTANCE >= 1000 entries of the former one.
# 
mode = "overwrite"
# 
df1000.write.jdbc(url, tablename, mode)

In [None]:
spark.read.format('jdbc').options(driver='org.sqlite.JDBC', 
                                  dbtable=tablename,
                                  url=url).load().show(30, truncate=False)

overwrite mode replace the SQLite with news data.

In [None]:
# Further data selectino from pyspark.sql.dataframe.DataFrame
df.filter(df.DISTANCE <= 500).collect()

In [None]:
# 
df500 = df.filter(df.DISTANCE <= 500)

In [None]:
# New table will add DISTANCE <= 500 entries of the former one.
# 
mode = "append"
# 
df500.write.jdbc(url, tablename, mode)

New data is appended into SQLite DB.

In [None]:
# Confirm short distance flights "DISTANCE <= 300"  had been added successfully.
# mode = "append"
spark.read.format('jdbc').options(driver='org.sqlite.JDBC', 
                                  dbtable=tablename,
                                  url=url).load().show(60, truncate=False)

### **Summary**

From the 3 demos we get familiar wth ETL and related tools such as Pandas and pyspark. These are just small part for real cases. By work through above steps, audience could build their own ETL/ELT demo and handle basic projects. This notebook is not the final version. I would improve it gradually. 

The picture at the very top of this note is a grafana dashboard. I will add grafana into this note.

Hope this note could help someone. Thanks!