# Project Title
### Data Engineering Capstone Project

#### Project Summary
The project will involve exploring and assessment of data from different data-sets, and write to file storages for later processing based on business demands


#### Scope 
The project will address - cleansing, processing of data from multiple data sources (mainly file-systems) and finally storing the cleansed data in file-storage (csv, parquet, json etc.). Finally we will perform minimilastic code quality checks on the dataframes.

#### Data Sets used:
The Data Sets used to complete this project:

* I94 Immigration Data : Comes from the U.S. National Tourism and Trade Office and contains various statistics on international visitor arrival in USA and comes from the US National Tourism and Trade Office.
* World Temperature Data: Comes from Kaggle and contains average weather temperatures by city.
* U.S. City Demographic Data: Comes from OpenSoft and contains information about the demographics of all US cities such as average age, male and female population
* Airport codes and related cities : Comes from https://datahub.io/core/airport-codes#data. Airport codes data contains information about different airports around the world.
* Climate Change: Earth Surface Temperature Data : Comes from Kaggle. Contains temperature for earth surface temperature data.

In [1]:
# Do all imports and installs here
import pandas as pd
import os, re
import configparser
from datetime import timedelta, datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, when, lower, isnull, year, month, dayofmonth, hour, weekofyear, dayofweek, date_format, to_date
from pyspark.sql.types import StructField, StructType, IntegerType, DoubleType, LongType

In [2]:
# Create the spark context.
spark = SparkSession.builder.config("spark.jars.packages",
                                        "saurfang:spark-sas7bdat:2.0.0-s_2.11,org.apache.hadoop:hadoop-aws:2.7.0")\
    .enableHiveSupport().getOrCreate()

#### Data Exploration & Modeling for U.S. City Demographic Data 

#### Steps Involved:

* Data Read from CSV file
* Process the data (convert all columns from string to integer and double type) and perform aggregation
* Modify the column name and fill all null values with 0
* Finally, write to a parquet folder

In [3]:
# Read US Cities Demo dataset file.
demographics_data=spark.read.csv("./us-cities-demographics.csv", sep=';', header=True)
# Total number of records. (O/P: 2891)
demographics_data.count()

2891

In [4]:
demographics_data.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: string (nullable = true)
 |-- Male Population: string (nullable = true)
 |-- Female Population: string (nullable = true)
 |-- Total Population: string (nullable = true)
 |-- Number of Veterans: string (nullable = true)
 |-- Foreign-born: string (nullable = true)
 |-- Average Household Size: string (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: string (nullable = true)



In [5]:
def cast_column_type(df, cols):
    """
    Convert the types of the columns according to the configuration supplied in the cols dictionary in the format {"column_name": type}
    Args:
    df : Spark dataframe to be processed. 
    cols (:obj:`dict`): Dictionary in the format of {"column_name": type} indicating what columns and types they should be converted to
    """
    for k, v in cols.items():
        if k in df.columns:
            df = df.withColumn(k, df[k].cast(v))
    return df

In [6]:
# Convert numeric columns to the proper types: Integer and Double (needed for aggregation in next step)
int_cols = ['Count', 'Male Population', 'Female Population', 'Total Population', 'Number of Veterans', 'Foreign-born']
float_cols = ['Median Age', 'Average Household Size']
# dict(zip(int_cols, len(int_cols)*[IntegerType()])) ==> {Count=IntegerType(), Male Population=IntegerType()}
demographics_data = cast_column_type(demographics_data, dict(zip(int_cols, len(int_cols)*[IntegerType()])))
# dict(zip(float_cols, len(float_cols)*[DoubleType()])) ==> {Median Age = DoubleType()}
demographics_data = cast_column_type(demographics_data, dict(zip(float_cols, len(float_cols)*[DoubleType()])))

In [7]:
demographics_data.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: double (nullable = true)
 |-- Male Population: integer (nullable = true)
 |-- Female Population: integer (nullable = true)
 |-- Total Population: integer (nullable = true)
 |-- Number of Veterans: integer (nullable = true)
 |-- Foreign-born: integer (nullable = true)
 |-- Average Household Size: double (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: integer (nullable = true)



In [8]:
first_agg = {"Median Age": "first", "Male Population": "first", "Female Population": "first", 
            "Total Population": "first", "Number of Veterans": "first", "Foreign-born": "first", "Average Household Size": "first"}
# First aggregation - City, then State followed by State Code
agg_df = demographics_data.groupby(["City", "State", "State Code"]).agg(first_agg)
agg_df.head()

Row(City='Rockville', State='Maryland', State Code='MD', first(Total Population)=66998, first(Female Population)=35793, first(Median Age)=38.1, first(Number of Veterans)=1990, first(Foreign-born)=25047, first(Male Population)=31205, first(Average Household Size)=2.6)

In [9]:
# Rename all columns having space in between
demographics_column_renamed = agg_df.withColumnRenamed('State Code', 'StateCode')\
    .withColumnRenamed('State Code', 'StateCode')\
    .withColumnRenamed('first(Total Population)', 'TotalPopulation')\
    .withColumnRenamed('first(Female Population)', 'FemalePopulation')\
    .withColumnRenamed('first(Male Population)', 'MalePopulation')\
    .withColumnRenamed('first(Median Age)', 'MedianAge')\
    .withColumnRenamed('first(Number of Veterans)', 'NumberVeterans')\
    .withColumnRenamed('first(Foreign-born)', 'ForeignBorn')\
    .withColumnRenamed('first(Average Household Size)', 'AverageHouseholdSize')\
# Print out the schema.
demographics_column_renamed.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- StateCode: string (nullable = true)
 |-- TotalPopulation: integer (nullable = true)
 |-- FemalePopulation: integer (nullable = true)
 |-- MedianAge: double (nullable = true)
 |-- NumberVeterans: integer (nullable = true)
 |-- ForeignBorn: integer (nullable = true)
 |-- MalePopulation: integer (nullable = true)
 |-- AverageHouseholdSize: double (nullable = true)



In [10]:
# The numeric columns list who will be replaced with 0 when the value is null.
numeric_cols = ['TotalPopulation', 'FemalePopulation', 'MedianAge', 'NumberVeterans', 'ForeignBorn', 'MalePopulation',  'AverageHouseholdSize']

# Fill the null values with 0
demographics_column_renamed = demographics_column_renamed.fillna(0, numeric_cols)
demographics_column_renamed.show(3)

+------------+----------+---------+---------------+----------------+---------+--------------+-----------+--------------+--------------------+
|        City|     State|StateCode|TotalPopulation|FemalePopulation|MedianAge|NumberVeterans|ForeignBorn|MalePopulation|AverageHouseholdSize|
+------------+----------+---------+---------------+----------------+---------+--------------+-----------+--------------+--------------------+
|   Rockville|  Maryland|       MD|          66998|           35793|     38.1|          1990|      25047|         31205|                 2.6|
|Delray Beach|   Florida|       FL|          66261|           34042|     47.9|          4232|      16639|         32219|                2.35|
| Jersey City|New Jersey|       NJ|         264277|          132512|     34.3|          4374|     109186|        131765|                2.57|
+------------+----------+---------+---------------+----------------+---------+--------------+-----------+--------------+--------------------+
only s

In [11]:
# Now write (and overwrite) transformed 'demographics' dataset onto parquet file
demographics_column_renamed.write.mode('overwrite').parquet("us_cities_demographics")

#### Data Exploration & Modeling for I94 Immigration Data 

#### Steps Involved:

* Data Read from sas_data folder (comprising of I94 Immigration parquet data in chunks)
* Process the data (convert necessary columns from string to integer, double type and date type), needed downstream for date calculation  
* Remove unwanted columns.
* Finally, write to a parquet folder immigration

In [12]:
# Read i94 immigration dataset
immigration=spark.read.parquet("sas_data")
immigration.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [13]:
int_cols = ['cicid', 'i94yr', 'i94mon', 'i94cit', 'i94res', 
        'arrdate', 'i94mode', 'i94bir', 'i94visa', 'count', 'biryear', 'dtadfile', 'depdate']
date_cols = ['arrdate', 'depdate']
unused_cols = ["entdepa", "entdepd", "matflag", "dtaddto", "admnum"]

In [14]:
# Convert columns read as string/double to integer, same as we didi above
immigration = cast_column_type(immigration, dict(zip(int_cols, len(int_cols)*[IntegerType()])))

In [15]:
# The date format string preferred to our work here: YYYY-MM-DD
# Considering 1970 as it is a start date foe EPOCH calendar
date_format = "%Y-%m-%d"
convert_date_udf = udf (lambda x : x if x is None else (timedelta(days=x) + datetime(1970, 1, 1)).strftime(date_format))

In [16]:
def convert_date(df, cols):
    """
    Convert dates in the SAS datatype to a date in a string format YYYY-MM-DD
    
    Args:
        df (:obj:`SparkDataFrame`): Spark dataframe to be processed. 
            Represents the entry point to programming Spark with the Dataset and DataFrame API.
        cols (:obj:`list`): List of columns in the SAS date format to be convert
    """
    for c in cols :
        if c in df.columns :
            df = df.withColumn(c, convert_date_udf(df[c]))
    return df

In [17]:
# Convert SAS date to a meaningful string date in the format of YYYY-MM-DD
immigration = convert_date(immigration, date_cols)
immigration = immigration.drop(*unused_cols)
immigration.head()

Row(cicid=5748517, i94yr=2016, i94mon=4, i94cit=245, i94res=438, i94port='LOS', arrdate='2026-05-01', i94mode=1, i94addr='CA', depdate='2026-05-09', i94bir=40, i94visa=1, count=1, dtadfile=20160430, visapost='SYD', occup=None, entdepu=None, biryear=1976, gender='F', insnum=None, airline='QF', fltno='00011', visatype='B1')

In [18]:
immigration.write.mode("overwrite").parquet('immigration')

#### Data Exploration & Modeling for I94 Immigration Data (I94_SAS_Labels_Descriptions.SAS file)

#### Steps Involved:

* Data Read from I94_SAS_Labels_Descriptions file
* Process the data and prepare the data (below cell)
* Parse based on the country, port, mode, address and visa-type

In [19]:
def sas_file_value_parser(sas_source_file, value, column):
    """Parses SAS Program file to return value as pandas dataframe
    Args:
        sas_source_file (str): SAS source code file.
        value (str): sas value to extract.
        column (list): list of 2 containing column names.
    Return:
        None
    """
    file_string = ''
    with open(sas_source_file) as f:
        #read every line from the file and store in file_string
        file_string = f.read()
        
    # value can be: i94cntyl, i94prtl, etc.  
    # Read from the sas - "value" pattern till ";" (Considering a block of data)
    file_string = file_string[ file_string.index(value) : ]
    file_string = file_string[ : file_string.index(';') ]

    # [1:] as every line has a leading space.
    lines = file_string.split('\n')[1:]
    codes = []
    values = []
    
    for line in lines :
        #582 =  'MEXICO Air Sea, and Not Reported (I-94, no land arrivals)'
        if '=' in line :
            code, val = line.split('=')
            code = code.strip()
            val = val.strip()
            # Ignore the first "'"            
            if code [0] == "'" :
                code = code[1:-1]
            if val[0] == "'":
                val = val[1:-1]
            codes.append(code)
            values.append(val)
            
    return pd.DataFrame(list(zip(codes, values)), columns=column)


In [20]:
# Parse based on city/country codes.
i94cit_res_df = sas_file_value_parser('I94_SAS_Labels_Descriptions.SAS', 'i94cntyl', ['code', 'country'])
i94cit_res_df.head()

Unnamed: 0,code,country
0,582,"MEXICO Air Sea, and Not Reported (I-94, no lan..."
1,236,AFGHANISTAN
2,101,ALBANIA
3,316,ALGERIA
4,102,ANDORRA


In [21]:
# Parse based on port codes.
i94port_df = sas_file_value_parser('I94_SAS_Labels_Descriptions.SAS', 'i94prtl', ['code', 'port'])
i94port_df.head()

Unnamed: 0,code,port
0,ALC,"ALCAN, AK"
1,ANC,"ANCHORAGE, AK"
2,BAR,"BAKER AAF - BAKER ISLAND, AK"
3,DAC,"DALTONS CACHE, AK"
4,PIZ,"DEW STATION PT LAY DEW, AK"


In [22]:
# Parse based on modes.
i94mode_df = sas_file_value_parser('I94_SAS_Labels_Descriptions.SAS', 'i94model', ['code', 'mode'])
i94mode_df.head()

Unnamed: 0,code,mode
0,1,Air
1,2,Sea
2,3,Land
3,9,Not reported


In [23]:
# Parse based on addresses.
i94addr_df = sas_file_value_parser('I94_SAS_Labels_Descriptions.SAS', 'i94addrl', ['code', 'address'])
i94addr_df.head()

Unnamed: 0,code,address
0,AL,ALABAMA
1,AK,ALASKA
2,AZ,ARIZONA
3,AR,ARKANSAS
4,CA,CALIFORNIA


In [24]:
# Parse based on visas.
i94visa_df = sas_file_value_parser('I94_SAS_Labels_Descriptions.SAS', 'I94VISA', ['code', 'visa-type'])
i94visa_df.head()

Unnamed: 0,code,visa-type
0,1,Business
1,2,Pleasure
2,3,Student


#### Data Exploration & Modeling for I94 Immigration Data 

#### Steps Involved:

* Data Read from sas_data folder (comprising of I94 Immigration parquet data in chunks)
* Process the data only select necessary columns needed for calculation
* Remove duplicates.
* Extract month, year, day of month, week, day of year and store in a temporary view
* Query the view and create a new column "Quarter", populate the column based on the month condition
* Finally write to a parquet file, with partition arrival_year and arrival_month

In [25]:
# Read i94 immigration dataset to create Date Frame
i94_spark = spark.read.parquet("sas_data")
i94_spark.head()

Row(cicid=5748517.0, i94yr=2016.0, i94mon=4.0, i94cit=245.0, i94res=438.0, i94port='LOS', arrdate=20574.0, i94mode=1.0, i94addr='CA', depdate=20582.0, i94bir=40.0, i94visa=1.0, count=1.0, dtadfile='20160430', visapost='SYD', occup=None, entdepa='G', entdepd='O', entdepu=None, matflag='M', biryear=1976.0, dtaddto='10292016', gender='F', insnum=None, airline='QF', admnum=94953870030.0, fltno='00011', visatype='B1')

In [26]:
i94_spark=i94_spark.select(col("i94res").cast(IntegerType()),col("i94port"),
                           col("arrdate").cast(IntegerType()), \
                           col("i94mode").cast(IntegerType()),col("depdate").cast(IntegerType()),
                           col("i94bir").cast(IntegerType()),col("i94visa").cast(IntegerType()), 
                           col("count").cast(IntegerType()), \
                              "gender",col("admnum").cast(LongType()))
i94_spark.show(3)

+------+-------+-------+-------+-------+------+-------+-----+------+-----------+
|i94res|i94port|arrdate|i94mode|depdate|i94bir|i94visa|count|gender|     admnum|
+------+-------+-------+-------+-------+------+-------+-----+------+-----------+
|   438|    LOS|  20574|      1|  20582|    40|      1|    1|     F|94953870030|
|   438|    LOS|  20574|      1|  20591|    32|      1|    1|     F|94955622830|
|   438|    LOS|  20574|      1|  20582|    29|      1|    1|     M|94956406530|
+------+-------+-------+-------+-------+------+-------+-----+------+-----------+
only showing top 3 rows



In [27]:
# We will drop duplicate rows and save it as final dataset for i94
i94_spark_unique=i94_spark.dropDuplicates()
i94_spark_unique.show(1)

+------+-------+-------+-------+-------+------+-------+-----+------+-----------+
|i94res|i94port|arrdate|i94mode|depdate|i94bir|i94visa|count|gender|     admnum|
+------+-------+-------+-------+-------+------+-------+-----+------+-----------+
|   582|    XXX|  20557|   null|  20558|    34|      2|    1|  null|91904214530|
+------+-------+-------+-------+-------+------+-------+-----+------+-----------+
only showing top 1 row



In [28]:
import datetime as dt
# Convert SAS arrival date to datetime format
convert_to_date = udf(lambda x: (dt.datetime(1970, 1, 1).date() + dt.timedelta(x)).isoformat() if x else None)
i94_with_arrival_date = i94_spark_unique.withColumn("arrival_date", convert_to_date(i94_spark_unique.arrdate))
i94_with_arrival_date.show(3)

+------+-------+-------+-------+-------+------+-------+-----+------+-----------+------------+
|i94res|i94port|arrdate|i94mode|depdate|i94bir|i94visa|count|gender|     admnum|arrival_date|
+------+-------+-------+-------+-------+------+-------+-----+------+-----------+------------+
|   582|    XXX|  20557|   null|  20558|    34|      2|    1|  null|91904214530|  2026-04-14|
|   209|    AGA|  20552|      1|   null|  null|      2|    1|     M|47842155333|  2026-04-09|
|   209|    ATL|  20571|      1|   null|  null|      2|    1|     M|44537883633|  2026-04-28|
+------+-------+-------+-------+-------+------+-------+-----+------+-----------+------------+
only showing top 3 rows



In [29]:
i94_with_arrival_date.printSchema()

root
 |-- i94res: integer (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: integer (nullable = true)
 |-- i94mode: integer (nullable = true)
 |-- depdate: integer (nullable = true)
 |-- i94bir: integer (nullable = true)
 |-- i94visa: integer (nullable = true)
 |-- count: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- admnum: long (nullable = true)
 |-- arrival_date: string (nullable = true)



In [30]:
from pyspark.sql import functions as F
i94date= i94_with_arrival_date.withColumn('arrival_date_in_Date',F.to_date(i94_with_arrival_date.arrival_date))

In [31]:
i94date.printSchema()

root
 |-- i94res: integer (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: integer (nullable = true)
 |-- i94mode: integer (nullable = true)
 |-- depdate: integer (nullable = true)
 |-- i94bir: integer (nullable = true)
 |-- i94visa: integer (nullable = true)
 |-- count: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- admnum: long (nullable = true)
 |-- arrival_date: string (nullable = true)
 |-- arrival_date_in_Date: date (nullable = true)



In [32]:
# Extract all month, year, day, week related information from the arrival_date.
i94date = i94date.withColumn('arrival_month',month(i94date.arrival_date_in_Date))
i94date = i94date.withColumn('arrival_year',year(i94date.arrival_date_in_Date))
i94date = i94date.withColumn('arrival_day',dayofmonth(i94date.arrival_date_in_Date))
i94date = i94date.withColumn('day_of_week',dayofweek(i94date.arrival_date_in_Date))
i94date = i94date.withColumn('arrival_weekofyear',weekofyear(i94date.arrival_date_in_Date))
i94date.printSchema()

root
 |-- i94res: integer (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: integer (nullable = true)
 |-- i94mode: integer (nullable = true)
 |-- depdate: integer (nullable = true)
 |-- i94bir: integer (nullable = true)
 |-- i94visa: integer (nullable = true)
 |-- count: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- admnum: long (nullable = true)
 |-- arrival_date: string (nullable = true)
 |-- arrival_date_in_Date: date (nullable = true)
 |-- arrival_month: integer (nullable = true)
 |-- arrival_year: integer (nullable = true)
 |-- arrival_day: integer (nullable = true)
 |-- day_of_week: integer (nullable = true)
 |-- arrival_weekofyear: integer (nullable = true)



In [33]:
# Drop duplicates.
i94date_filtered=i94date.select(col('arrdate').alias('arrival_sasdate'),col('arrival_date_in_Date').alias('arrival_iso_date'),'arrival_month','day_of_week','arrival_year','arrival_day','arrival_weekofyear').dropDuplicates()
i94date_filtered.printSchema()

root
 |-- arrival_sasdate: integer (nullable = true)
 |-- arrival_iso_date: date (nullable = true)
 |-- arrival_month: integer (nullable = true)
 |-- day_of_week: integer (nullable = true)
 |-- arrival_year: integer (nullable = true)
 |-- arrival_day: integer (nullable = true)
 |-- arrival_weekofyear: integer (nullable = true)



In [69]:
# Create temporary sql table
i94date_filtered.createOrReplaceTempView("i94date_table")

In [70]:
# Add quarters to i94 date dimension table
i94date_quarter=spark.sql('''select arrival_sasdate,
                         arrival_iso_date,
                         arrival_month,
                         day_of_week,
                         arrival_year,
                         arrival_day,
                         arrival_weekofyear,
                         CASE WHEN arrival_month IN (1, 2, 3) THEN '1st Quarter ' 
                                WHEN arrival_month IN (4, 5, 6) THEN '2nd Quarter' 
                                WHEN arrival_month IN (7, 8, 9) THEN '3rd Quarter' 
                                ELSE '4th Quarter' 
                         END AS Quarter from i94date_table''')
i94date_quarter.show(3)

+---------------+----------------+-------------+-----------+------------+-----------+------------------+-----------+
|arrival_sasdate|arrival_iso_date|arrival_month|day_of_week|arrival_year|arrival_day|arrival_weekofyear|    Quarter|
+---------------+----------------+-------------+-----------+------------+-----------+------------------+-----------+
|          20567|      2026-04-24|            4|          6|        2026|         24|                17|2nd Quarter|
|          20551|      2026-04-08|            4|          4|        2026|          8|                15|2nd Quarter|
|          20563|      2026-04-20|            4|          2|        2026|         20|                17|2nd Quarter|
+---------------+----------------+-------------+-----------+------------+-----------+------------------+-----------+
only showing top 3 rows



In [36]:
# Save i94date dimension to parquet file partitioned by year and month:
i94date_quarter.write.mode("overwrite").partitionBy("arrival_year", "arrival_month").parquet('i94date_quarter')

#### Data Exploration & Modeling for Global Temperature by City (GlobalLandTemperaturesByCity.csv file)

#### Steps Involved:

* Data Read from GlobalLandTemperaturesByCity file
* Process the data and only keep data related to United States
* Store in a CSV file finally GlobalLandTempOfUS.csv

In [37]:
# Read another datset GlobalLandTemperaturesByCity.csv
global_temp_csv = '../../data2/GlobalLandTemperaturesByCity.csv'
df_temp = pd.read_csv(global_temp_csv)
df_temp.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [38]:
# Show temperature details for country = United states.
df_temp_us = df_temp[df_temp["Country"] == "United States"]
df_temp_us = df_temp_us.rename(columns={'dt': 'Date'})
print(df_temp_us.info())

<class 'pandas.core.frame.DataFrame'>
Int64Index: 687289 entries, 47555 to 8439246
Data columns (total 7 columns):
Date                             687289 non-null object
AverageTemperature               661524 non-null float64
AverageTemperatureUncertainty    661524 non-null float64
City                             687289 non-null object
Country                          687289 non-null object
Latitude                         687289 non-null object
Longitude                        687289 non-null object
dtypes: float64(2), object(5)
memory usage: 41.9+ MB
None


In [39]:
df_temp_us.head(5)

Unnamed: 0,Date,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
47555,1820-01-01,2.101,3.217,Abilene,United States,32.95N,100.53W
47556,1820-02-01,6.926,2.853,Abilene,United States,32.95N,100.53W
47557,1820-03-01,10.767,2.395,Abilene,United States,32.95N,100.53W
47558,1820-04-01,17.989,2.202,Abilene,United States,32.95N,100.53W
47559,1820-05-01,21.809,2.036,Abilene,United States,32.95N,100.53W


In [40]:
#Write the filtered data to a csv file for future use
df_temp_us.to_csv('GlobalLandTempOfUS.csv')

#### Data Exploration & Modeling for Aiport Codes

#### Steps Involved:

* Data Read from Aiport codes csv file
* Process the data split the coordinates column to 2 sperate columns, latitude and longitude
* Store in a CSV file finally filtered_location_data.csv with necessary filtered columns

In [41]:
airport_code_data_spark =spark.read.format('csv').options(header='true').load('airport-codes_csv.csv')
airport_code_data_spark.printSchema()

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)



In [42]:
# Split the co-ordinates column based on a delimter ","
from pyspark.sql import functions as F
split_coordinates_column = F.split(airport_code_data_spark['coordinates'], ',')
airport_code_data_spark = airport_code_data_spark.withColumn('latitude', split_coordinates_column[0])
airport_code_data_spark = airport_code_data_spark.withColumn('longitude', split_coordinates_column[1])
airport_code_data_spark.printSchema()

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)



In [43]:
airport_code_data_spark.createOrReplaceTempView("airport_code_data_table")

In [44]:
# Selective column querying related to location.
filtered_location_data = spark.sql('''select type,
                         name,
                         iso_country,
                         iso_region,
                         latitude,
                         longitude,
                         gps_code from airport_code_data_table''')
filtered_location_data.show(3)

+-------------+--------------------+-----------+----------+------------------+---------------+--------+
|         type|                name|iso_country|iso_region|          latitude|      longitude|gps_code|
+-------------+--------------------+-----------+----------+------------------+---------------+--------+
|     heliport|   Total Rf Heliport|         US|     US-PA|-74.93360137939453| 40.07080078125|     00A|
|small_airport|Aero B Ranch Airport|         US|     US-KS|       -101.473911|      38.704022|    00AA|
|small_airport|        Lowell Field|         US|     US-AK|    -151.695999146|    59.94919968|    00AK|
+-------------+--------------------+-----------+----------+------------------+---------------+--------+
only showing top 3 rows



In [45]:
# Partition by type and country.
filtered_location_data.write.mode("overwrite").partitionBy("type", "iso_country").csv('filtered_location_data')

#### Data Quality Checks for all pre-processed data.

#### Steps Involved:

* Create a spark temp view based on the individual data frame object
* Query the view to check if proper count is returned.
* Query the view to check if any null value is being present in any calculated column.

In [46]:
demographics_column_renamed.createOrReplaceTempView("demographics_column_renamed_view")
demographics_column_renamed_check1 = spark.sql("""
    SELECT  COUNT(*)
    FROM demographics_column_renamed_view
""")
demographics_column_renamed_check1.show(1)

+--------+
|count(1)|
+--------+
|     596|
+--------+



In [47]:
# Both parquet file and the in-memory view shall return equal counts = 596
spark.read.parquet("us_cities_demographics").count()

596

In [50]:
# Check if any null or empty string value is present in MalePopulation column
demographics_column_renamed_check2 = spark.sql("""
    SELECT  COUNT(*)
    FROM demographics_column_renamed_view
    WHERE   MalePopulation IS NULL OR MalePopulation == ""
""")
demographics_column_renamed_check2.show(1)

+--------+
|count(1)|
+--------+
|       0|
+--------+



In [51]:
# Check count of immigration view
immigration.createOrReplaceTempView("immigration_view")
immigration_check1 = spark.sql("""
    SELECT  COUNT(*)
    FROM immigration_view
""")
immigration_check1.show(1)

+--------+
|count(1)|
+--------+
| 3096313|
+--------+



In [53]:
# Both parquet file and the in-memory view shall return equal counts = 309613
spark.read.parquet("immigration").count()

3096313

In [57]:
# Check if any null or empty string value is present in arrdate column
immigration_date_column_check2 = spark.sql("""
    SELECT  COUNT(*)
    FROM immigration_view
    WHERE arrdate IS NULL OR arrdate == "" 
""")
immigration_date_column_check2.show(1)

+--------+
|count(1)|
+--------+
|       0|
+--------+



In [73]:
# Check if there is any null value in the calculated column 'arrival_month'
i94date_quarter_check1=spark.sql('''
    SELECT  COUNT(*) from i94date_table
    WHERE arrival_month IS NULL or arrival_month == ""
''')
immigration_date_column_check2.show(1)

+--------+
|count(1)|
+--------+
|       0|
+--------+

