# Project Title
### Data Engineering Capstone Project

#### Project Summary
--describe your project at a high level--

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import date_add, col, udf

from datetime import datetime, timedelta
from pyspark.sql import types as T
import pyspark.sql.functions as f


from functools import reduce
from pyspark.sql import DataFrame
from pyspark.sql.functions import create_map, lit
from itertools import chain
pd.options.display.max_columns = None
pd.options.display.max_rows = None
from pyspark.sql.functions import year,desc,asc,to_date
from pyspark.sql.functions import col, avg, year
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType
from pyspark.sql.types import DoubleType
from pyspark.sql import Row
import pandas as pd

from pyspark.sql import SparkSession
from pyspark.sql.functions import date_add, col, udf

from datetime import datetime, timedelta
from pyspark.sql import types as T


### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

In [2]:
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

#sas_filenames = [os.path.join('../../data/18-83510-I94-Data-2016', fn) for fn in os.listdir('../../data/18-83510-I94-Data-2016')]
sas_filenames = ["/tmp/i94_apr16_sub.sas7bdat"]
dfs = []

i94_headers = ['cicid', 'i94yr', 'i94mon', 'i94cit', 'i94res', 'i94port', 'arrdate',
       'i94mode', 'i94addr', 'depdate', 'i94bir', 'i94visa', 'count',
       'visapost', 'occup', 'entdepa', 'entdepd', 'entdepu',
       'matflag', 'biryear', 'dtaddto', 'gender', 'insnum', 'airline',
       'admnum', 'fltno', 'visatype']


def convert_datetime(x):
    try:
        start = datetime(1960, 1, 1)
        return start + timedelta(days=int(x))
    except:
        return None
    
udf_datetime_from_sas = udf(lambda x: convert_datetime(x), T.DateType())


for f in sas_filenames:
    print(f)
    "    #df_spark.unionAll(spark.read.format('com.github.saurfang.sas.spark').load(f.select('cicid')))\n",
    "    #df_spark =spark.read.format('com.github.saurfang.sas.spark').load(f)\n",
    "    #df_spark.show()\n",
    df = spark.read.format('com.github.saurfang.sas.spark') \
               .load(f) \
               .select(*i94_headers) \
               .withColumn("arrdate", udf_datetime_from_sas("arrdate"))\
               .withColumn("depdate", udf_datetime_from_sas("depdate"))
    dfs.append(df)
    "    #df1 = df.select(*headers)\n",
    print(f'the count is {df.count()}')
    "    #df_spark.union(df1)\n",
df_complete = reduce(DataFrame.unionAll, dfs)
print(f'df_complete.count() total is {df_complete.count()}')







/tmp/i94_apr16_sub.sas7bdat
the count is 3096313
df_complete.count() total is 3096313


In [3]:
#Enhance
countryNames = spark.read.option("header",True) \
       .csv("I94CIT_I94RES.csv")
df_complete = df_complete.withColumn('rough_age', year(df_complete['arrdate']) - df_complete['biryear'])
df_complete = df_complete.join(countryNames, on=['i94res'], how='left')




In [4]:
#Enhance
csvs = spark.read.option("header",True) \
       .csv("visa_cats.csv")
csvs.printSchema()
dffin = csvs.join(df_complete, on=['visatype'], how='inner')

dffin.head()

root
 |-- visatype: string (nullable = true)
 |-- visagroup: string (nullable = true)
 |-- visacategory: string (nullable = true)



Row(visatype='B2', visagroup='Temporary visitors for pleasure', visacategory='Temporary workers and trainees', i94res=692.0, cicid=6.0, i94yr=2016.0, i94mon=4.0, i94cit=692.0, i94port='XXX', arrdate=datetime.date(2016, 4, 29), i94mode=None, i94addr=None, depdate=None, i94bir=37.0, i94visa=2.0, count=1.0, visapost=None, occup=None, entdepa='T', entdepd=None, entdepu='U', matflag=None, biryear=1979.0, dtaddto='10282016', gender=None, insnum=None, airline=None, admnum=1897628485.0, fltno=None, rough_age=37.0, country_name='ECUADOR')

In [6]:
import pyspark.sql.functions as f

countryTemps = spark.read.option("header",True) \
       .csv("GlobalLandTemperaturesByCountry.csv") \
       .withColumn("dt", to_date(col('dt'), 'yyyy-MM-dd')) \
       .withColumn('years',f.year(col('dt')))

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

In [4]:
# Performing cleaning tasks here
df_spark.head()
df_spark.select(["visatype"]).distinct().show()
#For every visa type I added which visagroup and which visacategory it belongs to according to the
#https://www.dhs.gov/immigration-statistics/nonimmigrant/NonimmigrantCOA website. The result is visa_cats.csv
#CPL,CP,SBP are missing definitions from that website. I sent a message to DHS via their website, https://www.dhs.gov/, asking what they
#stand for. Are they amalgamations of existing categories? I never received a reply.



+--------+
|visatype|
+--------+
|      F2|
|     GMB|
|      B2|
|      F1|
|     CPL|
|      I1|
|      WB|
|      M1|
|      B1|
|      WT|
|      M2|
|      CP|
|     GMT|
|      E1|
|       I|
|      E2|
|     SBP|
+--------+



In [None]:
#Claudia C. Wolfe of the NTTO sent me a list of values for the i94 Data - see 835-10_Variables_Included_in_I94_Data.csv. According to that
#spreadsheet dtadfile is an 'Unknown variable'. Accordingly I haven't included it in the list of i94_headers


In [5]:
df_spark.select(["occup"]).distinct().show()
#I sent an email to TTO@trade.gov asking what these values mean. Claudia C. Wolfe wrote "I regret that we do not have access to a dictionary that provides an explanation
#of the codes used to describe the occupation performed in the United States" See NTTO_Email.txt

+-----+
|occup|
+-----+
|  PHA|
|  ENT|
|  REL|
|  ACH|
|  101|
|  EMM|
|  ULS|
|  GEN|
|  DVM|
|  MTH|
|  SVC|
|  ECH|
|  EXA|
|  ENV|
|  ENP|
|  PRF|
|  VOC|
| null|
|  AST|
|  TCH|
+-----+
only showing top 20 rows



In [31]:
#There are 41 people who are 100 years old or greater. How can people that old immigrate to the U.S.? Or is there an error somewhere?

from pyspark.sql.functions import year,desc,asc

age = df_spark.withColumn('rough_age', year(df_spark['arrdate']) - df_spark['biryear'])
age.filter(age.rough_age.isNotNull()).orderBy(asc("biryear")).where(age.rough_age >= 100).count()

41

In [32]:
#There is a 114 year old, 111 year old, and a 110 year old. How can people that old immigrate to the U.S.? Or is there an error somewhere?

from pyspark.sql.functions import year,desc,asc

age = df_spark.withColumn('rough_age', year(df_spark['arrdate']) - df_spark['biryear'])
age.filter(age.rough_age.isNotNull()).orderBy(asc("biryear")).show()

+---------+------+------+------+------+-------+----------+-------+-------+----------+------+-------+-----+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+---------------+-----+--------+---------+
|    cicid| i94yr|i94mon|i94cit|i94res|i94port|   arrdate|i94mode|i94addr|   depdate|i94bir|i94visa|count|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|         admnum|fltno|visatype|rough_age|
+---------+------+------+------+------+-------+----------+-------+-------+----------+------+-------+-----+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+---------------+-----+--------+---------+
|5956548.0|2016.0|   4.0| 254.0| 276.0|    AGA|2016-04-08|    1.0|     GU|      null| 114.0|    2.0|  1.0|    null| null|      A|   null|   null|   null| 1902.0|05232016|     M|  3693|     TW|4.8501050733E10|00301|     GMT|    114.0|
|5981723.0|2016.0|   4.0| 254.0| 276.0|    AGA|2016-04-21|    1.

In [34]:
from pyspark.sql.functions import to_date,desc,asc

#age.filter(age.rough_age.isNotNull()).orderBy(asc("rough_age")).show(3000)
age.filter(age.rough_age.isNotNull()).orderBy(desc("biryear")).show(100)

+---------+------+------+------+------+-------+----------+-------+-------+----------+------+-------+-----+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+---------------+-----+--------+---------+
|    cicid| i94yr|i94mon|i94cit|i94res|i94port|   arrdate|i94mode|i94addr|   depdate|i94bir|i94visa|count|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|         admnum|fltno|visatype|rough_age|
+---------+------+------+------+------+-------+----------+-------+-------+----------+------+-------+-----+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+---------------+-----+--------+---------+
|5952559.0|2016.0|   4.0| 252.0| 209.0|    AGA|2016-04-10|    1.0|   null|      null|  -3.0|    2.0|  1.0|    null| null|      A|   null|   null|   null| 2019.0|05242016|     M|  null|   null|5.7545310233E10|00001|     GMT|     -3.0|
|1051118.0|2016.0|   4.0| 438.0| 438.0|    HHW|2016-04-06|    1.

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [7]:
frst_gen = {"frst_gen": list(range(2010,1980,-1))}
scnd_gen = {"scnd_gen": list(range(1980,1950,-1))}
thrd_gen = {"third_gen": list(range(1950,1920,-1))}
frth_gen = {"fourth_gen": list(range(1920,1890,-1))}
#fith_gen = {"fifth_gen": list(range(1890,1860,-1))}
#sxth_gen = {"sixth_gen": list(range(1860,1830,-1))}
avg_generation_lst = []

generations = [frst_gen,scnd_gen,thrd_gen,frth_gen]#,fith_gen,sxth_gen]

countryGenSchema = StructType([
            StructField("Country", StringType(), True)
])

avgCountryGens = spark.createDataFrame([], countryGenSchema)

for g in generations:
    row_by_generation = countryTemps.filter(col('years').isin(list(g.values())[0])).groupBy("Country","years").agg(f.avg('AverageTemperature').alias(list(g.keys())[0]))
    avg_generation = row_by_generation.groupBy("Country").agg(f.avg(list(g.keys())[0]))
    avg_generation_lst.append(avg_generation.join(avgCountryGens, on=['Country'], how='left'))
    
frst = avg_generation_lst[0]
scnd = avg_generation_lst[1]
thrd = avg_generation_lst[2]
fourth = avg_generation_lst[3]
#fifth = avg_generation_lst[4]
#sixth = avg_generation_lst[5]

three_cols = frst.join(scnd, on=['Country'], how='left')
four_cols = three_cols.join(thrd, on=['Country'], how='left')
five_cols = four_cols.join(fourth, on=['Country'], how='left')
#six_cols = five_cols.join(fifth, on=['Country'], how='left')
#seven_cols = six_cols.join(sixth, on=['Country'], how='left')
five_cols.show()

sub = five_cols.withColumn('DiffGen1Gen2', five_cols['avg(frst_gen)'] - five_cols['avg(scnd_gen)']) \
                .withColumn('DiffGen2Gen3', five_cols['avg(scnd_gen)'] - five_cols['avg(third_gen)']) \
                .withColumn('DiffGen3Gen4', five_cols['avg(third_gen)'] - five_cols['avg(fourth_gen)'])
#                .withColumn('DiffGen4Gen5', seven_cols['avg(fourth_gen)'] - seven_cols['avg(fifth_gen)']) \
#                .withColumn('DiffGen5Gen6', seven_cols['avg(fifth_gen)'] - seven_cols['avg(sixth_gen)'])
changes_in_temp_by_gen = sub.select(col("Country").alias("country_name"),"DiffGen1Gen2","DiffGen2Gen3","DiffGen3Gen4")
changes_in_temp_by_gen = changes_in_temp_by_gen.withColumn("country_name",f.lower(f.col("country_name")))
#data = data.select(col("Name").alias("name")
#.alias("name")
#changes_in_temp_by_gen = sub.select("Country","DiffGen1Gen2","DiffGen2Gen3","DiffGen3Gen4")#,"DiffGen4Gen5","DiffGen5Gen6")
print('$$$$$$$$$$$$$$$$$$$$$$$$$$$$$')
changes_in_temp_by_gen.show()

+-------------+------------------+------------------+------------------+------------------+
|      Country|     avg(frst_gen)|     avg(scnd_gen)|    avg(third_gen)|   avg(fourth_gen)|
+-------------+------------------+------------------+------------------+------------------+
|         Chad| 27.64079166666667|27.176663888888886| 27.25101666666666| 26.76893611111112|
|     Anguilla|27.280758333333335|26.827502777777777|26.597055555555553| 26.32713055555556|
| Kingman Reef|27.690716666666667| 27.18835555555555|          27.08275|26.688200000000002|
|     Paraguay|23.838030555555555|23.465005555555557|23.344572222222226|23.083288888888887|
|       Russia| -4.45961388888889|-5.434174999999999|-5.303036111111112| -5.83281111111111|
|Palmyra Atoll|27.721394444444435|27.218327777777784|27.113436111111113|26.718544444444447|
|        Yemen|26.772916666666667|26.275844444444445|26.134586111111116|25.906255555555553|
|      Senegal|28.617455555555555|28.034163888888887|27.976936111111115|27.74771

In [13]:
#btw = df_complete.filter(df_complete.rough_age.isNotNull()).where((df_complete.biryear >= 1980)).groupby(df_complete.country_name).count().orderBy(desc("count"))
btw = df_complete.filter(df_complete.rough_age.isNotNull()).where((df_complete.biryear >= 1980)).groupby(df_complete.country_name).count().orderBy(desc("count"))

frst_gen = {"frst_gen": list(range(2010,1980,-1))}
scnd_gen = {"scnd_gen": list(range(1980,1950,-1))}
thrd_gen = {"third_gen": list(range(1950,1920,-1))}
frth_gen = {"fourth_gen": list(range(1920,1890,-1))}
#fith_gen = {"fifth_gen": list(range(1890,1860,-1))}
#sxth_gen = {"sixth_gen": list(range(1860,1830,-1))}
avg_generation_lst = []

generations = [frst_gen,scnd_gen,thrd_gen,frth_gen]#,fith_gen,sxth_gen]

countryGenSchema = StructType([
            StructField("country_name", StringType(), True)
])

avgCountryGens = spark.createDataFrame([], countryGenSchema)

c_name = df_complete.select('country_name').distinct()

avgCountryGens = c_name.join(avgCountryGens, on=['country_name'], how='left')
#avgCountryGens.join(c_name, on=['country_name'], how='left')
avgCountryGens.show(10)
print('$$$$$$$$$$$$$$$$$$$$$')
#avgCountryGens['country_name'] = df_spark['country_name']

for g in generations:
    print(f'{g}')
    #row_by_generation = countryTemps.filter(col('years').isin(list(g.values())[0])).groupBy("Country","years").agg(f.avg('AverageTemperature').alias(list(g.keys())[0]))
    print("df_complete.head(1) == ")
    print(df_complete.head(1))
    print("btw end ")
    print("btw.printSchema() is ")
    btw.printSchema()
    print("df_spark.printSchema() is ")
    df_complete.printSchema()
    
    #btw = df_complete.filter(df_complete.rough_age.isNotNull()).filter(col('biryear').isin(list(g.values())[0])).groupby(df_complete.country_name).agg(f.count('biryear').alias(list(g.keys())[0])).orderBy(desc(list(g.keys())[0]))
    #btw = df_complete.filter(df_complete.rough_age.isNotNull()).filter(col('biryear').isin(list(g.values())[0])).groupby(df_complete.country_name).agg(f.count('biryear'))
    
    btw_temp = df_complete.filter(df_complete.rough_age.isNotNull()).filter(col('biryear').isin(list(g.values())[0]))
    btw = btw_temp.groupby(btw_temp.country_name).agg(f.count('biryear').alias(list(g.keys())[0])).orderBy(desc(list(g.keys())[0]))
    
    
    print("btw show ")
    btw.show(10)
    print("btw end ")
    #avg_generation_lst.append(avg_generation.join(avgCountryGens, on=['Country'], how='left'))
    #avg_generation_lst.append(btw.join(avgCountryGens, on=['country_name'], how='inner'))
    avgCountryGens = btw.join(avgCountryGens, on=['country_name'], how='left')
    print("YYYYYYYYYYYYYYYYy")
    avgCountryGens.orderBy(asc("country_name")).show()

avgCountryGens = avgCountryGens.withColumn("country_name",f.lower(f.col("country_name")))
#dffin = countryNames.join(btw, on=['i94res'], how='left')
#between = age.filter(age.rough_age.isNotNull()).where((age.rough_age > 40) & (age.rough_age < 60)).orderBy(asc("biryear"))
#dffin.head(500)
print("THIS IS IT!!!!!")
avgCountryGens.show()

+-------------+
| country_name|
+-------------+
|      ARMENIA|
|      BAHAMAS|
| SOUTH AFRICA|
|  MONTENEGRO |
|        BURMA|
|   BANGLADESH|
|        JAPAN|
|       UGANDA|
|COCOS ISLANDS|
|   CAPE VERDE|
+-------------+
only showing top 10 rows

$$$$$$$$$$$$$$$$$$$$$
{'frst_gen': [2010, 2009, 2008, 2007, 2006, 2005, 2004, 2003, 2002, 2001, 2000, 1999, 1998, 1997, 1996, 1995, 1994, 1993, 1992, 1991, 1990, 1989, 1988, 1987, 1986, 1985, 1984, 1983, 1982, 1981]}
df_complete.head(1) == 
[Row(i94res=692.0, cicid=6.0, i94yr=2016.0, i94mon=4.0, i94cit=692.0, i94port='XXX', arrdate=datetime.date(2016, 4, 29), i94mode=None, i94addr=None, depdate=None, i94bir=37.0, i94visa=2.0, count=1.0, visapost=None, occup=None, entdepa='T', entdepd=None, entdepu='U', matflag=None, biryear=1979.0, dtaddto='10282016', gender=None, insnum=None, airline=None, admnum=1897628485.0, fltno=None, visatype='B2', rough_age=37.0, country_name='ECUADOR')]
btw end 
btw.printSchema() is 
root
 |-- country_name: string (

In [None]:
final = changes_in_temp_by_gen.join(avgCountryGens, on=['country_name'], how='inner')
print("YYYYYYYYYYYYYYYYy")
final.orderBy(desc("country_name")).distinct().show(500)
#changes_in_temp_by_gen.filter(col('country_name') == "Japan").show()
changes_in_temp_by_gen.select('country_name').orderBy(desc("country_name")).distinct().show()
print("YACK")
#avgCountryGens.filter(col('country_name') == "Japan").show()
avgCountryGens.select('country_name').orderBy(desc("country_name")).distinct().show()()
#changes_in_temp_by_gen.show()
#print("CCCCCCCCCCCCCCCCCCCC")
#avgCountryGens.show()
    

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [37]:
# Perform quality checks here
if (avgCountryGens.count() < 40):
    print("Quality check FAILED ... not enough countries")

bah = avgCountryGens.filter(avgCountryGens.country_name == "bahamas")

if((bah.select('frst_gen').collect()[0][0] != 4821) or (bah.select('scnd_gen').collect()[0][0] != 8507)):
    print("Quality check FAILED ... bahamas values don't match up")

In [None]:
#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.
 
country_name: lower case string of the name of the country. The field comes from joining the I94 Immigration Data Dataframes and World Temperature Data Dataframes
DiffGen1Gen2: The difference in average temperature between generation 1 and generation 2. Comes from averarging the temperatures in the World Temperature Data Dataframe outlined above.
DiffGen2Gen3: The difference in average temperature between generation 2 and generation 3. Comes from averarging the temperatures in the World Temperature Data Dataframe outlined above.
third_gen: The number of emigres from the third generation. The field comes from the I94 Immigration Data Dataframe described above
scnd_gen: The number of emigres from the second generation. The field comes from the I94 Immigration Data Dataframe described above
frst_gen: The number of emigres from the first generation. The field comes from the I94 Immigration Data Dataframe described above

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.