# Project Title
### Data Engineering Capstone Project

#### Project Summary

This project aims to provide immigration analyst a global view of all the data related to immigration. This is the combination of the data of immigration to the United States, airport codes, U.S. City demographics and global city temperature.

All this data will be used by the analysts to better understand the immigration, and be used mainly for analytics. 
For example, analysts will be able to determine which origin country has more immigrants in the US, how long do they stay, how temperature impacts their choice of cities, etc.


The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [3]:
# Do all imports and installs here
import pandas as pd
import os
import configparser
import datetime as dt
import time
from pyspark.sql.types import DoubleType, StringType, IntegerType, FloatType,StructType, StructField
from datetime import datetime, timedelta
from pyspark.sql import types as T
from pyspark.sql.functions import isnan, when, mean, round, count, col, udf, dayofmonth, dayofweek, month, year, weekofyear, avg, monotonically_increasing_id
from pyspark.sql.types import *
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.functions import year, month, dayofmonth, weekofyear, date_format
from pyspark.sql import SparkSession, SQLContext, GroupedData, HiveContext
from pyspark.sql.functions import *
from pyspark.sql.functions import date_add as d_add
from pyspark.sql import functions 
from pyspark.sql.functions import lit
from pyspark.sql import Row


#### Defining helper functions as asked by the reviewer

In [4]:
def convert_datetime(x):
        try:
            start = datetime(1960, 1, 1)
            return start + timedelta(days=int(x))
        except:
            return None

In [5]:
def create_temperature(df, spark):
    """
        Create temperature table
        
        :param df: dataframe of input data.
        :param spark: spark session.
        :return: temperature dataframe
    """    

    temperature_schema = StructType([StructField("dt", StringType(), True)\
                              ,StructField("average_temperature", FloatType(), True)\
                              ,StructField("average_temperature_uncertainty", FloatType(), True)\
                              ,StructField("city", StringType(), True)\
                              ,StructField("country", StringType(), True)\
                              ,StructField("latitude", StringType(), True)\
                              ,StructField("longitude", StringType(), True)])

    df.rename(columns={'AverageTemperature':'average_temperature'}, inplace=True)
    df.rename(columns={'AverageTemperatureUncertainty':'average_temperature_uncertainty'}, inplace=True)
    df.rename(columns={'City':'city'}, inplace=True)
    df.rename(columns={'Country':'country'}, inplace=True)
    df.rename(columns={'Latitude':'latitude'}, inplace=True)
    df.rename(columns={'Longitude':'longitude'}, inplace=True)

    temperature_df = spark.createDataFrame(df, schema=temperature_schema)
    
    return temperature_df

In [6]:
def create_demographics(df):
    """
        create demographics table
        
        :param df: dataframe of input data.
        :return: dataframe of demographics
    """ 
    demographics_schema = StructType([StructField("City", StringType(), True)\
                        ,StructField("State", StringType(), True)\
                        ,StructField("Median Age", FloatType(), True)\
                        ,StructField("Male Population", FloatType(), True)\
                        ,StructField("Female Population", FloatType(), True)\
                        ,StructField("Total Population", IntegerType(), True)\
                        ,StructField("Number of Veterans", FloatType(), True)\
                        ,StructField("Foreign-born", FloatType(), True)\
                        ,StructField("Average Household Size", FloatType(), True)\
                        ,StructField("State Code", StringType(), True)\
                        ,StructField("Race", StringType(), True)\
                        ,StructField("Count", IntegerType(), True)])

    demo_df = spark.createDataFrame(df, schema=demographics_schema)
    
    return demo_df

In [7]:
def create_airport(df):
    """
            Create airport table
            
        :param df: dataframe of input data.
        :return: dataframe airport
    """
    airport_codes_schema =  StructType([StructField("ident", StringType(), True)\
                        ,StructField("type", StringType(), True)\
                        ,StructField("name", StringType(), True)\
                        ,StructField("elevation_ft", FloatType(), True)\
                        ,StructField("continent", StringType(), True)\
                        ,StructField("iso_country", StringType(), True)\
                        ,StructField("iso_region", StringType(), True)\
                        ,StructField("municipality", StringType(), True)\
                        ,StructField("gps_code", StringType(), True)\
                        ,StructField("iata_code", StringType(), True)\
                        ,StructField("local_code", StringType(), True)\
                        ,StructField("coordinates", StringType(), True)])
    airport_codes = spark.createDataFrame(df, schema=airport_codes_schema)
    return airport_codes

In [8]:
def create_time(df):
    """
        Create time table
        
        :param input_df: dataframe of input data.
        :return: dataframe time
    """
    udf_datetime = udf(lambda x: convert_datetime(x), T.DateType())


    time = df.select(["arrdate"])\
                .withColumn("arrival_date", udf_datetime("arrdate")) \
                .withColumn('day', functions.dayofmonth('arrival_date')) \
                .withColumn('month', functions.month('arrival_date')) \
                .withColumn('year', functions.year('arrival_date')) \
                .withColumn('week', functions.weekofyear('arrival_date')) \
                .withColumn('weekday', functions.dayofweek('arrival_date'))\
                .select(["arrdate", "arrival_date", "day", "month", "year", "week", "weekday"])
    return time

In [9]:
def create_status(df):
    """
        Create status table
        
        :param input_df: dataframe of input data.
        :return: dataframe status
    """


    status = df.withColumn("status_flag_id", monotonically_increasing_id()) \
                .select(["status_flag_id", "entdepa", "entdepd", "matflag"]) \
                .withColumnRenamed("entdepa", "arrival_flag")\
                .withColumnRenamed("entdepd", "departure_flag")\
                .withColumnRenamed("matflag", "match_flag")\
                .dropDuplicates(["arrival_flag", "departure_flag", "match_flag"])
    return status

In [10]:
def create_visas(df):
    """
        Create visa table
        
        :param df: dataframe of input data.
        :return: dataframe visa
    """
    
    
    visas = df.withColumn("visa_id", monotonically_increasing_id()) \
                .select(["visa_id","i94visa", "visatype", "visapost"]) \
                .dropDuplicates(["i94visa", "visatype", "visapost"])
    
    return visas

In [11]:
def create_state(df):
    """
        Create state table
        
        :param df: dataframe of input data
        :return: dataframe visa
    
    """
    state = df.select(["State Code", "State", "Median Age", "Male Population", "Female Population", "Total Population", "Average Household Size",\
                          "Foreign-born", "Race", "Count"])\
                .withColumnRenamed("State","state")\
                .withColumnRenamed("State Code", "state_code")\
                .withColumnRenamed("Median Age", "median_age")\
                .withColumnRenamed("Male Population", "male_population")\
                .withColumnRenamed("Female Population", "female_population")\
                .withColumnRenamed("Total Population", "total_population")\
                .withColumnRenamed("Average Household Size", "avg_household_size")\
                .withColumnRenamed("Foreign-born", "foreign_born")\
                .withColumnRenamed("Race", "race")\
                .withColumnRenamed("Count", "count")
    state = state.groupBy("state_code","state").agg(\
                round(mean('median_age'),0).alias("median_age"),\
                sum("total_population").alias("total_population"),\
                sum("male_population").alias("male_population"),\
                sum("female_population").alias("female_population"),\
                sum("foreign_born").alias("foreign_born"), \
                sum("avg_household_size").alias("average_household_size")
                ).dropna()
    
    return state

### Step 1: Scope the Project and Gather Data

#### Scope 
This project will be based on Spark and S3. I will use all the datasets provided by udacity.

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

I used the data provided by Udacity to finalize this capstone project. 

We will see now how this looks like. 

In [12]:
# Read in the data here

#### Immigration data

In [13]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

df_spark = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')


In [14]:
#write to parquet

df_spark.write.parquet("sas_data")
df_spark=spark.read.parquet("sas_data")

In [15]:
# check the data is well loaded 
df_spark.head(5)

[Row(cicid=5748517.0, i94yr=2016.0, i94mon=4.0, i94cit=245.0, i94res=438.0, i94port='LOS', arrdate=20574.0, i94mode=1.0, i94addr='CA', depdate=20582.0, i94bir=40.0, i94visa=1.0, count=1.0, dtadfile='20160430', visapost='SYD', occup=None, entdepa='G', entdepd='O', entdepu=None, matflag='M', biryear=1976.0, dtaddto='10292016', gender='F', insnum=None, airline='QF', admnum=94953870030.0, fltno='00011', visatype='B1'),
 Row(cicid=5748518.0, i94yr=2016.0, i94mon=4.0, i94cit=245.0, i94res=438.0, i94port='LOS', arrdate=20574.0, i94mode=1.0, i94addr='NV', depdate=20591.0, i94bir=32.0, i94visa=1.0, count=1.0, dtadfile='20160430', visapost='SYD', occup=None, entdepa='G', entdepd='O', entdepu=None, matflag='M', biryear=1984.0, dtaddto='10292016', gender='F', insnum=None, airline='VA', admnum=94955622830.0, fltno='00007', visatype='B1'),
 Row(cicid=5748519.0, i94yr=2016.0, i94mon=4.0, i94cit=245.0, i94res=438.0, i94port='LOS', arrdate=20574.0, i94mode=1.0, i94addr='WA', depdate=20582.0, i94bir=29.

In [16]:
# are all the columns here?
df_spark.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

#### Temperature data

In [17]:
# read temperature file 

file_name = '../../data2/GlobalLandTemperaturesByCity.csv'
temperature_df = pd.read_csv(file_name)

In [18]:
# are all the columns here?

temperature_df.dtypes

dt                                object
AverageTemperature               float64
AverageTemperatureUncertainty    float64
City                              object
Country                           object
Latitude                          object
Longitude                         object
dtype: object

In [19]:
temperature_df.head(5)

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


#### Airport data

In [20]:
airport_codes_csv = 'airport-codes_csv.csv'
airport_codes_df = pd.read_csv(airport_codes_csv)
airport_codes_df.shape

(55075, 12)

In [21]:
airport_codes_df.head(5)

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


#### US Cities demographics data

In [22]:
us_cities_csv = "us-cities-demographics.csv"
demo_df = pd.read_csv(us_cities_csv, delimiter=';')
demo_df.head(5)

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [23]:
with open("I94_SAS_Labels_Descriptions.SAS") as f:
    contents = f.readlines()
    country_code = {}
    for countries in contents[10:298]:
        pair = countries.split('=')
        code,country = pair[0].strip(), pair[1].strip().strip("'")
        country_code[code] = country
country_code_df = pd.DataFrame(list(country_code.items()),columns=['code','country'])
country_code_df.head(5)

Unnamed: 0,code,country
0,236,AFGHANISTAN
1,101,ALBANIA
2,316,ALGERIA
3,102,ANDORRA
4,324,ANGOLA


### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

In the immigration table, the column "insnum" and "occup" are almost always empty. 
In some rows, the arrival date and departure dates can be empty, so we should remove them as we can't do anything with this data. Also, there are some duplicates, so we should remove them.

In [24]:
df_spark = df_spark.drop("insnum","occup")
df_spark = df_spark.where(df_spark.arrdate.isNotNull())
df_spark = df_spark.where(df_spark.depdate.isNotNull())
df_spark = df_spark.drop_duplicates()
df_spark.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = true)
 |-- fltno: string (nullable = true)
 |-- visatype: string (nullable 

For the temperature data, we drop the emply rows, the duplicates and we just analyze twenty years (this is from the Knowledge hub, great idea)

In [25]:
temperature_df = temperature_df.dropna()
temperature_df = temperature_df.drop_duplicates()

begin = "2000-01-01"
end = "2019-01-01"
begin_time = temperature_df["dt"] >= begin
ending_time = temperature_df["dt"] < end


temperature_df = temperature_df.loc[begin_time & ending_time]
temperature_df.shape

temperature_df.head()


Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
3074,2000-01-01,3.065,0.372,Århus,Denmark,57.05N,10.33E
3075,2000-02-01,3.724,0.241,Århus,Denmark,57.05N,10.33E
3076,2000-03-01,3.976,0.296,Århus,Denmark,57.05N,10.33E
3077,2000-04-01,8.321,0.221,Århus,Denmark,57.05N,10.33E
3078,2000-05-01,13.567,0.253,Århus,Denmark,57.05N,10.33E


For the airport codes, we drop all the raws where the iata code and the local code are not null, then remove the empty and the duplicates.

In [26]:
airport_codes_df = airport_codes_df[pd.notnull(airport_codes_df['iata_code'])]
airport_codes_df = airport_codes_df[pd.notnull(airport_codes_df['local_code'])]
airport_codes_df = airport_codes_df.dropna()
airport_codes_df = airport_codes_df.drop_duplicates()
airport_codes_df.shape

(678, 12)

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model


We classify as a fact the process of admitting someone in the US. We call that the "immigration" table.

Then, the dimension tables are : 
- time : time and its component (day, month, year, weekday, ...)
- status : status of an immigration process
- visa : visa of an immigrant
- temperature 
- airport : airport and its attributes
- country 
- state


This data model is a snowflake model because of the lack of redundancy, making queries more reliable and easier to maintain, even though the queries will require more time to compute and hence be slower.

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

To make it work, the data pipelines will look like this. 

- First, load all the data into the staging tables. 
- Then, create the tables matching the data model defined above.
- Then, write data into parquet files.
- Finally, run the checks about data quality.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

Let's create the temperature schema, rename the columns and fill it according to the schema.

In [27]:
temperature_df_spark = create_temperature(temperature_df, spark)

In [28]:
temperature_df_spark.toPandas().head()

Unnamed: 0,dt,average_temperature,average_temperature_uncertainty,city,country,latitude,longitude
0,2000-01-01,3.065,0.372,Århus,Denmark,57.05N,10.33E
1,2000-02-01,3.724,0.241,Århus,Denmark,57.05N,10.33E
2,2000-03-01,3.976,0.296,Århus,Denmark,57.05N,10.33E
3,2000-04-01,8.321,0.221,Århus,Denmark,57.05N,10.33E
4,2000-05-01,13.567,0.253,Århus,Denmark,57.05N,10.33E


In [29]:
# checking data is indeed loaded
temperature_df_spark.toPandas().shape

(576080, 7)

The same process for the demographics and airport data.

In [30]:
demo_df_spark = create_demographics(demo_df)

demo_df_spark.toPandas().head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.799999,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.599998,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [31]:
airport_codes_spark = create_airport(airport_codes_df)

airport_codes_spark.toPandas().head()


Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,03N,small_airport,Utirik Airport,4.0,OC,MH,MH-UTI,Utirik Island,K03N,UTK,03N,"169.852005, 11.222"
1,ADC,small_airport,Andakombe Airport,3600.0,OC,PG,PG-EHG,Andekombe,AYAN,ADC,ADK,"145.744722222, -7.13722222222"
2,AEK,small_airport,Aseki Airport,4106.0,OC,PG,PG-MPL,Aseki,AYAX,AEK,ASE,"146.19386673, -7.35080485552"
3,AIE,small_airport,Aiome Airport,350.0,OC,PG,PG-MPM,Aiome,AYAO,AIE,AIO,"144.7307003, -5.145699978"
4,AIH,small_airport,Aiambak Airport,90.0,OC,PG,PG-WPD,Aiambak,AYAK,AIH,AMB,"141.2675, -7.342777777779999"


In [32]:
airport_codes_spark.toPandas().shape

(678, 12)

In [33]:
airport_codes_spark = airport_codes_spark.select(["ident", "type", "iata_code", "name", "iso_country", "iso_region", "municipality", "gps_code", "coordinates", "elevation_ft"])\
                .dropDuplicates(["ident"])

airport_codes_spark.write.mode("overwrite").parquet("second_part/airport")
    

For the time data, I used the convert_datetime() function that I found in the course and again on the knowledge hub.

In [34]:
time = create_time(df_spark)

time.write.mode("overwrite").parquet("second_part/time")

We will do the same for the status, visas, temperature, country, state, and the country/temperature tables.

In [35]:
status = create_status(df_spark)
    
status.write.mode("overwrite").parquet("second_part/status")

In [36]:
visas = create_visas(df_spark)
    
visas.write.mode("overwrite").parquet("second_part/visas")    

In [37]:
temperature = temperature_df_spark.groupBy("country").agg(
                round(mean('average_temperature'),1).alias("average_temperature"),\
                round(mean("average_temperature_uncertainty"),1).alias("average_temperature_uncertainty")
            ).dropna()\
            .withColumn("temperature_id", monotonically_increasing_id()) \
            .select(["temperature_id", "country", "average_temperature", "average_temperature_uncertainty"])
    
temperature.write.mode("overwrite").parquet("second_part/temperatures")    

In [38]:
country_spark = spark.createDataFrame(country_code_df)
country_spark.write.mode("overwrite").parquet("second_part/country")

In [39]:
state = create_state(demo_df_spark)
state.write.mode("overwrite").parquet("second_part/state")

In [40]:
country_temperature = country_spark.select(["*"])\
            .join(temperature, (country_spark.country == upper(temperature.country)), how='full')\
            .select([country_spark.code, country_spark.country, temperature.temperature_id, temperature.average_temperature, temperature.average_temperature_uncertainty])

country_temperature.write.mode("overwrite").parquet("second_part/country_temperature")

Now, we will create the immigration table, the fact table.

In [41]:
airport = spark.read.parquet("second_part/airport")
country = spark.read.parquet("second_part/country")
temperature = spark.read.parquet("second_part/temperatures")
country_temperature = spark.read.parquet("second_part/country_temperature")
state = spark.read.parquet("second_part/state")
status = spark.read.parquet("second_part/status")
time = spark.read.parquet("second_part/time")
visa = spark.read.parquet("second_part/visas")

In [42]:
immigration = df_spark.select(["*"])\
                .join(airport, (df_spark.i94port == airport.ident), how='full')\
                .join(country_temperature, (df_spark.i94res == country_temperature.code), how='full')\
                .join(status, (df_spark.entdepa == status.arrival_flag) & (df_spark.entdepd == status.departure_flag)\
                    & (df_spark.matflag == status.match_flag), how='full')\
                .join(visa, (df_spark.i94visa == visa.i94visa) & (df_spark.visatype == visa.visatype)\
                    & (df_spark.visapost == visa.visapost), how='full')\
                .join(state, (df_spark.i94addr == state.state_code), how='full')\
                .join(time, (df_spark.arrdate == time.arrdate), how='full')\
                .where(col('cicid').isNotNull())\
                .select(["cicid", "i94res", "depdate", "i94mode", "i94port", "i94cit", "i94addr", "airline", "fltno", "ident", "code",\
                         "temperature_id", "status_flag_id", "visa_id", "state_code", country_temperature.country,time.arrdate.alias("arrdate")])

In [43]:
immigration.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- fltno: string (nullable = true)
 |-- ident: string (nullable = true)
 |-- code: string (nullable = true)
 |-- temperature_id: long (nullable = true)
 |-- status_flag_id: long (nullable = true)
 |-- visa_id: long (nullable = true)
 |-- state_code: string (nullable = true)
 |-- country: string (nullable = true)
 |-- arrdate: double (nullable = true)



In [44]:
immigration.head(3)

[Row(cicid=474.0, i94res=103.0, depdate=20547.0, i94mode=2.0, i94port='NEW', i94cit=103.0, i94addr=None, airline='VES', fltno='91285', ident=None, code='103', temperature_id=1546188226561, status_flag_id=0, visa_id=None, state_code=None, country='AUSTRIA', arrdate=20545.0),
 Row(cicid=474.0, i94res=103.0, depdate=20547.0, i94mode=2.0, i94port='NEW', i94cit=103.0, i94addr=None, airline='VES', fltno='91285', ident=None, code='103', temperature_id=1546188226561, status_flag_id=0, visa_id=None, state_code=None, country='AUSTRIA', arrdate=20545.0),
 Row(cicid=474.0, i94res=103.0, depdate=20547.0, i94mode=2.0, i94port='NEW', i94cit=103.0, i94addr=None, airline='VES', fltno='91285', ident=None, code='103', temperature_id=1546188226561, status_flag_id=0, visa_id=None, state_code=None, country='AUSTRIA', arrdate=20545.0)]

### Query example

(As asked by the reviewer)

What state (represented by its state code) is the most chosen in the immigration process ? 
It would be also interesting to see that by yearly cohort, for example.

In [46]:
temperature.printSchema()

root
 |-- temperature_id: long (nullable = true)
 |-- country: string (nullable = true)
 |-- average_temperature: double (nullable = true)
 |-- average_temperature_uncertainty: double (nullable = true)



In [56]:
immigration_states = immigration.na.drop(subset=["state_code", "airline","code"]) \
   .join(temperature, (temperature.temperature_id == immigration.temperature_id),how="inner") \
   .groupBy("state_code").avg("average_temperature")


In [57]:
immigration_states.printSchema()

root
 |-- state_code: string (nullable = true)
 |-- avg(average_temperature): double (nullable = true)



In [59]:
immigration_states.show(5)

Py4JJavaError: An error occurred while calling o552.showString.
: org.apache.spark.SparkException: Could not execute broadcast in 300 secs. You can increase the timeout for broadcasts via spark.sql.broadcastTimeout or disable broadcast join by setting spark.sql.autoBroadcastJoinThreshold to -1
	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:150)
	at org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:387)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:144)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:140)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:140)
	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:117)
	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.codegenOuter(BroadcastHashJoinExec.scala:259)
	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doConsume(BroadcastHashJoinExec.scala:102)
	at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:189)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.consume(HashAggregateExec.scala:40)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.generateResultFunction(HashAggregateExec.scala:526)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.doProduceWithKeys(HashAggregateExec.scala:662)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.doProduce(HashAggregateExec.scala:166)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:85)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:85)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.produce(HashAggregateExec.scala:40)
	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doProduce(BroadcastHashJoinExec.scala:96)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:85)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:85)
	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.produce(BroadcastHashJoinExec.scala:40)
	at org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:45)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:85)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:85)
	at org.apache.spark.sql.execution.ProjectExec.produce(basicPhysicalOperators.scala:35)
	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doProduce(BroadcastHashJoinExec.scala:96)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:85)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:85)
	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.produce(BroadcastHashJoinExec.scala:40)
	at org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:45)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:85)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:85)
	at org.apache.spark.sql.execution.ProjectExec.produce(basicPhysicalOperators.scala:35)
	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doProduce(BroadcastHashJoinExec.scala:96)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:85)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:85)
	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.produce(BroadcastHashJoinExec.scala:40)
	at org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:45)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:85)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:85)
	at org.apache.spark.sql.execution.ProjectExec.produce(basicPhysicalOperators.scala:35)
	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doProduce(BroadcastHashJoinExec.scala:96)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:85)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:85)
	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.produce(BroadcastHashJoinExec.scala:40)
	at org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:45)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:85)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:85)
	at org.apache.spark.sql.execution.ProjectExec.produce(basicPhysicalOperators.scala:35)
	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doProduce(BroadcastHashJoinExec.scala:96)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:85)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:85)
	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.produce(BroadcastHashJoinExec.scala:40)
	at org.apache.spark.sql.execution.FilterExec.doProduce(basicPhysicalOperators.scala:125)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:85)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:85)
	at org.apache.spark.sql.execution.FilterExec.produce(basicPhysicalOperators.scala:85)
	at org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:45)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:85)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:85)
	at org.apache.spark.sql.execution.ProjectExec.produce(basicPhysicalOperators.scala:35)
	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doProduce(BroadcastHashJoinExec.scala:96)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:85)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:85)
	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.produce(BroadcastHashJoinExec.scala:40)
	at org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:45)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:85)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:85)
	at org.apache.spark.sql.execution.ProjectExec.produce(basicPhysicalOperators.scala:35)
	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doProduce(BroadcastHashJoinExec.scala:96)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:85)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:85)
	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.produce(BroadcastHashJoinExec.scala:40)
	at org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:45)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:85)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:85)
	at org.apache.spark.sql.execution.ProjectExec.produce(basicPhysicalOperators.scala:35)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.doProduceWithKeys(HashAggregateExec.scala:654)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.doProduce(HashAggregateExec.scala:166)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:85)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:85)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.produce(HashAggregateExec.scala:40)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:544)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:598)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:92)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:391)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:151)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:339)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3383)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
	at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2544)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2758)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [300 seconds]
	at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:223)
	at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:227)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:220)
	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:146)
	... 205 more


#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
print("airport: test not passed") if airport.count() == 0  else print("airport: test passed")
print("time: test not passed") if time.count() == 0  else print("time: test passed")
print("status: test not passed") if status.count() == 0  else print("status: test passed")
print("visa: test not passed") if visa.count() == 0  else print("visa: test passed")
print("temperature: test not passed") if temperature.count() == 0  else print("temperature: test passed")
print("country: test not passed") if country.count() == 0  else print("country: test passed")
print("state: test not passed") if state.count() == 0  else print("state: test passed")

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

# airport table
 - ident: (string) Primary key
 - type: (string) Type of airport
 - iata_code: (string)  International airport code
 - name: (string) Name of the airport
 - iso_country: (string)  ISO Code of the countru
 - iso_region: (string) ISO code of the region
 - municipality: (string) City
 - gps_code: (string) GPS location
 - coordinates: (string) Airport coordinates
 - elevation_ft: (float) Elevation

# country table
 - code: (string) Primary key
 - country: (string) Country name

# temperature table
 - temperature_id: (long) Primary key
 - country: (string) Foreign key - Country code
 - average_temperature: (double) Average temperature of that country
 - average_temperature_uncertainty: (double) Average temperature with uncertainty level

# state table
 - state_code: (string) Primary key 
 - state: (string) State name
 - median_age: (double) Median age 
 - total_population: (long) Total population
 - male_population: (double) Total male population
 - female_population: (double) Total female population
 - foreign_born: (double) Number of foreign born residents
 - average_household_size: (double) Average size of a household

# status table
 - status_flag_id: (long) Primary Key
 - arrival_flag: (string) Flag of arrival
 - departure_flag: (string) Flag of departure 
 - match_flag: (string) Matching flag for arrival and departure

# time table
 - arrdate: (double) Primary key
 - arrival_date: (date) Date of arrival
 - day: (integer) Day of arrival
 - month: (integer) Month of arrival
 - year: (integer) Year of arrival
 - week: (integer) Week of arrival
 - weekday: (integer) Day of the week of arrival

# visa table
 - visa_id: (long) Primary key
 - i94visa: (double) Visa code
 - visatype: (string) Type of visa (class of admission)
 - visapost: (string) Department of state 

# immigration table

 - cicid: (double) Primary Key - Identifier
 - i94res: (double) State code
 - depdate: (double) Departement date from USA
 - i94mode: (double) Arrival mode. (1 for Air, 2 for Sea, 3 for land, 9 is unknown)
 - i94port: (string) Arrival port
 - i94cit: (double) Immigrant's birth country (3 digit code)
 - i94addr: (string) Arrival state in the USA
 - airline: (string) Airline used to trael
 - fltno: (string) Flight number
 - ident: (string) Foreign key - Airport unique identifier
 - code: (string) Foreign key - Country of residence unique identifier
 - temperature_id: (long) Foreign key - Temperature
 - status_flag_id: (long) Foreign key - Status
 - visa_id: (long) Foreign key - Visa
 - state_code: (string) Foreign key - State code
 - country: (string) Country
 - arrdate: (double) Foreign key - Time of arrival

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

#### Clearly state the rationale for the choice of tools and technologies for the project.

- Apache Sparks because of the computing power.
Given the data size, we want to use Apache Sparks for its parallel processing and the cache memory.

#### Propose how often the data should be updated and why.


This data is not critical and is used solely for analysis purposes. 
It will also depend on how often is the data source updated, but I would say once a month.

#### Write a description of how you would approach the problem differently under the following scenarios:


##### The data was increased by 100x.

I would use Amazon EMR and leverage the power of Spark.

##### The data populates a dashboard that must be updated on a daily basis by 7am every day.


I would use Airflow to monitor and execute DAGs that would update this data.

##### The database needed to be accessed by 100+ people.

We need a high-traffic with an optimization on read requests database, such as Redshift or Snowflake.