# Project Title
### Data Engineering Capstone Project

#### Project Summary
--describe your project at a high level--

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [None]:
# Do all imports and installs here
import pandas as pd

# __v pyreadstat for reading multiple files sas7dat
# import os
# import pyreadstat as pyd


### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

In [None]:
# Read in the data here

# To test read Immigration data file by pandas
# v_ read the immigration data here
# [!Note]: should care when large files, this just try read a file

#_________V_________V_________V_________V_________V_________V_________V_________V
#Bypass this for save time run test, use only pyspark later on


#  _v: read only one file i94_apr16_sub
# fname_immigra = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
# df_AprImmig = pd.read_sas(fname_immigra, 'sas7bdat', encoding="ISO-8859-1")

# to print out the data frame of Immigration data
# df_AprImmig.head(7)

#_________V_________V_________V_________V_________V_________V_________V_________V


In [None]:
# v_ read temperature data here
# read by pandas

#_________V_________V_________V_________V_________V_________V_________V_________V
#  work OK but, by pass read by pandas here, use only pyspark below
#  because, for check the NaN value need to be used the: pyspark.sql.function

# fname_temper = '../../data2/GlobalLandTemperaturesByCity.csv'
# df_Temper = pd.read_csv(fname_temper)

# # to print out the temprature df
# df_Temper.head(10)
#_________V_________V_________V_________V_________V_________V_________V_________V

In [None]:
#  test read only one sas7bdat file i94_apr16_sub
# read by df in pyspark 
from pyspark.sql import SparkSession

spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

#  read only one file into df_spark

# df_spark = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')
# print("______v______v______v______v to see df when read one file")   
# df_spark.count()

#  do read temperature data here

fname_temper = '../../data2/GlobalLandTemperaturesByCity.csv'
# df_Temper = pd.read_csv(fname_temper)
df_Temper = spark.read.csv(fname_temper)
print("to see df_Temper.count: {}".format(df_Temper.count()))

# write parquet for df_Temper
df_Temper.write.parquet("Temp_data_0")
df_Temper=spark.read.parquet("Temp_data_0")

In [None]:
# for read multiple files sas
from functools import reduce
from pyspark.sql import DataFrame

import os
from os import listdir
from os.path import isfile, join

# to get the list of full path of each file in Immigration data folder
Immig_datapath= '../../data/18-83510-I94-Data-2016'
namefiles = os.listdir(Immig_datapath)

# return list of full path files
files = [os.path.join(Immig_datapath, f) for f in namefiles if os.path.isfile(os.path.join(Immig_datapath, f))]

print("to see list of path files")
print(files) 


def unionAll(*dfs):
    return reduce(DataFrame.unionAll, dfs)
i = 0

# accumulating the df for get Immigration data in all 12 files
print("______v______v______v______v______v______v______v______v \n")
print("To see accumulating df.count for 12 data files .... \n")
for file in files:
#tempo to test first 3 file
#     if(i==3): 
#         break
    if(i==0):
        df_ImmigAll = spark.read.format('com.github.saurfang.sas.spark').load(file)
        cols= df_ImmigAll.columns
        print(df_ImmigAll.count())
    if(i>0):
        #df1=spark.read.format('com.github.saurfang.sas.spark').load(file)
        df_ImmigAll = unionAll(df_ImmigAll,spark.read.format('com.github.saurfang.sas.spark').load(file).select([col for col in cols]))
        print(df_ImmigAll.count())
    i = i+1

print("______v______v______v______v______v______v______v______v \n")

In [None]:
#write to parquet when read one dat file
# df_spark.write.parquet("sas_data_viet")
# df_spark=spark.read.parquet("sas_data_viet")

# write to parquet when read all 12 files
df_ImmigAll.write.parquet("sas_data_all")
df_ImmigAll=spark.read.parquet("sas_data_all")


### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

In [None]:
# Performing cleaning tasks here
# Identify task to explore data and cleaning steps

# Task1: Handling with NaN value (missing value)
#  -> Indentify where NaN come from (which column on each df)
#  -> count the Nan value
#  -> select method to handle the NaN value (might be depend on the Nan come from which field of df)

# [airport_code]:
#     -> the column `name`: does not have the NaN value
#     -> but, there are two duplicated name

# [cites_demography]:
#     -> Some columns does not have NaN: City/ State/ State Code/ Race/ 

# [Immigration dataL: i94_xxx16_sub.sas7bdat]
# need to be indentified the NaN by df method

# [Temerature data: GlobalLandTemperaturesByCity.csv]
#   -> column 'City' and 'Country' does not have NaN value

# Task2: Duplicate data:
#  -> how identify>
#  -> how to handle with duplicating data

# Task3: Some un-expected value come into field of df (Ex. text-string come into a ditgit-column)
#  -> shall be removed
#  -> replace by a default proper value



In [None]:
from pyspark.sql.functions import col,isnan,when,count

# Task1: Handling with NaN value (missing value)
# TODO Identify which table, which column contain the NaN value

# _________v_________v_________v_________v_________v_________v_________v
#  use the read csv by pandas
#  Check NaN in df of Temerature

# print("to see NaN value in interest columns ... \n")
# NaN_dt=df_Temper['dt'].isna().sum()
# NaN_AverageTemperature=df_Temper['AverageTemperature'].isna().sum()
# NaN_City=df_Temper['City'].isna().sum()
# NaN_Country=df_Temper['Country'].isna().sum()
# print("NaN value number of column dt is: {} \n".format(NaN_dt))
# print("NaN value number of column AverageTemperature is: {} \n".format(NaN_AverageTemperature))
# print("NaN value number of column City is: {} \n".format(NaN_City))
# print("NaN value number of column Country is: {} \n".format(NaN_Country))
# print("\n")
# _________v_________v_________v_________v_________v_________v_________v
print("just list out the Temper df \n {}".format(list(df_ImmigAll)))

print("to check type of df_Temper is {}".format(type(df_Temper)))

print("to see all Columns of Terperature Df with counting NaN ... \n")
df_Temper.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_Temper.columns]).show()

#  Check NaN in df of Immigration data
# Get a list of all columns in immigration df
# Immig_col_list = list(df_ImmigAll)
# print("To see all the columns list of df_ImmigAll is \n {}".format(Immig_col_list))


print("to see all NaN value of df_ImmigAll \n")
print(type(df_ImmigAll))
# df_ImmigAll.isnull().sum(axis = 0)


# here to print out all Columns of Immigration DataFrame with counting NaN
# df_ImmigAll.select([count(when(isnan(c), c)).alias(c) for c in df_ImmigAll.columns]).show()
df_ImmigAll.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_ImmigAll.columns]).show()


In [None]:
# Task2: Duplicate data:


In [None]:
# Task3: Some un-expected value come into field of df (Ex. text-string come into a ditgit-column)

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [None]:
# Write code here

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
# Perform quality checks here

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.