In [2]:
import pandas as pd
import os, re
import configparser
from datetime import timedelta, datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, when, lower, isnull, year, month, dayofmonth, hour, weekofyear, dayofweek, date_format, to_date
from pyspark.sql.types import StructField, StructType, IntegerType, DoubleType, LongType

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["PATH"] = "/opt/conda/bin:/opt/spark-2.4.3-bin-hadoop2.7/bin:/opt/conda/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/lib/jvm/java-8-openjdk-amd64/bin"
os.environ["SPARK_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"
os.environ["HADOOP_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"

In [3]:
# The AWS key id and password are configured in a configuration file "dl.cfg"
config = configparser.ConfigParser()
config.read('dl.cfg')
# Reads and saves the AWS access key information and saves them in a environment variable
os.environ['AWS_ACCESS_KEY_ID']=config['KEYS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['KEYS']['AWS_SECRET_ACCESS_KEY']

In [4]:
spark = SparkSession.builder.getOrCreate()

### Read US Cities Demo dataset file

In [6]:
demographics=spark.read.csv("us-cities-demographics.csv", sep=';', header=True)

### Verifying Total Number of Records

In [7]:
demographics.count()

2891

### Print Schema to verify that all the columns are in "string" format

In [8]:
demographics.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: string (nullable = true)
 |-- Male Population: string (nullable = true)
 |-- Female Population: string (nullable = true)
 |-- Total Population: string (nullable = true)
 |-- Number of Veterans: string (nullable = true)
 |-- Foreign-born: string (nullable = true)
 |-- Average Household Size: string (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: string (nullable = true)



In [9]:
def cast_type(df, cols):
    """
    Convert the types of the columns according to the configuration supplied in the cols dictionary in the format {"column_name": type}
    
    Args:
        df (:obj:`SparkDataFrame`): Spark dataframe to be processed. 
            Represents the entry point to programming Spark with the Dataset and DataFrame API.
        cols (:obj:`dict`): Dictionary in the format of {"column_name": type} indicating what columns and types they should be converted to
    """
    for k,v in cols.items():
        if k in df.columns:
            df = df.withColumn(k, df[k].cast(v))
    return df

### Convert numeric columns to the proper types: Integer and Double

In [10]:
int_cols = ['Count', 'Male Population', 'Female Population', 'Total Population', 'Number of Veterans', 'Foreign-born']
float_cols = ['Median Age', 'Average Household Size']
demographics = cast_type(demographics, dict(zip(int_cols, len(int_cols)*[IntegerType()])))
demographics = cast_type(demographics, dict(zip(float_cols, len(float_cols)*[DoubleType()])))

### Get aggregation data

In [12]:
first_agg = {"Median Age": "first", "Male Population": "first", "Female Population": "first", 
            "Total Population": "first", "Number of Veterans": "first", "Foreign-born": "first", "Average Household Size": "first"}

# First aggregation - City
agg_df = demographics.groupby(["City", "State", "State Code"]).agg(first_agg)

# Pivot Table to transform values of the column Race to different columns
piv_df = demographics.groupBy(["City", "State", "State Code"]).pivot("Race").sum("Count")

In [13]:
agg_df.head()

Row(City='Rockville', State='Maryland', State Code='MD', first(Total Population)=66998, first(Female Population)=35793, first(Median Age)=38.1, first(Number of Veterans)=1990, first(Foreign-born)=25047, first(Male Population)=31205, first(Average Household Size)=2.6)

In [14]:
piv_df.head()

Row(City='Delray Beach', State='Florida', State Code='FL', American Indian and Alaska Native=None, Asian=1696, Black or African-American=21138, Hispanic or Latino=6397, White=40980)

### Rename column names removing the spaces to avoid problems when saving to disk (we got errors when trying to save column names with spaces)

In [15]:
demographics = agg_df.join(other=piv_df, on=["City", "State", "State Code"], how="inner")\
                    .withColumnRenamed('State Code', 'StateCode')\
                    .withColumnRenamed('first(Total Population)', 'TotalPopulation')\
                    .withColumnRenamed('first(Female Population)', 'FemalePopulation')\
                    .withColumnRenamed('first(Male Population)', 'MalePopulation')\
                    .withColumnRenamed('first(Median Age)', 'MedianAge')\
                    .withColumnRenamed('first(Number of Veterans)', 'NumberVeterans')\
                    .withColumnRenamed('first(Foreign-born)', 'ForeignBorn')\
                    .withColumnRenamed('first(Average Household Size)', 'AverageHouseholdSize')\
                    .withColumnRenamed('Hispanic or Latino', 'HispanicOrLatino')\
                    .withColumnRenamed('Black or African-American', 'BlackOrAfricanAmerican')\
                    .withColumnRenamed('American Indian and Alaska Native', 'AmericanIndianAndAlaskaNative')

In [16]:
demographics.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- StateCode: string (nullable = true)
 |-- TotalPopulation: integer (nullable = true)
 |-- FemalePopulation: integer (nullable = true)
 |-- MedianAge: double (nullable = true)
 |-- NumberVeterans: integer (nullable = true)
 |-- ForeignBorn: integer (nullable = true)
 |-- MalePopulation: integer (nullable = true)
 |-- AverageHouseholdSize: double (nullable = true)
 |-- AmericanIndianAndAlaskaNative: long (nullable = true)
 |-- Asian: long (nullable = true)
 |-- BlackOrAfricanAmerican: long (nullable = true)
 |-- HispanicOrLatino: long (nullable = true)
 |-- White: long (nullable = true)



In [18]:
numeric_cols = [
                    'TotalPopulation', 
                    'FemalePopulation', 
                    'MedianAge', 
                    'NumberVeterans', 
                    'ForeignBorn', 
                    'MalePopulation', 
                    'AverageHouseholdSize', 
                    'AmericanIndianAndAlaskaNative', 
                    'Asian', 
                    'BlackOrAfricanAmerican', 
                    'HispanicOrLatino', 
                    'White'
                ]

#### Fill the null values with 0

In [19]:
demographics = demographics.fillna(0, numeric_cols)

In [20]:
demographics.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- StateCode: string (nullable = true)
 |-- TotalPopulation: integer (nullable = true)
 |-- FemalePopulation: integer (nullable = true)
 |-- MedianAge: double (nullable = false)
 |-- NumberVeterans: integer (nullable = true)
 |-- ForeignBorn: integer (nullable = true)
 |-- MalePopulation: integer (nullable = true)
 |-- AverageHouseholdSize: double (nullable = false)
 |-- AmericanIndianAndAlaskaNative: long (nullable = true)
 |-- Asian: long (nullable = true)
 |-- BlackOrAfricanAmerican: long (nullable = true)
 |-- HispanicOrLatino: long (nullable = true)
 |-- White: long (nullable = true)



### Now write (and overwrite) transformed `demographics` dataset onto parquet file

In [21]:
demographics.write.mode('overwrite').parquet("us_cities_demographics.parquet")

In [None]:
# Read i94 immigration dataset

immigration=spark.read.parquet("sas_data")

In [None]:
# Fill the null values with 0
demographics = demographics.fillna(0, numeric_cols)