# Climate change and Immigration

#### Project Summary
Today, 1% of the world is a barely livable hot zone.By 2070, that portion could go up to 19%.Billions of people call this land home.
Where will they go? we will help the countries and governments to understand the bisc relation between the climate change and the immigration issue.

The project follows the follow steps:
    
<li><a href="#s1">Step 1: Scope the Project and Gather Data</a>
<li><a href="#s2">Step 2: Explore and Assess the Data</a>
<li><a href="#s3">Step 3: Define the Data Model</a>
<li><a href="#s4">Step 4: Run ETL to Model the Data</a>
<li><a href="#s5">Step 5: Complete Project Write Up</a>

## </h3> Step 1: Scope the Project and Gather Data</h3><a id="s1"></a>

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>
In this project we will use  some important datasets related to immigration process and the temperature with the world. We will use some usful data enginnering tools like data werehouses,AWS  S3,  Spark, Apache Airflow and AWS Warehouse like Redshif
#### Describe and Gather Data 

- 1- I94 Immigration dataset: This data comes from the US National Tourism and Trade Office. A data dictionary is included in the workspace.  [This](https://www.trade.gov/national-travel-and-tourism-office) is where the data comes from. 

- 2- World Temperature dataset: This dataset came from Kaggle. You can read more about it [here](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data).

In [1]:
#all imports and installs
import pandas as pd
import os
import configparser
from pyspark.sql import SparkSession

In [2]:
config = configparser.ConfigParser()
config.read('config.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

In [3]:
spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

df_spark = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')


In [4]:
#write to parquet
# df_spark.write.parquet("sas_data")
# df_spark=spark.read.parquet("sas_data")

### I94 Immigration dataset

In [5]:
fname = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat' 
df_immigration = spark.read.format('com.github.saurfang.sas.spark').load(fname)

In [6]:
# df_immigration.show(n=10)
df_immigration.limit(10).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2
5,18.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MI,20555.0,...,,M,1959.0,09302016,,,AZ,92471040000.0,602.0,B1
6,19.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,NJ,20558.0,...,,M,1953.0,09302016,,,AZ,92471400000.0,602.0,B2
7,20.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,NJ,20558.0,...,,M,1959.0,09302016,,,AZ,92471610000.0,602.0,B2
8,21.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,NY,20553.0,...,,M,1970.0,09302016,,,AZ,92470800000.0,602.0,B2
9,22.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,NY,20562.0,...,,M,1968.0,09302016,,,AZ,92478490000.0,608.0,B1


### World Temperature dataset

In [7]:
file_name = '../../data2/GlobalLandTemperaturesByCity.csv'
df_temperature = spark.read.csv(file_name, header=True, inferSchema=True)

In [8]:
df_temperature.show(n=10)

+-------------------+------------------+-----------------------------+-----+-------+--------+---------+
|                 dt|AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|
+-------------------+------------------+-----------------------------+-----+-------+--------+---------+
|1743-11-01 00:00:00|             6.068|           1.7369999999999999|Århus|Denmark|  57.05N|   10.33E|
|1743-12-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-01-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-02-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-03-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-04-01 00:00:00|5.7879999999999985|           3.6239999999999997|Århus|Denmark|  57.05N|   10.33E|
|1744-05-01 00:00:00|            10.644|           1.28300000000

## <h3> Step 2: Explore and Assess the Data </h3><a id="s2"></a>

- I94 Immigration dataset

In [9]:
df_immigration.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [10]:
df_immigration.count()

3096313

In [11]:
# drop duplicate rows
df_immigration_new = df_immigration.dropDuplicates(['cicid'])

In [12]:
df_immigration_new.count()

3096313

it seems that no duplicates

In [13]:
immigration_fname = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
immigration = pd.read_sas(immigration_fname, 'sas7bdat', encoding="ISO-8859-1")

In [14]:
immigration.isnull().sum()

cicid             0
i94yr             0
i94mon            0
i94cit            0
i94res            0
i94port           0
arrdate           0
i94mode         239
i94addr      152372
depdate      142457
i94bir          802
i94visa           0
count             0
dtadfile          1
visapost    1881250
occup       3088187
entdepa         238
entdepd      138429
entdepu     3095921
matflag      138429
biryear         802
dtaddto         477
gender       414269
insnum      2982605
airline       83627
admnum            0
fltno         19549
visatype          0
dtype: int64

we see that those columns (visapost ,insnum , occup, entdepu, ) have most of the missing values so we will remove them

In [15]:
cols = ['visapost', 'insnum','occup', 'entdepu',]
df_immigration_new = df_immigration_new.drop(*cols)

In [18]:
# Remove all missing values for the rest of the dataeset
df_immigration_new = df_immigration_new.dropna(how="any", subset=["i94port","airline" ,"i94addr", "gender","fltno"])

In [19]:
df_immigration_new.count()

2492563

In [20]:
immigration.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3096313 entries, 0 to 3096312
Data columns (total 28 columns):
cicid       float64
i94yr       float64
i94mon      float64
i94cit      float64
i94res      float64
i94port     object
arrdate     float64
i94mode     float64
i94addr     object
depdate     float64
i94bir      float64
i94visa     float64
count       float64
dtadfile    object
visapost    object
occup       object
entdepa     object
entdepd     object
entdepu     object
matflag     object
biryear     float64
dtaddto     object
gender      object
insnum      object
airline     object
admnum      float64
fltno       object
visatype    object
dtypes: float64(13), object(15)
memory usage: 661.4+ MB


we see that the data column arrdate is a float so we should convert it

In [21]:
# Convert date form SAS format to PySpark format
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

@udf(StringType())
def convert_datetime(x):
    if x:
        return (datetime(1960, 1, 1).date() + timedelta(x)).isoformat()
    return None

df_immigration_new = df_immigration_new.withColumn("arrdate", convert_datetime(df_immigration_new.arrdate))

- World Temperature dataset

In [30]:
df_temperature.printSchema()

root
 |-- dt: timestamp (nullable = true)
 |-- AverageTemperature: double (nullable = true)
 |-- AverageTemperatureUncertainty: double (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



In [31]:
df_temperature.count()

8599212

In [32]:
fname = '../../data2/GlobalLandTemperaturesByCity.csv'
temperature = pd.read_csv(fname, sep=',')
# check the missing values
temperature.isnull().sum()

dt                                    0
AverageTemperature               364130
AverageTemperatureUncertainty    364130
City                                  0
Country                               0
Latitude                              0
Longitude                             0
dtype: int64

In [33]:
# check the duplicates
temperature.duplicated().sum()

0

In [34]:
# Remove all missing values for the rest of the dataeset
df_temperature_new = df_temperature.dropna(how="any", subset=["AverageTemperature"])

In [35]:
df_temperature_new.count()

8235082

## <h3> Step 3: Define the Data Model </h3><a id="s3"></a>
#### 3.1 Conceptual Data Model
We will use the star schema data model because we could easily work on it by create and join tables.
the following image indicates the star schema diagram:
![](images/star_schema.png)

#### 3.2 Mapping Out Data Pipelines
the steps necessary to pipeline the data.

- 1- Upload the data into the S3 Redshift.
- 2- use Apache Airflow to perform the data pipelines.
- 3- Load Fact Operator
- 4- Data Quality check
- 5- End dummy operator

which will be found here [Airflow](https://r766466c839826xjupyterlnnfq3jud.udacity-student-workspaces.com/lab/tree/Airflow) folder

 ## <h3> Step 4: Run Pipelines to Model the Data  </h3><a id="s4"></a>
#### 4.1 Create the data model
we will find that here [Capstone Project](https://r766466c839826xjupyterlnnfq3jud.udacity-student-workspaces.com/lab/tree/Capstone%20Project.ipynb)

#### 4.2 Data Quality Checks
we will find that here [data_quality](https://r766466c839826xjupyterlnnfq3jud.udacity-student-workspaces.com/lab/tree/Airflow/plugins/operators/data_quality.py)

#### 4.3 Data dictionary 

- 1- I94 Immigration dataset: 
##### immigration
<table class="tg" align="left">
  <tr>
    <th class="tg-0pky">Column Name</th>
    <th class="tg-0pky">Description</th>
  </tr>
 <tr><td class="tg-0pky">CICI </td><td class="tg-0pky">Primary Key Unique ID</td></tr>
 <tr><td class="tg-0pky">i94yr</td><td class="tg-0pky"> year</td></tr>
 <tr><td class="tg-0pky">i94mon</td><td class="tg-0pky"> month</td></tr>
 <tr><td class="tg-0pky">i94port</td><td class="tg-0pky">Port of admission</td></tr>
 <tr><td class="tg-0pky">arrdate</td><td class="tg-0pky">Arrival Date</td></tr>
 <tr><td class="tg-0pky">i94mode</td><td class="tg-0pky">Mode of transportation</td></tr>
 <tr><td class="tg-0pky">i94addr</td><td class="tg-0pky">USA State of arrival</td></tr>
 <tr><td class="tg-0pky">depdate</td><td class="tg-0pky">Departure Date </td></tr>
 <tr><td class="tg-0pky">i94bir</td><td class="tg-0pky">Age of birth</td></tr>
 <tr><td class="tg-0pky">i94visa</td><td class="tg-0pky">Visa codes </td></tr>
 <tr><td class="tg-0pky">count</td><td class="tg-0pky">count</td></tr>
 <tr><td class="tg-0pky">dtadfile</td><td class="tg-0pky">Character string </td></tr>
 <tr><td class="tg-0pky">visapost</td><td class="tg-0pky">Department of State</td></tr>
 <tr><td class="tg-0pky">occup</td><td class="tg-0pky">Occupation </td></tr>
 <tr><td class="tg-0pky">entdepa</td><td class="tg-0pky">Arrival Flag </td></tr>
 <tr><td class="tg-0pky">entdepd</td><td class="tg-0pky">Departure Flag </td></tr>
 <tr><td class="tg-0pky">entdepu</td><td class="tg-0pky">Update Flag </td></tr>
 <tr><td class="tg-0pky">matflag</td><td class="tg-0pky">Match flag </td></tr>
 <tr><td class="tg-0pky">biryear</td><td class="tg-0pky">year of birth</td></tr>
 <tr><td class="tg-0pky">visatype</td><td class="tg-0pky">visa type</td></tr>
 <tr><td class="tg-0pky">dtaddto</td><td class="tg-0pky">how long stay</td></tr>
 <tr><td class="tg-0pky">gender</td><td class="tg-0pky">sex</td></tr>
 <tr><td class="tg-0pky">airline</td><td class="tg-0pky">airline</td></tr>
</table>
 


- 2- World Temperature dataset : Indicates Earth Surface Temperature Data, Exploring global temperatures since 1750.

##### temperature
<table class="tg" align="left">
  <tr>
    <th class="tg-0pky">Column Name</th>
    <th class="tg-0pky">Description</th>
  </tr>
 <tr><td class="tg-0pky">Code </td><td class="tg-0pky">Country Code</td></tr>
 <tr><td class="tg-0pky">Country</td><td class="tg-0pky"> Country Name</td></tr>
 <tr><td class="tg-0pky">AverageTemperature</td><td class="tg-0pky"> Temperature of the country between 1743 and 2013</td></tr>
 <tr><td class="tg-0pky">Latitude</td><td class="tg-0pky">GPS coordinates</td></tr>
 <tr><td class="tg-0pky">Longitude</td><td class="tg-0pky">GPS coordinates</td></tr>
</table>



## <h3> Step 5: Complete Project Write Up  </h3><a id="s5"></a>
### Tools and technologies for the project.
     
     - Spark
     - AWS S3
     - Apache Airflow 
     - AWS Warehouse like Redshift 


### How often the data should be updated and why.
     
     - Should be updated Monthly Should be updated Monthly because the data is formed monthly.
     
### How we will deal with those differently under the following scenarios:
#### The data was increased by 100x.
      - Use partitioning  of the data or use AWS EMR
#### The data populates a dashboard that must be updated on a daily basis by 7am every day.
      - We should use Apache Airflow  to updated dashboard daily.
#### The database needed to be accessed by 100+ people.
      - Use partitioning to give the ability to all users to access the data quickly