## ETL Notebook for Testing Data Pipelines

In [1]:
import configparser
import pandas as pd
from datetime import datetime
from pyspark.sql.functions import dayofweek
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, lit
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, to_date, upper
import logging
from pyspark.sql.types import DateType
from pyspark.sql.functions import monotonically_increasing_id

In [2]:
# CONFIG
config = configparser.ConfigParser()
config.read('dl.cfg')

KEY = config.get('AWS', 'AWS_ACCESS_KEY_ID')
SECRET = config.get('AWS', 'AWS_SECRET_ACCESS_KEY')
output_data = config.get('S3', 'DEST_S3_BUCKET')


os.environ['AWS_ACCESS_KEY_ID']=KEY
os.environ['AWS_SECRET_ACCESS_KEY']=SECRET

In [3]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.repositories", "https://repos.spark-packages.org/")\
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0,saurfang:spark-sas7bdat:2.0.0-s_2.11")\
        .enableHiveSupport().getOrCreate()
    return spark

In [5]:
spark = create_spark_session()
input_data ='./'
output_data = 's3a://gfp-udacity/testing'

In [6]:
def rename_columns(table, new_columns):
    for original, new in zip(table.columns, new_columns):
        table = table.withColumnRenamed(original, new)
    return table

In [7]:
def process_demography_data(spark, input_data, output_data):
    """ Process demograpy data to get dim_demog_population 
     and d_demog_statistics table
        Arguments:
            spark {object}: SparkSession object
            input_data {object}: Source S3 endpoint
            output_data {object}: Target S3 endpoint
        Returns:
            None
    """

    logging.info("Start processing d_demog_statistics")
    # read demography data file
    demog_data = os.path.join(input_data + 'us-cities-demographics.csv')
    df = spark.read.format('csv').options(header=True, delimiter=';').load(demog_data)


    d_demog_statistics = df.select(['City', 'State', 'Male Population', 'Female Population', \
                              'Number of Veterans', 'Foreign-born', 'Race']).distinct() \
                              .withColumn("demog_pop_id", monotonically_increasing_id())


    new_columns = ['city', 'state', 'male_population', 'female_population', \
                   'num_vetarans', 'foreign_born', 'race']
    d_demog_statistics = rename_columns(d_demog_statistics, new_columns)

    # write dim_demog_population table to parquet files
    d_demog_statistics.write.mode("overwrite")\
                        .parquet(path=output_data + 'd_demog_statistics')

    
    logging.info("Start processing d_demog_statistics")
    d_demog_statistics = df.select(['City', 'State', 'Median Age', 'Average Household Size'])\
                             .distinct()\
                             .withColumn("d_demog_statistics", monotonically_increasing_id())

    new_columns = ['city', 'state', 'median_age', 'avg_household_size']
    d_demog_statistics = rename_columns(d_demog_statistics, new_columns)
    d_demog_statistics = d_demog_statistics.withColumn('city', upper(col('city')))
    d_demog_statistics = d_demog_statistics.withColumn('state', upper(col('state')))

    # write dim_demog_statistics table to parquet files
    d_demog_statistics.write.mode("overwrite")\
                        .parquet(path=output_data)

In [None]:
process_demography_data(spark, input_data, output_data)