## ETL Example with DataFrames

#### Extraction: create a DataFrame from a given JSON document

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.master("local") \
.appName("ETL") \
.getOrCreate()

In [2]:
# Read JSON file and create DataFrame
input_path = "census_2010.json"
census_df = spark.read.json(input_path)

In [3]:
# Counting records
census_df.count()

101

In [10]:
census_df.show()

+---+-------+-------+----+
|age|females|  males|year|
+---+-------+-------+----+
|  0|1994141|2085528|2010|
|  1|1997991|2087350|2010|
|  2|2000746|2088549|2010|
|  3|2002756|2089465|2010|
|  4|2004366|2090436|2010|
|  5|2005925|2091803|2010|
|  6|2007781|2093905|2010|
|  7|2010281|2097080|2010|
|  8|2013771|2101670|2010|
|  9|2018603|2108014|2010|
| 10|2023289|2114217|2010|
| 11|2026352|2118390|2010|
| 12|2037286|2132030|2010|
| 13|2060100|2159943|2010|
| 14|2089651|2195773|2010|
| 15|2117689|2229339|2010|
| 16|2146942|2263862|2010|
| 17|2165852|2285295|2010|
| 18|2168175|2285990|2010|
| 19|2159571|2272689|2010|
+---+-------+-------+----+
only showing top 20 rows



#### Transformation: several operations on data

In [11]:
# Select just the seniors 
seniors = census_df[census_df['age'] > 54]
seniors.count()

46

In [12]:
seniors.show()

+---+-------+-------+----+
|age|females|  males|year|
+---+-------+-------+----+
| 55|2167706|2059204|2010|
| 56|2106460|1989505|2010|
| 57|2048896|1924113|2010|
| 58|2001049|1869486|2010|
| 59|1957350|1819943|2010|
| 60|1908602|1765522|2010|
| 61|1859542|1710843|2010|
| 62|1794846|1642438|2010|
| 63|1706899|1553454|2010|
| 64|1604801|1452106|2010|
| 65|1505088|1353125|2010|
| 66|1404227|1253164|2010|
| 67|1314295|1164006|2010|
| 68|1242906|1092883|2010|
| 69|1184673|1034415|2010|
| 70|1126180| 975512|2010|
| 71|1069608| 918217|2010|
| 72|1018530| 865438|2010|
| 73| 973223| 817131|2010|
| 74| 932810| 772524|2010|
+---+-------+-------+----+
only showing top 20 rows



In [13]:
# create a new aggregated column called total, 
# which adds up the numbers of males and females
from pyspark.sql.functions import lit

seniors_final = seniors.withColumn('total', lit(seniors.males + seniors.females))

In [14]:
seniors_final.show()

+---+-------+-------+----+-------+
|age|females|  males|year|  total|
+---+-------+-------+----+-------+
| 55|2167706|2059204|2010|4226910|
| 56|2106460|1989505|2010|4095965|
| 57|2048896|1924113|2010|3973009|
| 58|2001049|1869486|2010|3870535|
| 59|1957350|1819943|2010|3777293|
| 60|1908602|1765522|2010|3674124|
| 61|1859542|1710843|2010|3570385|
| 62|1794846|1642438|2010|3437284|
| 63|1706899|1553454|2010|3260353|
| 64|1604801|1452106|2010|3056907|
| 65|1505088|1353125|2010|2858213|
| 66|1404227|1253164|2010|2657391|
| 67|1314295|1164006|2010|2478301|
| 68|1242906|1092883|2010|2335789|
| 69|1184673|1034415|2010|2219088|
| 70|1126180| 975512|2010|2101692|
| 71|1069608| 918217|2010|1987825|
| 72|1018530| 865438|2010|1883968|
| 73| 973223| 817131|2010|1790354|
| 74| 932810| 772524|2010|1705334|
+---+-------+-------+----+-------+
only showing top 20 rows



#### Loading: write the revised DataFrame into a database

In [None]:
# Write seniors_final DF into a MySQL table

# seniors_final\
#     .write\
#     .format("jdbc")\
#     .option("driver", "com.mysql.jdbc.Driver")\
#     .mode("overwrite")\
#     .option("url", "jdbc:mysql://localhost/testdb")\
#     .option("dbtable", "seniors")\
#     .option("user", "root")\
#     .option("password", "root_password")\
#     .save()