# Project Title
### Data Engineering Capstone Project

#### Project Summary
--describe your project at a high level--<br>
**I94 Immigration data and city temperature data will be used to create a database that is optimized to query and analyze immigration events. An ETL pipeline is to be build with these to data sources to create the database. Finally, the database will be used to access immigration behaviour to location temperatures.**

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

### Importing Step
Installing **koalas pyarrow** to run **koalas** <br> and Importing libraries those I need.


In [1]:
!pip install koalas pyarrow



In [2]:
# Do all imports and installs here
from pyspark.sql import SparkSession

import datetime

import numpy as np
import pandas as pd
import databricks.koalas as ks
%matplotlib inline
import matplotlib.pyplot as plt
import pyspark.sql.functions as F



### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>
<br>We would be creating 3 dimension tables and 1 fact table. I94 immigration data is to be aggregated by destination city, then temperature data is to be aggregated by city. These would be the 2 fact tables. Both these tables will be joined on destination city to form fact table. A final database will be created to query on immigration events to determine if temperature affects the selection of destination cities for immigration.
#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included?<br> 
I used 3 different engine:<br>
* Koalas
* Pandas
* Pyspark
<br>The reason behind these engins is to show the optimization on reading and gathering Data.<br>
But almost the whole project has been done by **Koalas**.

In [7]:
%%time 

from pyspark.sql import SparkSession
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()
df_spark =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')
#df_spark = spark.createDataFrame(df)

CPU times: user 3.76 ms, sys: 3.94 ms, total: 7.7 ms
Wall time: 133 ms


%%time
# Read in the data here

path = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
df = pd.read_sas(path, 'sas7bdat', encoding="ISO-8859-1")

In [8]:
%%time 

kdf = ks.DataFrame(df_spark)

CPU times: user 33.9 ms, sys: 19.2 ms, total: 53.1 ms
Wall time: 521 ms


### Reading The second dataset `GlobalLandTemperaturesByCity.csv`

%%time

temp = '../../data2/GlobalLandTemperaturesByCity.csv'
df_tempratures = pd.read_csv(temp)

In [9]:
%%time 

df_spark_csv = spark.read.option("header",True).csv("../../data2/GlobalLandTemperaturesByCity.csv")
df_spark_csv.show()

+----------+-------------------+-----------------------------+-----+-------+--------+---------+
|        dt| AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|
+----------+-------------------+-----------------------------+-----+-------+--------+---------+
|1743-11-01|              6.068|           1.7369999999999999|Århus|Denmark|  57.05N|   10.33E|
|1743-12-01|               null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-01-01|               null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-02-01|               null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-03-01|               null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-04-01| 5.7879999999999985|           3.6239999999999997|Århus|Denmark|  57.05N|   10.33E|
|1744-05-01|             10.644|           1.2830000000000001|Århus|Denmark|  57.05N|   10.33E|
|1744-06-01| 14.050999999999998|        

In [10]:
df_spark_csv.printSchema()

root
 |-- dt: string (nullable = true)
 |-- AverageTemperature: string (nullable = true)
 |-- AverageTemperatureUncertainty: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



In [11]:
%%time

kdf_tempratures = ks.DataFrame(df_spark_csv)

CPU times: user 22.3 ms, sys: 5.1 ms, total: 27.4 ms
Wall time: 271 ms


%%time 

temp = '../../data2/GlobalLandTemperaturesByCity.csv'
kdf_tempratures = ks.read_csv(temp)

Reading CSV files in `Pandas` more faster than `Koalas`.

### Step 2: Explore and Assess the Data
#### 1. Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### 2. Cleaning Steps
Document steps necessary to clean the data

### 1. Explore

In [12]:
kdf.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,37.0,2.0,1.0,,,,T,,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,25.0,3.0,1.0,20130811.0,SEO,,G,,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,55.0,2.0,1.0,20160401.0,,,T,O,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,28.0,2.0,1.0,20160401.0,,,O,O,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,4.0,2.0,1.0,20160401.0,,,O,O,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [13]:
kdf.info()



databricks.koalas.frame.DataFrame
Index: 3096313 entries, 0 to 3096312
Data columns (total 28 columns):
cicid       3096313 non-null float64
i94yr       3096313 non-null float64
i94mon      3096313 non-null float64
i94cit      3096313 non-null float64
i94res      3096313 non-null float64
i94port     3096313 non-null object
arrdate     3096313 non-null float64
i94mode     3096074 non-null float64
i94addr     2943721 non-null object
depdate     2953856 non-null float64
i94bir      3095511 non-null float64
i94visa     3096313 non-null float64
count       3096313 non-null float64
dtadfile    3096312 non-null object
visapost    1215063 non-null object
occup       8126 non-null object
entdepa     3096075 non-null object
entdepd     2957884 non-null object
entdepu     392 non-null object
matflag     2957884 non-null object
biryear     3095511 non-null float64
dtaddto     3095836 non-null object
gender      2682044 non-null object
insnum      113708 non-null object
airline     3012686 non-null

%%time

df.head()

%%time

df.info()

In [14]:
%%time

kdf_tempratures.head()

CPU times: user 18 ms, sys: 8.27 ms, total: 26.3 ms
Wall time: 85.5 ms


Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


%%time

df_tempratures.head()

%time

df_tempratures.info()

In [15]:
%%time

kdf_tempratures.shape

CPU times: user 3.4 ms, sys: 0 ns, total: 3.4 ms
Wall time: 6.09 s


(8599212, 7)

%%time

df_tempratures.shape

In [16]:
kdf_tempratures.AverageTemperature.isnull().value_counts()

False    8235082
True      364130
Name: AverageTemperature, dtype: int64

In [17]:
#False : means no null values.
kdf.i94port.isnull().values.any()



False

In [18]:
#The result : 0. that means no null values.
kdf.i94port.isnull().sum()

0

In [19]:
kdf.i94port.count()

3096313

In [20]:
kdf['count'].value_counts()

1.0    3096313
Name: count, dtype: int64

In [21]:
kdf.depdate.head()

0        NaN
1        NaN
2    20691.0
3    20567.0
4    20567.0
Name: depdate, dtype: float64

In [22]:
kdf.depdate.isnull().value_counts()

False    2953856
True      142457
Name: depdate, dtype: int64

In [23]:
#fill missing values with 0.
kdf.depdate.fillna(0)
kdf.depdate.head()

0        NaN
1        NaN
2    20691.0
3    20567.0
4    20567.0
Name: depdate, dtype: float64

### 2. Cleaning

Defining quality issues in data:
1. **i94yr** : The year should be a datetime or integer.
2. **i94port** : has XXX which means nothing and if there any missing value.
3. **count** : all the values of this column has '1' which is not useful, so should be removed.
4. **AverageTempratures** : has 364130 missing values which are should be removed.
5. **depdate** : will be converted to YYYY-MM-DD format.
6. **dt** : should be converted to datetime.

Converting `dt` to datetime.

In [24]:
kdf_tempratures.info()

databricks.koalas.frame.DataFrame
Index: 8599212 entries, 0 to 8599211
Data columns (total 7 columns):
dt                               8599212 non-null object
AverageTemperature               8235082 non-null object
AverageTemperatureUncertainty    8235082 non-null object
City                             8599212 non-null object
Country                          8599212 non-null object
Latitude                         8599212 non-null object
Longitude                        8599212 non-null object
dtypes: object(7)



In [25]:
kdf_tempratures.dt = ks.to_datetime(kdf_tempratures.dt)
kdf_tempratures.dt.head()

0   1743-11-01
1   1743-12-01
2   1744-01-01
3   1744-02-01
4   1744-03-01
Name: dt, dtype: datetime64[ns]

Data quality check

In [26]:
kdf_tempratures.info()

databricks.koalas.frame.DataFrame
Index: 8599212 entries, 0 to 8599211
Data columns (total 7 columns):
dt                               8599212 non-null object
AverageTemperature               8235082 non-null object
AverageTemperatureUncertainty    8235082 non-null object
City                             8599212 non-null object
Country                          8599212 non-null object
Latitude                         8599212 non-null object
Longitude                        8599212 non-null object
dtypes: object(7)



**AverageTemperature**: Drop all missing values in 

In [27]:
kdf_tempratures.AverageTemperature.isnull().value_counts()

False    8235082
True      364130
Name: AverageTemperature, dtype: int64

Data quality check:

In [28]:
kdf_tempratures.dropna(subset=['AverageTemperature'], inplace= True)
kdf_tempratures.AverageTemperature.isnull().value_counts()

False    8235082
Name: AverageTemperature, dtype: int64

**i94yr** : The year should be an integer.

In [29]:
kdf.i94yr.dtype

dtype('float64')

In [30]:
kdf.i94yr.astype('int64').dtype

dtype('int64')

Data quality check:

In [31]:
kdf.i94yr = kdf.i94yr.astype('int64')
kdf.i94yr.head()

0    2016
1    2016
2    2016
3    2016
4    2016
Name: i94yr, dtype: int64

In [32]:
kdf.info()



databricks.koalas.frame.DataFrame
Index: 3096313 entries, 0 to 3096312
Data columns (total 28 columns):
cicid       3096313 non-null float64
i94yr       3096313 non-null float64
i94mon      3096313 non-null float64
i94cit      3096313 non-null float64
i94res      3096313 non-null float64
i94port     3096313 non-null object
arrdate     3096313 non-null float64
i94mode     3096074 non-null float64
i94addr     2943721 non-null object
depdate     2953856 non-null float64
i94bir      3095511 non-null float64
i94visa     3096313 non-null float64
count       3096313 non-null float64
dtadfile    3096312 non-null object
visapost    1215063 non-null object
occup       8126 non-null object
entdepa     3096075 non-null object
entdepd     2957884 non-null object
entdepu     392 non-null object
matflag     2957884 non-null object
biryear     3095511 non-null float64
dtaddto     3095836 non-null object
gender      2682044 non-null object
insnum      113708 non-null object
airline     3012686 non-null

**i94port** : has XXX which means nothing and if there any missing value.

In [33]:
kdf.i94port.value_counts()

NYC    485916
MIA    343941
LOS    310163
SFR    152586
ORL    149195
HHW    142720
NEW    136122
CHI    130564
HOU    101481
FTL     95977
ATL     92579
LVG     89280
AGA     80919
WAS     74835
DAL     71809
BOS     57354
SEA     47719
PHO     38890
DET     37832
TAM     25632
PHI     24973
DUB     24371
SAI     23628
TOR     20886
DEN     18260
MAA     18151
PSP     18117
FMY     17514
SPM     16973
CLT     16228
OGG     13259
NAS     13032
VCV     12706
BLA     11087
SDP     10944
SFB     10159
SAJ      9144
WPB      9093
SNA      7066
NIA      6102
MON      6006
HAM      5207
CHM      5197
NCA      5197
SLC      5118
POO      5108
SNJ      4490
NOL      4409
KOA      4042
PBB      4035
SRQ      3922
XXX      3522
OAK      3501
BAL      3476
CLG      3191
AUS      3034
SHA      3007
SYS      2874
LEW      2425
X96      2378
LIH      2292
SAC      2201
RDU      2112
OTM      1857
YGF      1763
STT      1741
CIN      1722
PHU      1653
SAA      1625
HIG      1594
PEV      1083
BUF   

There are **3522** records have `XXX` value.

In [34]:
kdf = kdf[kdf.i94port != 'XXX']
kdf.i94port.value_counts()

NYC    485916
MIA    343941
LOS    310163
SFR    152586
ORL    149195
HHW    142720
NEW    136122
CHI    130564
HOU    101481
FTL     95977
ATL     92579
LVG     89280
AGA     80919
WAS     74835
DAL     71809
BOS     57354
SEA     47719
PHO     38890
DET     37832
TAM     25632
PHI     24973
DUB     24371
SAI     23628
TOR     20886
DEN     18260
MAA     18151
PSP     18117
FMY     17514
SPM     16973
CLT     16228
OGG     13259
NAS     13032
VCV     12706
BLA     11087
SDP     10944
SFB     10159
SAJ      9144
WPB      9093
SNA      7066
NIA      6102
MON      6006
HAM      5207
CHM      5197
NCA      5197
SLC      5118
POO      5108
SNJ      4490
NOL      4409
KOA      4042
PBB      4035
SRQ      3922
OAK      3501
BAL      3476
CLG      3191
AUS      3034
SHA      3007
SYS      2874
LEW      2425
X96      2378
LIH      2292
SAC      2201
RDU      2112
OTM      1857
YGF      1763
STT      1741
CIN      1722
PHU      1653
SAA      1625
HIG      1594
PEV      1083
BUF      1040
NOR   

**count** : all the values of this column has '1' which is not useful, so should be removed.

In [35]:
kdf['count'].value_counts()

1.0    3092791
Name: count, dtype: int64

In [36]:
kdf = kdf.drop('count', axis=1)

Data quality check:

In [37]:
kdf.info()



databricks.koalas.frame.DataFrame
Index: 3092791 entries, 1 to 3096312
Data columns (total 27 columns):
cicid       3092791 non-null float64
i94yr       3092791 non-null float64
i94mon      3092791 non-null float64
i94cit      3092791 non-null float64
i94res      3092791 non-null float64
i94port     3092791 non-null object
arrdate     3092791 non-null float64
i94mode     3092769 non-null float64
i94addr     2941769 non-null object
depdate     2952202 non-null float64
i94bir      3092016 non-null float64
i94visa     3092791 non-null float64
dtadfile    3092791 non-null object
visapost    1214449 non-null object
occup       8122 non-null object
entdepa     3092769 non-null object
entdepd     2955643 non-null object
entdepu     388 non-null object
matflag     2955643 non-null object
biryear     3092016 non-null float64
dtaddto     3092532 non-null object
gender      2678739 non-null object
insnum      111671 non-null object
airline     3011660 non-null object
admnum      3092791 non-null 

 **depdate** and **arrdate** : will be converted to YYYY-MM-DD format.

Dealing with `depdate` column with **pandas** by this way:<br>
df_copy = df.depdate<br>
df_copy = pd.to_timedelta(df_copy, unit='D')  + pd.datetime(1960, 1, 1)<br>
df_copy.head()

In [38]:
kdf_copy = kdf
kdf_copy.arrdate.apply(lambda sas_date: pd.to_timedelta(sas_date,unit='D') + pd.datetime(1960, 1, 1))

1      2016-04-07
2      2016-04-01
3      2016-04-01
4      2016-04-01
5      2016-04-01
6      2016-04-01
7      2016-04-01
8      2016-04-01
9      2016-04-01
10     2016-04-01
11     2016-04-01
12     2016-04-01
13     2016-04-01
14     2016-04-01
15     2016-04-01
16     2016-04-01
17     2016-04-01
18     2016-04-01
19     2016-04-01
20     2016-04-01
21     2016-04-01
22     2016-04-01
23     2016-04-01
24     2016-04-01
25     2016-04-01
26     2016-04-01
27     2016-04-01
28     2016-04-01
29     2016-04-01
30     2016-04-01
31     2016-04-01
32     2016-04-01
33     2016-04-01
34     2016-04-01
35     2016-04-01
36     2016-04-01
37     2016-04-01
38     2016-04-01
39     2016-04-01
40     2016-04-01
41     2016-04-01
42     2016-04-01
43     2016-04-01
44     2016-04-01
45     2016-04-01
46     2016-04-01
47     2016-04-01
48     2016-04-01
49     2016-04-01
50     2016-04-01
51     2016-04-01
52     2016-04-01
53     2016-04-01
54     2016-04-01
55     2016-04-01
56     201

In [39]:
kdf.depdate.head()

1        NaN
2    20691.0
3    20567.0
4    20567.0
5    20555.0
Name: depdate, dtype: float64

df_copy = df.depdate
df_copy = pd.to_timedelta(df_copy, unit='D') + pd.datetime(1960, 1, 1)
df_copy.head()

kdf.depdate = df_copy
kdf.depdate.head()

In [40]:
kdf.depdate = kdf.depdate.apply(lambda sas_date: pd.to_timedelta(sas_date,unit='D') + pd.datetime(1960, 1, 1))
kdf.depdate.head()

1          NaT
2   2016-08-25
3   2016-04-23
4   2016-04-23
5   2016-04-11
Name: depdate, dtype: datetime64[ns]

arrdate

In [41]:
kdf.arrdate.head()

1    20551.0
2    20545.0
3    20545.0
4    20545.0
5    20545.0
Name: arrdate, dtype: float64

df_copy = df.arrdate
df_copy = pd.to_timedelta(df_copy, unit='D') + pd.datetime(1960, 1, 1)
kdf.arrdate = df_copy
kdf.arrdate.head()

In [42]:
kdf.arrdate = kdf.arrdate.apply(lambda sas_date: pd.to_timedelta(sas_date,unit='D') + pd.datetime(1960, 1, 1))
kdf.arrdate.head()

1   2016-04-07
2   2016-04-01
3   2016-04-01
4   2016-04-01
5   2016-04-01
Name: arrdate, dtype: datetime64[ns]

Data quality check:

In [43]:
kdf.info()



databricks.koalas.frame.DataFrame
Index: 3092791 entries, 1 to 3096312
Data columns (total 27 columns):
cicid       3092791 non-null float64
i94yr       3092791 non-null float64
i94mon      3092791 non-null float64
i94cit      3092791 non-null float64
i94res      3092791 non-null float64
i94port     3092791 non-null object
arrdate     3092791 non-null float64
i94mode     3092769 non-null float64
i94addr     2941769 non-null object
depdate     2952202 non-null float64
i94bir      3092016 non-null float64
i94visa     3092791 non-null float64
dtadfile    3092791 non-null object
visapost    1214449 non-null object
occup       8122 non-null object
entdepa     3092769 non-null object
entdepd     2955643 non-null object
entdepu     388 non-null object
matflag     2955643 non-null object
biryear     3092016 non-null float64
dtaddto     3092532 non-null object
gender      2678739 non-null object
insnum      111671 non-null object
airline     3011660 non-null object
admnum      3092791 non-null 

In [44]:
# Performing cleaning tasks here





### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

Fact Table `fact_table`
Columns:

i94yr = 4 digit year.<br>
i94mon = numeric month.<br>
i94cit = 3 digit code of origin city.<br>
i94port = 3 character code of destination USA city.<br>
i94mode = 1 digit travel code.<br>
i94visa = reason for immigration.<br>
AverageTemperature = average temperature of destination city.<br>



Dimension Table `i94_table` - I94 immigration data columns:

i94yr = 4 digit year<br>
i94mon = numeric month<br>
i94cit = 3 digit code of origin city<br>
i94port = 3 character code of destination USA city<br>
arrdate = arrival date in the USA<br>
i94mode = 1 digit travel code<br>
depdate = departure date from the USA<br>
i94visa = reason for immigration<br>


Dimension Table `temp_table` - temperature data columns:

AverageTemperature = average temperature<br>
City = city name<br>
Country = country name<br>
Latitude= latitude<br>
Longitude = longitude<br>

Dimension Table `time_table` -  any columns related to time.
<br>columns:

i94yr = 4 digit year<br>
i94mon = numeric month<br>
i94port = 3 character code of destination USA city<br>
arrdate = arrival date in the USA<br>
depdate = departure date from the USA<br>

#### The reason of choosing this data model:
* Star schema
<br>I choose this model because it is popular and familiar in the market.
<br> It is one of the simplest schema to be implemented.
<br> It is a simple to read and to understand the schema.
<br> Easy to add some tables.
<br> Denormalization, fast aggregation and simplify queries.

# Trying more efficient reading way.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

#### Creating tables `i94_table`, `temp_table`, `time_table` `fact_table`

In [45]:
# Write code here

In [46]:
kdf_spark = kdf.to_spark()

In [47]:
kdf_tempratures_spark = kdf_tempratures.to_spark()

ETL process...

In [48]:
kdf_spark.createOrReplaceTempView('i94_table_view')

In [49]:
kdf_tempratures_spark.createOrReplaceTempView('temp_table_view')

In [50]:
#Dimension table.
temp_table = kdf_tempratures_spark.select(["AverageTemperature", "City", "Country", "Latitude", "Longitude"])

In [51]:
#Dimension table.
i94_table = kdf_spark.select(["i94yr","i94mon","i94cit","i94port","arrdate","i94mode","depdate","i94visa"])

In [52]:
#Dimension table.
time_table = kdf_spark.select(["i94yr","i94mon","i94port","arrdate","depdate"])

In [53]:
#Create the fact table.
fact_table = (spark.sql('''
                    SELECT 
                    DISTINCT
                    i94_table_view.i94yr as year,
                    i94_table_view.i94mon as month,
                    i94_table_view.i94cit as city,
                    i94_table_view.i94port as port,
                    i94_table_view.i94mode as mode,
                    i94_table_view.i94visa as visa,
                    temp_table_view.AverageTemperature as temperature
                    FROM i94_table_view,
                    temp_table_view
                     '''))

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

Regarding to cleaning step I did many Data Quality Checks after every cleaning step.

Checking tables `i94_table`, `temp_table`, `time_table` `fact_table`

In [54]:
i94_table.show()

+------+------+------+-------+-------+-------+-------+-------+
| i94yr|i94mon|i94cit|i94port|arrdate|i94mode|depdate|i94visa|
+------+------+------+-------+-------+-------+-------+-------+
|2016.0|   4.0| 254.0|    ATL|20551.0|    1.0|   null|    3.0|
|2016.0|   4.0| 101.0|    WAS|20545.0|    1.0|20691.0|    2.0|
|2016.0|   4.0| 101.0|    NYC|20545.0|    1.0|20567.0|    2.0|
|2016.0|   4.0| 101.0|    NYC|20545.0|    1.0|20567.0|    2.0|
|2016.0|   4.0| 101.0|    NYC|20545.0|    1.0|20555.0|    1.0|
|2016.0|   4.0| 101.0|    NYC|20545.0|    1.0|20558.0|    2.0|
|2016.0|   4.0| 101.0|    NYC|20545.0|    1.0|20558.0|    2.0|
|2016.0|   4.0| 101.0|    NYC|20545.0|    1.0|20553.0|    2.0|
|2016.0|   4.0| 101.0|    NYC|20545.0|    1.0|20562.0|    1.0|
|2016.0|   4.0| 101.0|    NYC|20545.0|    1.0|20671.0|    2.0|
|2016.0|   4.0| 101.0|    TOR|20545.0|    1.0|20554.0|    2.0|
|2016.0|   4.0| 101.0|    BOS|20545.0|    1.0|20549.0|    1.0|
|2016.0|   4.0| 101.0|    ATL|20545.0|    1.0|20549.0| 

Checking are row counts same as the source. 

 `i94_table`

In [55]:
checking_tables = lambda x: True if x == kdf.shape[0] else False

In [56]:
num = i94_table.count()
checking_tables(num)

True

In [57]:
i94_table.count()

3092791

In [58]:
kdf.shape[0]

3092791

 `time_table`

In [59]:
checking_tables(time_table.count())

True

 `temp_table`

In [60]:
checking_temp_table = lambda x: True if x == kdf_tempratures.shape[0] else False

In [61]:
checking_temp_table(temp_table.count())

True

In [62]:
temp_table.show

<bound method DataFrame.show of DataFrame[AverageTemperature: string, City: string, Country: string, Latitude: string, Longitude: string]>

In [63]:
fact_table.show

<bound method DataFrame.show of DataFrame[year: double, month: double, city: double, port: string, mode: double, visa: double, temperature: string]>

In [64]:
i94_table.show

<bound method DataFrame.show of DataFrame[i94yr: double, i94mon: double, i94cit: double, i94port: string, arrdate: double, i94mode: double, depdate: double, i94visa: double]>

In [65]:
time_table.show

<bound method DataFrame.show of DataFrame[i94yr: double, i94mon: double, i94port: string, arrdate: double, depdate: double]>

# I executed all the cells before this cell and they were executed successfully.
Some cells throw **WARNING**, it is from the engine itself so that is not an error. If the execution of all cells doesn't work well.. I could provide something to prove that all cells have been executed well without any error.

This is the warning when I run `kdf.info()`:
<br>/opt/conda/lib/python3.6/site-packages/databricks/koalas/generic.py:406: FutureWarning: `get_dtype_counts` has been deprecated and will be removed in a future version. For DataFrames use `.dtypes.value_counts()
  FutureWarning,

If `kdf.info()` is not executed well, kindly make the cell `markdown` or do not run it.
<br> I use this `kdf.info()` to show how many nulls in the data, names of the columns and the datatype of each column.

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

* Has been provided in seperate file called `DataDictionary`

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

* The data was increased by 100x.<br>
I used Apache Spark to do all the processing data and create the model. The reason for this is because Spark can scale a lot of data and the library spark.sql has many tools to transform data. So, it is not problem if the data is increased while using Spark.

* The data populates a dashboard that must be updated on a daily basis by 7am every day.<br>
Use Airflow which is atuomated, has a dashboard feature and supports alerts (sending emails if a falure happens).

* The database needed to be accessed by 100+ people.<br>
The more people accessing the database the more CPU resources you need to get a fast experience. By using a distributed database you can improve your replications and partitioning to get faster query results for each user.

#### Tools of the projects:
As I talked in the first part of the capstone that I used 3 tools:
* Koalas
* Pandas
* Pyspark
these are my favourite tools that I'm familiar to deal Data with.
Otherwise, after many execution and data wrangling in this project I dicovered to cancel **pandas** due to execution time.
<br>For example, let's see time execution of gathering SAS Data (has `3096313` records) .
* `Koalas` : CPU times: user 464 ms, sys: 632 ms, total: 1.1 s, Wall time: 3min 3s.
* `Pandas` : CPU times: user 2min 21s, sys: 40.7 s, total: 3min 2s, Wall time: 21min 11s.
* `Spark` : CPU times: user 58.2 ms, sys: 3.19 ms, total: 61.4 ms, Wall time: 16.5 s.

Additionally, In Exploratory and Wrangling.
<br> I did these steps by `Koalas` because I prefer using Pandas and I'm used to deal with Data by Pandas.
<br> So, almost the whole project has been done by **Koalas** because has optimization in gathering and simplicity.