# Immigration Database & pipelines creation for Machine Learning studies
### Data Engineering Capstone Project

#### Project Summary
This project undertakes the creation of a series of ETL transformations whose ultimate goal is the creation of a database that allows the run of Machine Learning algorithms that can unveil hidden patterns in immigration data.

Indeed, these studies want to analyze the impact of several ambiental and socioeconomic factors in the US immigrant population in relation to their incoming and distribution within the US.

We will start small, by gathering and cleaning the data, but only joining some econonmic data to the immigration tables, leaving the rest of the tables readily available at a one-join-statement distance, when the need of more columns presents.

At end, we will store the heavily modified Immigration table in a parquet file, so as it is possible to load it and use it without the need of running all these steps from scratch.

The project follows the follow steps:

* Step 1: Scope the Project and gather and present the Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# importing modules
import pandas as pd
import re
import datetime as dt
from pyspark.sql import SparkSession
from  pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf
from pyspark.sql import SQLContext

### Step 1: Scope the Project and Gather Data

#### Scope 

As we said, the ultimate goal is to run Machine Learning models to determine whether some hidden relationships might exist between conditions of immigrants's home countries and their arrival and distribution within the US.

We will start small, so for a first iteration, we will try to determine whether a relationship between our resident's city or choice and their homeplace's mean temperatures hold any relationship.

#### Data

For this purpose will use only a big dataset from the US National Tourism and Trade Office, another one from Opensoft relating the demographics of cities along the US, a dataset of airport codes that might help us in locating our resident's countries of birth, a big dataset of temperatures by city and a dataset of countries by GDP per capita.

- I94 Immigration Data
- World Temperature Data
- U.S. City Demographic Data
- Airport Code Table
- Countries GDP per capita

We will import and describe now these 5 datasets:

**I94 Immigration Data:**

This data comes from the US National Tourism and Trade Office. A data dictionary is also included in the workspace. 

The data comes from: https://travel.trade.gov/research/reports/i94/historical/2016.html.

We have also a sample file we will use to take a look at the data in csv format before reading it all in:

In [2]:
immigration_path = 'immigration_data_sample.csv'
df_immigration = pd.read_csv(immigration_path)
df_immigration.head()

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,...,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,...,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,...,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,...,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,...,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


With the data we have a data dictionary, the we proceed to read:

In [3]:
SAS_label_descriptions = "./Auxiliary/I94_SAS_Labels_Descriptions.SAS"
label_descriptions = open(SAS_label_descriptions)
label_descriptions.readlines()[0:20]

["libname library 'Your file location' ;\n",
 'proc format library=library ;\n',
 '\n',
 '/* I94YR - 4 digit year */\n',
 '\n',
 '/* I94MON - Numeric month */\n',
 '\n',
 '/* I94CIT & I94RES - This format shows all the valid and invalid codes for processing */\n',
 '  value i94cntyl\n',
 "   582 =  'MEXICO Air Sea, and Not Reported (I-94, no land arrivals)'\n",
 "   236 =  'AFGHANISTAN'\n",
 "   101 =  'ALBANIA'\n",
 "   316 =  'ALGERIA'\n",
 "   102 =  'ANDORRA'\n",
 "   324 =  'ANGOLA'\n",
 "   529 =  'ANGUILLA'\n",
 "   518 =  'ANTIGUA-BARBUDA'\n",
 "   687 =  'ARGENTINA '\n",
 "   151 =  'ARMENIA'\n",
 "   532 =  'ARUBA'\n"]

Cleaning with Regex, we extract the information on what are our columns and the information they contain:

In [4]:
re_obj = re.compile(r'\/\*(.*)')
I94_data_columns_expl = list()

label_descriptions = open(SAS_label_descriptions)
for data in label_descriptions:
    match = re_obj.search(data)
    if match != None:
         I94_data_columns_expl.append(match[0])
    
I94_data_columns_expl

['/* I94YR - 4 digit year */',
 '/* I94MON - Numeric month */',
 '/* I94CIT & I94RES - This format shows all the valid and invalid codes for processing */',
 '/* I94PORT - This format shows all the valid and invalid codes for processing */',
 '/* ARRDATE is the Arrival Date in the USA. It is a SAS date numeric field that a ',
 '/* I94MODE - There are missing values as well as not reported (9) */',
 '/* I94ADDR - There is lots of invalid codes in this variable and the list below ',
 '/* DEPDATE is the Departure Date from the USA. It is a SAS date numeric field that ',
 '/* I94BIR - Age of Respondent in Years */',
 '/* I94VISA - Visa codes collapsed into three categories:',
 '/* COUNT - Used for summary statistics */',
 '/* DTADFILE - Character Date Field - Date added to I-94 Files - CIC does not use */',
 '/* VISAPOST - Department of State where where Visa was issued - CIC does not use */',
 '/* OCCUP - Occupation that will be performed in U.S. - CIC does not use */',
 '/* ENTDEPA - Arr

We now proceed to import to a dataframe one of the original SAS files:

In [5]:
SAS_immigration = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
df_immigration_trial = pd.read_sas(SAS_immigration, format='sas7bdat', encoding="ISO-8859-1")
# format=None, index=None, encoding=None, chunksize=None, iterator=False)                                 
# format='sas7bdat', encoding="ISO-8859-1")
                                   
df_immigration_trial.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


**Airport Code Table:** 

This is a simple table of airport codes and corresponding cities. 

It comes from:
https://datahub.io/core/airport-codes#data

In [6]:
airports_path = 'airport-codes_csv.csv'
df_airports = pd.read_csv(airports_path, encoding="ISO-8859-1")
df_airports.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [7]:
df_airports[(df_airports['iso_country'] == 'ES')&(df_airports['type']=='large_airport')]

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
22500,GCLA,large_airport,La Palma Airport,107.0,EU,ES,ES-CN,"Sta Cruz de la Palma, La Palma Island",GCLA,SPC,,"-17.7556, 28.626499"
22502,GCLP,large_airport,Gran Canaria Airport,78.0,EU,ES,ES-CN,Gran Canaria Island,GCLP,LPA,,"-15.38659954071045, 27.931900024414062"
22504,GCTS,large_airport,Tenerife South Airport,209.0,EU,ES,ES-CN,Tenerife Island,GCTS,TFS,,"-16.5725002289, 28.044500351"
22506,GCXO,large_airport,Tenerife Norte Airport,2076.0,EU,ES,ES-CN,Tenerife Island,GCXO,TFN,,"-16.3414993286, 28.4827003479"
30787,LEAL,large_airport,Alicante International Airport,142.0,EU,ES,ES-V,Alicante,LEAL,ALC,,"-0.5581560134887695, 38.28219985961914"
30799,LEBL,large_airport,Barcelona International Airport,12.0,EU,ES,ES-CT,Barcelona,LEBL,BCN,,"2.07846, 41.2971"
30863,LEMD,large_airport,Adolfo SuÃÂ¡rez MadridÃ¢ÂÂBarajas Airport,1998.0,EU,ES,ES-M,Madrid,LEMD,MAD,,"-3.56264, 40.471926"
30865,LEMG,large_airport,MÃÂ¡laga Airport,53.0,EU,ES,ES-AN,MÃÂ¡laga,LEMG,AGP,,"-4.499110221862793, 36.67490005493164"
30884,LEPA,large_airport,Palma De Mallorca Airport,27.0,EU,ES,ES-PM,Palma De Mallorca,LEPA,PMI,,"2.73881006241, 39.551700592"
30911,LEST,large_airport,Santiago de Compostela Airport,1213.0,EU,ES,ES-GA,Santiago de Compostela,LEST,SCQ,,"-8.415140151977539, 42.89630126953125"


**U.S. City Demographic Data:**
    
This data comes from OpenSoft. 

You can read more about it here:
        
https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/

In [8]:
demographics_path = 'us-cities-demographics.csv'
df_demographics = pd.read_csv(demographics_path, sep=";")
df_demographics.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


**World Temperature Data:** 

xThis dataset came from Kaggle. You can read more about it here:
https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data

In [9]:
temperatures_path = '../../data2/GlobalLandTemperaturesByCity.csv'
df_temperatures = pd.read_csv(temperatures_path, nrows=100000)
df_temperatures.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


**Countries GDP per capita:**
    
Data from the Word Bank datasets:
    
https://data.worldbank.org/indicator/NY.GDP.PCAP.CD?view=chart

In [10]:
countries_path = 'Countries_GDP_per_capita.csv'
df_countries = pd.read_csv(countries_path, sep = ";")
df_countries.head()

Unnamed: 0,Country Name,Country Code,Indicator Name,Indicator Code,1960,1961,1962,1963,1964,1965,...,2010,2011,2012,2013,2014,2015,2016,2017,2018,2019
0,Aruba,ABW,GDP per capita (current US$),NY.GDP.PCAP.CD,,,,,,,...,23512.6026,24985.99328,24713.69805,26189.43551,26647.9381,27980.8807,28281.35048,29007.693,,
1,Afghanistan,AFG,GDP per capita (current US$),NY.GDP.PCAP.CD,59.773194,59.860874,58.458015,78.706388,82.095231,101.108305,...,543.303042,591.162759,641.871479,637.165523,613.856689,578.466353,547.22811,556.302002,524.162881,502.115487
2,Angola,AGO,GDP per capita (current US$),NY.GDP.PCAP.CD,,,,,,,...,3587.883798,4615.468028,5100.095808,5254.882338,5408.410496,4166.979684,3506.072885,4095.812942,3289.646664,2973.59116
3,Albania,ALB,GDP per capita (current US$),NY.GDP.PCAP.CD,,,,,,,...,4094.350334,4437.142885,4247.629984,4413.060861,4578.631994,3952.801215,4124.055726,4531.020806,5284.380184,5352.857411
4,Andorra,AND,GDP per capita (current US$),NY.GDP.PCAP.CD,,,,,,,...,40852.66678,43335.32886,38686.46126,39538.76672,41303.92937,35762.52307,37474.66541,38962.88035,41793.05526,40886.39116


### Step 2: Explore and Assess the Data
#### Explore the Data 

Now that we have taken a quick peak on our data, we will take a closer look to notice its format and particularities.

We will focus on missing and duplicate data, appart from the column formats.

With such purposes in mind, we will define a function to apply to all of them:

In [11]:
def preliminary_checkings(df):
    print("Total number of rows is {}".format(df.shape[0]))
    print("The number of null values is {}".format(sum(df.isnull().values.ravel())))
    print(("The number of rows with some null values is {}".format(sum([True for idx,row in df.iterrows() if any(row.isnull())]))))
    print("Total number of duplicated rows is {}\n".format(sum(df.duplicated())))
    print(df.info())

**Checks:**

We apply the checkings function to our immigration sample dataset:

In [12]:
preliminary_checkings(df_immigration)

Total number of rows is 1000
The number of null values is 3961
The number of rows with some null values is 1000
Total number of duplicated rows is 0

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1000 entries, 0 to 999
Data columns (total 29 columns):
Unnamed: 0    1000 non-null int64
cicid         1000 non-null float64
i94yr         1000 non-null float64
i94mon        1000 non-null float64
i94cit        1000 non-null float64
i94res        1000 non-null float64
i94port       1000 non-null object
arrdate       1000 non-null float64
i94mode       1000 non-null float64
i94addr       941 non-null object
depdate       951 non-null float64
i94bir        1000 non-null float64
i94visa       1000 non-null float64
count         1000 non-null float64
dtadfile      1000 non-null int64
visapost      382 non-null object
occup         4 non-null object
entdepa       1000 non-null object
entdepd       954 non-null object
entdepu       0 non-null float64
matflag       954 non-null object
biryear   

We can see that no row is free of nulls, but that most of them come from a small bunch of columns.

After consulting in the data dictionary, we decide to remove four columns that will not bring much information to our desired goals, and that also hold a pretty decent ammount of null values:

In [13]:
df_immigration_columns_with_nulls = ['entdepu', 'occup', 'insnum', 'visapost']
df_immigration.drop(df_immigration_columns_with_nulls, axis = 1, inplace = True)

Now, the results are much better:

In [14]:
preliminary_checkings(df_immigration)

Total number of rows is 1000
The number of null values is 382
The number of rows with some null values is 243
Total number of duplicated rows is 0

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1000 entries, 0 to 999
Data columns (total 25 columns):
Unnamed: 0    1000 non-null int64
cicid         1000 non-null float64
i94yr         1000 non-null float64
i94mon        1000 non-null float64
i94cit        1000 non-null float64
i94res        1000 non-null float64
i94port       1000 non-null object
arrdate       1000 non-null float64
i94mode       1000 non-null float64
i94addr       941 non-null object
depdate       951 non-null float64
i94bir        1000 non-null float64
i94visa       1000 non-null float64
count         1000 non-null float64
dtadfile      1000 non-null int64
entdepa       1000 non-null object
entdepd       954 non-null object
matflag       954 non-null object
biryear       1000 non-null float64
dtaddto       1000 non-null object
gender        859 non-null object
airlin

We also check the dataset to look for meaning for the mmisterious 'Unnamed: 0' column.

We suspect that this column holds the visitor's id.

We will see, checking for duplicates:

In [15]:
df_immigration[df_immigration['Unnamed: 0'].duplicated() == True]

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepa,entdepd,matflag,biryear,dtaddto,gender,airline,admnum,fltno,visatype


After looking it up, we see that CIC corresponds to Citizenship and Immigration Canada.

We check for duplicates, and if there are not any, the 1-on-1 relationship between 'Unnamed: 0' and cicid would be granted, providing support for our theory about that first column:

In [16]:
df_immigration[df_immigration['cicid'].duplicated() == True]

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepa,entdepd,matflag,biryear,dtaddto,gender,airline,admnum,fltno,visatype


We now check the airports:

In [17]:
preliminary_checkings(df_airports)

Total number of rows is 55075
The number of null values is 126968
The number of rows with some null values is 54397
Total number of duplicated rows is 0

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 55075 entries, 0 to 55074
Data columns (total 12 columns):
ident           55075 non-null object
type            55075 non-null object
name            55075 non-null object
elevation_ft    48069 non-null float64
continent       27356 non-null object
iso_country     54828 non-null object
iso_region      55075 non-null object
municipality    49399 non-null object
gps_code        41030 non-null object
iata_code       9189 non-null object
local_code      28686 non-null object
coordinates     55075 non-null object
dtypes: float64(1), object(11)
memory usage: 5.0+ MB
None


Proceeding as with the immigraiton data, we remove two columns from our dataset that bring little information for us, but that are full of null values:

In [18]:
df_airports_columns_with_nulls = ['continent', 'elevation_ft']
df_airports.drop(df_airports_columns_with_nulls, axis = 1, inplace = True)

In [19]:
preliminary_checkings(df_airports)

Total number of rows is 55075
The number of null values is 92243
The number of rows with some null values is 52299
Total number of duplicated rows is 0

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 55075 entries, 0 to 55074
Data columns (total 10 columns):
ident           55075 non-null object
type            55075 non-null object
name            55075 non-null object
iso_country     54828 non-null object
iso_region      55075 non-null object
municipality    49399 non-null object
gps_code        41030 non-null object
iata_code       9189 non-null object
local_code      28686 non-null object
coordinates     55075 non-null object
dtypes: object(10)
memory usage: 4.2+ MB
None


Again, we check for duplicates in the 'ident' column (above we check for duplicated rows):

In [20]:
df_airports[df_airports['ident'].duplicated() == True]

Unnamed: 0,ident,type,name,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates


Now is turn for the df_demographics dataset:

In [21]:
preliminary_checkings(df_demographics)

Total number of rows is 2891
The number of null values is 48
The number of rows with some null values is 16
Total number of duplicated rows is 0

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2891 entries, 0 to 2890
Data columns (total 12 columns):
City                      2891 non-null object
State                     2891 non-null object
Median Age                2891 non-null float64
Male Population           2888 non-null float64
Female Population         2888 non-null float64
Total Population          2891 non-null int64
Number of Veterans        2878 non-null float64
Foreign-born              2878 non-null float64
Average Household Size    2875 non-null float64
State Code                2891 non-null object
Race                      2891 non-null object
Count                     2891 non-null int64
dtypes: float64(6), int64(2), object(4)
memory usage: 271.1+ KB
None


And the temperatures dataset:

In [22]:
preliminary_checkings(df_temperatures)

Total number of rows is 100000
The number of null values is 8548
The number of rows with some null values is 4274
Total number of duplicated rows is 0

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100000 entries, 0 to 99999
Data columns (total 7 columns):
dt                               100000 non-null object
AverageTemperature               95726 non-null float64
AverageTemperatureUncertainty    95726 non-null float64
City                             100000 non-null object
Country                          100000 non-null object
Latitude                         100000 non-null object
Longitude                        100000 non-null object
dtypes: float64(2), object(5)
memory usage: 5.3+ MB
None


We can see that those two datasets are fairly clean.

Now, we go for the last dataset, the df_countries. We saw above that there were a pretty big ammount of columns. We will check on that:

In [23]:
df_countries.columns

Index(['Country Name', 'Country Code', 'Indicator Name', 'Indicator Code',
       '1960', '1961', '1962', '1963', '1964', '1965', '1966', '1967', '1968',
       '1969', '1970', '1971', '1972', '1973', '1974', '1975', '1976', '1977',
       '1978', '1979', '1980', '1981', '1982', '1983', '1984', '1985', '1986',
       '1987', '1988', '1989', '1990', '1991', '1992', '1993', '1994', '1995',
       '1996', '1997', '1998', '1999', '2000', '2001', '2002', '2003', '2004',
       '2005', '2006', '2007', '2008', '2009', '2010', '2011', '2012', '2013',
       '2014', '2015', '2016', '2017', '2018', '2019'],
      dtype='object')

We see that there are many columns. As we are interested only in the recent data (we are checking on present day immigration) we will throw away all data from 1960 to 2009:

In [24]:
list_columns_to_drop = [str(i) for i in range(1960,2010)]
list_columns_to_drop.append('Indicator Name')
list_columns_to_drop.append('Indicator Code')

In [25]:
df_countries.drop(list_columns_to_drop, axis=1, inplace=True)

Now, this structure is also not the industry standard, so we are going to reshape or dataframe so there is a unique column holding all the values for the different years:

In [26]:
list_columns_indicators = ['Country Name', 'Country Code']
list_columns_to_melt = [str(i) for i in range(2010,2020)]

df_countries = df_countries.melt(id_vars = list_columns_indicators, var_name = 'year')

Now, we run our standard checkings procedure:

In [27]:
preliminary_checkings(df_countries)

Total number of rows is 2640
The number of null values is 169
The number of rows with some null values is 169
Total number of duplicated rows is 0

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2640 entries, 0 to 2639
Data columns (total 4 columns):
Country Name    2640 non-null object
Country Code    2640 non-null object
year            2640 non-null object
value           2471 non-null float64
dtypes: float64(1), object(3)
memory usage: 82.6+ KB
None


For our preliminary analysis, seems nice.

Also, it is noticeable that we have not found a single duplicated value (bearing in mind that, from the immigration and temperature data we have only taken a small sample, of course).

#### Defining Cleaning Steps

In the first part of this section we looked at our data and only dropped some columns that held a fair share of nulls and were of no practical use.

Only in the case of the df_countries dataset we did a more serious job, as there were changes it structure required before assesing its content.

Now, in this second part we will clean our data, adressing what we have found on the first part, and also bringing new considerations to the process:

We start again with the immigration dataset:

In [28]:
df_immigration.head()

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepa,entdepd,matflag,biryear,dtaddto,gender,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,...,G,O,M,1955.0,7202016,F,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,...,G,R,M,1990.0,10222016,M,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,...,G,O,M,1940.0,7052016,M,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,...,G,O,M,1991.0,10272016,M,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,...,Z,K,M,1997.0,7042016,F,,42322570000.0,LAND,WT


Suspecting that the first column corresponds to an id, we will rename the first column to 'Imm_id'

Also, we could see all columns were float64 values, when the variables all this column hold, are integers.

This is due to Pandas, in order to introduce NaN, which are floats, have to convert the whole column to float.

We will fill the NaN with 0 and then convert all float64 columns to integers:

In [29]:
df_immigration.rename({'Unnamed: 0':'Imm_id'}, axis=1, inplace=True)

We also have a column with a very unfortunate name: 'count' can bring future problems for our analysis team, so we change its name, trying to keep its essence:

In [30]:
df_immigration.rename({'count':'counted'}, axis=1, inplace=True)

In [31]:
df_immigration.fillna(0, inplace=True)

In [32]:
integer_columns = list()
for column in df_immigration.columns:
    if df_immigration[column].dtypes == 'float64':
        integer_columns.append(column)

In [33]:
for column in df_immigration.columns:
    if df_immigration[column].dtypes == 'float64':
        df_immigration[column] = df_immigration[column].astype('int')

Now we can see that our dataframe presents much nicer looks:

In [34]:
df_immigration.head()

Unnamed: 0,Imm_id,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepa,entdepd,matflag,biryear,dtaddto,gender,airline,admnum,fltno,visatype
0,2027561,4084316,2016,4,209,209,HHW,20566,1,HI,...,G,O,M,1955,7202016,F,JL,56582674633,00782,WT
1,2171295,4422636,2016,4,582,582,MCA,20567,1,TX,...,G,R,M,1990,10222016,M,*GA,94361995930,XBLNG,B2
2,589494,1195600,2016,4,148,112,OGG,20551,1,FL,...,G,O,M,1940,7052016,M,LH,55780468433,00464,WT
3,2631158,5291768,2016,4,297,297,LOS,20572,1,CA,...,G,O,M,1991,10272016,M,QR,94789696030,00739,B2
4,3032257,985523,2016,4,111,111,CHM,20550,3,NY,...,Z,K,M,1997,7042016,F,0,42322572633,LAND,WT


There is still the matter with the dates, so let's address that too:

In [35]:
format_base_date = '%d-%m-%Y'

def transforming_SAS_date(series):
    series.fillna(0, inplace = True)
    return dt.datetime.strptime('01-01-1960', format_base_date) + series.apply(lambda x: dt.timedelta(days = x))

In [36]:
df_immigration[['arrdate', 'depdate']] = df_immigration[['arrdate', 'depdate']].apply(transforming_SAS_date)

In [37]:
df_immigration.head()

Unnamed: 0,Imm_id,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepa,entdepd,matflag,biryear,dtaddto,gender,airline,admnum,fltno,visatype
0,2027561,4084316,2016,4,209,209,HHW,2016-04-22,1,HI,...,G,O,M,1955,7202016,F,JL,56582674633,00782,WT
1,2171295,4422636,2016,4,582,582,MCA,2016-04-23,1,TX,...,G,R,M,1990,10222016,M,*GA,94361995930,XBLNG,B2
2,589494,1195600,2016,4,148,112,OGG,2016-04-07,1,FL,...,G,O,M,1940,7052016,M,LH,55780468433,00464,WT
3,2631158,5291768,2016,4,297,297,LOS,2016-04-28,1,CA,...,G,O,M,1991,10272016,M,QR,94789696030,00739,B2
4,3032257,985523,2016,4,111,111,CHM,2016-04-06,3,NY,...,Z,K,M,1997,7042016,F,0,42322572633,LAND,WT


We now go for the demographics dataset:

In [38]:
df_demographics.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


We face the same problem this the floats as before, but only for some columns, so we are going to address this issue again:

In [39]:
df_demographics.dropna(inplace = True)

In [40]:
df_demographics_columns_to_int = ['Male Population', 'Female Population', 'Total Population', 'Number of Veterans', 'Foreign-born']
for column in df_demographics_columns_to_int:
    df_demographics[column] = df_demographics[column].astype('int')

Also, we can drop the column 'Number of Veterans', as it holds no interest for our purposes:

In [41]:
df_demographics.drop('Number of Veterans', axis = 1, inplace = True)

And change the name of the last column, as in the previous case:

In [42]:
df_demographics.rename({'count':'counted'}, axis=1, inplace=True)

In [43]:
df_demographics.rename({'Foreign-born':'Foreignborn'}, axis=1, inplace=True)

In [44]:
# df_demographics['Aux Median Age'] = df_demographics['Median Age']*df_demographics['Total Population']
# df_demographics['AuxAverage Household Size'] = df_demographics['Average Household Size']*df_demographics['Total Population']

# df_demographics_2 = df_demographics.groupby('State Code').sum().reset_index()
# df_demographics_2['Median Age'] = df_demographics_2['Aux Median Age']/df_demographics_2['Total Population']
# df_demographics_2['Average Household Size'] = df_demographics_2['AuxAverage Household Size']/df_demographics_2['Total Population']
# df_demographics_2 = df_demographics_2['State Code', 'Median Age', 'Male Population', 'Female Population','Total Population', 'Foreignborn', 'Average Household Size']

Now, we go for the airports dataset:

In [45]:
df_airports.head()

Unnamed: 0,ident,type,name,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,US,US-AR,Newport,,,,"-91.254898, 35.6087"


Apparantly there is not much to be done, here.

Nevertheless, there is one thing we can do: not all of these airports will be of much use for us, so we can reduce the lenght of this dataset by finding which airports are used in our immigration dataset.
    
For doing so, we will create an airport dictionary by going Regex on the i94_SAS_Labels_Descriptions.SAS, and then checking out dataframe against this dictionary:

In [46]:
# Dictionary of valid i94port codes

regex_pattern = re.compile(r'\'(.*)\'.*\'(.*)\'')
i94_airports = dict()
with open(SAS_label_descriptions) as label_descriptions:
     for line in label_descriptions:
         match = regex_pattern.search(line)
         if match != None:
             i94_airports[match[1]]=[match[2]]
            
i94_airports.keys()

dict_keys(['ALC', 'ANC', 'BAR', 'DAC', 'PIZ', 'DTH', 'EGL', 'FRB', 'HOM', 'HYD', 'JUN', '5KE', 'KET', 'MOS', 'NIK', 'NOM', 'PKC', 'ORI', 'SKA', 'SNP', 'TKI', 'WRA', 'HSV', 'MOB', 'LIA', 'ROG', 'DOU', 'LUK', 'MAP', 'NAC', 'NOG', 'PHO', 'POR', 'SLU', 'SAS', 'TUC', 'YUI', 'AND', 'BUR', 'CAL', 'CAO', 'FRE', 'ICP', 'LNB', 'LOS', 'BFL', 'OAK', 'ONT', 'OTM', 'BLT', 'PSP', 'SAC', 'SLS', 'SDP', 'SFR', 'SNJ', 'SLO', 'SLI', 'SPC', 'SYS', 'SAA', 'STO', 'TEC', 'TRV', 'APA', 'ASE', 'COS', 'DEN', 'DRO', 'BDL', 'BGC', 'GRT', 'HAR', 'NWH', 'NWL', 'TST', 'WAS', 'DOV', 'DVD', 'WLL', 'BOC', 'SRQ', 'CAN', 'DAB', 'FRN', 'FTL', 'FMY', 'FPF', 'HUR', 'GNV', 'JAC', 'KEY', 'LEE', 'MLB', 'MIA', 'APF', 'OPF', 'ORL', 'PAN', 'PEN', 'PCF', 'PEV', 'PSJ', 'SFB', 'SGJ', 'SAU', 'FPR', 'SPE', 'TAM', 'WPB', 'ATL', 'BRU', 'AGS', 'SAV', 'AGA', 'HHW', 'OGG', 'KOA', 'LIH', 'CID', 'DSM', 'BOI', 'EPI', 'IDA', 'PTL', 'SPI', 'CHI', 'DPA', 'PIA', 'RFD', 'UGN', 'GAR', 'HMM', 'INP', 'MRL', 'SBN', 'ICT', 'LEX', 'LOU', 'BTN', 'LKC', 'L

A previous check on how many of this airports are in our dataframe (we know the right column is 'iata_code' because of some trying and comparisons we decided not to keep, for relevance reasons):

In [47]:
print(len(i94_airports))
len([a for a in list(i94_airports.keys()) if any(df_airports['iata_code'] == a)])

715


549

And now, we now filter our dataset:

In [48]:
df_airports = df_airports[df_airports['iata_code'].isin(i94_airports.keys())]

AS, in the other direction, also not all values of our dictionary (and, so, our immigration dataset) are to be found in the df_airports dataframe, we create the shorter dictionary with the common airports:

In [49]:
i94_airports_in_df = {a:b for (a,b) in list(i94_airports.items()) if any(df_airports['iata_code'] == a)}

We could filter the immigration dataset also in search of this common airports, but it wouldn't be good: we loose a fair share of data, and is better to keep it, even though we have not the data of their associated airport.

Now, as we are going to use the "iata_code" column for connection with the immigration table, we will look for duplicates in this column (there shouldn't be any):

In [50]:
df_airports['iata_code'].value_counts().head(10)

VQS    2
AUS    2
HIG    2
CLG    2
BCK    2
MNW    2
DLR    2
RIV    1
GRB    1
MIA    1
Name: iata_code, dtype: int64

As we can see, we do have some duplicates. Let's take a look:

In [51]:
val_counts = df_airports['iata_code'].value_counts()

list_iata_dup = list(val_counts[val_counts>1].index)

In [52]:
df_airports[df_airports['iata_code'].isin(list_iata_dup)].sort_values('iata_code')

Unnamed: 0,ident,type,name,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
11676,AUS,closed,Austin Robert Mueller Municipal,US,US-TX,,KAUS,AUS,,"-97.6997852325, 30.2987223546"
26137,KAUS,large_airport,Austin Bergstrom International Airport,US,US-TX,Austin,KAUS,AUS,AUS,"-97.6698989868164, 30.194499969482422"
11637,AU-BCK,closed,[Duplicate] Bolwarra Airport,AU,AU-QLD,Bolwarra,,BCK,,"144.169006348, -17.388299942"
52989,YBWR,small_airport,Bolwarra Airport,AU,AU-QLD,,YBWR,BCK,,"144.169006348, -17.388299942"
15011,CLG,closed,Coalinga Airport,US,US-CA,,,CLG,,"-120.360116959, 36.1580433385"
26327,KC80,small_airport,New Coalinga Municipal Airport,US,US-CA,Coalinga,,CLG,C80,"-120.29399871826172, 36.16310119628906"
17519,DLR,small_airport,Dalnerechensk Airport,RU,RU-PRI,Dalnerechensk,UHHD,DLR,,"133.7363, 45.8783"
40590,RU-0493,small_airport,Dalnerechensk Airport,RU,RU-PRI,Dalnerechensk,,DLR,,"133.7363, 45.8783"
11651,AU-HIG,closed,[Duplicate] Highbury Airport,AU,AU-QLD,Highbury,,HIG,,"143.145996094, -16.4244003296"
53404,YHHY,small_airport,Highbury Airport,AU,AU-QLD,,YHHY,HIG,,"143.145996094, -16.4244003296"


We can see that this phenomenom is due to several airports having been built again, so there is an old and a new airport for the same IATA code.

There is one exception: Dalnerechensk Airport, not marked neither as closed nor as a duplicate.

We will drop the old airports marked as "closed", and also this one airport, by its identity:

In [53]:
df_airports = df_airports[~((df_airports['iata_code'].isin(list_iata_dup)) & (df_airports['type'] == 'closed'))]
df_airports = df_airports[df_airports['ident'] != 'RU-0493']

And, finally, our latest transformation is to separate the longitude and latitude columns:

In [54]:
df_airports['longitude'] = df_airports['coordinates'].apply(lambda x: x.split(',')[0])
df_airports['latitude'] = df_airports['coordinates'].apply(lambda x: x.split(',')[1])

In [55]:
df_airports.drop('coordinates', axis = 1, inplace = True)

In [56]:
df_airports.head()

Unnamed: 0,ident,type,name,iso_country,iso_region,municipality,gps_code,iata_code,local_code,longitude,latitude
6055,57A,seaplane_base,Tokeen Seaplane Base,US,US-AK,Tokeen,57A,TKI,57A,-133.32699585,55.9370994568
6731,5Z9,seaplane_base,Lake Brooks Seaplane Base,US,US-AK,Katmai National Park,5Z9,BKF,5Z9,-155.77699279785,58.554798126221
8976,89NY,small_airport,Maxson Airfield,US,US-NY,Alexandria Bay,89NY,AXB,89NY,-75.90034,44.312002
10441,AGGF,small_airport,Fera/Maringe Airport,SB,SB-IS,Fera Island,AGGF,FRE,,159.576996,-8.1075
10695,ANZ,small_airport,Angus Downs Airport,AU,AU-NT,Angus Downs Station,,ANZ,,132.2748,-25.0325


Now, as for temperatures:

In [57]:
df_temperatures.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


There weren't many nulls, so we drop them:

In [58]:
df_temperatures.dropna(inplace=True)

Now, our 'dt' column is a string, and we can see that our dates go back in time to ancient times.

We don't need this data, we can keep just from 2010 to present date.

That's what we will do:

In [59]:
date_format_df_temp = '%Y-%m-%d'

df_temperatures['date'] = df_temperatures['dt'].apply(lambda x: dt.datetime.strptime(x, date_format_df_temp))
df_temperatures['month'] = df_temperatures['date'].apply(lambda x: x.month)
df_temperatures['year'] = df_temperatures['date'].apply(lambda x: x.year)
df_temperatures['day'] = df_temperatures['date'].apply(lambda x: x.day)
df_temperatures[df_temperatures['year']>=2008].tail()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude,date,month,year,day
98312,2013-04-01,28.242,0.571,Agartala,India,23.31N,91.75E,2013-04-01,4,2013,1
98313,2013-05-01,27.854,0.747,Agartala,India,23.31N,91.75E,2013-05-01,5,2013,1
98314,2013-06-01,29.664,0.575,Agartala,India,23.31N,91.75E,2013-06-01,6,2013,1
98315,2013-07-01,28.926,0.425,Agartala,India,23.31N,91.75E,2013-07-01,7,2013,1
98316,2013-08-01,28.381,0.653,Agartala,India,23.31N,91.75E,2013-08-01,8,2013,1


We can see that we have record only up to September of 2013. We will create then a dataset of temperatures from 2000 on, and now we will bring in all our data and filter it:

In [60]:
temp_reading_obj = pd.read_csv(temperatures_path, chunksize=100000)

df_temperatures_total = pd.DataFrame()

for i, chunk in enumerate(temp_reading_obj):
    chunk.dropna(inplace = True)
    chunk['date'] = chunk['dt'].apply(lambda x: dt.datetime.strptime(x, date_format_df_temp))
    chunk['year'] = chunk['date'].apply(lambda x: x.year)
    
    if i == 0:
        df_temperatures_total = chunk[chunk['year']>=2000]
    else:
        df_temperatures_total.append(chunk[chunk['year']>=2000])
    
df_temperatures_total.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude,date,year
3074,2000-01-01,3.065,0.372,Århus,Denmark,57.05N,10.33E,2000-01-01,2000
3075,2000-02-01,3.724,0.241,Århus,Denmark,57.05N,10.33E,2000-02-01,2000
3076,2000-03-01,3.976,0.296,Århus,Denmark,57.05N,10.33E,2000-03-01,2000
3077,2000-04-01,8.321,0.221,Århus,Denmark,57.05N,10.33E,2000-04-01,2000
3078,2000-05-01,13.567,0.253,Århus,Denmark,57.05N,10.33E,2000-05-01,2000


Doesn't seem to be the case, but in case there are more than one point of data per month, we will group by those values and take the mean. We will also calculate the average unvertainty and will calculate averge temperatures and uncertainty values for the year:

In [61]:
df_temperatures_total['month'] = df_temperatures_total['date'].apply(lambda x: x.month)
df_temperatures_total_gr = df_temperatures_total.groupby(['month', 'year', 'City', 'Country']).mean().reset_index().sort_values(['City', 'Country', 'year', 'month'])

In [62]:
df_temperatures_total_gr_av = df_temperatures_total_gr.groupby(['City', 'Country']).mean().reset_index().sort_values(['City', 'Country', 'year'])[['City', 'Country', 'AverageTemperature']]

In [63]:
df_temperatures_total_gr_av.rename({'AverageTemperature':'AvgTotal'}, axis = 1, inplace = True)

In [64]:
df_temperatures_total_gr = pd.merge(df_temperatures_total_gr, df_temperatures_total_gr_av, on=['City', 'Country'])

In [65]:
df_temperatures_total_gr.head()

Unnamed: 0,month,year,City,Country,AverageTemperature,AverageTemperatureUncertainty,AvgTotal
0,1,2000,A Coruña,Spain,7.468,0.318,14.034128
1,2,2000,A Coruña,Spain,11.199,0.429,14.034128
2,3,2000,A Coruña,Spain,12.242,0.501,14.034128
3,4,2000,A Coruña,Spain,10.431,0.379,14.034128
4,5,2000,A Coruña,Spain,15.149,0.457,14.034128


Finally, we will treat the df_countries DataFrame.

We are interested in the GDP per capita mean value of last years, and the average % increase or decline of such value:

In [66]:
df_countries.head()

Unnamed: 0,Country Name,Country Code,year,value
0,Aruba,ABW,2010,23512.6026
1,Afghanistan,AFG,2010,543.303042
2,Angola,AGO,2010,3587.883798
3,Albania,ALB,2010,4094.350334
4,Andorra,AND,2010,40852.66678


In [67]:
aux_df1 = df_countries.dropna().groupby(['Country Name', 'Country Code']).max().reset_index()
aux_df2 = df_countries.dropna().groupby(['Country Name', 'Country Code']).min().reset_index()

aux_df1.rename({'year':'max_year', 'value':'GDP_capita_max_year'}, axis = 1, inplace = True)

aux_df1[['min_year', 'GDP_capita_min_year']] = aux_df2[['year', 'value']]
aux_df1['max_year'] = aux_df1['max_year'].astype('int')
aux_df1['min_year'] = aux_df1['min_year'].astype('int')

aux_df1['%_change'] = ((aux_df1['GDP_capita_max_year'] / aux_df1['GDP_capita_min_year'])**(1/(aux_df1['max_year'] - aux_df1['min_year'])) -1) * 100
aux_df1.head()

Unnamed: 0,Country Name,Country Code,max_year,GDP_capita_max_year,min_year,GDP_capita_min_year,%_change
0,Afghanistan,AFG,2019,641.871479,2010,502.115487,2.765984
1,Albania,ALB,2019,5352.857411,2010,3952.801215,3.426348
2,Algeria,DZA,2019,5592.257099,2010,3946.443977,3.948956
3,American Samoa,ASM,2018,11843.33118,2010,10271.22452,1.796174
4,Andorra,AND,2019,43335.32886,2010,35762.52307,2.15702


In [68]:
aux_df3 = df_countries.groupby(['Country Name', 'Country Code']).mean().round().dropna().astype('int').reset_index()
aux_df3.rename({'value':'avg_GDP_capita'}, axis = 1, inplace = True)
aux_df3.head()

Unnamed: 0,Country Name,Country Code,avg_GDP_capita
0,Afghanistan,AFG,574
1,Albania,ALB,4502
2,Algeria,DZA,4675
3,American Samoa,ASM,11220
4,Andorra,AND,39860


In [69]:
df_countries_final = pd.merge(aux_df3, aux_df1, on = ['Country Name', 'Country Code'])

We will save this results in some csv files:

In [70]:
immigration_csv_path = './treated/immigration_sample_treated.csv'
df_immigration.to_csv(immigration_csv_path, sep = ";", encoding="ISO-8859-1", index = False)

In [71]:
airports_csv_path = './treated/airports_treated.csv'
df_airports.to_csv(airports_csv_path, sep = ";", encoding="ISO-8859-1", index = False)

In [72]:
demographics_csv_path = './treated/demographics_treated.csv'
df_demographics.to_csv(demographics_csv_path, sep = ";", encoding="ISO-8859-1", index = False)

In [73]:
temperatures_csv_path = './treated/temperatures_treated.csv'
df_temperatures_total_gr.to_csv(temperatures_csv_path, sep = ";", encoding="ISO-8859-1", index = False)

In [74]:
countries_csv_path = './treated/countries_treated.csv'
df_countries_final.to_csv(countries_csv_path, sep = ";", encoding="ISO-8859-1", index = False)

#### Cleaning Steps (Spark)

For the immigration files, we have undertaken this process with pandas and small ammounts of data using sample files, but this process, for the bigger files, should be accomplished with Spark.

We should, then, recollect all the cleaning steps we have done in pandas, and translate them into Spark:

In [75]:
# Creating the Spark session and uploading a file:

spark = SparkSession.builder.config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11").enableHiveSupport().getOrCreate()
spark_df_immigration_trial = spark.read.format('com.github.saurfang.sas.spark').load(SAS_immigration)
spark_df_immigration_trial.show(5)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|  6.0|2016.0|   4.0| 692.0| 692.0|    XXX|20573.0|   null|   null|   null|  37.0|    2.0|  1.0|    null|    null| null|      T|   null|      U|   null| 1979.0|10282016|  null|  null|   null| 1.897628485E9| null|      B2|
|  7.0|2016.0|   4.0| 254.0| 276.0|    ATL|20551.0|    1.0|     AL|   null|  25.0|    3.0|  1.0|20130811|     SE

Firstly, we dropped 4 columns:

In [76]:
for col in df_immigration_columns_with_nulls:
    spark_df_immigration_trial = spark_df_immigration_trial.drop(col)

Then, we renamed the columns 'Unnamed: 0' (curiously enough, it is not present when loading the file in Spark) and 'count', and filled the NA with zeroes:

In [77]:
spark_df_immigration_trial = spark_df_immigration_trial.withColumnRenamed ('Unnamed: 0','Imm_id')
spark_df_immigration_trial = spark_df_immigration_trial.withColumnRenamed ('count', 'counted')
spark_df_immigration_trial = spark_df_immigration_trial.na.fill(0)

We didn't perform then a filter by the values in the i94_airports dictionary, but we will perform such check now, and see how many columns are there afterwars:

In [78]:
spark_df_immigration_trial.count()

3096313

In [79]:
spark_df_immigration_trial_filtered = spark_df_immigration_trial.filter(spark_df_immigration_trial['i94port'].isin(list(i94_airports.keys())))

In [80]:
spark_df_immigration_trial_filtered.count()

3096281

As we can see, we keep most of our data.

Then, we changed the data types, from floats to integers, in all columns (as no data was supposed to be float):

In [81]:
columns_dtypes = spark_df_immigration_trial_filtered.dtypes
columns_as_double = [i[0] for i in columns_dtypes if i[1] == 'double']

In [82]:
def cast_columns_as_type(df, column_list, var_type):
    
    for column in df.columns:
        if column in column_list:
            df = df.withColumn(column, df[column].cast(IntegerType()))
            
    return df

In [83]:
spark_df_immigration_trial_filtered = cast_columns_as_type(spark_df_immigration_trial_filtered, columns_as_double, IntegerType)

In [84]:
spark_df_immigration_trial_filtered.dtypes

[('cicid', 'int'),
 ('i94yr', 'int'),
 ('i94mon', 'int'),
 ('i94cit', 'int'),
 ('i94res', 'int'),
 ('i94port', 'string'),
 ('arrdate', 'int'),
 ('i94mode', 'int'),
 ('i94addr', 'string'),
 ('depdate', 'int'),
 ('i94bir', 'int'),
 ('i94visa', 'int'),
 ('counted', 'int'),
 ('dtadfile', 'string'),
 ('entdepa', 'string'),
 ('entdepd', 'string'),
 ('matflag', 'string'),
 ('biryear', 'int'),
 ('dtaddto', 'string'),
 ('gender', 'string'),
 ('airline', 'string'),
 ('admnum', 'int'),
 ('fltno', 'string'),
 ('visatype', 'string')]

Finally, we took the 'arrdate' and 'depdate' columns and changed them from integers to datatime objects:

In [85]:
def spark_transforming_SAS_date(x):
    return dt.datetime.strptime('01-01-1960', '%d-%m-%Y') + dt.timedelta(days = x)

udf_spark_transforming_SAS_date = udf(spark_transforming_SAS_date)

In [86]:
spark_df_immigration_trial_filtered = spark_df_immigration_trial_filtered.withColumn('arrdate', udf_spark_transforming_SAS_date('arrdate'))

In [87]:
spark_df_immigration_trial_filtered = spark_df_immigration_trial_filtered.withColumn('depdate', udf_spark_transforming_SAS_date('depdate'))

In [88]:
spark_df_immigration_trial_filtered.show(5)

+-----+-----+------+------+------+-------+--------------------+-------+-------+--------------------+------+-------+-------+--------+-------+-------+-------+-------+--------+------+-------+----------+-----+--------+
|cicid|i94yr|i94mon|i94cit|i94res|i94port|             arrdate|i94mode|i94addr|             depdate|i94bir|i94visa|counted|dtadfile|entdepa|entdepd|matflag|biryear| dtaddto|gender|airline|    admnum|fltno|visatype|
+-----+-----+------+------+------+-------+--------------------+-------+-------+--------------------+------+-------+-------+--------+-------+-------+-------+-------+--------+------+-------+----------+-----+--------+
|    6| 2016|     4|   692|   692|    XXX|java.util.Gregori...|      0|   null|java.util.Gregori...|    37|      2|      1|    null|      T|   null|   null|   1979|10282016|  null|   null|1897628485| null|      B2|
|    7| 2016|     4|   254|   276|    ATL|java.util.Gregori...|      1|     AL|java.util.Gregori...|    25|      3|      1|20130811|      G|

With all these operations we would end our data cleaning in Spark for the large immigration files.

The rest of them can be treated in Python, justified by their size.

The problem would start when bringing all together, as this immigration files are too big to fit comfortably in a regular SQL database and be used nicely.

### Step 3: Define the Data Model

#### 3.1 Conceptual Data Model

At this moment, this is our situation (we will be calling it POINT1):

<img src="IMAGES/DATABASE_MODEL_INITIALLY.png" style="width: 800px;" />

we have cleaned our information, but we have not dealt with its structure. 

That is what we are going to do now.

The main purpose of this pipeline is to perform machine learning analysis afterwards. So it will be useful to have a big table with all the information readily available. At the same time, adjacent tables can be useful for the Data Scientists to gather more data just by performing one "JOIN" operation. So, our model of choice will be a sort of star schema, with one immigration facts table in which we will present the most relevant information to the viewer, and the rest of the tables as they are, linked to our immigration facts table.

From all the information contained in our tables, we are going to perform several transformations to end with the following schema:


<img src="IMAGES/DATABASE_MODEL_END.png" style="width: 800px;" />


We will be referring this situation as POINT2.

#### 3.2 Mapping Out Data Pipelines

We have seen the steps needed to perform the cleaning of our data.

These are the steps required to get from POINT1 to POINT2:

1) Transform the airports_treated and immigration_treated tables to hold common codes as explained in the **NOTE**. We will name these tables airports_treated_code and immigration_treated_code.

2) We want to check directly some information from the immigration table with the demographics table, but we find that it is complicated to tell which is the specfic location the travaler is going to, but at the same time, we have the state code for the state they are staying at. So, the second stage will be grouping the demographics table by State. This table will be called demographics_treated_grouped.

3) Create the immigration_facts_table from the immigration_treated_code, performing the required JOINs to other tables.

* **NOTE 1:** When reaching this point we find a problem in regards to defining the relationship between the country table and the immigration and airport tables, as they have different codes assigned for their countries columns. In response to that, we have created a country_codes table, that we stored as a csv under the folder "AUXILIARY". The information comes originally from https://www.iban.com/country-codes, and to that info we have added the i94 country codes manually.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model

We did the process with the samples in a MySQL database. Due to the huge ammount of data to be treated within the immigration dataset, we decide to go with Spark here.
We will load Spark DataFrames from the auxiliary csv files that generated earlier, creating Temporary Views of our tables to run SQL code on them:

In [89]:
# Importing the rest of the data from our csv files:

aux_countries = './Auxiliary/country_codes.csv'

spark_df_airports = spark.read.option("header",True).csv(airports_csv_path, sep=";", encoding = 'ISO-8859-1')
spark_df_demographics = spark.read.option("header",True).csv(demographics_csv_path, sep=";", encoding = 'ISO-8859-1')
spark_df_temperatures = spark.read.option("header",True).csv(temperatures_csv_path, sep=";", encoding = 'ISO-8859-1')
spark_df_countries = spark.read.option("header",True).csv(countries_csv_path, sep=";", encoding = 'ISO-8859-1')
spark_df_aux = spark.read.option("header",True).csv(aux_countries, sep=";")

In [90]:
# Creating the required temporal views:

spark_df_immigration_trial_filtered.createOrReplaceTempView("immigration_treated")
spark_df_airports.createOrReplaceTempView("airports_treated")
spark_df_demographics.createOrReplaceTempView("demographics_treated")
spark_df_temperatures.createOrReplaceTempView("temperatures_treated")
spark_df_countries.createOrReplaceTempView("countries_treated")
spark_df_aux.createOrReplaceTempView("country_codes")

In [91]:
# This query is for creating the airports_treated_code described above, the table of airports with the countries codes added:

query_1 = """SELECT a.*, b.CountryCode, b.i94cntyl
    FROM airports_treated AS a
    JOIN country_codes AS b
    ON a.iso_country = b.iso_country"""

In [92]:
spark_df_airports_treated_code = spark.sql(query_1)
spark_df_airports_treated_code.createOrReplaceTempView("airports_treated_code")

In [93]:
# This query is for adding the country codes to the immigration table:

query_2 ="""SELECT a.*,
        b.CountryCode AS cit_CountryCode, 
        b.iso_country AS cit_iso_country,
        c.CountryCode AS res_CountryCode, 
        c.iso_country AS res_iso_country
    FROM immigration_treated AS a
    JOIN country_codes AS b
    ON a.i94cit = b.i94cntyl
    JOIN country_codes AS c
    ON a.i94res = c.i94cntyl"""

In [94]:
spark_df_immigration_sample_treated_code = spark.sql(query_2)
spark_df_immigration_sample_treated_code.createOrReplaceTempView("immigration_sample_treated_code")

In [95]:
# This query is for grouping the demographics data by State, so we will be able to correctly link it to the immigration table to create of facts table:

# This is the old query comming from SQL:

query_3_old = """SELECT State, State_Code,
    SUM(Median_Age)/SUM(Total_Population) AS Median_Age,
    SUM(Male_Population) AS Male_Population,
    SUM(Female_Population) AS Female_Population,
    SUM(Total_Population) AS Total_Population,
    SUM(Foreignborn) AS Foreignborn,
    SUM(Average_Household_Size)/SUM(Total_Population) AS Average_Household_Size,
    SUM(Foreignborn)/SUM(Total_Population) AS percent_foreignborn
FROM demographics_treated
GROUP BY State_Code"""

In [125]:
# This query is for grouping the demographics data by State, so we will be able to correctly link it to the immigration table to create of facts table:

# Valid query:

query_3_2 = """SELECT FIRST(State) AS State, 
        `State Code`,
        SUM(`Median Age`)/SUM(`Total Population`) AS Median_Age,
        SUM(`Male Population`) AS Male_Population,
        SUM(`Female Population`) AS Female_Population,
        SUM(`Total Population`) AS Total_Population,
        SUM(Foreignborn) AS Foreignborn,
        SUM(`Average Household Size`)/SUM(`Total Population`) AS Average_Household_Size,
        SUM(Foreignborn)/SUM(`Total Population`) AS Percent_Foreignborn
    FROM demographics_treated
    GROUP BY `State Code`"""

In [126]:
spark_df_demographics_treated_code = spark.sql(query_3_2)
spark_df_demographics_treated_code.createOrReplaceTempView("demographics_treated_grouped")

In [114]:
# This is the query with which we create our facts table:

query_4 = """SELECT a.cicid,
        a.biryear,
        a.gender,
        a.i94yr,
        a.i94mon,
        a.i94cit,
        a.cit_CountryCode,
        a.cit_iso_country,
        a.i94res,
        a.res_CountryCode,
        a.res_iso_country,
        a.i94port,
        d.CountryCode AS CountryAirport,
        a.arrdate,
        a.i94mode,
        a.i94addr,
        c.Percent_Foreignborn,
        a.depdate,
        a.i94bir,
        a.i94visa,
        a.visatype,
        b.avg_GDP_capita AS res_avg_GDP_capita,
        b.`%_change` AS res_GDPcapita_annual_change

    FROM immigration_sample_treated_code AS a
    LEFT JOIN countries_treated AS b
    ON a.res_CountryCode = b.`Country Code`
    LEFT JOIN demographics_treated_grouped AS c
    ON a.i94addr = c.`State Code`
    LEFT JOIN airports_treated_code AS d
    ON a.i94port = d.iata_code"""

In [115]:
spark_df_immigration_facts = spark.sql(query_4)
spark_df_immigration_facts.createOrReplaceTempView("immigration_facts_table")

Here, we can see the result:

In [116]:
spark_df_immigration_facts.show(5)

+------+-------+------+-----+------+------+---------------+---------------+------+---------------+---------------+-------+--------------+--------------------+-------+-------+-------------------+--------------------+------+-------+--------+------------------+---------------------------+
| cicid|biryear|gender|i94yr|i94mon|i94cit|cit_CountryCode|cit_iso_country|i94res|res_CountryCode|res_iso_country|i94port|CountryAirport|             arrdate|i94mode|i94addr|Percent_Foreignborn|             depdate|i94bir|i94visa|visatype|res_avg_GDP_capita|res_GDPcapita_annual_change|
+------+-------+------+-----+------+------+---------------+---------------+------+---------------+---------------+-------+--------------+--------------------+-------+-------+-------------------+--------------------+------+-------+--------+------------------+---------------------------+
| 13351|   1948|     M| 2016|     4|   116|            IRL|             IE|   116|            IRL|             IE|    BGM|           USA|ja

#### 4.2 Data Quality Checks

As we firstly store our results from Pandas in csv files, and then we import them in Spark, we want to check that these files have been stored in their totality.

So we design a preliminary check to check that everything has gone well, by comparing the rows in our temporary views with the number of rows of the corresponding DataFrame:

In [118]:
qual_check_query_01 = "SELECT COUNT(*) FROM airports_treated"
qual_check_query_02 = "SELECT COUNT(*) FROM demographics_treated"
qual_check_query_03 = "SELECT COUNT(*) FROM temperatures_treated"
qual_check_query_04 = "SELECT COUNT(*) FROM countries_treated"

In [119]:
open_qual_checks ={qual_check_query_01:df_airports.shape[0], 
                   qual_check_query_02:df_demographics.shape[0], 
                   qual_check_query_03:df_temperatures_total_gr.shape[0], 
                   qual_check_query_04:df_countries_final.shape[0]}

In [120]:
for query, value in open_qual_checks.items():
    result = spark.sql(query).first()[0]
    if result == value:
        print(query, ": PASSED")
    else:
        print(query, ": FAILED")

SELECT COUNT(*) FROM airports_treated : PASSED
SELECT COUNT(*) FROM demographics_treated : PASSED
SELECT COUNT(*) FROM temperatures_treated : PASSED
SELECT COUNT(*) FROM countries_treated : PASSED


Finally, we want to check if our operations have gone as expected by checking the number of rows in our immigration_facts_table and comparing that number with the number of rows that immigration_sample_treated_code has:

In [121]:
qual_check_query_05 = "SELECT COUNT(*) FROM immigration_sample_treated_code"
qual_check_query_06 = "SELECT COUNT(*) FROM immigration_facts_table"

result_final_1 = spark.sql(qual_check_query_05).first()[0]
result_final_2 = spark.sql(qual_check_query_06).first()[0]

if result_final_1 == result_final_2:
    print("Check passed succesfully")
    print("immigration_treated has {} rows".format(result_final_1))
    print("immigration_facts_table has {} rows".format(result_final_2))
else:
    print("""Check failed.\n
            Table immigration_facts_table has {} records while table immigration_treated has {} records.\n
            Check whether this is correct."""
         .format(result_final_1, result_final_2))

Check passed succesfully
immigration_treated has 2699631 rows
immigration_facts_table has 2699631 rows


#### 4.3 Storing the results

And now, we store the resulting tables in a parque file:

In [110]:
output_df_demographics_treated_grouped = './out/immigration_table_results'
spark_df_immigration_facts.write.parquet(output_df_demographics_treated_grouped)

In [128]:
spark_df_demographics_treated_code.columns

['State',
 'State Code',
 'Median_Age',
 'Male_Population',
 'Female_Population',
 'Total_Population',
 'Foreignborn',
 'Average_Household_Size',
 'Percent_Foreignborn']

#### 4.4 Data dictionary 

Here we leave a Data dictionary explaining the values stored in our file:

**immigration_facts_table:**

    cicid--> CIC's foreign traveler id

    biryear--> date of birth of such foreigner

    gender--> gender of the foreigner (values as "0" and "X" exist for unknown data)

    i94yr--> year of the travel

    i94mon--> month of the travel

    i94cit--> USA-CIC's traveler's country of citizenship

    cit_CountryCode--> Country Code of traveler's country of citizenship

    cit_iso_country--> ISO Country Code of traveler's country of citizenship

    i94res--> USA-CIC's traveler's country of residence

    res_CountryCode--> Country Code of traveler's country of residence

    res_iso_country--> ISO Country Code of traveler's country of residence

    i94port--> i94 code destination airport

    CountryAirport--> Airport's country

    arrdate--> Date of arraival

    i94mode--> Way the traveler has entered the USA: values include 'Air', 'Sea', 'Land' and 'Not reported'

    i94addr--> State code of traveler's residence address

    percent_foreignborn--> State's percentage of people foreign born, as calculated aggregatin data from the demographics file (differences to real data may exist)

    depdate--> Date of departure for the foreigner

    i94bir--> Age of the respondent in years

    i94visa--> Type of visa the respondent holds

    visatype--> Type of visa hold by traveler

    res_avg_GDP_capita--> Average GDP per capita (of last 10 years) of traveler's country of residence

    res_GDPcapita_annual_change--> Average annual percent increase of decrease of the GDP per capita of traveler's country of residence (calculated over the last 10 years)
    
 **airports_treated_code:**
 
    ident--> Airport's identity code
    
    type--> Kind of airport (small_airport, medium_airport, large_aiport, etc...)
    
    name--> Airport's name
    
    iso_country--> ISO code for airport's country
    
    iso_region--> ISO code for airport's region
    
    municipality--> Airport's city
    
    gps_code--> Airport's GPS code
    
    iata_code--> Airport's code in IATA tables
    
    local_code--> Local designation
    
    longitude--> Airport's longitude
    
    latitude--> Airport's latitude
    
    CountryCode--> Airport's country code
    
    i94cntyl--> I94 airport's code
    
    
 **demographics_treated_grouped:**  
    
    State--> USA's State Name

    State Code--> USA's 2-letter code

    Median_Age--> State's median age, as per the aggregation of the value from the demographics dataset

    Male_Population--> Number of males in state, as per the aggregation of the value from the demographics dataset

    Female_Population--> Number of females in state, as per the aggregation of the value from the demographics dataset

    Total_Population--> State's total population, as per the aggregation of the value from the demographics dataset

    Foreignborn--> Number of people born abroad in the state, as per the aggregation of the value from the demographics dataset

    Average_Household_Size--> State's average household size, as per the aggregation of the value from the demographics dataset

    Percent_Foreignborn--> Percentage of people born abroad, as per the aggregation of the value from the demographics dataset
    
    
**temperatures_treated:**   
    
    month--> month of record

    year--> year of record

    City--> City of record

    Country--> Country's city of record

    AverageTemperature--> Month's average temperature (last 10 years: 2010-2019)

    AverageTemperatureUncertainty--> Month's average temperature uncertainty (last 10 years: 2010-2019)

    AvgTotal--> City's average temperature for the last 10 years (2010-2019)
    
**countries_treated:**    
    
    Country Name--> Name of the country

    Country Code--> Country 3-digit code

    avg_GDP per capita_capita--> Last 10 year's average GDP per capita data

    max_year--> year of last available medition

    GDP per capita_capita_max_year--> GDP per capita of last year available

    min_year--> year of first medition since 2010 available

    GDP per capita_capita_min_year--> GDP per capita of that year

    %_change--> annual average GDP per capita increase, in percentage points
    

**country_codes:**

    Country--> Country's name

    Country_norm--> Country's name "normalized" (some minor misspellings corrected, plus other corrections)

    iso_country--> Country's ISO code

    CountryCode--> Country's 3-digit code

    CodeNum--> Country's international code number

    i94cntyl--> i94 Country's number

### Step 5: Complete Project Write Up

Along this project, we have used a series of technologies that have allowed us the handling of our data. Because of its flexibility, the possibility of coding in Jupyter notebooks and the huge ammount of modules and tools availables, Python has been the language of choice.
  
Now, For the smaller datasets, we have chosen Pandas for the data cleaning. Even for the world temperatures dataset the process is manageable in Pandas, loading the file in chunks. 

Nevertheless, for the big immigration files we have had to resort to Spark.

Then, for the creation of the data model, we could manage the situation with SQL (employing MySQL workbench, queries in the 'Auxiliary' folder), when using a sample of the immigration dataset, but Spark was needed again when treating the big files, and so, all out data model pipelines were executed in Spark.

For this reason, we decided to store our smaller files (all other data but the immigration data), once treated, in csv files, and create Spark DataFrames for these smaller datasets to have them available for our data queries in pypark.

We, then, defined our data pipeline in Spark, creating the desired tables, running the required checks and storing the main table in a parquet file.

Now, we would propose to run this model once a month to update the data, since it seems that the immigration files have a monthly periodicity, and are the core of the information needed to run our Machine Learning algorithms.
    
#### Scenarios
* Write a description of how you would approach the problem differently under the following scenarios:
    1. the data was increased by 100x.
        - We should go for Amazon Redshift: It is an analytical database that is optimized for aggregation and read-heavy workloads
    2. The data populates a dashboard that must be updated on a daily basis by 7am every day.
        - If the data volume was not increased, it would be possible to keep using the same tools.
        - Airflow can be used here, to ease the workload and automate the runs. It allows us also to use DAG retries or send emails in the event of failure.
        - Have daily quality checks; if fail, send emails to operators and freeze dashboards
    3. The database needed to be accessed by 100+ people.
        - Again, for huge availability, AWS, with S3 and Redshift would be the tool of choice since their auto-scaling capabilities and good read performance would fit our needs.