# Project Title
## Data Engineering Capstone Project

### Project Summary
In this project we will build an ETL pipeline to extract data from different sources clean and transform it to a usefull and easy to understand star schema and save it for future usage.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [2]:
# Do all imports and installs here
import pandas as pd
import pyspark
from pyspark.sql.types import MapType
from pyspark.sql.types import StructType as R, StructField as Fld, DoubleType as Dbl, StringType as Str, IntegerType as Int, DateType as Date, TimestampType

## Step 1: Scope the Project and Gather Data

### Scope 
This project will integrate I94 immigration data, Airport Codes data and US demographic data to setup a data warehouse with fact and dimension tables.

### Describe and Gather Data 
**Data Sets**
- [I94 Immigration Data](https://www.trade.gov/national-travel-and-tourism-office)
    Data contains international visitor arrival statistics by world regions and select countries (including top 20), type of visa, mode of transportation, age groups, states visited (first intended address only), and the top ports of entry (for select countries).
- [Airport codes Data](https://datahub.io/core/airport-codes#data)
   This data contains the list of all airport codes, names, location, elevation etc.
- [U.S. City Demographic Data](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/)
    This dataset contains information about the demographics of all US cities and census-designated places with a population greater or equal to 65,000.

**Tools**
- AWS S3: data storage
- Python for data processing
    - Pandas - exploratory data analysis on small data set
    - PySpark - data processing on large data set
    



In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

df_spark = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')


In [4]:
#write to parquet
# df_spark.write.parquet("sas_data")
df_spark=spark.read.parquet("sas_data")


In [5]:
df_spark.createOrReplaceTempView("df_spark")
spark.sql("""SELECT COUNT(*)
                FROM df_spark""").show()

+--------+
|count(1)|
+--------+
| 3096313|
+--------+



## Step 2: Explore and Assess the Data


### I94-immigration data:

#### Explore:

In [6]:
df=pd.read_csv("immigration_data_sample.csv")
df.head()

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,...,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,...,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,...,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,...,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,...,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


#### Cleaning:

- **Missing Values:** This data has missing values in many columns. 
- **Useless Columns:** This data has many useless columns which are either completely empty or have majority of missing values.
- **Column Names:** Column names have been abbreviated and confusing It would be better to give them more precise names.
- **Column Types:** Many columns have unnecassarily big float types I shall give them more generic integer types.
- **Coded Columns** Many columns have coded values in them e.g. i94cit,i94res. Lets give them their original values where more optimizable.
- **Dates:** Dates are in weird formats It would be better to give them more generic timestamp format.

In [7]:
# Performing cleaning tasks here
spark.udf.register("SAS_to_datetime",lambda date: pd.to_timedelta(date, unit='D') + pd.Timestamp('1960-1-1'),TimestampType())
def visa_int_to_var(visa):
    if visa == 1:
        return 'Business'
    elif visa == 2:
        return 'Pleasure'
    elif visa == 3:
        return 'Student'
spark.udf.register("visa_transform",lambda visa:visa_int_to_var(visa))
df_spark.createOrReplaceTempView("df_spark")
cleaned_df_immi = spark.sql("""SELECT monotonically_increasing_id() as id,
                            cast(cicid as BIGINT),
                            cast(i94yr as INT) as year,
                            cast(i94mon as INT) as month,
                            cast(i94cit as INT) as citizen_country,
                            cast(i94res as INT) as residence_country,
                            cast(i94port as CHAR(3)) as port,
                            SAS_to_datetime(arrdate) as arrival_date,
                            SAS_to_datetime(depdate) as departure_date,
                            cast(i94addr as CHAR(2)) as state_code,
                            cast(i94bir as INT) as birth_year,
                            visa_transform(i94visa) as visa,
                            cast(gender as CHAR(1)) as gender,
                            cast(admnum as BIGINT) as adm_num,
                            fltno as flight_no,
                            airline,
                            visatype
                            FROM df_spark
                            WHERE i94mode = 1 AND depdate IS NOT NULL AND i94addr IS NOT NULL
                            AND fltno IS NOT NULL AND airline IS NOT NULL AND visatype IS NOT NULL
                            AND gender IS NOT NULL AND i94cit IS NOT NULL AND i94res IS NOT NULL
                            """)

cleaned_df_immi.createOrReplaceTempView("df_immi")
cleaned_df_immi.limit(10).toPandas()


Unnamed: 0,id,cicid,year,month,citizen_country,residence_country,port,arrival_date,departure_date,state_code,birth_year,visa,gender,adm_num,flight_no,airline,visatype
0,0,5748517,2016,4,245,438,LOS,2016-04-30,2016-05-08,CA,40,Business,F,94953870030,11,QF,B1
1,1,5748518,2016,4,245,438,LOS,2016-04-30,2016-05-17,NV,32,Business,F,94955622830,7,VA,B1
2,2,5748519,2016,4,245,438,LOS,2016-04-30,2016-05-08,WA,29,Business,M,94956406530,40,DL,B1
3,3,5748520,2016,4,245,438,LOS,2016-04-30,2016-05-14,WA,29,Business,F,94956451430,40,DL,B1
4,4,5748521,2016,4,245,438,LOS,2016-04-30,2016-05-14,WA,28,Business,M,94956388130,40,DL,B1
5,5,5748522,2016,4,245,464,HHW,2016-04-30,2016-05-05,HI,57,Pleasure,M,94981802830,10,NZ,B2
6,6,5748523,2016,4,245,464,HHW,2016-04-30,2016-05-12,HI,66,Pleasure,F,94979689930,10,NZ,B2
7,7,5748524,2016,4,245,464,HHW,2016-04-30,2016-05-12,HI,41,Pleasure,F,94979746730,10,NZ,B2
8,8,5748525,2016,4,245,464,HOU,2016-04-30,2016-05-07,FL,27,Pleasure,M,94973246630,28,NZ,B2
9,9,5748526,2016,4,245,464,LOS,2016-04-30,2016-05-07,CA,26,Pleasure,F,95013547930,2,NZ,B2


#### Extracing Country codes and Country names from I94_SAS_Labels_Description:

In [8]:
def parseco(line):
    import re
    pattern = "^\s*(\d+)\s*=\s*'(.*)'"
    match = re.search(pattern,line)

    return {
        'code' :int(match.group(1)),
        'country':match.group(2)
        }
spark.udf.register('parseco',parseco,MapType(Str(),Str()))
df=spark.read.text("I94_SAS_Labels_Descriptions.SAS")
df_countries=spark.createDataFrame(df.collect()[10:298])
df_countries.createOrReplaceTempView("df_co")
df_countries=spark.sql("""SELECT parseco(value) as parsed
                            FROM df_co""")
df_countries.createOrReplaceTempView("df_co")
df_countries = spark.sql("""SELECT parsed['code'] as country_code,
                                    parsed['country'] as country_name
                                FROM df_co""")
df_countries.createOrReplaceTempView("df_countries")
df_countries.show()

+------------+---------------+
|country_code|   country_name|
+------------+---------------+
|         236|    AFGHANISTAN|
|         101|        ALBANIA|
|         316|        ALGERIA|
|         102|        ANDORRA|
|         324|         ANGOLA|
|         529|       ANGUILLA|
|         518|ANTIGUA-BARBUDA|
|         687|     ARGENTINA |
|         151|        ARMENIA|
|         532|          ARUBA|
|         438|      AUSTRALIA|
|         103|        AUSTRIA|
|         152|     AZERBAIJAN|
|         512|        BAHAMAS|
|         298|        BAHRAIN|
|         274|     BANGLADESH|
|         513|       BARBADOS|
|         104|        BELGIUM|
|         581|         BELIZE|
|         386|          BENIN|
+------------+---------------+
only showing top 20 rows



#### Extracting city codes and city names from I94_SAS_Labels_Description:

Looks like this one needs a bit of cleaning too because city_names and states are merged together in strings and we need to parse them apart

In [9]:
def parsecity(line):
    import re
    pattern = "^\s*'(\w{3})'\s*=\s*'([\w\s\./-]*),{0,1}"
    match = re.search(pattern,line)

    return {
        'city_code' :match.group(1),
        'city_name':match.group(2)
        }
spark.udf.register('parsecity',parsecity,MapType(Str(),Str()))
df_city=spark.createDataFrame(df.collect()[303:893])
df_city.createOrReplaceTempView("df_city")
df_city=spark.sql("""SELECT parsecity(value) as parsed
                FROM df_city""")
df_city.createOrReplaceTempView("df_city")
df_city=spark.sql("""SELECT parsed['city_code'] as city_code,parsed['city_name'] as city_name
                        FROM df_city""")
df_city.createOrReplaceTempView("df_city")
df_city.limit(5).toPandas()

Unnamed: 0,city_code,city_name
0,ANC,ANCHORAGE
1,BAR,BAKER AAF - BAKER ISLAND
2,DAC,DALTONS CACHE
3,PIZ,DEW STATION PT LAY DEW
4,DTH,DUTCH HARBOR


#### Extracting State Codes and State names from I94_SAS_Labels_Description:

In [10]:
def parsestate(line):
    import re
    pattern = "^\s*'(\w{2})'\s*=\s*'(.*)'"
    match = re.search(pattern,line)

    return {
        'state_code' :match.group(1),
        'state_name':match.group(2)
        }
spark.udf.register('parsestate',parsestate,MapType(Str(),Str()))
df_state=spark.createDataFrame(df.collect()[982:1036])
df_state.createOrReplaceTempView("df_state")
df_state=spark.sql("""SELECT parsestate(value) as parsed
                            FROM df_state""")
df_state.createOrReplaceTempView("df_state")
df_state = spark.sql("""SELECT parsed['state_code'] as state_code,
                            parsed['state_name'] as state_name
                            FROM df_state""")
df_state.createOrReplaceTempView("df_state")
df_state.show()

+----------+-----------------+
|state_code|       state_name|
+----------+-----------------+
|        AK|           ALASKA|
|        AZ|          ARIZONA|
|        AR|         ARKANSAS|
|        CA|       CALIFORNIA|
|        CO|         COLORADO|
|        CT|      CONNECTICUT|
|        DE|         DELAWARE|
|        DC|DIST. OF COLUMBIA|
|        FL|          FLORIDA|
|        GA|          GEORGIA|
|        GU|             GUAM|
|        HI|           HAWAII|
|        ID|            IDAHO|
|        IL|         ILLINOIS|
|        IN|          INDIANA|
|        IA|             IOWA|
|        KS|           KANSAS|
|        KY|         KENTUCKY|
|        LA|        LOUISIANA|
|        ME|            MAINE|
+----------+-----------------+
only showing top 20 rows



### US-Cities_Demographics Data:
#### Exploring:

In [11]:
df = pd.read_csv("us-cities-demographics.csv",delimiter=';')
df.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


#### Cleaning us-cities-demographics data:

- **Column Names:** Column names have been abbreviated and confusing It would be better to give them more precise names.
- **Duplicate data:** There is a lot of duplicate data here i.e. last count provides counts for each unique race in the a provided city but one simple observation was that because of this unique extra feature rest of the columns are just duplicates of each other so I have decided to only extract the majority race for each city from this data. and drop the rest rows.

In [12]:
df = spark.read.csv("us-cities-demographics.csv",sep=";", inferSchema=True, header=True)
df.createOrReplaceTempView("df")
df_us_cities=spark.sql("""
                    SELECT upper(a.city) as city,
                    upper(a.state) as state,
                    a.`Median Age` as median_age,
                    a.`Male Population` as male_population,
                    a.`Female Population` as female_population,
                    a.`Total Population` as total_population,
                    a.`Number of Veterans` as veteran_population,
                    a.`Foreign-born` as foreign_population,
                    a.`Average Household Size` as avg_household_size,
                    a.`State Code` as state_code,
                    a.Race as race
                    FROM df a INNER JOIN (SELECT city,MAX(count) as count
                                            FROM df
                                            GROUP BY city) b
                    ON a.city = b.city AND a.count = b.count
                    """)
df_us_cities.createOrReplaceTempView("df_us_cities")
df_us_cities.limit(5).toPandas()

Unnamed: 0,city,state,median_age,male_population,female_population,total_population,veteran_population,foreign_population,avg_household_size,state_code,race
0,QUINCY,MASSACHUSETTS,41.0,44129,49500,93629,4147,32935,2.39,MA,White
1,FORT MYERS,FLORIDA,37.3,36850,37165,74015,4312,15365,2.45,FL,White
2,PITTSBURGH,PENNSYLVANIA,32.9,149690,154695,304385,17728,28187,2.13,PA,White
3,HAMPTON,VIRGINIA,35.5,66214,70240,136454,19638,6204,2.48,VA,Black or African-American
4,FREDERICK,MARYLAND,36.1,33146,36336,69482,3870,14211,2.48,MD,White


### airport-codes_csv data:

#### Exploring:

In [13]:
df=pd.read_csv('airport-codes_csv.csv')
df.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


#### Cleaning airport-codes_csv data and only extracting US airports data:

- **Missing Values:** This data has missing values in many columns. 
- **Useless Columns:** This data has many useless columns which are either completely empty or have majority of missing values.
- **Column Names:** Column names have been abbreviated and confusing It would be better to give them more precise names.
- **Coded Columns** Many columns have coded values in them. One such column is iso_region.
- **Unnecessary Rows:** One thing to notice here that this a is global data but I only require information about US airports for my dimension table associated to this data. So I am going to drop all other airports except for the ones with US data.
- **iso_region:** One other thing to notice here that values in iso_region column are coded with coutry code(US for US) as well as state Codes and separated by a "-" . As I am only interested in US data so I am going to parse this column and only extratc state codes using regex.

In [14]:
df = spark.read.csv("airport-codes_csv.csv",sep=",", inferSchema=True, header=True)
def parseregion(line):
    import re
    pattern = "\w{2}-(\w{2})"
    match = re.search(pattern,line)

    return match.group(1)
spark.udf.register('parseregion',parseregion,Str())
df.createOrReplaceTempView("df")
df_airports = spark.sql("""SELECT ident as airport_id,
                type,
                name,
                elevation_ft,
                parseregion(iso_region) as region,
                municipality,
                coordinates
                FROM df
                WHERE iso_country = 'US'
                """)
df_airports.createOrReplaceTempView("df_airports")
df_airports.show()

+----------+-------------+--------------------+------------+------+------------+--------------------+
|airport_id|         type|                name|elevation_ft|region|municipality|         coordinates|
+----------+-------------+--------------------+------------+------+------------+--------------------+
|       00A|     heliport|   Total Rf Heliport|          11|    PA|    Bensalem|-74.9336013793945...|
|      00AA|small_airport|Aero B Ranch Airport|        3435|    KS|       Leoti|-101.473911, 38.7...|
|      00AK|small_airport|        Lowell Field|         450|    AK|Anchor Point|-151.695999146, 5...|
|      00AL|small_airport|        Epps Airpark|         820|    AL|     Harvest|-86.7703018188476...|
|      00AR|       closed|Newport Hospital ...|         237|    AR|     Newport| -91.254898, 35.6087|
|      00AS|small_airport|      Fulton Airport|        1100|    OK|        Alex|-97.8180194, 34.9...|
|      00AZ|small_airport|      Cordes Airport|        3810|    AZ|      Cordes|-1

## Step 3: Define the Data Model:
Ok Now that my data is all cleaned and ready its time to define my Data model.
### 3.1 Conceptual Data Model
Since the purpose of this data warehouse is for OLAP and BI app usage, we will model these data sets with star schema data modeling.
![ER_Diagram](ER_Diagram.png)

### 3.2 Mapping Out Data Pipelines
1. Assume all data sets are stored in S3 buckets as below (This step is for etl.py file, Although in this notebook I am assuming the same path as specified in workspace).
    - [Source_S3_Bucket]/immigration/18-83510-I94-Data-2016/*.sas7bdat
    - [Source_S3_Bucket]/I94_SAS_Labels_Descriptions.SAS
    - [Source_S3_Bucket]/us-cities-demographics.csv
    - [Source_S3_Bucket]/airport-codes_csv.csv
2. Follow by Step 2 – Cleaning step to clean up data sets or You can go directly to etl.py and run it by uploading data in s3 bucket as specified above.
3. Once the data is cleaned you can follow the steps specified below for creating the above mentioned star schema.
4. One Thing to note is that in pipeline on my jupyter notebook I have not written my data to a perquet file in s3 however in my etl.py file I have written my tables to perquet files.

## Step 4: Run Pipelines to Model the Data 
### 4.1 Create the data model
![ER](ER_Diagram.png)

### Fact_immigration table:

In [15]:
# Write code here
fact_immigration=spark.sql("""SELECT id,
                                cicid,
                                year,
                                month,
                                port,
                                state_code,
                                visa,
                                arrival_date,
                                departure_date,
                                adm_num
                                FROM df_immi""")
fact_immigration.createOrReplaceTempView('fact_immigration')

### dimm_immi_personal table:

In [16]:
dimm_immi_personal = spark.sql("""
                                SELECT DISTINCT i.cicid,
                                        cc.country_name as citizen_country,
                                        rc.country_name as residence_country,
                                        i.birth_year,
                                        i.gender
                                        FROM df_immi i
                                        LEFT JOIN df_countries cc ON i.citizen_country=cc.country_code
                                        LEFT JOIN df_countries rc ON i.residence_country=rc.country_code
                                        ORDER BY cicid
                                        
                                """)
dimm_immi_personal.createOrReplaceTempView('dimm_immi_personal')

### dimm_flight_detail table:

In [17]:
dimm_flight_detail = spark.sql("""
                                SELECT DISTINCT adm_num,
                                        flight_no,
                                        airline,
                                        visatype
                                FROM df_immi""")
dimm_flight_detail.createOrReplaceTempView('dimm_flight_detail')

### dimm_airports table:

In [18]:
dimm_airports = spark.sql("""SELECT * FROM df_airports""")
dimm_airports.createOrReplaceTempView('dimm_airports')

### dimm_city_population table:

In [19]:
dimm_city_population = spark.sql("""
                                    SELECT uc.city,
                                            uc.state,
                                            c.city_code,
                                            uc.state_code,
                                            uc.male_population,
                                            uc.female_population,
                                            uc.total_population,
                                            uc.veteran_population,
                                            uc.foreign_population,
                                            uc.race
                                            FROM df_us_cities uc
                                            LEFT JOIN df_city c ON uc.city=c.city_name
                                            WHERE city_code IS NOT NULL
                                            """)

# dimm_city_population = dimm_city_population.dropDuplicates(['city'])
dimm_city_population.createOrReplaceTempView('dimm_city_population')

### dimm_city_stats table:

In [20]:
dimm_city_stats = spark.sql("""SELECT uc.city,
                                uc.state,
                                c.city_code,
                                uc.state_code,
                                uc.median_age,
                                uc.avg_household_size
                                FROM  df_us_cities uc
                                LEFT JOIN df_city c ON uc.city=c.city_name
                                WHERE city_code IS NOT NULL""")
dimm_city_stats.createOrReplaceTempView('dimm_city_stats_table')

### 4.2 Data Quality Checks
Data quality checks includes:
    
1. Data schema of every dimensional table matches data model.
2. No empty table after running ETL data pipeline.

#### Data schema of every dimensional table matches data model.

In [21]:
print("-------------------------------- fact_immigration_table ------------------------------------------")
fact_immigration.printSchema()
print("-------------------------------- dimm_imm_personal_table -----------------------------------------")
dimm_immi_personal.printSchema()
print("-------------------------------- dimm_flight_detail_table ----------------------------------------")
dimm_flight_detail.printSchema()
print("-------------------------------- dimm_airports_table ---------------------------------------------")
dimm_airports.printSchema()
print("-------------------------------- dimm_city_population_table --------------------------------------")
dimm_city_population.printSchema()
print("-------------------------------- dimm_city_stats_table -------------------------------------------")
dimm_city_stats.printSchema()

-------------------------------- fact_immigration_table ------------------------------------------
root
 |-- id: long (nullable = false)
 |-- cicid: long (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- port: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- visa: string (nullable = true)
 |-- arrival_date: timestamp (nullable = true)
 |-- departure_date: timestamp (nullable = true)
 |-- adm_num: long (nullable = true)

-------------------------------- dimm_imm_personal_table -----------------------------------------
root
 |-- cicid: long (nullable = true)
 |-- citizen_country: string (nullable = true)
 |-- residence_country: string (nullable = true)
 |-- birth_year: integer (nullable = true)
 |-- gender: string (nullable = true)

-------------------------------- dimm_flight_detail_table ----------------------------------------
root
 |-- adm_num: long (nullable = true)
 |-- flight_no: string (nullable = true)
 |-- a

#### No empty table after running ETL data pipeline:

In [None]:
print("-------------------------------- fact_immigration_table ------------------------------------------")
record_num = fact_immigration.count()
if record_num <= 0:
    raise ValueError("This table is empty!")
else:
    print(f"Table is not empty: total {record_num} records.")
print("-------------------------------- dimm_imm_personal_table -----------------------------------------")
record_num = dimm_immi_personal.count()
if record_num <= 0:
    raise ValueError("This table is empty!")
else:
    print(f"Table is not empty: total {record_num} records.")
print("-------------------------------- dimm_flight_detail_table ----------------------------------------")
recod_num = dimm_flight_detail.count()
if record_num <= 0:
    raise ValueError("This table is empty!")
else:
    print(f"Table is not empty: total {record_num} records.")
print("-------------------------------- dimm_airports_table ---------------------------------------------")
record_num = dimm_airports.count()
if record_num <= 0:
    raise ValueError("This table is empty!")
else:
    print(f"Table is not empty: total {record_num} records.")
print("-------------------------------- dimm_city_population_table --------------------------------------")
record_num = dimm_city_population.count()
if record_num <= 0:
    raise ValueError("This table is empty!")
else:
    print(f"Table is not empty: total {record_num} records.")
print("-------------------------------- dimm_city_stats_table -------------------------------------------")
record_num = dimm_city_stats.count()
if record_num <= 0:
    raise ValueError("This table is empty!")
else:
    print(f"Table is not empty: total {record_num} records.")
print("-------------------------------- Test Complete ---------------------------------------------------")

-------------------------------- fact_immigration_table ------------------------------------------
Table is not empty: total 2377967 records.
-------------------------------- dimm_imm_personal_table -----------------------------------------


### 4.3 Data dictionary:
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### fact_immigration_table:
immigrant log fact table:
- id ------------------------------ Autogenerated Unique id for fact table.
- cicid --------------------------- Unique id for fact table provided in original data.
- year ---------------------------- 4 digit year.
- month --------------------------- numeric month.
- port ---------------------------- city_code provided in i94 immigration data, city name can be retrieved                                     from dimm_city_population table.
- state_code ---------------------- state_code for states in US, State name can be retrieved from                                               dimm_city_population table.
- visa ---------------------------- visa cetagory.
- arrival_date -------------------- arrival timestamp for immigrant.
- departure_date ------------------ departure date for immigrant.
- adm_num ------------------------- admission number of immigrant.

#### dimm_immi_personel
personal detail for each immigrant:
- cicid ---------------------------- Unique id for fact table provided in original data.
- citizen_country ------------------ citizenship country of immigrant.
- residence_country ---------------- residence country of immigrant.
- birth_year ----------------------- birth year of immigrant.
- gender --------------------------- gender of immigrant.

#### dimm_flight_detail
flight detail for given admission number:
- adm_num ------------------------- admission number of immigrant.
- flight_no ----------------------- Number of flight.
- airline ------------------------- Airline flight belongs to.
- visatype ------------------------ Visatype of the pessenger.

#### dimm_city_stats
statistic for US cities:
- city_code ------------------------ City code for each city.
- city ----------------------------- Name of each city.
- state ---------------------------- state name of each city.
- state_code ----------------------- state code of each city.
- median_age ----------------------- median age for each city.
- avg_household_size --------------- Average household size of each city.

#### dimm_city_population:
population numbers for US cities:
- city_code ------------------------ City code for each city.
- city ----------------------------- Name of each city.
- state ---------------------------- state name of each city.
- state_code ----------------------- state code of each city.
- male_population ------------------ Male population of city.
- female_population ---------------- Female population of city.
- total_population ----------------- Total population of city.
- veteran_population --------------- Veteran population of city.
- foreign_population --------------- foreign citizens population of city.
- race ----------------------------- Majority ethnicity of city.

#### dimm_airports:
airports in United states:
- airport_id ----------------------- Airport id of airport.
- type ----------------------------- type of airport.
- name ----------------------------- name of airport.
- elevation_ft --------------------- alevation of airport in feet.
- region --------------------------- US State in which Airport is present.
- municipality --------------------- Municipality of Airport.
- coordinates ---------------------- Coordinates of Airport.

## Step 5: Complete Project Write Up
### Tools and technologies:
- AWS S3 for data storage-------------- I chose AWS S3 for data storage because it is easier maintain.
- Pandas ------------------------------ I used Pandas because pandas dataframes are easier to visualize and explore.
- Pyspark ----------------------------- I used Pyspark because of its schema on read features.

### Steps Taken:
- I imported pyspark and pandas libraries which are necessary for this project.
- I decided the scope of the project. i.e. I decided to use i94 immigration data, airport codes data and us cities damography data for my project.
- I created a spark session to work with my data.
- I loaded the datasets for my project.
- I cleaned those datasets by dropping irrelevant datam null values and duplicate values using schema on read features in pyspark.
- I also used advanced annalytics nlp methods on i9_SAS_labels_description to extract countries, cities and state names for their provided codes in the datasets.
- I replaced these codes with their relevant names where necessary in the original datasets.
- I modeled a star schema for my ETL pipeline.
- I created tables as specfied in star schema from the original datasets.
- i finalized my etl pipeline by doing data quality checks of my data warehouse.



In [None]:
### The Reason I finalized this data model:
I finalized this data model because I wanted to generate some beneficial OLAP cubes which would be easier to communicate with relevant authorities .e.g. some official from department of home affairs want to know how many female immigrants ported the city with highest foreign population.

In [24]:
spark.sql("""SELECT c.city, count(f.id) as porting_female_immigrants,c.foreign_population
            FROM fact_immigration f
            LEFT JOIN dimm_city_population c
            ON f.port = c.city_code
            LEFT JOIN dimm_immi_personal p
            ON f.cicid = p.cicid
            WHERE p.gender = 'F'
            GROUP BY c.city,c.foreign_population
            ORDER BY c.foreign_population DESC
            LIMIT 1""").show()

+--------+-------------------------+------------------+
|    city|porting_female_immigrants|foreign_population|
+--------+-------------------------+------------------+
|NEW YORK|                   192665|           3212500|
+--------+-------------------------+------------------+



### Data Update Frequency:
- Tables created from immigration data set should be updated monthly since the raw data set is built up monthly.
- Tables created from demography data set could be updated annually since demography data collection takes time and high frequent demography might take high cost but generate wrong conclusion.
- Table generated from airport codes data should be update annually as well, since airports are not created  or shut down overnight and frequency of new airports adding to a location is quite low.

### How to approach the problem differently under the following scenarios:
- **The data was increased by 100x.**
    If data is increased by 100x then we should consider to put data in AWS EMR which is a distributed data cluster for processing large data sets on cloud.
 
- **The data populates a dashboard that must be updated on a daily basis by 7am every day.**
    Apache Airflow is a usefull too for building up a ETL data pipeline to update the date regularly. Apache Airflow integrated with Python and AWS very well. Many applications can be combined together to deliver task automation.
- **The database needed to be accessed by 100+ people.**
    AWS Redshift is a good consideration in this case as Redshift can easily handle upto 500 connections.