# Project Title
### Data Engineering Capstone Project

#### Project Summary

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# imports
import pandas as pd
import numpy as np
import pyspark.sql.functions as F
import cassandra
import os
import csv
from IPython.display import clear_output
import time
from cassandra.query import UNSET_VALUE
import datetime
import glob

### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>
- This project extracts data from four csv files, explores and cleans them, then loads them to a Apache Cassandra cluster.
- The project is meant to define all the building blocks to fully automate this process by spinning up a cluster in the cloud and incorporating Airflow. As such, the full data was not loaded into the Cassandra cluster created.
- The end solution is a Cassandra keyspace optimised for particular queries. The script would need to be adjusted if other types of queries would be more relevant.

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 
The project uses four datasets:
- I94 Immigration Data: This data comes from the US National Tourism and Trade Office. More information can be found [here](https://www.trade.gov/national-travel-and-tourism-office).
- World Temperature Data: This dataset came from Kaggle. More information can be found [here](https://www.kaggle.com/datasets/berkeleyearth/climate-change-earth-surface-temperature-data).
- U.S. City Demographic Data: This data comes from OpenSoft. More information can be found [here](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/).
- Airport Code Table: This is a simple table of airport codes and corresponding cities. More information can be found [here](https://datahub.io/core/airport-codes#data).


#### Immigration Data Dictionary

- I94YR - 4 digit year
- I94MON - Numeric month
- I94CIT & I94RES - This format shows all the valid and invalid codes for processing
- I94PORT - This format shows all the valid and invalid codes for processing value $i94prtl
- ARRDATE is the Arrival Date in the USA. It is a SAS date numeric field that a 
   permament format has not been applied.  Please apply whichever date format 
   works for you.
- I94MODE - There are missing values as well as not reported (9) value i94model
    - 1 = 'Air'
    - 2 = 'Sea'
    - 3 = 'Land'
    - 9 = 'Not reported'
- I94ADDR - There is lots of invalid codes in this variable and the list below 
   shows what we have found to be valid, everything else goes into 'other'
- DEPDATE is the Departure Date from the USA. It is a SAS date numeric field that 
   a permament format has not been applied.
- I94BIR - Age of Respondent in Years
- I94VISA - Visa codes collapsed into three categories:
   - Business
   - Pleasure
   - Student
- COUNT - Used for summary statistics
- DTADFILE - Character Date Field - Date added to I-94 Files - CIC does not use
- VISAPOST - Department of State where where Visa was issued - CIC does not use
- OCCUP - Occupation that will be performed in U.S. - CIC does not use
- ENTDEPA - Arrival Flag - admitted or paroled into the U.S. - CIC does not use
- ENTDEPD - Departure Flag - Departed, lost I-94 or is deceased - CIC does not use
- ENTDEPU - Update Flag - Either apprehended, overstayed, adjusted to perm residence - CIC does not use
- MATFLAG - Match flag - Match of arrival and departure records
- BIRYEAR - 4 digit year of birth
- DTADDTO - Character Date Field - Date to which admitted to U.S. (allowed to stay until) - CIC does not use
- GENDER - Non-immigrant sex
- INSNUM - INS number
- AIRLINE - Airline used to arrive in U.S.
- ADMNUM - Admission Number
- FLTNO - Flight number of Airline used to arrive in U.S.
- VISATYPE - Class of admission legally admitting the non-immigrant to temporarily stay in U.S.

Let's read and explore some sample data.

In [2]:
# Read in sample data
immigration_data = pd.read_csv('immigration_data_sample.csv', index_col=0)

In [3]:
immigration_data.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,20573.0,...,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,20568.0,...,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,20571.0,...,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,20581.0,...,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,20553.0,...,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


In [4]:
immigration_data.columns

Index(['cicid', 'i94yr', 'i94mon', 'i94cit', 'i94res', 'i94port', 'arrdate',
       'i94mode', 'i94addr', 'depdate', 'i94bir', 'i94visa', 'count',
       'dtadfile', 'visapost', 'occup', 'entdepa', 'entdepd', 'entdepu',
       'matflag', 'biryear', 'dtaddto', 'gender', 'insnum', 'airline',
       'admnum', 'fltno', 'visatype'],
      dtype='object')

_Initialise Spark and load datasets_

In [5]:
# initialise spark session
from pyspark.sql import SparkSession

spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

In [13]:
# read data
immigration_data=spark.read.parquet("sas_data")
temperature_data = spark.read.option('header', True).csv('../../data2/GlobalLandTemperaturesByCity.csv')
us_demographic_data = pd.read_csv('us-cities-demographics.csv', delimiter=';')
airport_code_data = pd.read_csv('airport-codes_csv.csv')

### Step 2: Explore and Assess the Data
#### Explore and Clean the Data 

In [6]:
immigration_data.count()

3096313

In [7]:
def missing_cols(df, vertical=True):
    """ 
    This function counts and displays missing values for each column in the 
    passed dataframe.
    """
    
    df_missing = df.select([F.count(F.when(F.isnan(c) | F.col(c).isNull(), c))
                     .alias(c) for c in df.columns]).toPandas()
    print(pd.melt(df_missing))

In [8]:
# check for missing columns
missing_cols(immigration_data)

    variable    value
0      cicid        0
1      i94yr        0
2     i94mon        0
3     i94cit        0
4     i94res        0
5    i94port        0
6    arrdate        0
7    i94mode      239
8    i94addr   152592
9    depdate   142457
10    i94bir      802
11   i94visa        0
12     count        0
13  dtadfile        1
14  visapost  1881250
15     occup  3088187
16   entdepa      238
17   entdepd   138429
18   entdepu  3095921
19   matflag   138429
20   biryear      802
21   dtaddto      477
22    gender   414269
23    insnum  2982605
24   airline    83627
25    admnum        0
26     fltno    19549
27  visatype        0


In [9]:
# check for duplicate values
immigration_data.count() - immigration_data.dropDuplicates().count()

0

In [10]:
immigration_data.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

Schema looks good, apart from the date columns.
Let's update these types.

In [11]:
from pyspark.sql.types import DateType

# define sas to spark date type udf
parse_date = F.udf(lambda x: pd.to_timedelta(x, unit='D') + pd.Timestamp('1960-1-1') if x is not None else x
                   , DateType())

# update type of date columns
immigration_data = immigration_data.withColumn('arrdate', parse_date('arrdate'))
immigration_data = immigration_data.withColumn('depdate', parse_date('depdate'))
immigration_data = immigration_data.withColumn('dtaddto', F.to_date('dtaddto', "MMddyyyy"))
immigration_data = immigration_data.withColumn('dtadfile', F.to_date('dtadfile', "yyyyMMdd"))

In [12]:
# check change
immigration_data.limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,5748517.0,2016.0,4.0,245.0,438.0,LOS,2016-04-30,1.0,CA,2016-05-08,...,,M,1976.0,2016-10-29,F,,QF,94953870000.0,11,B1
1,5748518.0,2016.0,4.0,245.0,438.0,LOS,2016-04-30,1.0,NV,2016-05-17,...,,M,1984.0,2016-10-29,F,,VA,94955620000.0,7,B1
2,5748519.0,2016.0,4.0,245.0,438.0,LOS,2016-04-30,1.0,WA,2016-05-08,...,,M,1987.0,2016-10-29,M,,DL,94956410000.0,40,B1
3,5748520.0,2016.0,4.0,245.0,438.0,LOS,2016-04-30,1.0,WA,2016-05-14,...,,M,1987.0,2016-10-29,F,,DL,94956450000.0,40,B1
4,5748521.0,2016.0,4.0,245.0,438.0,LOS,2016-04-30,1.0,WA,2016-05-14,...,,M,1988.0,2016-10-29,M,,DL,94956390000.0,40,B1


Looks good. Let's load and explore the other datasets.

In [14]:
temperature_data.limit(3).toPandas()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E


In [15]:
missing_cols(temperature_data)

                        variable   value
0                             dt       0
1             AverageTemperature  364130
2  AverageTemperatureUncertainty  364130
3                           City       0
4                        Country       0
5                       Latitude       0
6                      Longitude       0


In [16]:
temperature_data.count()

8599212

The data starts at 1743 and has 8m rows! Let's filter from 2010 onwards. We will need to convert to date format first to do this.

In [17]:
# convert to date type
temperature_data = temperature_data.withColumn('dt', F.to_date('dt', "yyyy-MM-dd"))

# filter data after year 2010
temperature_data = temperature_data.filter(temperature_data.dt > F.lit("2010-01-01"))

In [18]:
# check change
temperature_data.orderBy('dt').limit(5).toPandas()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,2010-02-01,-22.308000000000003,0.517,Öskemen,Kazakhstan,50.63N,82.39E
1,2010-02-01,-2.6910000000000003,0.272,Aalborg,Denmark,57.05N,10.33E
2,2010-02-01,-13.356,2.379,Ürümqi,China,44.20N,87.20E
3,2010-02-01,5.252000000000001,0.417,Çorum,Turkey,40.99N,34.08E
4,2010-02-01,8.789,0.366,A Coruña,Spain,42.59N,8.73W


In [19]:
temperature_data.count() - temperature_data.dropDuplicates().count()

0

In [20]:
us_demographic_data.head(3)

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759


In [21]:
us_demographic_data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2891 entries, 0 to 2890
Data columns (total 12 columns):
City                      2891 non-null object
State                     2891 non-null object
Median Age                2891 non-null float64
Male Population           2888 non-null float64
Female Population         2888 non-null float64
Total Population          2891 non-null int64
Number of Veterans        2878 non-null float64
Foreign-born              2878 non-null float64
Average Household Size    2875 non-null float64
State Code                2891 non-null object
Race                      2891 non-null object
Count                     2891 non-null int64
dtypes: float64(6), int64(2), object(4)
memory usage: 271.1+ KB


In [22]:
us_demographic_data.duplicated().sum()

0

In [23]:
airport_code_data.head(3)

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"


In [24]:
airport_code_data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 55075 entries, 0 to 55074
Data columns (total 12 columns):
ident           55075 non-null object
type            55075 non-null object
name            55075 non-null object
elevation_ft    48069 non-null float64
continent       27356 non-null object
iso_country     54828 non-null object
iso_region      55075 non-null object
municipality    49399 non-null object
gps_code        41030 non-null object
iata_code       9189 non-null object
local_code      28686 non-null object
coordinates     55075 non-null object
dtypes: float64(1), object(11)
memory usage: 5.0+ MB


In [25]:
airport_code_data.duplicated().sum()

0

### Step 3: Define the Data Model

### 3.1 Conceptual Data Model


The data does not seem very relatable. The only relations I can think of that might be relevant are:
1. Analysing immigration statistics by temperature on the date.
2. Relating states/cities that experience the most immigration to the demographic make-up
3. Relating immigration statistics to the location and type of airport.

Out of these relations, only the second one makes remote sense to me.

It therefore seems more likely that these tables are unrelated. In addition, immigration statistics data is already very large (i.e. 3m rows), yet it only contains data for April 2016. If this database takes in data from other months, it's size will increase quickly.

As a result, I believe an Apache Cassandra database format would be more appropriate than a SQL relational database structure. This gives the flexibility to allow the database to consistently scale and work with large amounts of data. For relational use cases (e.g. point 2 listed above), the data would need to be pulled to the client node before performing analysis.

Cassandra is optimised to store very large amounts of data. As such, I believe it should be okay to store rows with null values. This is why they have not been cleaned above.

### 3.2 Mapping Out Data Pipeline


The following steps will need to be undertaken:
- write files locally
- initialise and connect to a Cassandra cluster
- crate a keyspace
- create tables and insert data into them

A note on 'null' data insertion in Cassandra. Cassandra does not allow for null values to be inserted and instead replaces them with tombstone values. Down the line, this can cause performance issues and higher query latency. During a scan, Cassandra keeps tombstones in memory and returns them to the coordinator node, which uses them to ensure other replica nodes also know about the deleted rows. It is preferable to simply not insert data in such instances. So, the insertion script will need to check for null rows and adjust the query accordingly before insertion.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

_Check for duplicates in planned primary key columns_

In [26]:
immigration_data.count() - immigration_data.dropDuplicates(['i94port', 'i94cit', 'arrdate', 'cicid']).count()

0

In [27]:
temperature_data.count() - temperature_data.dropDuplicates(['dt', 'City', 'Country']).count()

880

In [28]:
# remove duplicates
temperature_data = temperature_data.dropDuplicates(['dt', 'City', 'Country'])

In [29]:
# check change
temperature_data.count() - temperature_data.dropDuplicates(['dt', 'City', 'Country']).count()

0

In [30]:
us_demographic_data.duplicated(subset=['Race', 'City', 'State']).shape[0] - us_demographic_data.shape[0]

0

In [31]:
airport_code_data.duplicated(subset=['iso_region', 'ident', 'type']).shape[0] - airport_code_data.shape[0]

0

_Reduce size of immigration file_

In [55]:
# filter data
immigration_data = immigration_data.filter(immigration_data.arrdate > F.lit("2016-04-29"))

_Save data locally_

In [58]:
# write files to local storage
path = 'clean_data/'


immigration_data\
    .coalesce(1)\
    .write.option("header",True)\
    .csv(path+"immigration_data")
temperature_data\
    .coalesce(1)\
    .write.option("header",True)\
    .csv(path+"temperature_data")
us_demographic_data.to_csv(path+'us_demographic_data.csv', index=False)
airport_code_data.to_csv(path+'airport_code_data.csv', index=False)

_Initialise Cassandra cluster and create keyspace_

In [59]:
from cassandra.cluster import Cluster

# specify port and make a connection to Cassandra instance
port = ['127.0.0.1']
cluster = Cluster(port)

# establish a session
session = cluster.connect()

We will use a SimpleStrategy replication. This approach places the first replica on the node selected by the client. After that remaining replicas are placed in clockwise direction in the data center (collection of Cassandra nodes).

A replication factor of 1 ensures that only one copy of each row will be stored in the Cassandra cluster.

Both of these parameters could be changed depending on the level of fault tolerance desired in the system.

In [60]:
# Create a Keyspace

session.execute("""
CREATE KEYSPACE IF NOT EXISTS us_immigration
WITH REPLICATION = 
{'class': 'SimpleStrategy', 'replication_factor': 1}
""")

<cassandra.cluster.ResultSet at 0x7f1f567aaba8>

In [61]:
# set keyspace
session.set_keyspace('us_immigration')

_Define column names and data types for each table_

In [195]:
immigration_columns = [
        ('cic_id', 'int', string_to_int),
        ('admission_year', 'int', string_to_int),
        ('admission_month', 'int', string_to_int),
        ('applicant_city', 'text', str),
        ('applicant_residence', 'text', str),
        ('application_port', 'text', str),
        ('arrival_date', 'date', string_to_date),
        ('applicant_arrival_mode', 'text', str),
        ('port_state', 'text', str),
        ('departure_date', 'date', string_to_date),
        ('age', 'int', string_to_int),
        ('visa_code', 'text', str),
        ('count', 'int', string_to_int),
        ('character_date', 'date', string_to_date),
        ('visa_post', 'text', str),
        ('occupation', 'text', str),
        ('arrival_flag', 'text', str),
        ('departure_flag', 'text', str),
        ('update_flag', 'text', str),
        ('match_flag', 'text', str),
        ('birth_year', 'int', string_to_int),
        ('date_admitted', 'date', string_to_date),
        ('gender', 'text', str),
        ('insurance_num', 'text', str),
        ('airline', 'text', str),
        ('admssion_number', 'text', str),
        ('flight_no', 'text', str),
        ('visa_type', 'text', str)
        ]
immigration_primary_keys = ['application_port', 'applicant_city', 'arrival_date', 'cic_id']


temperature_columns = [
    ('date', 'date', string_to_date), 
    ('average_temperature', 'float', float),
    ('average_temperature_uncertainty', 'float', float),
    ('city', 'text', str),
    ('country', 'text', str),
    ('latitude', 'text', str),
    ('longitude', 'text', str)]
temperature_primary_keys = ['country', 'date', 'city']

us_demographic_columns = [
    ('city', 'text', str), 
    ('state', 'text', str), 
    ('median_age', 'float', float), 
    ('male_population', 'int', string_to_int), 
    ('female_population', 'int', string_to_int),
    ('total_population', 'int', string_to_int),
    ('number_of_veterans', 'int', string_to_int),
    ('foreign_born', 'int', string_to_int),
    ('average_household_size', 'float', float), 
    ('state_code', 'text', str), 
    ('race', 'text', str), 
    ('count', 'int', string_to_int)]
us_demographic_primary_keys = ['race', 'city', 'state']

airport_code_columns = [
    ('ident', 'text', str), 
    ('type', 'text', str), 
    ('name', 'text', str), 
    ('elevation_ft', 'float', float), 
    ('continent', 'text', str),
    ('iso_country', 'text', str),
    ('iso_region', 'text', str),
    ('municipality', 'text', str),
    ('gps_code', 'text', str), 
    ('iata_code', 'text', str), 
    ('local_code', 'text', str), 
    ('coordinates', 'text', str)]
airport_code_primary_keys = ['iso_region', 'ident', 'type']

_Define table creation and insertion functions_

In [198]:
# table creation function
def create_table(table_name, columns, primary_keys):
    """ 
    This function creates the individual tables within the us_immigration keyspace
    and defines the columns and their types
    """
    
    # drop the table if it exists
    query = f"DROP TABLE IF EXISTS {table_name}"
    session.execute(query)
    
    # recreate the table using the defined schema
    query = f"CREATE TABLE IF NOT EXISTS {table_name}"
    
    # insert columns into query
    for col in columns:
        beginning = '(' if col == columns[0] else ''
        end = keys_query if col == columns[-1] else ','
        query += f" {beginning}{col[0]} {col[1]},"
        
     # create primary keys
    keys_query = 'PRIMARY KEY'
    for key in primary_keys:
        beginning = '(' if key == primary_keys[0] else ''
        end = '))' if key == primary_keys[-1] else ','
        keys_query += f"{beginning}{key}{end}"
        
    query += ' ' + keys_query
    
    session.execute(query)

In [206]:
def string_to_date(date):
    # convert string to date
    return datetime.datetime.strptime(date, '%Y-%m-%d')
    
def string_to_int(string):
    # convert string to integer
    return int(float(string))

def prepare_values(data_types, line):
    """ This function prepares and formats the values to be inserted into a table
    """
    
    values = []
    for i in range(len(line)):
        if line[i]:
            values.append(data_types[i][2](line[i]))
        else:
            values.append(UNSET_VALUE)
    values = tuple(values)
    
    return values

def prepare_insert_statement(table_name, columns):
    """ This function prepares a table insertion statement
    """
    
    query = f"INSERT INTO {table_name}"
    for col in columns:
        beginning = '(' if col == columns[0] else ''
        end = ')' if col == columns[-1] else ','
        query += f"{beginning} {col[0]}{end}"
    query += f" VALUES({'?,'*(len(columns)-1)}?)"
    
    return query

def insert_into_table(table_name, columns, filepath, interval=10000):
    """ 
    This function inserts data into the immigration table.
    It specifically does not insert null values into the table.
    """
    
    start_time = time.time()    

    # open file
    for file in glob.glob(filepath):
        with open(file, encoding='utf8') as f:

            csvreader = csv.reader(f)
            # skip header
            next(csvreader)

            line_num = 1
            for line in csvreader:
                # prepare and insert values
                insert_statement = prepare_insert_statement(table_name, columns)
                ps = session.prepare(insert_statement)
                values = prepare_values(columns, line)
                session.execute(ps, values)
                
                if line_num%interval == 0:
                    clear_output()
                    print(f'Loaded up to row {line_num}')
                line_num +=1
            clear_output()
            print(f"Finished loading data! {line_num-1} rows loaded. \nExecution took {time.time()-start_time} seconds")

_Create and insert into tables_

In [208]:
create_table('immigration', immigration_columns, immigration_primary_keys)
insert_into_table('immigration', immigration_columns, filepath='./clean_data/immigration_data/*.csv')

Finished loading data! 127155 rows loaded. 
Execution took 306.79215598106384 seconds


In [209]:
create_table('temperatures', temperature_columns, temperature_primary_keys)
insert_into_table('temperatures', temperature_columns, filepath='./clean_data/temperature_data/*.csv')

Finished loading data! 153560 rows loaded. 
Execution took 288.31449699401855 seconds


In [210]:
create_table('us_demographics', us_demographic_columns, us_demographic_primary_keys)
insert_into_table('us_demographics', us_demographic_columns, filepath='./clean_data/us_demographic_data.csv')

Finished loading data! 2891 rows loaded. 
Execution took 5.766994953155518 seconds


In [211]:
create_table('airport_codes', airport_code_columns, airport_code_primary_keys)
insert_into_table('airport_codes', airport_code_columns, filepath='./clean_data/airport_code_data.csv')

Finished loading data! 55075 rows loaded. 
Execution took 103.87967848777771 seconds


#### 4.2 Data Quality Checks

For each table, the checks will verify:
- the total number of table rows matches the number of rows from the saved dataframe
- the data types of each column matches the definition made before the table's creation
 
Run Quality Checks

In [245]:
# Perform quality checks here
def row_number_check(table, dataframe, spark_df=True):
    
    query = f"""
    SELECT count(*) FROM {table} limit 5000000
    """
    rows = session.execute(query)
    
    table_rows = rows[0].count
    if spark_df:
        df_length = dataframe.count()
    else:
        df_length = len(dataframe)
        
    assert table_rows == df_length, f"Rows don't match. Table rows: {table_rows}; DF rows: {df_length}"
        
    print(f"Row Number checks passed! Rows: {df_length}")

def convert(lst):
    res_dct = map(lambda col: (col[0], [col[1], col[2]]), lst)
    return dict(res_dct)

def type_checks(table_name, columns):
    
    column_dict = convert(immigration_data_types)
    
    query = f"""
    SELECT column_name, type from system_schema.columns
    WHERE keyspace_name = 'us_immigration'
    AND table_name = '{table_name}'
    """

    rows = session.execute(query)

    values = []
    for row in rows:
        assert column_dict[row.column_name][0] == row.type, \
        f"mismatch between data types for {row.column_name}"
        
    print("Data type checks passed!")

In [246]:
row_number_check('immigration',immigration_data)
type_checks('immigration', immigration_data_types)

Row Number checks passed! Rows: 127155
Data type checks passed!


In [247]:
row_number_check('temperatures',temperature_data)
type_checks('temp', immigration_data_types)

Row Number checks passed! Rows: 153560
Data type checks passed!


In [248]:
row_number_check('us_demographics',us_demographic_data, spark_df=False)
type_checks('immigration', immigration_data_types)

Row Number checks passed! Rows: 2891
Data type checks passed!


In [249]:
row_number_check('airport_codes',airport_code_data, spark_df=False)
type_checks('immigration', immigration_data_types)

Row Number checks passed! Rows: 55075
Data type checks passed!


In [252]:
## Drop the tables before closing out the sessions
def droptables(tables):
    for table in tables:
        session.execute(f'DROP TABLE IF EXISTS {table}')

_Close out the session_

In [254]:
session.shutdown()
cluster.shutdown()

### 4.3 Data dictionary 

#### Immigration Data
* cic_id: The CIC ID number
* admission_year: year of arrival
* applicant_month: month of arrival
* applicant_city: country from where the applicant travelled
* applicant_residence: applicant's residence location 
* application_port: port processing the application
* arrival_date: string_to_date
* applicant_arrival_mode: mode of arrival
* port_state: state of port
* departure_date: departure date from the US
* age: respondent age
* visa_code: visa code (business, pleasure, student)
* count: count
* character_date: string_to_date
* visa_post: Department of State where where Visa was issued
* occupation: Occupation that will be performed in the US
* arrival_flag: admitted or paroled into the US
* departure_flag: Departed, lost I-94 or is deceased
* update_flag: Either apprehended, overstayed, adjusted to perm residence
* match_flag: flag indicating arrival and departure dates match
* birth_year: year of birth
* date_admitted: date admitted to stay in the US
* gender: gender
* insurance_num: insurance number
* airline: airline used to arrive in the US
* admssion_number: admission number
* flight_no: flight number of airline used to arrive in the US
* visa_type: Class of admission legally admitting the non-immigrant to temporarily stay in the US


#### Temperature Data
* date: date
* average_temperature: average temperature on day
* average_temperature_uncertainty: measure of uncertainty about average temperature measure
* city: city
* country: country
* latitude: latitude of city location
* longitude: longitude of city location

#### US Demographic Data
* city: city
* state: state
* median_age: median age
* male_population: male population count
* female_population: female population count
* total_population: total population count
* number_of_veterans: number of veterans
* foreign_born: number of foreign born citizens
* average_household_size: average household size
* state_code: state code
* race: ethnic race
* count: count

#### Airport Code Data
* ident: identity code
* type: type of airport 
* name: airport name
* elevation_ft: elevation level in feet
* continent: continent
* iso_country: country
* iso_region: region
* municipality: municipality
* gps_code: gps code
* iata_code: iata code
* local_code: local code
* coordinates: airport location coordinates


#### Step 5: Project Write Up

The goal for this script was to take data from four different sources, explore it, clean it where relevant and load into an Apache Cassandra database.

The rationale for choosing a Apache Cassandra database is threefold:
- The loaded data tables do not appear strongly related.
- The data loaded relates to April 2016 only is very large already (~3m rows and ~400MB). If more months' data were to be added, the size would quickly increase. A relational database might start to struggle when the whole data fits in the RAM.
- This structure would allow the database to consistently scale when needed.

More information can be found in section 3.2.

I propose the data be updated once a month. The US National Tourism and Trade Office produce the immigration dataset monthly. Hence, a shorter timeframe does not make sense. In addition, it is unlikely that the newly updated data would be required on an urgent basis.

A note on incorporating other technologies:
- The script is currently set up to extract the data from local storage. This process could be automated by using airflow. Airflow could be scheduled to run the scripts at the desired intervals. In addition, the script could be updated the query the data producers' APIs (if they exist).
- Currently, the script uses Spark to process the larger datasets. However, this processing is done locally. If the datasets' size increased, a cluster could be spun up (e.g. Amazon EMR) to handle this processing task. This would start to become more of a concern if any of the dataset's size started to approach the size of the computer's memory.
- The script currently spins up the Cassandra cluster locally and does not save the processed tables. It would be best to use a cloud service to set up the cluster and store the data.

#### Scenario Planning
- The data was increased by 100x: Apache Cassandra would handle this size increase without issues.
- The data populates a dashboard that must be updated on a daily basis by 7am every day: The scripts should be run at the end of the previous day to refresh the tables.
- The database needed to be accessed by 100+ people: Apache Cassandra achieves high data availability without a guarantee that the data is the latest update. The risk of outdated data would increase with multiple people needing to access the database concurrently. In this case, communicating Cassandra's guarantee of eventual data consistency would be important for end users. On the other hand, Cassandra would allow multiple users to access the database unlike a RMDBS, given their ACID nature, which would lock data if is being acted on.