### Step 1: Scope the Project and Gather Data

#### Scope 

The scope of the project is to get insights and analytics for immigration, airports, and demographics data. The end solution is a data model with star schema with a fact table and five dimension tables which connects immigration data, demographics data, airport data, visa type data and the arrival dates of the immigrants.

With this we can get total immigrants using different visa types, total arrivals in different us cities and states, total airports in different US states and much more

We use Spark to load and process the data into dataframes. We explore the data initially and then clean the data from null / missing values.

We create staging tables and then with the help of those tables we create the end fact and dimension tables. Each dimension table and the fact tables are finally checked for data quality by verifying whether the data is present or not. Each of the end table's results are parquet into for deeper understanding and clarity.

#### Describe and Gather Data 
1. *I94 Immigration Data:* This data comes from the US National Tourism and Trade Office.
2. *U.S. City Demographic Data:* This data comes from OpenSoft.This dataset contains information about the demographics of all US cities and census-designated places with a population greater or equal to 65,000. This data comes from the US Census Bureau's 2015 American Community Survey.
3. *Airport Code Table:* This is a simple table of airport codes and corresponding cities.


In [1]:
# Do all imports and installs here
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import StringType as Str, IntegerType as Int
from datetime import datetime, timedelta
import pyspark.sql.functions as F
import re

In [2]:
spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

In [3]:
immig_fpath = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
immig_subset = ["cicid", "i94port", "i94addr", "gender", "i94mode"]

In [4]:
dem_file = "us-cities-demographics.csv"
dem_subset = ['State Code', 'city', 'State', 'Median Age', 'Male Population', 'Female Population', 'Total Population', 'Number of Veterans']

In [5]:
airp_file = "airport-codes_csv.csv"
air_subset = ['ident', 'type', 'name', 'iso_country', 'iso_region', 'elevation_ft']

In [6]:
#convert city names to its abbreviation
immig_desc = "I94_SAS_Labels_Descriptions.SAS"
with open(immig_desc) as f:
    parsed_val = f.readlines()

exp_code = re.compile(r"\'(.*)\'.*\'(.*)\'")
city_codes = {}
for line in parsed_val[302:961]:
    x = exp_code.search(line)
    city_codes[x.group(1)] = x.group(2)
    
@udf(Str())
def processed_codes(city):
    for a in city_codes:
        if city.lower() in city_codes[a].lower():
            return a

### Working with Demographics data
____________________________________________________________________________________________________________________________

#### The function *process_demographic* does the following:
* read the input demographics data
* clean the dataframe from missing values with *clean_data* function
* creates a unique monotonically increasing id for each record
* convert city names to city codes using the *processed_codes* function
* create immigration staging table **staging_demographic**

In [7]:
def process_demographic(spark, input_data, subset):
    dem_df = spark.read.csv(dem_file, inferSchema=True, header=True, sep=';')
    clean_demodf = dem_df.dropna(how='any', subset=subset)
    clean_dm = clean_demodf.withColumn('id', monotonically_increasing_id())
    clean_dm = clean_dm.withColumn('city_code', processed_codes(clean_dm['City']))
    cleansed_dm = clean_dm.dropna(how = 'any', subset = ['city_code'])
    staging_demographic = cleansed_dm.select(col('id').alias('id'),
                                     col('State Code').alias('state_code'),
                                     col('city_code'),
                                     col('Median Age').alias('median_age'),
                                     col('Male Population').alias('male_population'),
                                     col('Female Population').alias('female_population'),
                                     col('Total Population').alias('total_population'),
                                     col('Number of Veterans').alias('number_of_veterans'),
                                     col('Foreign-born').alias('foreign_born'),
                                     col('Average Household Size').alias('average_household_size'),
                                     col('race'),
                                     col('count')).dropDuplicates()
    return clean_demodf, staging_demographic
    

In [8]:
clean_demodf, staging_demographic = process_demographic(spark, dem_file, dem_subset)

In [9]:
staging_demographic.limit(10).toPandas()

Unnamed: 0,id,state_code,city_code,median_age,male_population,female_population,total_population,number_of_veterans,foreign_born,average_household_size,race,count
0,463,AR,LIA,36.6,96997,100989,197986,12343,16640,2.36,White,102312
1,591,CA,LNB,34.6,238159,236013,474172,17463,127764,2.78,White,277962
2,1019,CA,SAC,33.7,237724,252991,490715,19698,112579,2.73,White,268151
3,1542,IL,PIA,33.1,56229,62432,118661,6634,7517,2.4,White,77074
4,2277,AZ,TUC,33.6,264893,266781,531674,38182,82220,2.45,Asian,24689
5,2642,TN,MEM,34.1,312237,343523,655760,31189,43318,2.55,Black or African-American,420983
6,2259,FL,WPB,39.6,49262,57520,106782,4917,30675,2.53,Asian,3191
7,2347,CA,IND,35.9,43803,43723,87526,3647,22538,3.08,Asian,2866
8,2837,OR,SBN,37.3,42294,44723,87017,6199,3032,2.39,American Indian and Alaska Native,1818
9,305,PA,PHI,34.1,741270,826172,1567442,61995,205339,2.61,Black or African-American,691186


In [10]:
#defining function to convert date to readable pythonic format(yyyy-mm-dd)
@udf(Str())
def process_dates(date):
    if date:
        return (datetime(1960,1,1).date() + timedelta(date)).isoformat()
    return None

In [11]:
#get US only distinct states from immigration data by getting the state code from demographics data
us_states = clean_demodf.toPandas()['State Code'].unique()
a = us_states.tolist()
#defining a funtion to filter us only states from immigration dataframe
@udf(Str())
def filter_statecode(state):
    if state in us_states:
        return state
    return 'noUS'

In [12]:
#defining a funtion to clean data frames from missing empty values
def clean_data(df, subset):
    clean_df = df.dropna(how='any', subset=subset)
    return clean_df

### Working with Immigration data

____________________________________________________________________________________________________________________________

#### The function *process_immigration* does the following:
* read the input immigration data
* clean the dataframe from missing values with *clean_data* function
* convert the arrival date from sas to pythonic readable date format using *process_dates* function
* filter out data for US only states
* create immigration staging table **staging_immigration**

In [13]:
def process_immigration(spark, input_data, subset, a):
    immig_df = spark.read.format('com.github.saurfang.sas.spark').load(input_data)
    clean_immig_df = clean_data(immig_df, subset)
    clean_immig_df = clean_immig_df.withColumn("arrdate", process_dates(clean_immig_df.arrdate))
    clean_immig_df = clean_immig_df.withColumn("i94addr", filter_statecode(clean_immig_df.i94addr))
    clean_immig_df = clean_immig_df.filter((clean_immig_df.i94addr).isin(a))
    staging_immigration = clean_immig_df.select(col('cicid').alias('Id'),
                                           col('gender').alias('Gender'),
                                           col('i94bir').alias('Age'), 
                                           col('i94mode').alias('Mode_Of_Arrival'),
                                           col('arrdate').alias('Arrival_Date'),
                                           col('i94port').alias('City_Code'),
                                           col('i94addr').alias('State_Code'),
                                           col('visatype').alias('Visa_Type'),
                                           'count').dropDuplicates()
    return immig_df, staging_immigration
    

In [14]:
immig_df, staging_immigration = process_immigration(spark, immig_fpath, immig_subset, a)

In [15]:
staging_immigration.limit(10).toPandas()

Unnamed: 0,Id,Gender,Age,Mode_Of_Arrival,Arrival_Date,City_Code,State_Code,Visa_Type,count
0,939.0,F,42.0,1.0,2016-04-01,NEW,NY,WT,1.0
1,1151.0,F,60.0,1.0,2016-04-01,ATL,DC,WT,1.0
2,1513.0,M,12.0,1.0,2016-04-01,NYC,NY,WT,1.0
3,1910.0,M,20.0,1.0,2016-04-01,SFR,CA,F1,1.0
4,2550.0,M,42.0,1.0,2016-04-01,NEW,NY,B2,1.0
5,3797.0,F,61.0,1.0,2016-04-01,LVG,NV,WT,1.0
6,3886.0,F,24.0,1.0,2016-04-01,ATL,NY,WT,1.0
7,4032.0,F,6.0,1.0,2016-04-01,NYC,NJ,WT,1.0
8,4056.0,M,70.0,1.0,2016-04-01,NYC,NY,WT,1.0
9,4201.0,F,63.0,1.0,2016-04-01,SPM,MN,WT,1.0


____________________________________________________________________________________________________________________________

### Working with Airports data

____________________________________________________________________________________________________________________________

#### The function *process_airports* does the following:
* read the input airports data
* clean the dataframe from missing values with *clean_data* function
* filter out airports located only in the US
* remove the prefix from region to get well formated state code
* create airports staging table **staging_airports**

In [16]:
def process_airports(spark, input_data, subset):
    airp_df = spark.read.csv(airp_file, inferSchema=True, header=True, sep=',')
    clean_adf = clean_data(airp_df, subset)
    clean_adf_us = clean_adf.filter(clean_adf.iso_country == 'US')
    clean_airdf = clean_adf_us.withColumn('iso_region', F.regexp_replace('iso_region', 'US-', ''))
    staging_airports = clean_airdf.select(col('ident').alias('Id'),
                                         col('name').alias('Name'),
                                         col('type').alias('Type'),
                                         col('elevation_ft').alias('Elevation_ft'),
                                         col('iso_region').alias('State_Code'),
                                         col('iso_country').alias('Country_Code')).dropDuplicates()
    return staging_airports

In [17]:
staging_airports = process_airports(spark, airp_file, air_subset)

In [18]:
staging_airports.limit(10).toPandas()

Unnamed: 0,Id,Name,Type,Elevation_ft,State_Code,Country_Code
0,01PS,Nort's Resort Airport,small_airport,1040,PA,US
1,04FA,Richards Field,small_airport,9,FL,US
2,04OK,Stillwater Medical Center Heliport,heliport,984,OK,US
3,09XS,Baptist Medical Center Heliport,heliport,710,TX,US
4,0CL4,Glen Fed Heliport,heliport,770,CA,US
5,0IL6,Kishwaukee Community Hospital Heliport,heliport,852,IL,US
6,0N5,Deldot Helistop,heliport,30,DE,US
7,0NK3,Seven Gullies Airport,small_airport,600,NY,US
8,12XS,Mc Croskey Field,small_airport,253,AR,US
9,19FL,The Villages Heliport,heliport,103,FL,US


In [19]:
staging_airports.count()

22518

____________________________________________________________________________________________________________________________


### Creating visa staging table

____________________________________________________________________________________________________________________________

#### The function *process_visatype* does the following:
* get the distinct visa type data from immigration dataframe
* creates a unique monotonically increasing id for each record
* create visa types staging table **staging_visatype**

In [19]:
def process_visatype(df):
    visa_df = df.select(['visatype']).distinct()
    v_df = visa_df.withColumn('visa_type_id', monotonically_increasing_id())
    staging_visatype = v_df.select(col('visa_type_id'),
                              col('visatype').alias('visa_type'))
    return staging_visatype

In [20]:
staging_visatype = process_visatype(immig_df)

In [22]:
staging_visatype.count()

17

____________________________________________________________________________________________________________________________

## Step 3: Define the Data Model
### 3.1 Conceptual Data Model
#### Staging Tables:
    1. staging_immigration
    2. staging_airports
    3. staging_demographic
    4. staging_visatype

### ERD
![erd](updated_erd.png)

#### Dimension Tables:
    1. dim_immigrant
    2. dim_demographic
    3. dim_state
    4. dim_visatype
    5. dim_airport
    6. dim_arrivaldate

#### Fact Table:
    1. immig_fact
    

#### the proposed data model if of star schema
Star schemas are easy for end users and applications to understand and navigate. With a well-designed schema, users can quickly analyze large, multidimensional data sets. The main advantages of star schemas in a decision-support environment are:

    * Query performance
        - Because a star schema database has a small number of tables and clear join paths, queries run faster than they do against an OLTP system. Small single-table queries, usually of dimension tables, are almost instantaneous. Large join queries that involve multiple tables take only seconds or minutes to run. In a star schema database design, the dimensions are linked only through the central fact table. When two dimension tables are used in a query, only one join path, intersecting the fact table, exists between those two tables. This design feature enforces accurate and consistent query results.

    * Load performance and administration
        Structural simplicity also reduces the time required to load large batches of data into a star schema database. By defining facts and dimensions and separating them into different tables, the impact of a load operation is reduced. Dimension tables can be populated once and occasionally refreshed. You can add new facts regularly and selectively by appending records to a fact table.

    * Built-in referential integrity
        A star schema has referential integrity built in when data is loaded. Referential integrity is enforced because each record in a dimension table has a unique primary key, and all keys in the fact tables are legitimate foreign keys drawn from the dimension tables. A record in the fact table that is not related correctly to a dimension cannot be given the correct key value to be retrieved.

    * Easily understood
        A star schema is easy to understand and navigate, with dimensions joined only through the fact table. These joins are more significant to the end user, because they represent the fundamental relationship between parts of the underlying business. Users can also browse dimension table attributes before constructing a query.

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

1. **Data clansing:** remove nulls, duplicates from the data
2. **Process data:** filter out unwanted columns and process data such as changing datatype format, get distinct values, narrow down to single country etc
3. **Create Staging tables**: Create immigration, demographic, airports and visatype staging tables
4. **Create Dimension tables:** Create immigrant, demographic, state, airports, visatype and arrivaldate dimension tables
5. **Create Fact table:** Create immigration fact table
6. **Save the final result in parquet**

____________________________________________________________________________________________________________________________

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

### Functions to create Dimension tables
____________________________________________________________________________________________________________________________

In [21]:
def create_immigrant_dimension(staging_table):
    dim_immigrant = staging_table.select(col('Id').alias('id'),
                                          col('Arrival_Date').alias('arrival_date'),
                                          col('Age').alias('age'),
                                          col('Gender').alias('gender'),
                                          col('Visa_Type').alias('visa_type'), 
                                          col('Mode_of_Arrival').alias('mode_of_arrival')).dropDuplicates()
    #dim_immigrant.write.mode('overwrite').partitionBy('gender', 'age').parquet('immigrants_us')
    return dim_immigrant

In [32]:
def create_demographics_dimension(staging_table):
    dim_demographic = staging_table.select('id', 'city_code', 'male_population',
                                     'female_population', 'total_population', 'number_of_veterans',
                                     'foreign_born').dropDuplicates()
    dim_demographic = dim_demographic.groupby('city_code').agg(sum('total_population').alias('total_population'), sum('male_population').alias('male_population'), sum('female_population').alias('female_population'), sum('foreign_born').alias('foreign_born'), sum('number_of_veterans').alias('number_of_veterans'))
    #dim_demographic.write.mode("overwrite").partitionBy("city_code").parquet("city_us")
    return dim_demographic

In [60]:
def create_state_dimension(staging_table):
    dim_state = staging_table.select('id', 'state_code', 'male_population',
                                     'female_population', 'total_population', 'number_of_veterans',
                                     'foreign_born').dropDuplicates()
    dim_state = dim_state.groupby('state_code').agg(sum('total_population').alias('total_population'), sum('male_population').alias('male_population'), sum('female_population').alias('female_population'), sum('foreign_born').alias('foreign_born'), sum('number_of_veterans').alias('number_of_veterans'))
    #dim_state.write.mode("overwrite").partitionBy("state_code").parquet("state_us")
    return dim_state

In [25]:
def create_visa_dimension(staging_table):
    dim_visatype = staging_table.select('visa_type_id', 'visa_type').dropDuplicates()
    #dim_visatype.write.mode("overwrite").parquet("visatype_us")
    return dim_visatype

In [26]:
def create_airports_dimension(staging_table):
    dim_airport = staging_table.select(col('Id').alias('id'),
                                     col('Name').alias('name'),
                                     col('Type').alias('type'),
                                     col('Elevation_ft').alias('elevation_ft'),
                                     col('State_Code').alias('state_code'),
                                     col('Country_Code').alias('country_code')).dropDuplicates()
    #dim_airport.write.mode("overwrite").partitionBy("state_code").parquet("airport_us")
    return dim_airport

In [27]:
def create_arrivaldate_dimension(staging_table):
    dim_arrivaldate = staging_table.select(col('Arrival_Date').alias('date'))
    dim_arrivaldate = dim_arrivaldate.withColumn('year', year('date')).withColumn('month', month('date')).withColumn('day', dayofmonth('date')).withColumn('week', weekofyear('date')).dropDuplicates()
    #dim_arrivaldate.write.mode('overwrite').partitionBy('month').parquet('arrival_date')
    return dim_arrivaldate


____________________________________________________________________________________________________________________________

### Functions to create Fact table
____________________________________________________________________________________________________________________________

In [28]:
def create_immig_fact(staging_table):
    immig_fact = staging_table.select(col('Id').alias('id'),
                                        col('State_Code').alias('state_code'),
                                        col('City_Code').alias('city_code'),
                                        col('Arrival_Date').alias('arrival_Date'),
                                        col('Visa_Type').alias('visa_type'),
                                           'count').dropDuplicates()
    #immig_fact.write.mode("overwrite").partitionBy("state_code", "city_code").parquet("immigration_us")
    return immig_fact

____________________________________________________________________________________________________________________________

### Creating the data model
____________________________________________________________________________________________________________________________

In [61]:
dim_immigrant = create_immigrant_dimension(staging_immigration)
dim_demographic = create_demographics_dimension(staging_demographic)
dim_state = create_state_dimension(staging_demographic)
dim_visatype = create_visa_dimension(staging_visatype)
dim_airport = create_airports_dimension(staging_airports)
dim_arrivaldate = create_arrivaldate_dimension(staging_immigration)

immig_fact = create_immig_fact(staging_immigration)

In [62]:
dim_state.count()

45

In [63]:
dim_state.limit(10).toPandas()

Unnamed: 0,state_code,total_population,male_population,female_population,foreign_born,number_of_veterans
0,AZ,14159475,7075805,7083670,2370000,812895
1,SC,1344585,658210,686375,59205,75380
2,LA,3471395,1664530,1806865,185810,162490
3,MN,991710,481025,510685,100105,59515
4,NJ,3300686,1625593,1675093,1110059,65615
5,DC,3361140,1598525,1762615,475585,129815
6,OR,4506795,2231710,2275085,573945,246980
7,VA,4022775,1988235,2034540,405585,364965
8,RI,1222812,602606,620206,288932,47861
9,NH,551115,274225,276890,72530,27365


____________________________________________________________________________________________________________________________

#### 4.2 Data Quality Checks

Checking if the etl ran as expectted by checking for the data for the resultant dimension and fact tables

Run Quality Checks

In [39]:
# Perform quality checks here

# Check if tables are created here

def table_check(table):
    if table is not None:
        return True
    else:
        return False

# Check if data is present or not here
    
def data_check(table):
    return table.count() != 0

# Check if ids of columns are of expected data type or not here

def quality_check(table, col):
    a = dict(table.dtypes)[col]
    if (a in ['bigint', 'double', 'string']):
        print('Data Quality check passed !')
    else:
        print('Data Quality check failed')
    return

In [64]:
if table_check(dim_immigrant) & table_check(dim_demographic) & table_check(dim_state) & table_check(dim_visatype) & table_check(dim_airport) & table_check(dim_arrivaldate) & table_check(immig_fact):
    print("All Dimensions and one Fact table exists")
else:
    print("one or more table is missing")


All Dimensions and one Fact table exists


In [41]:
if data_check(dim_immigrant) & data_check(dim_demographic) & data_check(dim_state) & data_check(dim_visatype) & data_check(dim_airport) & data_check(dim_arrivaldate) & data_check(immig_fact):
    print('Data Quality check passed !')
else:
    print('Data Quality check failed')

Data Quality check passed !


In [65]:
quality_check(dim_immigrant, 'id')
quality_check(dim_demographic, 'city_code')
quality_check(dim_state, 'state_code')
quality_check(dim_visatype, 'visa_type_id')
quality_check(dim_airport, 'id')
quality_check(immig_fact, 'id')
quality_check(dim_arrivaldate, 'date')

Data Quality check passed !
Data Quality check passed !
Data Quality check passed !
Data Quality check passed !
Data Quality check passed !
Data Quality check passed !
Data Quality check passed !


### Run SQL Queries to check the expected analysis

In [44]:
dim_immigrant.createOrReplaceTempView('dim_immigrant')
df = spark.sql('SELECT * FROM dim_immigrant')

In [45]:
dim_demographic.createOrReplaceTempView('dim_demographic')
df1 = spark.sql('SELECT * FROM dim_demographic')

In [66]:
dim_state.createOrReplaceTempView('dim_state')
ds = spark.sql('SELECT * FROM dim_state')

In [75]:
dim_visatype.createOrReplaceTempView('dim_visatype')
df2 = spark.sql('SELECT * FROM dim_visatype')

In [76]:
dim_airport.createOrReplaceTempView('dim_airport')
df3 = spark.sql('SELECT * FROM dim_airport')

In [77]:
dim_arrivaldate.createOrReplaceTempView('dim_arrivaldate')
df4 = spark.sql('SELECT * FROM dim_arrivaldate')

In [78]:
immig_fact.createOrReplaceTempView('immig_fact')
df5 = spark.sql('SELECT * FROM immig_fact')

In [51]:
#query to get total immigrants based on their visa type
query = spark.sql('SELECT COUNT(i.id) as total_immigrants, v.visa_type \
                  FROM dim_immigrant as i JOIN dim_visatype as v \
                  ON i.visa_type = v.visa_type \
                  GROUP BY v.visa_type \
                  ORDER BY total_immigrants DESC')

In [52]:
query.toPandas()

Unnamed: 0,total_immigrants,visa_type
0,1026625,WT
1,970383,B2
2,183798,WB
3,180794,B1
4,35640,F1
5,17795,E2
6,3481,E1
7,3065,I
8,2793,F2
9,1279,M1


In [52]:
#query to get total immigrants arrived on diiferent week of the year 2016
immigrants_datewise = spark.sql('SELECT COUNT(i.id) as total_immigrants, a.week \
                  FROM dim_immigrant as i JOIN dim_arrivaldate as a \
                  ON i.arrival_date = a.date \
                  GROUP BY a.week \
                  ORDER BY total_immigrants DESC')

In [50]:
immigrants_datewise.toPandas()

Unnamed: 0,total_immigrants,week
0,559190,15
1,548274,16
2,542119,14
3,531724,17
4,245340,13


In [58]:
#query to get total immigrant arrivals to different US cities
dim2 = spark.sql('SELECT d.city_code, COUNT(immig.arrival_date) as total_arrivals \
                    FROM immig_fact AS immig \
                    JOIN dim_demographic AS d \
                    ON d.city_code = immig.city_code \
                    GROUP BY d.city_code \
                    ORDER BY total_arrivals DESC')

In [59]:
dim2.limit(10).toPandas()

Unnamed: 0,city_code,total_arrivals
0,NYC,384849
1,MIA,285810
2,LOS,246235
3,SFR,136657
4,NEW,130661
5,ORL,120282
6,CHI,103691
7,HOU,87456
8,FTL,80190
9,LVG,78376


In [85]:
#query to get total airports in different US states
statewise_airport_data = spark.sql('SELECT COUNT(air.name) as number_of_airports, ds.state_code \
                  FROM dim_airport as air \
                  JOIN dim_state as dS \
                  ON air.state_code = dS.state_code \
                  GROUP BY ds.state_code \
                  ORDER BY number_of_airports DESC')

In [86]:
statewise_airport_data.limit(10).toPandas()

Unnamed: 0,number_of_airports,state_code
0,2268,TX
1,1071,CA
2,954,FL
3,913,PA
4,901,IL
5,799,OH
6,784,AK
7,696,IN
8,645,NY
9,624,WI


#### 4.3 Data dictionary 

### Dimension table:

1. **dim_immigrant:**
    * **id:** unique id for the immigrants
    * **arrival_date:** date of the immigrant's arrival
    * **gender:** gender of the immigrant
    * **age:** age of the immigrant
    * **visa_type:** visa type given to the immigrant
    * **mode_of_arrival:** mode of arrival by the immigrant into the US
    
2. **dim_demographic:**
    * **city_code:** code of the city
    * **male_population:** total male population in the city
    * **female_population:** total female population in the city
    * **total_population:** total population in the city
    * **number_of_veterans:** total number of veterans in the city
    * **foreign_born:** number of people born outside US in the city
    
2. **dim_state:**
    * **state_code:** code of the state where city belongs
    * **male_population:** total male population in the city
    * **female_population:** total female population in the city
    * **total_population:** total population in the city
    * **number_of_veterans:** total number of veterans in the city
    * **foreign_born:** number of people born outside US in the city
    
3. **dim_airport:**
    * **id:** code for the airport
    * **name:** name of the airport
    * **type:** type of the airport
    * **elevation_ft:** elevation in feets of the airport
    * **state_code:** code of the state where the airport is located
    * **country_code:** code of the country where the airport is located

4. **dim_visatype:**
    * **visa_type_id:** id for the visa type
    * **visa_type:** type of visa

5. **dim_arrivaldate:**
    * **date:** arrival date of the immigrant
    * **year:** year of arrival
    * **month:** month of arrival
    * **day:** day of arrival
    * **week:** week number of a year of arrival


### Fact Table:
1. **immig_fact:**
    * **id:** unique record id of the immgration
    * **state_code:** code of the state where city belongs
    * **city_code:** code of the city**
    * **arrival_Date:** arrival date of the immigrant
    * **visa_type:** type of visa
    * **count:** count of immigrant

#### Step 5: Complete Project Write Up
* **Clearly state the rationale for the choice of tools and technologies for the project.**
    - We use Spark since it the best tool to process large amount of data with so many advantages like fast processing, easier scaling if needed and has very good integrations and dynamic in nature.

* **Propose how often the data should be updated and why.**
    =the data should be update on the basis of how often newer data is available, for example if we get an update on the immigration data or if there are more newer airports constructed then we might need to update the data. So it is suggested to checck and update the data in a monthly basis to facilitate our requirements

* **Write a description of how you would approach the problem differently under the following scenarios:**
 * **The data was increased by 100x.:** Increase in data can be handled by increasing the processing power so a powerful (larger) EC2 instance along with increase in the number of clusters or worker nodes of spark can handle them
 * **The data populates a dashboard that must be updated on a daily basis by 7am every day.:** We can achieve them by using Apache Airflow by creating and scheduling the DAGs to check and update the data pipelines on a daily basis
 * **The database needed to be accessed by 100+ people.:* We can move the work to a data warehouse with larger processing power ( Amazon Redshift) in order to enable the access for large number of people