# Spark ETL Demo Scala

This demo written in Scala for Watson Data Studio illustrates the use of a Spark cluster to perform ETL. It imports data in flat files into Spark DataFrames, manipulates the data, aggregates it  and then writes the result out to a relational  database. The advantage of using Spark for this is scalability  (by using a larger cluster one can achieve close to linear scalability) and simplified error recovery (a failed attempt at running this ETL job can be repeated at any stage and the final result will be the same).

### Step 1 Read in the source data
We read  two CSV files. One has statistics about Social Security payments for the state of Texas by zipcode and the other maps US zipcodes to US counties so we can aggregate the Social Security data by county rather than zipcode.

Grab the input data files from Github and stick them in in gpfs using wget

In [None]:
import scala.sys.process._

val socialSecurityDataFile = "oasdi-tx-clean.csv"
val zipcodeDataFile = "zip_codes_states.csv"

s"wget -O $socialSecurityDataFile https://raw.githubusercontent.com/djccarew/sparketldemo/master/data/oasdi-tx-clean.csv".!
s"wget -O $zipcodeDataFile https://raw.githubusercontent.com/djccarew/sparketldemo/master/data/zip_codes_states.csv".!

Read in the Social Security data file into a DataFrame using a schema. Note the schema can be inferred but the inferred schema typically converts various numeric types to string so it's better to specify the schema so you know what you end up with 

In [None]:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._


val spark = SparkSession.
    builder().
    getOrCreate()

// Specify schema for resulting DataFrame
val socialSecurityDataSchema = StructType(Array(
        StructField("Zip", StringType, false),
        StructField("NumTotal", IntegerType, false),
        StructField("NumRetired", IntegerType, false),
        StructField("NumDisabled", IntegerType, false),
        StructField("NumWidowerOrParent", IntegerType,false),
        StructField("NumSpouses", IntegerType, false),
        StructField("NumChildren", IntegerType, false),
        StructField("BenTotal", IntegerType, false),
        StructField("BenRetired", IntegerType, false),
        StructField("BenWidowerOrParent", IntegerType, false),
        StructField("NumSeniors", IntegerType, false)))

// Read CSV file into DataFrame using schema 
val dfSocialSecurityDataRaw = spark.
    read.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat").
    option("header", "true").
    schema(socialSecurityDataSchema).
    load("oasdi-tx-clean.csv")

// Validate DataFrame was created correctly
dfSocialSecurityDataRaw.printSchema()


Repeat for zipcode data file

In [None]:
val zipDataSchema = StructType(Array(
      StructField("Zip", StringType, false),
      StructField("Latitude", DoubleType, false),
      StructField("Longitude", DoubleType, false),
      StructField("City", StringType, false),
      StructField("State", StringType, false),
      StructField("County", StringType, false)))

val dfZipDataRaw = spark.
    read.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat").
    option("header", "true").
    schema(zipDataSchema).
    load("zip_codes_states.csv")

dfZipDataRaw.printSchema()

### Step 2 Transform raw source data

Only need County name and zip code columns for this demo so we don't use the other columns in the zipcode data

In [None]:
val dfCounties = dfZipDataRaw.select("Zip", "County")
dfCounties.printSchema()

Join Social Security data with zipcode data to add a County column to Social Security data

In [None]:
var dfSocialSecurityDataWithCounty = dfSocialSecurityDataRaw.join(dfCounties,"Zip")
dfSocialSecurityDataWithCounty.printSchema()

Don't need the zipcode column anymore since we'll be aggregating by County instead

In [None]:
dfSocialSecurityDataWithCounty = dfSocialSecurityDataWithCounty.drop("Zip")
dfSocialSecurityDataWithCounty.printSchema()

Create a temp view so we can do the "by county" aggregation via SQL rather than using the Spark SQL DataFrame API. (Doing it via SQL is usually easier)

In [None]:
dfSocialSecurityDataWithCounty.createOrReplaceTempView("aggregated_by_county")

Spark SQL query to aggregate Social Security data by county and sort by county name

In [None]:
val dfSocialSecurityDataByCounty = spark.sql("select County, sum(NumTotal) as NumTotal, sum(NumRetired) as NumRetired, sum(NumDisabled) as NumDisabled, sum(NumWidowerOrParent) as NumWidowerOrParent, sum(NumSpouses) as NumSpouses, sum(NumChildren) as NumChildren, sum(BenTotal) as BenTotal, sum(BenRetired) as BenRetired, sum(BenWidowerOrParent) as BenWidowerOrParent, sum(NumSeniors) as NumSeniors from aggregated_by_county group by County order by County")
dfSocialSecurityDataByCounty.take(5)

### Step 3 Write modified data to target database

We use the jdbc method of the  DataFrameWriter to write the  modified data to the target db. Appropriate credentials for the target db need to be set up first 

In [None]:

val jdbcURL = "jdbc:db2://dashdb-txn-sbox-yp-dal09-04.services.dal.bluemix.net:50000/BLUDB"
val destTable = "WTM57848.TXSSBYCOUNTY"

val jdbcProperties = new java.util.Properties
jdbcProperties.setProperty("driver", "com.ibm.db2.jcc.DB2Driver")
jdbcProperties.setProperty("user","wtm57848")
jdbcProperties.setProperty("password","2ncndbg7bpb7^4p6")

dfSocialSecurityDataByCounty.write.mode("overwrite").jdbc(jdbcURL, destTable, jdbcProperties)
