# Project Title
### Data Engineering Capstone Project

#### Project Summary
Collect data about immigrations to USA - cities, which are frequently used as well the immigrants.
In this project, I will collect data from different sources and build an ETL pipeline for a data lake hosted on S3. 
I will copy the data from S3 to staging tables and afterwards in analytics tables (in a Redshift-database)

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [10]:
# Do all imports and installs here
# System Imports
import configparser
import os
import logging
import boto3 
from botocore.exceptions import NoCredentialsError

#Pandas Import
import pandas as pd

#PySpark imports
from pyspark.sql import SparkSession
from pyspark.sql.types import DateType
from pyspark.sql.functions import lit, col, year, month, upper, to_date, udf
from pyspark.sql.functions import monotonically_increasing_id

from datetime import datetime, timedelta, date

### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>
My plan is to get details about the cities, where the immigrants go to and also about the immigrants itself.
It should be possible to identify most attractive cities as well get to know more about the immigrants.
Therefore I will use following data:
- I94 Immigration Data
- World Temperature Data
- U.S. City Demographic Data

I will use following tools to complete this project:
- this Jupyter notebook
- Spark for processing the data
- S3 to store the data
- a Redshift database to create and hold staging and analytics tables

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 
I used the provided datasets provided by Udacity:
- I94 Immigration Data 
    -> "../../data/18-83510-I94-Data-2016/i94_jun16_sub.sas7bdat"
    Because of performance reasons i used only one dataset
    -> "I94_SAS_Labels_Descriptions.SAS"
- World Temperature Data -> "GlobalLandTemperaturesByCity.csv"
- U.S. City Demographic Data -> "us-cities-demographics.csv"

In [18]:
# Read in the data here
# Reading in the AWS configuration
config = configparser.ConfigParser()
config.read('dl.cfg')
os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']
#SOURCE_S3_BUCKET = config['S3']['SOURCE_S3_BUCKET']
S3_BUCKET = config['S3']['S3_BUCKET']

# data sources
source_temp = '../../data2/GlobalLandTemperaturesByCity.csv'
source_demo = 'us-cities-demographics.csv'
source_air = 'airport-codes_csv.csv'
#source_img = '../../immigration/18-83510-I94-Data-2016/*.sas7bdat'
source_img = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
source_label = 'I94_SAS_Labels_Descriptions.SAS'

#target
output_data = 's3://fb-capstone'

#udf
#get_date = udf(lambda x: date.date.fromtimestamp(x / 1000).date)

# create spark session
spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

# get temperature data
df_temp = spark.read.csv(source_temp,header=True)

# get demo data
df_demo = spark.read.format('csv').options(header=True, delimiter=';').load(source_demo)

# get Label Descriptions
label_file = os.path.basename(source_label)
with open(label_file) as f:
       contents = f.readlines()

# get immigration data
img_data = os.path.join('' + source_img)
df_img_gen = spark.read.format('com.github.saurfang.sas.spark').load(img_data)


In [19]:

# Retrieve the list of existing buckets
s3 = boto3.client('s3')
response = s3.list_buckets()

# Output the bucket names

print('Existing buckets:')
for bucket in response['Buckets']:
    print(f'  {bucket["Name"]}')


Existing buckets:
  fb-capstone


### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

In [21]:
# Performing cleaning tasks here
# Clean temperature data
df_temp = df_temp.where(df_temp['Country'] == 'United States')
dim_temp = df_temp.select(['dt', 'AverageTemperature', 'AverageTemperatureUncertainty',\
                         'City', 'Country']).distinct()

dim_temp = dim_temp.withColumnRenamed("AverageTemperature","avg_temp")\
                .withColumnRenamed("AverageTemperatureUncertainty","avg_temp_unc")\
                .withColumnRenamed("City","city")\
                .withColumnRenamed("Country","country")                
dim_temp = dim_temp.withColumn('dt', to_date(col('dt')))
dim_temp.printSchema()

#dim_temp.createOrReplaceTempView("Dim_Temperature")
dim_temp.write.mode("overwrite").parquet(path=S3_BUCKET + 'dim_temperature')
#dim_temp.write.mode("overwrite").parquet(path="./output_new/temp/")

#uploaded = upload_to_aws("output_new/temp/part-00000-622f3535-5ea1-40a8-8b6f-3708f33edab4-c000.snappy.parquet", "fb-capstone", "dim_temp")

#for i in folder_file:
#    i = data_file
#    upload(data_file)


root
 |-- dt: date (nullable = true)
 |-- avg_temp: string (nullable = true)
 |-- avg_temp_unc: string (nullable = true)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)



Py4JJavaError: An error occurred while calling o690.parquet.
: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
	at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2195)
	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2654)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667)
	at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
	at org.apache.spark.sql.execution.datasources.DataSource.planForWritingFileFormat(DataSource.scala:424)
	at org.apache.spark.sql.execution.datasources.DataSource.planForWriting(DataSource.scala:524)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:290)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:566)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
	at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2101)
	at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2193)
	... 24 more


In [23]:
# clean demography data
dim_demo = df_demo.select(['City','State','Total Population','Median Age','Average Household Size']).distinct()
dim_demo = dim_demo.withColumnRenamed("City","city")\
                .withColumnRenamed("State","state")\
                .withColumnRenamed("Median Age","med_age")\
                .withColumnRenamed("Total Population","tot_pop")\
                .withColumnRenamed("Average Household Size","avg_household_size")                                             
#dim_demo.printSchema()
#dim_demo.write.mode("overwrite").parquet(path=S3_BUCKET + 'dim_demographics')

In [24]:
# clean immigration data
df_img = df_img_gen.select('cicid', 'i94yr', 'i94mon', 'i94port', 'i94addr',\
                                 'arrdate', 'depdate', 'i94mode', 'i94visa').distinct()\
                         .withColumn("immigration_id", monotonically_increasing_id())

df_img = df_img.withColumnRenamed("cicid","cic_id")\
            .withColumnRenamed("i94yr","year")\
            .withColumnRenamed("i94mon","month")\
            .withColumnRenamed("i94port","city_code")\
            .withColumnRenamed("i94addr","state_code")\
            .withColumnRenamed("arrdate","arrival_date")\
            .withColumnRenamed("depdate","departure_date")\
            .withColumnRenamed("i94mode","mode")\
            .withColumnRenamed("i94visa","visa")
            

df_img = df_img.withColumn('arrival_date', get_date(col('arrival_date')))
df_img = df_img.withColumn('departure_date', get_date(col('departure_date')))
    
#df_img.printSchema()

df_img.write.mode("overwrite").partitionBy('state_code')\
                    .parquet(S3_BUCKET + 'fact_immigration')



Py4JJavaError: An error occurred while calling o813.parquet.
: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
	at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2195)
	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2654)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667)
	at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
	at org.apache.spark.sql.execution.datasources.DataSource.planForWritingFileFormat(DataSource.scala:424)
	at org.apache.spark.sql.execution.datasources.DataSource.planForWriting(DataSource.scala:524)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:290)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:566)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
	at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2101)
	at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2193)
	... 24 more


In [25]:
# clean label data
country_code = {}
for countries in contents[10:298]:
        pair = countries.split('=')
        code, country = pair[0].strip(), pair[1].strip().strip("'")
        country_code[code] = country        

#spark.createDataFrame(country_code.items(), ['code', 'country'])\
#         .write.mode("overwrite")\
#         .parquet(path=output_data + 'country_code')
city_code = {}
for cities in contents[303:962]:
        pair = cities.split('=')
        code, city = pair[0].strip("\t").strip().strip("'"),\
                     pair[1].strip('\t').strip().strip("''")
        city_code[code] = city
#spark.createDataFrame(city_code.items(), ['code', 'city'])\
#         .write.mode("overwrite")\
#         .parquet(path=output_data + 'city_code')
state_code = {}
for states in contents[982:1036]:
        pair = states.split('=')
        code, state = pair[0].strip('\t').strip("'"), pair[1].strip().strip("'")
        state_code[code] = state

#spark.createDataFrame(state_code.items(), ['code', 'state'])\
#         .write.mode("overwrite")\
#         .parquet(path=output_data + 'state_code')

visa_code = {}
for codes in contents[1047:1049]:
    pair = codes.split('=')
    code, visa = pair[0].strip("\t").strip().strip("'"),\
                     pair[1].strip('\t').strip().strip("''")
    visa_code[code] = visa
#spark.createDataFrame(visa_code.items(), ['code', 'visa'])\
#         .write.mode("overwrite")\
#         .parquet(path=output_data + 'visa_code')


In [26]:
# clean personal data
df_pers = df_img_gen.select('cicid', 'i94cit', 'i94res',\
                                  'biryear', 'gender', 'insnum').distinct()
df_pers = df_pers.withColumn("personal_id", monotonically_increasing_id())

df_pers = df_pers.withColumnRenamed("cicid","cic_id")\
            .withColumnRenamed("i94cit","citizen_country")\
            .withColumnRenamed("i94res","residence_country")\
            .withColumnRenamed("biryear","birth_year")\
            .withColumnRenamed("insnum","ins_num")

df_pers.write.mode("overwrite")\
                     .parquet(path=S3_BUCKET + 'dim_person')

Py4JJavaError: An error occurred while calling o859.parquet.
: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
	at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2195)
	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2654)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667)
	at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
	at org.apache.spark.sql.execution.datasources.DataSource.planForWritingFileFormat(DataSource.scala:424)
	at org.apache.spark.sql.execution.datasources.DataSource.planForWriting(DataSource.scala:524)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:290)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:566)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
	at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2101)
	at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2193)
	... 24 more


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [None]:
# DROP TABLES

staging_temp_table_drop = "DROP table IF EXISTS staging_temperature"
staging_demo_table_drop = "DROP table IF EXISTS staging_demographics"
staging_img_table_drop = "DROP table IF EXISTS staging_immigration"
staging_country_code_drop = "DROP table IF EXISTS staging_country_code"
staging_city_code_drop = "DROP table IF EXISTS staging_city_code"
staging_visa_code_drop = "DROP table IF EXISTS staging_visa_code"
staging_person_drop = "DROP table IF EXISTS staging_person"


#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
# Perform quality checks here

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.