 # Creating a data warehouse for the immigration data

#### Project Summary
In this project I have the data in 2 different sources `sas` and `csv` formats.
Creating a data warehouse using `postgreSQL` it's capable of doing the job and there is more details about this in the data model
section.
performing cleaning process on the data befor inserting it on the data warehouse.
this data warehouse will be the final product of the project and the main purpos of it will be to answer the analytical queries 
like one to answer a question like if immigrants flock to states with generally more immigrants
i included a query to answer this in the end of the project to make sure that the data warehouse served its purpos
The project follows the following steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore the Data
* Step 3: Defininge Data Model
* Step 4: Runing the ETL
* Step 5: Project Write Up

In [1]:
# Doing all imports 
import pandas as pd
import psycopg2
from sqlalchemy import create_engine
from pyspark.sql import SparkSession
from pyspark.sql.functions import countDistinct, col, split, udf, count, when, isnan
import datetime as dt
from datetime import datetime
from pyspark.sql.types import StringType

In [2]:
#setting up spark session
spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

### Step 1: Scope the Project and Gather Data

#### Scope 
the data comes in 2 different sources `sas` and `csv` formats.
we will know more about the data in the exploring section.
the goal of the project is to insert the data in a database resides in postgreSQL.

#### Describe and Gather Data 
we have 3 data sets
- I94 Immigration Data: This data comes from the US National Tourism and Trade Office.
- U.S. City Demographic Data: This data comes from OpenSoft.
- Airport Code Table: This is a simple table of airport codes and corresponding cities.

In [3]:
# reading the immigration data
df_spark =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')
# reading airports data
airport_codes = pd.read_csv('airport-codes_csv.csv', encoding='utf-8')
# reading demografhics data 
us_cities_demographics = pd.read_csv('us-cities-demographics.csv',sep=';')

### Step 2: Explore the Data
in this section we will explore the data and define the issues on it to cleane them in the `ETL` process.

In [4]:
# Exploring airports data
airport_codes.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [5]:
us_cities_demographics.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [6]:
df_spark.show(5)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|  6.0|2016.0|   4.0| 692.0| 692.0|    XXX|20573.0|   null|   null|   null|  37.0|    2.0|  1.0|    null|    null| null|      T|   null|      U|   null| 1979.0|10282016|  null|  null|   null| 1.897628485E9| null|      B2|
|  7.0|2016.0|   4.0| 254.0| 276.0|    ATL|20551.0|    1.0|     AL|   null|  25.0|    3.0|  1.0|20130811|     SE

In [7]:
#finding the number of countries in the data 
print(len(airport_codes["iso_country"].unique()))

244


In [8]:
#checking if we coulde use the column 'cicid' as a primary key for the dataframe. 
print(df_spark.count())
df2 = df_spark.select(countDistinct("cicid"))
df2.show()

3096313
+---------------------+
|count(DISTINCT cicid)|
+---------------------+
|              3096313|
+---------------------+



In [9]:
#checking for the null values
df_spark.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_spark.columns]).show()  

+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-------+-------+-------+-------+-------+-------+-------+------+-------+-------+------+-----+--------+
|cicid|i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|  occup|entdepa|entdepd|entdepu|matflag|biryear|dtaddto|gender| insnum|airline|admnum|fltno|visatype|
+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-------+-------+-------+-------+-------+-------+-------+------+-------+-------+------+-----+--------+
|    0|    0|     0|     0|     0|      0|      0|    239| 152592| 142457|   802|      0|    0|       1| 1881250|3088187|    238| 138429|3095921| 138429|    802|    477|414269|2982605|  83627|     0|19549|       0|
+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-------+-------+-------+---

In [10]:
#checking the datatypes
df_spark.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

#### reporting the issues with the data to cleane it in the etl process 
- about the airport dataset we will fillter by the US country and fix the string issues to insert them to the database.
- about the demographics dataset we will groub by the state to be able to connect to the facts table
- about the immigration dataset we will fix some missing values and drop some columns most of them missing values and not important then split the dataset to 2 dataframes
- the columns names in the demographics dataframe having white spaces and i'll fix this in the loading

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
the purpos of our porject is to build an analytical house and the best model for that is the `dimensional model`
our data model consists of 4 tables and for more details about the tables itselfs check the data dictionary

#### 3.2 Mapping Out Data Pipelines
- creating the database and the tables. 
- load the data into dataframes. 
- clean the dataframes.
- insert the data into the database

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Createing the database

In [11]:
#creating the database in postegreSQL
!PGPASSWORD=student createdb -h 127.0.0.1 -U student immigrationdb
#loading sql magic
%load_ext sql 

createdb: database creation failed: ERROR:  database "immigrationdb" already exists


In [12]:
# connecting to the database
%sql postgresql://student:student@127.0.0.1/immigrationdb

'Connected: student@immigrationdb'

In [13]:
%%sql
CREATE TABLE immigrant
(
  cicid smallint PRIMARY KEY,
  i94bir smallint,
  i94visa varchar(50),
  count decimal(5,2),
  dtadfile varchar(50),
  visapost varchar(50),
  entdepa varchar(50),
  entdepd varchar(50),
  matflag varchar(50),
  biryear smallint,
  dtaddto varchar(50),
  gender varchar(50),
  admnum decimal(5,2),
  fltno varchar(50),
  visatype varchar(50)
);


CREATE TABLE demographics
(
    State_Code                varchar(10) PRIMARY KEY,
    Male_Population           INT,
    Female_Population         INT,
    Total_Population          INT,
    Number_of_Veterans        INT,
    Foreign_born              INT,
    Median_Age                decimal(12,10),
    Average_Household_Size    decimal(12,10)
);

CREATE TABLE airports
(
    ident            varchar(10) PRIMARY KEY,
    type             varchar(50),
    elevation_ft    decimal(5,2),
    continent        varchar(50),
    iso_country      varchar(10),
    iso_region       varchar(10),
    gps_code         varchar(10),
    iata_code        varchar(10),
    local_code       varchar(10),
    coordinates      varchar,
    State            varchar(10)
);

CREATE TABLE immigration_facts
(
  cicid smallint PRIMARY KEY,
  i94yr smallint,
  i94mon smallint,
  i94cit smallint,
  i94res smallint,
  i94port varchar(50),
  arrdate varchar(50),
  i94addr varchar(50),
  depdate varchar(50),
  arrival_mode varchar(50)
);

 * postgresql://student:***@127.0.0.1/immigrationdb
(psycopg2.ProgrammingError) relation "immigrant" already exists
 [SQL: 'CREATE TABLE immigrant\n(\n  cicid smallint PRIMARY KEY,\n  i94bir smallint,\n  i94visa varchar(50),\n  count decimal(5,2),\n  dtadfile varchar(50),\n  visapost varchar(50),\n  entdepa varchar(50),\n  entdepd varchar(50),\n  matflag varchar(50),\n  biryear smallint,\n  dtaddto varchar(50),\n  gender varchar(50),\n  admnum decimal(5,2),\n  fltno varchar(50),\n  visatype varchar(50)\n);']


In [14]:
def loading_data():
    """
    loading the data and returning back dataframes
    """
    df_spark =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')
    airport_codes = pd.read_csv('airport-codes_csv.csv', encoding='utf-8') 
    colnames = ['City', 'State', 'Median_Age', 'Male_Population', 'Female_Population', 'Total_Population',\
                'Number_of_Veterans', 'Foreign_born', 'Average_Household_Size', 'State_Code', 'Race', 'Count']
    us_cities_demographics = pd.read_csv('us-cities-demographics.csv',sep=';', names=colnames, header=None, skiprows=1)
    print('data loaded successfully')
    return df_spark, airport_codes, us_cities_demographics

In [15]:
def cleaning_data(df_spark, airport_codes, us_cities_demographics):
    """
    cleaning 3 datafarmes and return back 4 dataframes ready to be inserted into the database 
    """
    print('cleaning airports data')
    #filtering for US
    airport_codes = airport_codes[airport_codes['iso_country'] == 'US' ].copy()
    
    #creating a new column for the state 
    airport_codes['State'] = airport_codes['iso_region'].apply(lambda x: x[x.find('-')+1:])
    
    #droping the columns that have special charactar in thier string 
    airport_codes.drop(['name','municipality'], axis=1, inplace = True)
    
    #replacing '-' by '_' form columns so we can insert it in postegreSQL
    def replace(x):
        p = x.find('-')
        x = x[:p] + '_' + x[p+1:]
        return x
    airport_codes['iso_region'] = airport_codes['iso_region'].apply(replace)
    airport_codes['coordinates'] = airport_codes['coordinates'].apply(replace)
        
    
    print('cleaning demographics data')
    # preparing demographics data 
    us_cities_demographics = us_cities_demographics.groupby('State_Code', as_index=False).agg({'Male_Population':'sum',\
                                                                                               'Female_Population':'sum',\
                                                                           'Total_Population':'sum','Number_of_Veterans':'sum',\
                                                                           'Foreign_born':'sum','Median_Age':'mean'\
                                                                           ,'Average_Household_Size':'mean'})
    
    # casting int for the columns that contain int numbers
    us_cities_demographics_columns = ['Male_Population', 'Female_Population',
           'Total_Population', 'Number_of_Veterans', 'Foreign_born']
    for c in us_cities_demographics_columns:
        us_cities_demographics = us_cities_demographics.astype({c:'int'})
        
    print('cleaning immigration data')
    # preparing immigration data
    
    # selecting the number of rows we need
    df_spark = df_spark.limit(1010000)
    
    # casting int for the columns that contain int numbers
    columns = ['cicid','i94yr','i94mon','i94cit','i94res', 'i94bir', 'biryear']
    from pyspark.sql.functions import col
    for c in columns:
        try:
            df_spark = df_spark.withColumn(c ,col(c).cast('int'))
        except:
            pass
    
    # declaring udf funcs
    get_date = udf(lambda x: (dt.datetime(1960, 1, 1).date() + dt.timedelta(x)).isoformat() if x else None)
    
    @udf(returnType=StringType()) 
    def mode(x):
        try:    
            x = int(x)
        except:
            x = x
        if x == 1:
            x = 'Air'
        elif x == 2:
            x = 'Sea'
        elif x == 3:
            x = 'Land'
        else:
            x = 'Not reported'
        return x
    
    @udf(returnType=StringType()) 
    def visa(x):
        try:    
            x = int(x)
        except:
            x = x
        if x == 1:
            x = 'Business'
        elif x == 2:
            x = 'Pleasure'
        elif x == 3:
            x = 'Student'
        else:
            x = 'unknown'
        return x
    
    # applying the udf funcs    
    df_spark = df_spark.withColumn("i94visa", visa(df_spark.i94visa))
    df_spark = df_spark.withColumn("arrival_mode", mode(df_spark.i94mode))
    
    # getting the date for 2 columns
    df_spark = df_spark.withColumn("arrdate", get_date(df_spark.arrdate))
    df_spark = df_spark.withColumn("depdate", get_date(df_spark.depdate))
    
    #splitting the dataframe into 2 dataframes and converting them to pandas dataframes
    
    df_facts = df_spark.select(['cicid', 'i94yr', 'i94mon', 'i94cit', 'i94res', 'i94port', 'arrdate', 'i94addr', 'depdate', 'arrival_mode'])
    df_facts = df_facts.toPandas()

    df_immigrant = df_spark.select(['cicid', 'i94yr', 'i94bir', 'i94visa', 'count', 'dtadfile', 'visapost', 'entdepa',\
                                    'entdepd', 'matflag', 'biryear', 'dtaddto', 'gender', 'admnum', 'fltno', 'visatype',])
    df_immigrant = df_immigrant.toPandas()
    
    return airport_codes, us_cities_demographics, df_facts, df_immigrant

In [16]:
def inserting_data(dataframe, table):
    """
    inserting a dataframe into a spacific table in the databse
    """
    print(f'inserting data into {table} table')
    conn_string = 'postgresql://student:student@127.0.0.1/immigrationdb'
    db = create_engine(conn_string)
    conn = db.connect()
    dataframe.to_sql(table , con=conn, if_exists='replace',
                      index=False)

In [17]:
df_spark, airport_codes, us_cities_demographics = loading_data()
airport_codes, us_cities_demographics, df_facts, df_immigrant = cleaning_data(df_spark, airport_codes, us_cities_demographics)
inserting_data(airport_codes, 'airports')
inserting_data(us_cities_demographics, 'demographics')
inserting_data(df_facts, 'immigration_facts')
inserting_data(df_immigrant, 'immigrant')

data loaded successfully
cleaning airports data
cleaning demographics data
cleaning immigration data
inserting data into airports table
inserting data into demographics table
inserting data into immigration_facts table
inserting data into immigrant table


#### 4.2 Data Quality Checks
running a quality check the number of the rows to be suer that the data is inserted into the tables
 
Run Quality Checks

In [18]:
tables = ['airports', 'demographics', 'immigration_facts', 'immigrant']
conn_string = 'postgresql://student:student@127.0.0.1/immigrationdb'
conn = psycopg2.connect(conn_string
                        )
conn.autocommit = True
cursor = conn.cursor()
def is_the_data_inserted(tables):
    for table in tables:

        sql1 = f'''select count(*) from {table} ;'''
        cursor.execute(sql1)
        for i in cursor.fetchall():
            if i[0] > 0:
                print(f'{table} table passes the "is the data inserted?" quality check')
            else :
                print(f'{table} table fails the "is the data inserted?" quality check')

def constrants(table, pk):
    """
    checking if the primary key of the table have null valuse
    """
    sql1 = f'''SELECT count(*) FROM {table} WHERE "{pk}" IS NULL ;'''
    cursor.execute(sql1)
    for i in cursor.fetchall():
        if i[0] == 0:
            print(f'{table} table passes the "constrants" quality check')
        else :
            print(f'{table} table fails the "constrants" quality check')               
                
is_the_data_inserted(tables)
# checking if the primary key of the table have null valuse
constrants("airports", "ident")
constrants("demographics", "State_Code")
constrants("immigration_facts", "cicid")
constrants("immigrant", "cicid")

airports table passes the "is the data inserted" quality check
demographics table passes the "is the data inserted" quality check
immigration_facts table passes the "is the data inserted" quality check
immigrant table passes the "is the data inserted" quality check
airports table passes the "constrants" quality check
demographics table passes the "constrants" quality check
immigration_facts table passes the "constrants" quality check
immigrant table passes the "constrants" quality check


### testing the logic of the database 
in the below cell we want to find if the immigrants flock to states with generally more immigrants?

In [19]:
%%sql
SELECT i94addr, "Foreign_born" , count(cicid)
FROM immigration_facts i
JOIN demographics d
ON i.i94addr = d."State_Code"
GROUP BY 1 , 2
ORDER BY "Foreign_born" DESC
LIMIT 20;

 * postgresql://student:***@127.0.0.1/immigrationdb
20 rows affected.


i94addr,Foreign_born,count
CA,37059662,161465
NY,17186873,186761
TX,14498054,46414
FL,7845566,219828
IL,4632600,28132
AZ,3411565,7474
MA,2573815,23609
NV,2406685,37233
NJ,2327750,23289
WA,2204810,14078


we found from the query abouve that there a relation between the new immagrants the state that have big number of immagrants ecept in one state `FL` 
and here we used our data warehouse to answer a question we couldn't do that by a single dataset

### 4.3 Data dictionary 
### we have 4 tables
#### immigrate table
-  cicid    :  uniqe identfire
-  i94bir   :  Age of Respondent in Years
-  i94visa  :  Visa codes collapsed into three categories: Business, Pleasure, Student
-  count    :  Used for summary statistics 
-  dtadfile :  Character Date Field - Date added to I-94 Files
-  visapost :  Department of State where where Visa was issued
-  entdepa  :  Arrival Flag - admitted or paroled into the U.S
-  entdepd  :  Departure Flag - Departed, lost I-94 or is deceased
-  matflag  :  Match flag - Match of arrival and departure records
-  biryear  :  4 digit year of birth 
-  dtaddto  :  Character Date Field - Date to which admitted to U.S. (allowed to stay until)
-  gender   :  Non-immigrant sex
-  admnum   :  Admission Number
-  fltno    :  Flight number of Airline used to arrive in U.S.
-  visatype :  Class of admission legally admitting the non-immigrant to temporarily stay in U.S.

#### demographics table 
- State_Code                : reprsnt the state
- Male_Population           : number of men
- Female_Population         : number of females
- Total_Population          : total number
- Number_of_Veterans        : vereranes number 
- Foreign_born              : number of foreign born
- Median_Age                : the median age
- Average_Household_Size    : the mean of the size of the household

#### airports table 
-    ident            : identefire
-    type             : airport type
-    elevation_ft     : elevation in foot
-    continent        : 
-    iso_country      : country
-    iso_region       : region(country + state)
-    gps_code         : gps code
-    iata_code        : 
-    local_code       : 
-    coordinates      : coordinates
-    State            : State

#### immigration_facts table 

-  cicid :uniqe identfire
-  i94yr : 4 digit year 
-  i94mon :Numeric month
-  i94cit : city code
-  i94res : resedent city code
-  i94port : port code
-  arrdate : the Arrival Date in the USA.
-  i94addr : state code
-  depdate : the Departure Date from the USA.
-  arrival_mode : 'Air' 'Sea' 'Land' 'Not reported'


#### Step 5: Project Write Up

- in this project i used `pandas` to handle the small datasets and `spark` for the big ones and there is a reason to choose the 2 techs, in gerneral i use pandas dataframe to insert the data into postgresql and with the small datasets using pandas is a good choice and no need for spark cause the process is fast, but in the immigration dataset with this larg number of rows it's hard to process the data with pandas and if we run spark in distributed system it will be 100x faster.
- using `postgreSQL` for the database case it can handle about 9.7 millions rows and our data is much smaller, in general we need a releational database fo our dimintional model and according to the size of our data it's not nessessary to go to a cloud data warehouse, to avoid the unnessesary cost we could use `postgreSQL` and also `postgreSQL` has a much better query optimizer, vastly better join handling, and much more flexibility in querying in general than MySQL, which is a big advantage in an analytics environment such as a data warehouse.


the sorce of the immigration data is partisiond monthlly so that shoud be the update

- if The data was increased by 100x then we should us a map reduce tech to process the data in multiple nodes and I go for `redshift`
- if The data populates a dashboard that must be updated on a daily basis by 7am every day we should use air flow to schedule the task as we need
- if The database needed to be accessed by 100+ people movig to `redshift` can handle this problem.