In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Tutorial').getOrCreate()

In [2]:
from pyspark.sql.functions import *
from pyspark.sql import DataFrame

The following variables are required to use FileSystem to scan directory

In [3]:
URI           = spark.sparkContext._gateway.jvm.java.net.URI
Path          = spark.sparkContext._gateway.jvm.org.apache.hadoop.fs.Path
FileSystem    = spark.sparkContext._gateway.jvm.org.apache.hadoop.fs.FileSystem
Configuration = spark.sparkContext._gateway.jvm.org.apache.hadoop.conf.Configuration
FileStatus    = spark.sparkContext._gateway.jvm.org.apache.hadoop.fs.FileStatus

Properties for comma delimited files

In [4]:
properties = {
    "header": "true",
    "delimiter": ",",
    "inferSchema": "false",
    "ignoreLeadingWhiteSpace": "true",
    "ignoreTrailingWhiteSpace": "true",
    "quote": "\"",
    "escape": "\""
}

Function: readFile
3 input variables: path, properties and boolean if you want to create a source_path column
Output: DataFrame

In [5]:
def readFile(dataFilePath, fileProperties, source_path):
    inputDF = spark.read.format("csv")\
        .option("quote", "")\
        .option("header", fileProperties.get("header"))\
        .option("delimiter", fileProperties.get("delimiter"))\
        .option("inferSchema", fileProperties.get("inferSchema"))\
        .option("ignoreLeadingWhiteSpace", fileProperties.get("ignoreLeadingWhiteSpace"))\
        .option("ignoreTrailingWhiteSpace", fileProperties.get("ignoreTrailingWhiteSpace"))\
        .option("quote", fileProperties.get("quote"))\
        .option("escape", fileProperties.get("escape"))\
        .load(dataFilePath)
        
    if (source_path):
        inputDF = inputDF.withColumn("source_path", input_file_name())
    else:
        inputDF
    return inputDF

Function: readMultipleCSV
4 input variables: path, table name, properties and boolean if you want to create a source_path column
Output: DataFrame

In [9]:
def readMultipleCSV(dataPath, table, fileProperties, source_path):
    inputs = list()
    
    fs = FileSystem.get(URI("localhost"), Configuration())
    status = fs.listStatus(Path(dataPath))
    for fileStatus in status:
        path = fileStatus.getPath().toString()
        inputs.append(path)
        
    input = filter(lambda x: table in x, inputs)
    
    for idx,f in enumerate(input):
        if idx == 0:
            inputDF = readFile(f, fileProperties, source_path)
            outputDF = inputDF
        else:
            inputDF = readFile(f, fileProperties, source_path)
            outputDF=outputDF.unionAll(inputDF)
        
    return outputDF

Read nonEmp data: source_path = True as we want to pull the date from file name; rename ST and CTY

In [11]:
nonEmp = readMultipleCSV("de-tutorial/src/main/resources/data/", "nonemp", properties, True)\
    .withColumn("YEAR", concat(lit(20), regexp_extract(col("source_path"), "([0-9]{2})", 1)))\
    .withColumnRenamed("ST", "STATE")\
    .withColumnRenamed("CTY", "COUNTY")

Add new properties for population table as it is '|' delimited opposed to ',' delimited

In [12]:
popProperties = {
    "header": "true",
    "delimiter": "|",
    "inferSchema": "false",
    "ignoreLeadingWhiteSpace": "true",
    "ignoreTrailingWhiteSpace": "true",
    "quote": "\"",
    "escape": "\""
}

Create a function to unpivot/melt population DataFrame

      Current:
       STATE|POP_2018|POP_2017
           x|      20|      22
      
      We want:
       STATE|YEAR|POP
           x|2018| 20
           x|2017| 22
           
Function: unpivot
4 input variables: DataFrame, by fields, key column name, val column name
Output: DataFrame

In [13]:
def unpivot(df, by, key, val):

    # Filter dtypes and split into column names and type description
    cols, dtypes = zip(*((c, t) for (c, t) in df.dtypes if c not in by))
    # Spark SQL supports only homogeneous columns
    assert len(set(dtypes)) == 1, "All columns have to be of the same type"

    # Create and explode an array of (column_name, column_value) structs
    kvs = explode(array([
      struct(lit(c).alias(key), col(c).alias(val)) for c in cols
    ])).alias("kvs")

    return df.select(by + [kvs]).select(by + ["kvs."+key, "kvs."+val])

Read population statistic data using new properties and source_path = False

In [14]:
pop = readFile("de-tutorial/src/main/resources/data/sub-est2018_all.csv", popProperties, False)

Unpivot population statistics using function. Drop unwanted field before unpivot

In [15]:
newPop = unpivot(pop.drop('SUMLEV', 'PLACE', 'COUSUB', 'CONCIT', 'PRIMGEO_FLAG', 'FUNCSTAT', 'CENSUS2010POP', 'ESTIMATESBASE2010'), ['STATE', 'COUNTY', 'NAME'], 'YEAR', 'POPULATION')

Read states mapping table

In [16]:
states = readFile("de-tutorial/src/main/resources/data/states.csv", properties, False)

Join states mapping table onto unpivoted population statistics. Extract year from 'YEAR' column using regex

In [17]:
enrichPop = newPop.join(broadcast(states), ['STATE'], 'left_outer')\
    .withColumn("YEAR", regexp_extract(col('YEAR'), "([0-9]{4})", 1))

Join enriched Population DataFrame onto the NonEmployer statistics

In [18]:
outputDF = nonEmp.join(enrichPop, ['STATE', 'COUNTY', 'YEAR'], "left_outer")

Save output to parquet, partitioned by "STNAME"

In [None]:
outputDF.write\
    .mode("Overwrite")\
    .partitionBy("STNAME")\
    .parquet("data.parq")\