# AWS Data Lake - Immigration
### Data Engineering Capstone Project

#### Project Summary
The goal of this project is to create a data lake with data about immigration in EEUU, which facilitates the analysis and predictions to several types of company.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

import pyreadstat

import configparser
import os

import pandas as pd
import numpy as np

from pyspark.sql.window import Window
from pyspark.sql.functions import monotonically_increasing_id,row_number

from pyspark.sql.types import *

output_data = "s3a://bucket-test-udacity/"

In [2]:
def convert_to_datetime(date):
    """
    Convert to yyyy-mm-dd format
    
    :return: date in yyyy-mm-dd format
    """   
    if date is not None:
        return pd.Timestamp('1960-1-1')+pd.to_timedelta(date, unit='D')

In [3]:
def create_spark_session():
    spark = SparkSession.builder.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0").getOrCreate()
    return spark
spark = create_spark_session()

In [4]:
spark

### Step 1: Scope the Project and Gather Data

#### SCOPE 

The purpose of this project is to create a data lake with data about immigration in the US and the circumstances in which it has occurred.

To carry it out, we are considering three datasets, which are going to be raw data for our data lake.

**The datasets used are:**
- **I94 Immigration Data:** This data comes from the US National Tourism and Trade Office. A data dictionary is included in the workspace.  **[This](https://www.trade.gov/i-94-arrivals-program)** is where the data comes from. The National Travel and Tourism Office (NTTO) manages the ADIS/I-94 visitor arrivals program in cooperation with the Department of Homeland Security (DHS)/U.S. Customs and Border Protection (CBP). The I-94 provides a count of visitor arrivals to the United States (with stays of 1-night or more and visiting under certain visa types) to calculate U.S. travel and tourism volume exports.
- **U.S. City Demographic Data:** This data comes from OpenSoft. You can read more about it **[here](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/)**.
- **Airport Code Table:** This is a simple table of airport codes and corresponding cities. It comes from **[here](https://datahub.io/core/airport-codes#data)**.
- **World Temperature Data**: 

The main tools which are going to be used are:
- **Python libraries** like pandas or numpy
- **Pyspark** to deal with the immigration dataset
- **Aparhe Airflow** to automate a pipeline to extract this information programatically and to mantain the database updated.
- **Amazon S3**: to store both the raw data and the final data lake in parquet.
- **Amazon EMR**: to process the data with PySpark.

#### DATA DESCRIPTION

- **I94 Immigration Data:** This is a dataset with information from the people which arrive to EEUU as immigrants. 

- **U.S. City Demographic Data:** The information included in this dataset is the following:
    - **City names**
    - **State**: 
    - **Median age**
    - **Male population**
    - **Female population**
    - **Total population**
    - **Number of veterans**
    - **Foreign born**
    - **Average household size**
    - **State code**
    - **Race**
    - **Statistic values**
    
    
- **Airport Code Table:** Dataset with information about different airports. This information includes:
    - **ident**: Identification code
    - **type**: type of airport
    - **name**: name of the airport
    - **elevation_ft**: elevation above the sea level
    - **iso_country**: iso code of each country
    - **iso_region**: iso code of each region
    - **municipality**: municipality where the airport is located
    - **gps_code**: gps code of the airport
    - **iata_code**: An IATA airport code, also known as an IATA location identifier, IATA station code, or simply a location identifier, is a three-character alphanumeric geocode designating many airports and metropolitan areas around the world, defined by the International Air Transport Association (IATA).
    - **local_code**: local code of the airport
    - **coordinates**: coordinates of the airport
    
    
- **Global land temperatures by city**: Dataset with informmation about the temperature in different cities at different dates.
    - **dt**: date of the data
    - **AverageTemperature**
    - **AverageTemperatureUncertainty**
    - **City**
    - **Country**
    - **Latitude**
    - **Longitude**

**I94 Immigration Data**

In [5]:
def read_immigration_data(url = 'immigration_data_sample.csv'):
    '''
    Function which loads the immigration dataset.
    
    INPUT:
    url (string): URL of the bucket where the information is stored.
    
    OUTPUT:
    df_sas (Spark DataFrame): dataframe created based on the data
    '''
#     df_sas = spark.read.csv('immigration_data_sample.csv', header = True)
    df_sas = spark.read.parquet(output_data + 'sas_data/*.parquet')
    return df_sas

In [6]:
df_sas = read_immigration_data()

**U.S. City Demographic Data**

In [7]:
def read_cities_data(url = 'us-cities-demographics.csv'):
    '''
    Function which loads the immigration dataset.
    
    INPUT:
    url (string): URL of the bucket where the information is stored.
    
    OUTPUT:
    df_sas (Spark DataFrame): dataframe created based on the data
    '''
    df_cities = pd.read_csv(output_data + 'us-cities-demographics.csv', sep=';')
    return df_cities

In [8]:
df_cities = read_cities_data()

**Airport Code Table**

In [9]:
def read_airport_data(url = 'airport-codes_csv.csv'):
    '''
    Function which loads the immigration dataset.
    
    INPUT:
    url (string): URL of the bucket where the information is stored.
    
    OUTPUT:
    df_sas (Spark DataFrame): dataframe created based on the data
    '''
    df_airport = pd.read_csv(output_data + url)
    return df_airport

In [10]:
df_airport = read_airport_data()

**Global land temperatures by city**

In [44]:
def read_temp_data(url = "C:/Users/gonza/Downloads/GlobalLandTemperaturesByCity.csv"):
    '''
    Function which loads the immigration dataset.
    
    INPUT:
    url (string): URL of the bucket where the information is stored.
    
    OUTPUT:
    df_sas (Spark DataFrame): dataframe created based on the data
    '''
    df_temp = spark.read.csv(output_data + url, header = True, inferSchema = True)
    return df_temp

In [45]:
df_temp = read_temp_data() 

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Although each file and dataset is different and have different problems to solve but the steps to be implemented are the following:

* Modify the name of the columns to more descriptive names
* Modify data types
* Fox the missing values
* Drop duplicates values
* Replace codes with more descriptive names
* Drop unnecesary columns

Not all steps will have to be applied to all datasets.


### 1. I94 Immigration Data

#### Explore the data

In [13]:
df_sas.limit(6).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,5748517.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,CA,20582.0,...,,M,1976.0,10292016,F,,QF,94953870000.0,11,B1
1,5748518.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,NV,20591.0,...,,M,1984.0,10292016,F,,VA,94955620000.0,7,B1
2,5748519.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20582.0,...,,M,1987.0,10292016,M,,DL,94956410000.0,40,B1
3,5748520.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,...,,M,1987.0,10292016,F,,DL,94956450000.0,40,B1
4,5748521.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,...,,M,1988.0,10292016,M,,DL,94956390000.0,40,B1
5,5748522.0,2016.0,4.0,245.0,464.0,HHW,20574.0,1.0,HI,20579.0,...,,M,1959.0,10292016,M,,NZ,94981800000.0,10,B2


In [14]:
df_sas.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [15]:
print((df_sas.count(), len(df_sas.columns)))

(3096313, 28)


In [16]:
df_sas.summary().toPandas()

Unnamed: 0,summary,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,count,3096313.0,3096313.0,3096313.0,3096313.0,3096313.0,3096313,3096313.0,3096074.0,2943721,...,392,2957884,3095511.0,3095836,2682044,113708,3012686,3096313.0,3076764,3096313
1,mean,3078651.879075533,2016.0,4.0,304.9069344733559,303.28381949757664,,20559.84854179794,1.0736897761487614,51.652482269503544,...,,,1974.2323855415148,8291120.333841449,,4131.050016327899,59.477601493233784,70828850110.90295,1360.2463696420555,
2,stddev,1763278.099749858,1.9909824761792666e-13,0.0,210.0268885306332,208.5832129278886,,8.777339474881993,0.5158963131657235,42.97906231370985,...,,,17.420260534588262,1656502.4244925014,,8821.743471773656,172.63339952061747,22154415947.557632,5852.676345633783,
3,min,6.0,2016.0,4.0,101.0,101.0,5KE,20545.0,1.0,..,...,U,M,1902.0,/ 183D,F,0,*FF,0.0,00000,B1
4,25%,1577601.0,2016.0,4.0,135.0,131.0,,20552.0,1.0,10.0,...,,,1962.0,7102016.0,,3680.0,2.0,56035184433.0,101.0,
5,50%,3103156.0,2016.0,4.0,213.0,213.0,,20560.0,1.0,40.0,...,,,1975.0,7252016.0,,3872.0,2.0,59360890533.0,408.0,
6,75%,4654299.0,2016.0,4.0,512.0,504.0,,20567.0,1.0,99.0,...,,,1986.0,1.0132016E7,,3945.0,2.0,93509739730.0,903.0,
7,max,6102785.0,2016.0,4.0,999.0,760.0,YSL,20574.0,9.0,ZU,...,Y,M,2019.0,D/S,X,YM0167,ZZ,99915565930.0,ZZZ,WT


In [17]:
# Missing values analysis
df_sas.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_sas.columns]).take(1)

[Row(cicid=0, i94yr=0, i94mon=0, i94cit=0, i94res=0, i94port=0, arrdate=0, i94mode=239, i94addr=152592, depdate=142457, i94bir=802, i94visa=0, count=0, dtadfile=1, visapost=1881250, occup=3088187, entdepa=238, entdepd=138429, entdepu=3095921, matflag=138429, biryear=802, dtaddto=477, gender=414269, insnum=2982605, airline=83627, admnum=0, fltno=19549, visatype=0)]

There are missing values in the variables `i94addr=59`, `depdate=49`, `visapost=618`, `occup=996`, `entdepd=46`, `entdepu=1000`, `matflag=46`, `gender=141`, `insnum=965`, `airline=33` and `fltno=8`. It is going to be necessary to analyze each of them to identify a strategy to avoid having missing values, if possible.

#### Cleaning steps

In [13]:
def cleaning_immigration_data(df_sas):
    '''
    Function which cleans the data implementing the following steps: 
    1. Change column names
    2. Change the data types
    3. Fix the missing values
    4. Drop duplicate values
    5. Replace codes with more descriptive values
    6. Create new features
    7. Create the final dataframes
    
    INPUT:
    df_sas (Spark DataFrame): DataFrame with
    
    OUTPUT:
    df_travel (Spark DataFrame): dataframe with the data related with the considered travels
    df_personal_info (Spark DataFrame): dataframe with the data related with the travelers
    '''
    
    # 1. Change column names
    # Keys to modify the codes for names easier to understand
    names = {'cicid':'immigrant_id','I94YR': 'year', 'I94MON':'month', 'I94CIT':'country_1','I94RES':'country_2','I94PORT':'city',
        'ARRDATE':'arrival_date','I94MODE':'transport_mode','I94ADDR':'state','DEPDATE':'departure_date',
         'I94BIR':'age_respondent','I94VISA':'visa_code','COUNT':'summary_statistics','DTADFILE':'character_date_field',
        'VISAPOST':'department_visa','OCCUP':'occupation','ENTDEPA':'arrival_flag','ENTDEPD':'departure_flag',
        'ENTDEPU':'update_flag','MATFLAG':'match_flag','BIRYEAR':'birth_year','DTADDTO':'character_date_field',
        'GENDER':'non_inmigrant_sex','INSNUM':'ins_number','AIRLINE':'airline','ADMNUM':'admission_number',
        'FLTNO':'flight_number','VISATYPE':'visa_type'}
    
    # Modification of the column names for others more intuitive
    for i in names:
        df_sas = df_sas.withColumnRenamed(i,names[i])

    # 2. Change the data types
    from pyspark.sql.functions import substring, length, col, expr
    df_sas = df_sas.withColumn("country_1",expr("substring(country_1, 1, length(country_1)-2)"))
    df_sas = df_sas.withColumn("immigrant_id",expr("substring(immigrant_id, 1, length(immigrant_id)-2)"))
    df_sas = df_sas.withColumn("year",expr("substring(year, 1, length(year)-2)"))
    df_sas = df_sas.withColumn("month",expr("substring(month, 1, length(month)-2)"))
    df_sas = df_sas.withColumn("country_2",expr("substring(country_2, 1, length(country_2)-2)"))
    df_sas = df_sas.withColumn("arrival_date",expr("substring(arrival_date, 1, length(arrival_date)-2)"))
    df_sas = df_sas.withColumn("transport_mode",expr("substring(transport_mode, 1, length(transport_mode)-2)"))
    df_sas = df_sas.withColumn("departure_date",expr("substring(departure_date, 1, length(departure_date)-2)"))
    df_sas = df_sas.withColumn("age_respondent",expr("substring(age_respondent, 1, length(age_respondent)-2)"))
    df_sas = df_sas.withColumn("visa_code",expr("substring(visa_code, 1, length(visa_code)-2)"))
    df_sas = df_sas.withColumn("summary_statistics",expr("substring(summary_statistics, 1, length(summary_statistics)-2)"))
    df_sas = df_sas.withColumn("birth_year",expr("substring(birth_year, 1, length(birth_year)-2)"))
    df_sas = df_sas.withColumn("admission_number",expr("substring(admission_number, 1, length(admission_number)-2)"))
    
    df_sas = df_sas.withColumn("country_1", col("country_1").cast('int'))
    df_sas = df_sas.withColumn("immigrant_id", col("immigrant_id").cast('int'))
    df_sas = df_sas.withColumn("year", col("year").cast('int'))
    df_sas = df_sas.withColumn("month", col("month").cast('int'))
    df_sas = df_sas.withColumn("country_2", col("country_2").cast('int'))
    df_sas = df_sas.withColumn("arrival_date", col("arrival_date").cast('int'))
    df_sas = df_sas.withColumn("transport_mode", col("transport_mode").cast('int'))
    df_sas = df_sas.withColumn("departure_date", col("departure_date").cast('int'))
    df_sas = df_sas.withColumn("age_respondent", col("age_respondent").cast('int'))
    df_sas = df_sas.withColumn("visa_code", col("visa_code").cast('int'))
    df_sas = df_sas.withColumn("summary_statistics", col("summary_statistics").cast('int'))
    df_sas = df_sas.withColumn("birth_year", col("birth_year").cast('int'))
    df_sas = df_sas.withColumn("admission_number", col("admission_number").cast('int'))
    
    convert_to_datetime_udf = udf(convert_to_datetime, DateType())
    df_sas = df_sas.withColumn('arrival_date', convert_to_datetime_udf(col('arrival_date'))) # Fila nueva
    df_sas = df_sas.withColumn('departure_date', convert_to_datetime_udf(col('departure_date'))) #Fila nueva
    
    # 3. Fix the missing values
    # 3.1. I94ADDR (state)
    df_sas = df_sas.filter(df_sas.state.isNotNull())
    
    # 3.2. DEPDATE (departure_date)
    df_sas.filter(df_sas.departure_date.isNotNull())
    
    # 3.3. VISAPOST (department_visa)
    df_sas = df_sas.drop('department_visa')
    
    # 3.4. OCCUP (occupation)
    df_sas = df_sas.drop('occupation')
    
    # 3.5. ENTDEPD (departure_flag)
    df_sas = df_sas.filter(df_sas.departure_flag.isNotNull())
    
    # 3.6. MATFLAG (match_flag)
    # There are no missing values anymore, because those ones has been already solved in a prior step.    
    
    # 4. Drop duplicate values
    df_sas = df_sas.dropDuplicates()    
    
    # 5. Replace codes with more descriptive values
    # Read the SAS file with the meaning of the codes of the columns I94CIT & I94RES,I94PORT, I94MODE, I94ADDR
    country_codes = 'I94_SAS_Labels_Descriptions.SAS'
    with open(country_codes) as f:
        lines = f.readlines()
    lines = [line.replace('"','').replace('\n','').replace("'",'') for line in lines]
    
    df_sas_codes = pd.DataFrame(lines)
    
    # 5.1. I94CIT & I94RES
    values_I94CIT_I94RES = df_sas_codes[9:298]
    values_I94CIT_I94RES = values_I94CIT_I94RES[0].str.split('=', expand = True)
    values_I94CIT_I94RES.rename(columns = {0:"code" , 1:"name"}, inplace = True)
    values_I94CIT_I94RES = spark.createDataFrame(values_I94CIT_I94RES)
    df_sas = df_sas.join(values_I94CIT_I94RES, df_sas.country_1 == values_I94CIT_I94RES.code)
    df_sas = df_sas.drop('code').withColumnRenamed('name','country_1_name')
    df_sas = df_sas.join(values_I94CIT_I94RES, df_sas.country_2 == values_I94CIT_I94RES.code)
    df_sas = df_sas.drop('code').withColumnRenamed('name','country_2_name')
    
    # 5.2. I94PORT
    values_I94PORT = df_sas_codes[302:962]
    values_I94PORT = values_I94PORT[0].str.split('=', expand = True)
    values_I94PORT.rename(columns = {0:"code" , 1:"name"}, inplace = True)
    values_I94PORT['code'] = values_I94PORT['code'].str.strip()
    values_I94PORT['name'] = values_I94PORT['name'].str.strip()
    values_I94PORT.rename(columns = {0:"code" , 1:"name"}, inplace = True)
    values_I94PORT = spark.createDataFrame(values_I94PORT)
    df_sas = df_sas.join(values_I94PORT, df_sas.city == values_I94PORT.code)
    df_sas = df_sas.drop('code').withColumnRenamed('name','city_name')    
    
    # 5.3. I94MODE
    values_I94MODE = df_sas_codes[972:976]
    values_I94MODE = values_I94MODE[0].str.split('=', expand = True)
    values_I94MODE.rename(columns = {0:"code" , 1:"name"}, inplace = True)
    values_I94MODE['code'] = values_I94MODE['code'].str.strip()
    values_I94MODE['name'] = values_I94MODE['name'].str.strip()
    values_I94MODE = spark.createDataFrame(values_I94MODE)
    df_sas = df_sas.join(values_I94MODE, df_sas.transport_mode == values_I94MODE.code)
    df_sas = df_sas.drop('code').withColumnRenamed('name','transport_mode_name')
    
    # 5.4. I94ADDR
    values_I94ADDR= df_sas_codes[982:1036]
    values_I94ADDR = values_I94ADDR[0].str.split('=', expand = True)
    values_I94ADDR.rename(columns = {0:"code" , 1:"name"}, inplace = True)
    values_I94ADDR['code'] = values_I94ADDR['code'].str.strip()
    values_I94ADDR['name'] = values_I94ADDR['name'].str.strip()
    values_I94ADDR = spark.createDataFrame(values_I94ADDR)
    df_sas = df_sas.join(values_I94ADDR, df_sas.state == values_I94ADDR.code)
    df_sas = df_sas.drop('code').withColumnRenamed('name', 'state_name')

    # 6. Create new features
    # 6.1. date_arrival_temp - Key to link this table with the temperature table
    # Creation of columns with year and month of arrival to be used as key to link this table with the temperature table
    df_sas = df_sas.withColumn("year_arrival", year(df_sas.arrival_date)) # Creation of a column with the year of arrival
    df_sas = df_sas.withColumn("month_arrival", month(df_sas.arrival_date)) # Creation of a column with the month of arrival
    df_sas = df_sas.withColumn("day_arrival", lit(1)) # Creation of a column with 1s, because only the year and the month are 
                                                        # going to be relevant for this feature
    #Creation of the feature
    df_sas = df_sas.withColumn("date_arrival_temp", make_date(df_sas.year_arrival, df_sas.month_arrival, df_sas.day_arrival))
    
    #Drop the columns which are not useful anymore
    df_sas = df_sas.drop('year_arrival')
    df_sas = df_sas.drop('month_arrival')
    df_sas = df_sas.drop('day_arrival')
    
    # 6.2. travel_id - Key to identify each of the travels
    df_sas = df_sas.withColumn("travel_id",row_number().over(Window.orderBy(monotonically_increasing_id())))
#     df_sas = df_sas.withColumn("travel_id", monotonically_increasing_id())
    
    # 6.3. city_state - Key to link this table with df_cities and df_airport
    df_sas = df_sas.withColumn('city_name', split(df_sas['city_name'], ',').getItem(0))
    df_sas = df_sas.withColumn("city_name", initcap(df_sas.city_name))
    df_sas = df_sas.withColumn("state_name", initcap(df_sas.state_name))
    df_sas = df_sas.withColumn('city_state', concat(df_sas.city_name, lit('_'), df_sas.state_name))
    
    # 7. Define the final dataframes
    df_travel = df_sas.select('travel_id','city_state','immigrant_id', 'year', 'month', 'city', 'arrival_date','transport_mode','state',
              'departure_date', 'airline', 'flight_number', 'city_name', 'transport_mode_name',
              'state_name','date_arrival_temp')
    
    df_personal_info = df_sas.select('immigrant_id','country_1', 'country_2', 'visa_code','birth_year','non_inmigrant_sex', 'ins_number', 'visa_type', 
              'country_1_name', 'country_2_name')
    
    return df_travel, df_personal_info
    

In [14]:
df_travel, df_personal_info = cleaning_immigration_data(df_sas) 

In [15]:
df_travel.limit(5).toPandas()

Unnamed: 0,travel_id,city_state,immigrant_id,year,month,city,arrival_date,transport_mode,state,departure_date,airline,flight_number,city_name,transport_mode_name,state_name,date_arrival_temp
0,1,Las Vegas_Arizona,5889208,2016,4,LVG,2016-04-30,1,AZ,2016-05-08,CM,252,Las Vegas,Air,Arizona,2016-04-01
1,2,Atlanta_S. Carolina,5770290,2016,4,ATL,2016-04-30,1,SC,2016-05-06,DL,392,Atlanta,Air,S. Carolina,2016-04-01
2,3,Phoenix_Arizona,4052101,2016,4,PHO,2016-04-22,1,AZ,2016-05-14,BA,289,Phoenix,Air,Arizona,2016-04-01
3,4,Phoenix_Arizona,4273097,2016,4,PHO,2016-04-23,1,AZ,2016-05-07,BA,289,Phoenix,Air,Arizona,2016-04-01
4,5,San Francisco_Arizona,5831892,2016,4,SFR,2016-04-30,1,AZ,2016-08-01,AA,505,San Francisco,Air,Arizona,2016-04-01


In [21]:
def quality_check(df):
    '''
    Function which checks if there is information in the dataset.
    
    INPUT:
    df - pandas DataFrame: dataframe to be checked
    
    OUTPUT:
    None
    '''
    
    if df.toPandas().size > 0:
        print('The dataset has information.')
    
    else:
        df.toPandas().size == 0
        print('There has been a failure by processing the file.')

In [22]:
quality_check(df_travel)

The dataset has information.


In [23]:
quality_check(df_personal_info)

The dataset has information.


In [24]:
df_personal_info.limit(5).toPandas()

Unnamed: 0,immigrant_id,country_1,country_2,visa_code,birth_year,non_inmigrant_sex,ins_number,visa_type,country_1_name,country_2_name
0,5889208,689,689,2,1977,M,,B2,BRAZIL,BRAZIL
1,5770290,504,504,2,1946,F,,B2,PANAMA,PANAMA
2,4052101,123,123,2,2006,M,,WT,NETHERLANDS,NETHERLANDS
3,4273097,123,123,2,1958,M,,WT,NETHERLANDS,NETHERLANDS
4,5831892,582,582,2,1955,F,,B2,"MEXICO Air Sea, and Not Reported (I-94, no l...","MEXICO Air Sea, and Not Reported (I-94, no l..."


#### Upload data to the data lake

In [25]:
def save_travel_data (df, output_data):
    '''
    Function which saves the dataframe already adapted to the data lake in Amazon S3.
    
    INPUT: 
    df - Spark Dataframe: data to be saved in Amazon S3
    output_data - string: url to the Amazon S3 bucket
    
    OUTPUT:
    None
    '''
    df.write.partitionBy('year').mode('overwrite').parquet(output_data + 'travel_data/' + datetime.datetime.strf("%Y%m%d"))
    return

    

In [26]:
def save_personal_data (df, output_data):
    '''
    Function which saves the dataframe already adapted to the data lake in Amazon S3.
    
    INPUT: 
    df - Spark Dataframe: data to be saved in Amazon S3
    output_data - string: url to the Amazon S3 bucket
    
    OUTPUT:
    None
    '''
    df.write.partitionBy('country_1_name').mode('overwrite').parquet(output_data + 'personal_data/' + datetime.datetime.strf("%Y%m%d"))
    return
    

In [27]:
save_travel_data(df_travel, output_data)

In [28]:
save_personal_data(df_personal_info, output_data)

---
---

### 2. Airport Code Tables

#### 2.1. Explore the data

First, let's take a look at the code structure of this data:

In [29]:
df_airport.shape

(55075, 12)

In [30]:
df_airport.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 55075 entries, 0 to 55074
Data columns (total 12 columns):
 #   Column        Non-Null Count  Dtype  
---  ------        --------------  -----  
 0   ident         55075 non-null  object 
 1   type          55075 non-null  object 
 2   name          55075 non-null  object 
 3   elevation_ft  48069 non-null  float64
 4   continent     27356 non-null  object 
 5   iso_country   54828 non-null  object 
 6   iso_region    55075 non-null  object 
 7   municipality  49399 non-null  object 
 8   gps_code      41030 non-null  object 
 9   iata_code     9189 non-null   object 
 10  local_code    28686 non-null  object 
 11  coordinates   55075 non-null  object 
dtypes: float64(1), object(11)
memory usage: 5.0+ MB


In [31]:
df_airport.describe(include = 'all')

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
count,55075,55075,55075,48069.0,27356,54828,55075,49399,41030,9189.0,28686,55075
unique,55075,7,52144,,6,243,2810,27133,40850,9042.0,27436,54874
top,4OK4,small_airport,Centre Hospitalier Heliport,,EU,US,US-TX,Seoul,MBAC,0.0,AMA,"0, 0"
freq,1,33965,85,,7840,22757,2277,404,3,80.0,5,53
mean,,,,1240.789677,,,,,,,,
std,,,,1602.363459,,,,,,,,
min,,,,-1266.0,,,,,,,,
25%,,,,205.0,,,,,,,,
50%,,,,718.0,,,,,,,,
75%,,,,1497.0,,,,,,,,


There are a variables with a high amount of missing values, so it is going to be necessary to analyze, if they have to be removed or those values can be filled in.

In [32]:
df_airport.isna().sum()

ident               0
type                0
name                0
elevation_ft     7006
continent       27719
iso_country       247
iso_region          0
municipality     5676
gps_code        14045
iata_code       45886
local_code      26389
coordinates         0
dtype: int64

In [33]:
df_airport.isna().sum()/df_airport.shape[0]*100

ident            0.000000
type             0.000000
name             0.000000
elevation_ft    12.720835
continent       50.329551
iso_country      0.448479
iso_region       0.000000
municipality    10.305946
gps_code        25.501589
iata_code       83.315479
local_code      47.914662
coordinates      0.000000
dtype: float64

In [34]:
df_airport[df_airport.iata_code.isna()]

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"
...,...,...,...,...,...,...,...,...,...,...,...,...
55065,ZYTH,small_airport,Tahe Airport,1240.0,AS,CN,CN-23,Tahe,ZYTH,,,"124.720222222, 52.2244444444"
55071,ZYYY,medium_airport,Shenyang Dongta Airport,,AS,CN,CN-21,Shenyang,ZYYY,,,"123.49600219726562, 41.784400939941406"
55072,ZZ-0001,heliport,Sealand Helipad,40.0,EU,GB,GB-ENG,Sealand,,,,"1.4825, 51.894444"
55073,ZZ-0002,small_airport,Glorioso Islands Airstrip,11.0,AF,TF,TF-U-A,Grande Glorieuse,,,,"47.296388888900005, -11.584277777799999"


There are no duplicated row.

In [35]:
df_airport[df_airport.duplicated()]

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates


#### Cleaning steps

In [16]:
def cleaning_airport_data(df_airport):
    '''
    Function which cleans the data implementing the following steps: 
    1. Modify the names of the columns to more descriptive values
    2. Drop unnecessary columns
    3. Modify data types
    4. Replace codes with more descriptive values
    5. Drop duplicated values
    6. Create new features
    
    INPUT:
    df_airport (Spark DataFrame): DataFrame with
    
    OUTPUT:
    df_airport (Spark DataFrame): data already cleaned for being uploaded to the data lake
    '''
    
    # 1. Modify the names of the columns to more descriptive values
    airport_coordinates = df_airport['coordinates'].str.split(',', expand = True).rename(columns={0:'latitude', 1:'longitude'})
    df_airport['latitude'] = airport_coordinates['latitude']
    df_airport['longitude'] = airport_coordinates['longitude']
    df_airport.drop('coordinates', axis = 1, inplace = True)
    
    # 2. Drop unnecessary columns
    df_airport.drop(['iata_code','continent', 'gps_code', 'local_code', 'elevation_ft', 'municipality',
                 'iso_country'], axis = 1, inplace = True)
    
    # 3. Modify data types
    df_airport.latitude = df_airport.latitude.astype('float').round(2)
    df_airport.longitude = df_airport.longitude.astype('float').round(2)
    df_iso_states = pd.read_csv('ISO_code_US.csv', sep=";")

    # 4. Replace codes with more descriptive values
    df_iso_states = pd.read_csv('ISO_code_US.csv', sep=";")
    df_airport = pd.merge(df_airport, df_iso_states, how = 'inner', left_on=['iso_region'], right_on = ['iso_code'])
    df_airport.drop("iso_code", axis = 1, inplace = True)
    
    # 5. Drop duplicated values
    df_airport = df_airport.drop_duplicates()
    
    # 6. Create new features
    # 6.1. city_state - Key to link this table with df_sas
    df_faa_code = pd.read_csv('faa_code.csv', sep = ';') # Dataframe with the FAA codes to 
    df_faa_code['City'] = df_faa_code['City'].str.capitalize()
    df_faa_code['State'] = df_faa_code['State'].str.capitalize()
    df_faa_code['city_state'] = df_faa_code.City + '_' + df_faa_code.State
    df_faa_code.drop_duplicates(subset = ['Locator Id'], inplace = True)
    df_airport = pd.merge(df_airport, df_faa_code, how = 'inner', left_on=['ident'], right_on = ['Locator Id'])
    df_airport.drop(['state', 'Locator Id', 'Facilty'], axis = 1, inplace = True)

    return df_airport

In [17]:
df_airport = cleaning_airport_data(df_airport)
df_airport.head()

Unnamed: 0,ident,type,name,iso_region,latitude,longitude,City,State,city_state
0,00A,heliport,Total Rf Heliport,US-PA,-74.93,40.07,Bensalem,Pennsylvania,Bensalem_Pennsylvania
1,00PN,small_airport,Ferrell Field,US-PA,-80.21,41.3,Mercer,Pennsylvania,Mercer_Pennsylvania
2,01PA,heliport,Pine Heliport,US-PA,-80.05,40.66,Bolivar,Pennsylvania,Bolivar_Pennsylvania
3,01PS,small_airport,Nort's Resort Airport,US-PA,-76.03,41.6,Meshoppen,Pennsylvania,Meshoppen_Pennsylvania
4,02PA,heliport,Lag Iii Heliport,US-PA,-79.77,40.44,Monroeville,Pennsylvania,Monroeville_Pennsylvania


#### Upload data to the data lake

In [38]:
def save_csv_S3 (df, output_data = output_data):
    '''
    Function which saves the dataframe already adapted to the data lake in Amazon S3.
    
    INPUT: 
    df_airport - Spark Dataframe: data to be saved in Amazon S3
    output_data - string: url to the Amazon S3 bucket
    
    OUTPUT:
    None
    '''
    df.to_csv(output_data + 'immigration/')
    return
    

In [39]:
save_airport_data(df_airport, output_data)

---
---

### 3. U.S. Cities Demographic Data

#### Explore the data

In [40]:
df_cities.shape

(2891, 12)

In [41]:
df_cities.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2891 entries, 0 to 2890
Data columns (total 12 columns):
 #   Column                  Non-Null Count  Dtype  
---  ------                  --------------  -----  
 0   City                    2891 non-null   object 
 1   State                   2891 non-null   object 
 2   Median Age              2891 non-null   float64
 3   Male Population         2888 non-null   float64
 4   Female Population       2888 non-null   float64
 5   Total Population        2891 non-null   int64  
 6   Number of Veterans      2878 non-null   float64
 7   Foreign-born            2878 non-null   float64
 8   Average Household Size  2875 non-null   float64
 9   State Code              2891 non-null   object 
 10  Race                    2891 non-null   object 
 11  Count                   2891 non-null   int64  
dtypes: float64(6), int64(2), object(4)
memory usage: 271.2+ KB


In [42]:
df_cities.describe(include = 'all')

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
count,2891,2891,2891.0,2888.0,2888.0,2891.0,2878.0,2878.0,2875.0,2891,2891,2891.0
unique,567,49,,,,,,,,49,5,
top,Columbia,California,,,,,,,,CA,Hispanic or Latino,
freq,15,676,,,,,,,,676,596,
mean,,,35.494881,97328.43,101769.6,198966.8,9367.832523,40653.6,2.742543,,,48963.77
std,,,4.401617,216299.9,231564.6,447555.9,13211.219924,155749.1,0.433291,,,144385.6
min,,,22.9,29281.0,27348.0,63215.0,416.0,861.0,2.0,,,98.0
25%,,,32.8,39289.0,41227.0,80429.0,3739.0,9224.0,2.43,,,3435.0
50%,,,35.3,52341.0,53809.0,106782.0,5397.0,18822.0,2.65,,,13780.0
75%,,,38.0,86641.75,89604.0,175232.0,9368.0,33971.75,2.95,,,54447.0


In [43]:
df_cities.isna().sum()

City                       0
State                      0
Median Age                 0
Male Population            3
Female Population          3
Total Population           0
Number of Veterans        13
Foreign-born              13
Average Household Size    16
State Code                 0
Race                       0
Count                      0
dtype: int64

In [44]:
df_cities[df_cities['Number of Veterans'].isna()]

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
111,San Juan,Puerto Rico,41.4,155408.0,186829.0,342237,,,,PR,Hispanic or Latino,335559
155,Caguas,Puerto Rico,40.4,34743.0,42265.0,77008,,,,PR,Hispanic or Latino,76349
258,Carolina,Puerto Rico,42.0,64758.0,77308.0,142066,,,,PR,American Indian and Alaska Native,12143
637,Carolina,Puerto Rico,42.0,64758.0,77308.0,142066,,,,PR,Hispanic or Latino,139967
1747,San Juan,Puerto Rico,41.4,155408.0,186829.0,342237,,,,PR,American Indian and Alaska Native,4031
1748,Mayagüez,Puerto Rico,38.1,30799.0,35782.0,66581,,,,PR,Asian,235
1995,Ponce,Puerto Rico,40.5,56968.0,64615.0,121583,,,,PR,Hispanic or Latino,120705
2004,Bayamón,Puerto Rico,39.4,80128.0,90131.0,170259,,,,PR,Hispanic or Latino,169155
2441,San Juan,Puerto Rico,41.4,155408.0,186829.0,342237,,,,PR,Asian,2452
2589,Guaynabo,Puerto Rico,42.2,33066.0,37426.0,70492,,,,PR,Hispanic or Latino,69936


In [45]:
df_cities[df_cities['Average Household Size'].isna()]

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
111,San Juan,Puerto Rico,41.4,155408.0,186829.0,342237,,,,PR,Hispanic or Latino,335559
155,Caguas,Puerto Rico,40.4,34743.0,42265.0,77008,,,,PR,Hispanic or Latino,76349
258,Carolina,Puerto Rico,42.0,64758.0,77308.0,142066,,,,PR,American Indian and Alaska Native,12143
333,The Villages,Florida,70.5,,,72590,15231.0,4034.0,,FL,Hispanic or Latino,1066
449,The Villages,Florida,70.5,,,72590,15231.0,4034.0,,FL,Black or African-American,331
637,Carolina,Puerto Rico,42.0,64758.0,77308.0,142066,,,,PR,Hispanic or Latino,139967
1437,The Villages,Florida,70.5,,,72590,15231.0,4034.0,,FL,White,72211
1747,San Juan,Puerto Rico,41.4,155408.0,186829.0,342237,,,,PR,American Indian and Alaska Native,4031
1748,Mayagüez,Puerto Rico,38.1,30799.0,35782.0,66581,,,,PR,Asian,235
1995,Ponce,Puerto Rico,40.5,56968.0,64615.0,121583,,,,PR,Hispanic or Latino,120705


In [46]:
df_cities[df_cities.duplicated()]

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count


#### Cleaning steps

In [18]:
def cleaning_cities_data(df_cities):
    '''
    Function which cleans the data implementing the following steps: 
    1. Modify the names of the columns to more descriptive values
    2. Fix the missing values issues
    3. Modify data types
    4. Create new features
    
    INPUT:
    df_cities (Spark DataFrame): DataFrame directly extracted from the raw data without any cleaning process
    
    OUTPUT:
    df_cities (Spark DataFrame): data already cleaned for being uploaded to the data lake
    '''
    
    # 1. Modify the names of the columns to more descriptive values
    df_cities.columns = df_cities.columns.str.lower()
    df_cities.columns = df_cities.columns.str.replace(" ","_")
    df_cities.columns = df_cities.columns.str.replace("foreign-born","foreign_born")
    
    # 2. Fix the missing values issues
    # median age
    list_states = list(df_cities.state.unique())
    for i in list_states:
        if df_cities[df_cities.state==i]['median_age'].mean()>0:
            df_cities['median_age'] = df_cities.groupby('state')['median_age'].transform(lambda x: x.fillna(x.mean()))

        else:
            df_cities['median_age'] = df_cities.groupby('state')['median_age'].transform(lambda x: x.fillna(df_cities.median_age.mean()))
            
    # male population
    list_states = list(df_cities.state.unique())
    for i in list_states:
        if df_cities[df_cities.state==i]['male_population'].mean()>0:
            df_cities['male_population'] = df_cities.groupby('state')['male_population'].transform(lambda x: x.fillna(x.mean()))

        else:
            df_cities.male_population = df_cities.groupby('state')['male_population'].transform(lambda x: x.fillna(df_cities.male_population.mean()))
            
    # female population
    list_states = list(df_cities.state.unique())
    for i in list_states:
        if df_cities[df_cities.state==i]['female_population'].mean()>0:
            df_cities['female_population'] = df_cities.groupby('state')['female_population'].transform(lambda x: x.fillna(x.mean()))

        else:
            df_cities.male_population = df_cities.groupby('state')['female_population'].transform(lambda x: x.fillna(df_cities.female_population.mean()))
    
    # total population
    list_states = list(df_cities.state.unique())
    for i in list_states:
        if df_cities[df_cities.state==i]['total_population'].mean()>0:
            df_cities['total_population'] = df_cities.groupby('state')['total_population'].transform(lambda x: x.fillna(x.mean()))

        else:
            df_cities['total_population'] = df_cities.groupby('state')['total_population'].transform(lambda x: x.fillna(df_cities.total_population.mean()))    
            
    # number of veterans
    list_states = list(df_cities.state.unique())
    for i in list_states:
        if df_cities[df_cities.state==i]['number_of_veterans'].mean()>0:
            df_cities['number_of_veterans'] = df_cities.groupby('state')['number_of_veterans'].transform(lambda x: x.fillna(x.mean()))

        else:
            df_cities['number_of_veterans'] = df_cities.groupby('state')['number_of_veterans'].transform(lambda x: x.fillna(df_cities.number_of_veterans.mean()))
    
    # foreign born
    list_states = list(df_cities.state.unique())
    for i in list_states:
        if df_cities[df_cities.state==i]['foreign_born'].mean()>0:
            df_cities['foreign_born'] = df_cities.groupby('state')['foreign_born'].transform(lambda x: x.fillna(x.mean()))

        else:
            df_cities['foreign_born'] = df_cities.groupby('state')['foreign_born'].transform(lambda x: x.fillna(df_cities.foreign_born.mean()))
    
    # average household size
    list_states = list(df_cities.state.unique())
    for i in list_states:
        if df_cities[df_cities.state==i]['average_household_size'].mean()>0:
            df_cities['average_household_size'] = df_cities.groupby('state')['average_household_size'].transform(lambda x: x.fillna(x.mean()))

        else:
            df_cities.average_household_size = df_cities.groupby('state')['average_household_size'].transform(lambda x: x.fillna(df_cities.average_household_size.mean()))
    
    # count
    list_states = list(df_cities.state.unique())
    for i in list_states:
        if df_cities[df_cities.state==i]['count'].mean()>0:
            df_cities['count'] = df_cities.groupby('state')['count'].transform(lambda x: x.fillna(x.mean()))

        else:
            df_cities['count'] = df_cities.groupby('state')['count'].transform(lambda x: x.fillna(df_cities.count.mean()))
    
    # Drop the lines of the columns which have key values that can not be infered
    df_cities.dropna(subset = ['city','state','state_code','race'], inplace = True)
    
    # 3. Modify data types
    list_float_to_int = ['male_population','female_population','total_population','number_of_veterans',
                    'foreign_born','count']

    for col in list_float_to_int:
        df_cities[col] = df_cities[col].astype('int')
    
    # 4. Create new features
    # 4.1. city_state - Variable to link df_cities with df_travel
    var_names_list = ['median_age', 'male_population', 'female_population',
           'total_population', 'number_of_veterans', 'foreign_born',
           'average_household_size', 'count']

    race_list = ['Hispanic or Latino','White','Black or African-American','Asian','American Indian and Alaska Native']

    for var in var_names_list:
        for race_var in race_list:
            name_var = var + '_' + race_var
            df_cities[name_var] = df_cities[df_cities['race'] == race_var][var]
    df_cities.head()

    df_cities.drop(['median_age', 'male_population', 'female_population','total_population', 'number_of_veterans', 'foreign_born',
           'average_household_size', 'count','race'], axis = 1, inplace = True)

    df_cities['city_state'] = df_cities['city'] + '_' + df_cities['state']
    df_cities = df_cities.groupby('city_state').sum().reset_index()

    df_cities[['city','state']] = df_cities.city_state.str.split('_', expand = True).rename(columns = {0:'city', 1:'state'})
    
    return df_cities

In [19]:
df_cities = cleaning_cities_data(df_cities)
df_cities.head()

Unnamed: 0,city_state,median_age_Hispanic or Latino,median_age_White,median_age_Black or African-American,median_age_Asian,median_age_American Indian and Alaska Native,male_population_Hispanic or Latino,male_population_White,male_population_Black or African-American,male_population_Asian,...,average_household_size_Black or African-American,average_household_size_Asian,average_household_size_American Indian and Alaska Native,count_Hispanic or Latino,count_White,count_Black or African-American,count_Asian,count_American Indian and Alaska Native,city,state
0,Abilene_Texas,31.3,31.3,31.3,31.3,31.3,65212.0,65212.0,65212.0,65212.0,...,2.64,2.64,2.64,33222.0,95487.0,14449.0,2929.0,1813.0,Abilene,Texas
1,Akron_Ohio,38.1,38.1,38.1,38.1,38.1,96886.0,96886.0,96886.0,96886.0,...,2.24,2.24,2.24,3684.0,129192.0,66551.0,9033.0,1845.0,Akron,Ohio
2,Alafaya_Florida,33.5,33.5,33.5,33.5,0.0,39504.0,39504.0,39504.0,39504.0,...,2.94,2.94,0.0,34897.0,63666.0,6577.0,10336.0,0.0,Alafaya,Florida
3,Alameda_California,41.4,41.4,41.4,41.4,41.4,37747.0,37747.0,37747.0,37747.0,...,2.52,2.52,2.52,8265.0,44232.0,7364.0,27984.0,1329.0,Alameda,California
4,Albany_Georgia,33.3,33.3,33.3,33.3,33.3,31695.0,31695.0,31695.0,31695.0,...,2.38,2.38,2.38,1783.0,17160.0,53440.0,650.0,445.0,Albany,Georgia


#### Upload data to the data lake

In [49]:
def save_csv_S3 (df, output_data = output_data):
    '''
    Function which saves the dataframe already adapted to the data lake in Amazon S3.
    
    INPUT: 
    df_airport - Spark Dataframe: data to be saved in Amazon S3
    output_data - string: url to the Amazon S3 bucket
    
    OUTPUT:
    None
    '''
    df.to_csv(output_data + 'immigration/')
    return
    

In [50]:
save_csv_S3(df_cities, output_data)

---
---

### 4. World Temperature Data

#### Explore the data

In [51]:
df_temp_usa = df_temp.where(df_temp.Country == "United States")

In [52]:
df_temp_usa.show(5)

+-------------------+------------------+-----------------------------+-------+-------------+--------+---------+
|                 dt|AverageTemperature|AverageTemperatureUncertainty|   City|      Country|Latitude|Longitude|
+-------------------+------------------+-----------------------------+-------+-------------+--------+---------+
|1820-01-01 00:00:00|2.1010000000000004|                        3.217|Abilene|United States|  32.95N|  100.53W|
|1820-02-01 00:00:00|             6.926|                        2.853|Abilene|United States|  32.95N|  100.53W|
|1820-03-01 00:00:00|            10.767|                        2.395|Abilene|United States|  32.95N|  100.53W|
|1820-04-01 00:00:00|17.988999999999994|                        2.202|Abilene|United States|  32.95N|  100.53W|
|1820-05-01 00:00:00|            21.809|                        2.036|Abilene|United States|  32.95N|  100.53W|
+-------------------+------------------+-----------------------------+-------+-------------+--------+---

In [53]:
df_temp_usa.printSchema()

root
 |-- dt: timestamp (nullable = true)
 |-- AverageTemperature: double (nullable = true)
 |-- AverageTemperatureUncertainty: double (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



In [54]:
df_temp_usa.describe().show()

+-------+------------------+-----------------------------+-------+-------------+--------+---------+
|summary|AverageTemperature|AverageTemperatureUncertainty|   City|      Country|Latitude|Longitude|
+-------+------------------+-----------------------------+-------+-------------+--------+---------+
|  count|            661524|                       661524| 687289|       687289|  687289|   687289|
|   mean|13.949334923600677|           1.0895499452778745|   null|         null|    null|     null|
| stddev| 9.173337261791232|           1.1506804949928662|   null|         null|    null|     null|
|    min|           -25.163|                         0.04|Abilene|United States|  26.52N|  100.53W|
|    max|            34.379|                       10.519|Yonkers|United States|  61.88N|   99.24W|
+-------+------------------+-----------------------------+-------+-------------+--------+---------+



#### Cleaning steps

In [46]:
def cleaning_temperature_data(df_temp):
    '''
    Function which cleans the data implementing the following steps: 
    1. Modify the data types
    2. Fix the missing values issues
    3. Drop unnecessary columns
    
    INPUT:
    df_temp (Spark DataFrame): DataFrame directly extracted from the raw data without any cleaning process
    
    OUTPUT:
    df_temp_usa (Spark DataFrame): data already cleaned for being uploaded to the data lake
    '''
    df_temp_usa = df_temp.where(df_temp.Country == "United States")
    
    # 1. Modify the data types
    # Change the datastamp format, since the hour, minutes and seconds are not necessary and do not contribute to the final result
    df_temp_usa = df_temp_usa.withColumn('dt', to_date('dt'))
    
    # 2. Fix the missing values issues
    df_pandas = df_temp_usa.toPandas()
    df_pandas['AverageTemperature'] = df_pandas.groupby('City')['AverageTemperature'].transform(lambda x: x.fillna(x.mean()))
    df_pandas['AverageTemperatureUncertainty'] = df_pandas.groupby('City')['AverageTemperatureUncertainty'].transform(lambda x: x.fillna(x.mean()))
    
    # 3. Drop unnecessary columns
    df_pandas.drop("Country", axis = 1, inplace = True)
    
    return df_temp_usa

In [47]:
df_temp_usa = cleaning_temperature_data(df_temp)

In [22]:
df_temp_usa.limit(5).toPandas()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1820-01-01,2.101,3.217,Abilene,United States,32.95N,100.53W
1,1820-02-01,6.926,2.853,Abilene,United States,32.95N,100.53W
2,1820-03-01,10.767,2.395,Abilene,United States,32.95N,100.53W
3,1820-04-01,17.989,2.202,Abilene,United States,32.95N,100.53W
4,1820-05-01,21.809,2.036,Abilene,United States,32.95N,100.53W


#### Upload data to the data lake

In [58]:
def save_df_temp_usa_data (df_temp_usa = df_temp_usa, output_data = output_data):
    '''
    Function which saves the dataframe already adapted to the data lake in Amazon S3.
    
    INPUT: 
    df_airport - Spark Dataframe: data to be saved in Amazon S3
    output_data - string: url to the Amazon S3 bucket
    
    OUTPUT:
    None
    '''
    df_temp_usa.write.partitionBy('dt').mode('overwrite').parquet(output_data + 'temp_eeuu/' + datetime.datetime.strf("%Y%m%d"))
    return
    

In [59]:
save_df_temp_usa_data(df_temp_eeuu, output_data)

---
---
---

### Step 3: Examples of analysis
In this part of the code, I am going to show some analysis examples that can be carried out with this data schema.

In [92]:
# To speed up the data processing in this particular example, I created a new pandas dataframe based on df_travel.
df_travel_pandas = df_travel.toPandas()

#### Example 1: Which type of airport have used the immigrants considered in the dataset?

In [66]:
df_travel_airport = df_travel_pandas.merge(df_airport, left_on='city_state', right_on='city_state')

In [70]:
df_airport.type.value_counts()

small_airport     8506
heliport          5101
closed             762
seaplane_base      421
balloonport         12
medium_airport       1
Name: type, dtype: int64

In [67]:
df_travel_airport.groupby('type').count()['travel_id']

type
closed           1137459
heliport         9875553
seaplane_base     616327
small_airport    1920789
Name: travel_id, dtype: int64

#### Example 2: TOP10 most traveled places and which is the airline that travels the most to those places (in the data considered in the data set)

In [72]:
df_travel_cities = df_travel_pandas.merge(df_cities, left_on='city_state', right_on='city_state')

In [110]:
df_travel_cities[['airline','city_state']].value_counts()[:10]

airline  city_state       
AA       Miami_Florida        71380
DL       New York_New York    33952
VS       Orlando_Florida      33664
BA       New York_New York    32311
AA       New York_New York    31687
AF       New York_New York    21462
JJ       Miami_Florida        18069
LA       Miami_Florida        16281
AV       Miami_Florida        15857
UA       Houston_Texas        15548
dtype: int64