###### Data Engineering Capstone Project

# US Student Immigration
> The purpose of this project is to study the foreign students. The goal is to offer Data teams Analysts a selection of data concerning immigration to the United States.

#### Project Summary

The project follows the follow steps:
* [Step 1: Scope the Project and Gather Data](#Step-1:-Scope-the-Project-and-Gather-Data)

* [Step 2: Explore and Assess the Data](#Step-2:-Explore-Assess-the-Data) 

* [Step 3: Define the Data Model](#Step-3:-Define-the-Data-Model)
* [Step 4: Run ETL to Model the Data](#Step-4:-Run-ETL-to-Model-the-Data)
* [Step 5: Complete Project Write Up](#Step-5:-Complete-Project-Write-Up)

In [199]:
import os
import io
import re
import sys
import datetime
import pandas as pd
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql.functions import isnan, when, count, col
from pyspark.sql.functions import *
from pyspark.sql.types import FloatType, StringType, DecimalType
import pyspark.sql.functions as func
import datetime as dt
pd.set_option('display.max_colwidth', 200)
pd.set_option("display.precision", 2)

In [2]:
#os.environ['PYSPARK_SUBMIT_ARGS'] = "--packages=org.apache.hadoop:hadoop-aws:2.7.3 pyspark-shell"

# Step 1: Scope the Project and Gather Data

Data warehouse allow us to collect, transform and manage data from varied sources. Then, Data Team Business connect to it and analyse data. 
Apache Spark has been used to gather data
Amazon S3 buckets store the data in parquet files for the Data teams.
The main dataset includes data on immigration to the United State.
The questions about foreign students and their choice to come to US may be useful to propose services.   
How many students arrived in US in April?    
Which Airline bring the most student in April?    
What are the top city to arrive in the USA?   
Where are from?   
what are the student profils (age, country born, country indicators)? 

#### Data Source

[Datactionnary](2_data_dictionnary.ipynb) provides informations about dataset and tables used. [This notebook](1_Exploration_python.ipynb) performs a first exploration with Python and explain the datasets, which variables I kept. 
Edit: decide not used Education-statistics, i found indicators from Indicators developpment

Dataset |File |Data Source|Dataframe Name
-|-|-|-|
I94 Immigration | immigration_data_sample.csv| [US National Tourism and Trade Office](https://travel.trade.gov/research/programs/i94/description.asp)| df_immigration
I94 Description Labels  Description|I94_SAS_Labels_Descriptions.SAS |US National Tourism and Trade Office|
Global Land Temperature|GlobalLandTemperaturesByCity.csv| [Berkeley Earth](http://berkeleyearth.org/)|df_temperature
Global Airports|airports-extended.csv| [OpenFlights.org and user contributions](https://www.kaggle.com/open-flights/airports-train-stations-and-ferry-terminals)|df_global_airports
Airports codes |airport-codes_csv.csv| provide by Udacity|df_airport_code
Iso country | wikipedia-iso-country-codes.csv|[Wikipedia](https://gist.github.com/radcliff/f09c0f88344a7fcef373)|df_iso_country
US Cities Demographic| us-cities-demographics.csv|provide by Udacity|df_demograph
Indicators developpment| WDIData.csv| [World Bank](https://www.kaggle.com/xavier14/wdidata)|df_indicator_dev
Education-statistics| EdStatsData.csv|provide by Kaggle [World Bank](https://www.kaggle.com/kostya23/worldbankedstatsunarchived)|df_Educ_data

##### I94 Immigration Data
* Source: https://travel.trade.gov/research/reports/historical/2016.html
    * data 'data/18-83510-I94-Data-2016', provide one file per month
        * These records are described according to 28 variables and 3M  rows per file
        *  It's provide information about Arrival/Departure to foreign visitors        
    * I94_SAS_Labels_Description.SAS for variable descriptions
    
##### Global Land Temperature Data
* Source: http://berkeleyearth.org/
    * data 'GlobalLandTemperaturesByCity.csv' provide climate information
        * Each line correspond to a record of temperature per day from city around the world.
        * The GlobalLandTemperaturesByCity.csv has 7 variables and 8599213 rows.
        
##### Global Airports Data
* Source: https://www.kaggle.com/open-flights/airports-train-stations-and-ferry-terminals
    * data 'airports-extended.csv'. Some of the data come from public sources and some of it comes from OpenFlights.org user contributions.
        * It's provide informatioms about of airports, train stations, and ferry terminals around the world.
        * There are 4 variables in 'airports-extended.csv'and 10668 rows
        
##### Airports Data Description Data
* Source: https://datahub.io/core/airport-codes#data
    * airport-codes_csv.csv. The airport code refers to the IATA airport code, 3 letters code unique for all airports in the world
        * The airport-codes_csv.csv provides informations about aiports.
        * There are 55075 rows and 12 columns in airport-codes_csv.csv.
        
##### Iso country Data
* Source: https://gist.github.com/radcliff/f09c0f88344a7fcef373
    * data 'wikipedia-iso-country-codes.csv'. This is a database about the different code useful to identify country.
        * This table gives us informations about Country codes used to identify each country
        * There are 4 variables and 247 rows.
        
##### US cities Demographics Data
* Source: https://data.census.gov/cedsci/. 
    * data 'us-cities-demographics.csv'. This dataset contains information about the demographics of all US cities and come from the US Census Bureau.
        * Provides simple informations about US State population
        * Contains 12 variables and 2892 rows
        
##### World Development Indicators Data
* Source: https://www.kaggle.com/xavier14/wdidata
    * data 'WDIData.csv'. The primary World Bank collection of development indicators, compiled from officially-recognized international sources. 
        * It presents the most current and accurate global development data available, and includes national, regional and global estimates.
        * Contains 64 variables, most of which are variables per year(1960 to 2018), with economics context and 422137 rows.
               
##### i94addr Data
* Source: I94_SAS_Labels_Description.SAS
    * US States code defined in I94_SAS_Labels_Description.SAS
        * data 'i94addr.csv' provides State Id and State name  
        
##### i94city_i94res Data
* Source: I94_SAS_Labels_Description.SAS
    * data 'i94cit_i94res.csv' defined Code Country by 3 digits
        * data 'i94cit_i94res.csv' provides Country Id and Country name
        
##### i94mode Data
* Source: I94_SAS_Labels_Description.SAS
    * data 'i94mode.csv' defined arrival US
        * data 'i94mode.csv' provides code Mode and name Code.
        
##### i94port Data
* Source: I94_SAS_Labels_Description.SAS
    * data 'i94port.csv'
        * data 'i94port.csv' provides Port Id, Port city and State Id.
        
##### i94visa Data
* Source: I94_SAS_Labels_Description.SAS
    * data 'i94visa.csv'
        * data 'i94visa.csv' povides code Visa ans Visa

### SETUP SPARK AND ENVIRONMENT

In [3]:
output_parquet = '../../output/'
path = '../../data/'
date_time = datetime.today().strftime('%Y%m%d')

In [4]:
def create_spark_session():
    spark = SparkSession.builder \
                    .appName("Us_student_immigation") \
                    .config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11") \
                    .enableHiveSupport() \
                    .getOrCreate()
    return spark
spark = create_spark_session()

%autosave 60

Autosaving every 60 seconds


In [5]:
#spark.sparkContext.getConf().getAll()

# enhaut

# [DOWN](#enbas)

### I94 Immigration Data
#### Exploration

* Path = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
* There are 3096313 rows and 29 columns in *i94_apr16_sub.sas7bdat*.
* Name of the dataFrame: df_immigration
* As we see in [data exploration file](./0_dataset_information.ipynb), some variables are either not present or not very present (visapost, occup, entdepu, insnum)
* Variables droped: depdate, count, occup, entdepa, entdepd, entdepu, matflag, biryear, insnum, dtadfile, visapost, fltno, admnum, insnum, dtaddto. 	
* Variables used:

Column Name | Description |
-|-|
**cicid**|     ID uniq per record in the dataset 
**i94yr**|     4 digit year  
**i94mon**|    Numeric month 
**i94cit**|     3 digit code of source city for immigration (Born country) 
**i94res**|    3 digit code of source country for immigration
**i94port**|   Port addmitted through 
**arrdate**|   Arrival date in the USA
**i94mode**|   Mode of transportation (1 = Air; 2 = Sea; 3 = Land; 9 = Not reported) 
**i94addr**|   State of arrival 
**i94bir**|    Age in years 
**i94visa**|   Visa Code - 1 = Business / 2 = Pleasure / 3 = Student
**gender**|    Gender
**airline**|   Airline used to arrive in U.S.
**admnum**|    Admission number, should be unique and not nullable 
**visatype**|  Class of admission legally admitting the non-immigrant to temporarily stay in U.S.

#### Read I94 data

In [7]:
def load_immigration(path, file):
    df = spark.read \
        .format('com.github.saurfang.sas.spark') \
        .option('header', 'true') \
        .load(path+file)
    nb_rows = df.count()
    print(f'*****         Loading {nb_rows} rows')
    print(f'*****         Display the Schema')
    df.printSchema()
    print(f'*****         Display few rows')
    df.show(3, truncate = False)
    return df, nb_rows

In [8]:
file = '18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'

In [9]:
file = '18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
# refaire avec S3 et tous les fichiers (get_path_sas_folder parquet file)
df_immigration, rows_immig = load_immigration(path, file)

*****         Loading 3096313 rows
*****         Display the Schema
root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-

#### Write to parquet files

In [10]:
pattern = re.search(r'((i94_[a-z]{3}[1-9]{2}))', file).group(0)
df = df_immigration

name_file = pattern + '_staging.parquet_' + date_time
print(f' Path to file Parquet is:   "{output_parquet}{name_file}"')
df.write.mode("overwrite").parquet(f'{output_parquet}{name_file}')
print(f' Done for "{name_file}" !')

path_i94_immigration = output_parquet + name_file

 Path to file Parquet is:   "../../output/i94_apr16_staging.parquet_20200527"
 Done for "i94_apr16_staging.parquet_20200527" !


### Global Land Temperature Data
#### Exploration
* Path = '../../data/GlobalLandTemperaturesByCity.csv
* There are 8599212 rows and 7 columns in *GlobalLandTemperaturesByCity.csv*.
* Name of the dataFrame: df_temperature

* As we see in [data exploration file](./0_dataset_information.ipynb), the first date is in 1743, and we find a row per day per town. So we will make aggregation for this data set and drop 'AverageTemperature' , 'Latitude' and 'Longitude' columns
* Variables used:

Column Name | Description 
-|-|
**dt**|Date format YYYY-MM-DD| 
**AverageTemperature**|Average Temperature for the city to th date dt|
**City**| City name| 
**Country**| Country name |


#### Read GlobalLandTemperaturesByCity

In [11]:
def load_temperature(path, file):
    df = spark.read \
        .format("csv") \
        .option('header', 'true') \
        .option('inferSchema', 'true') \
        .load(path+file)
    nb_rows = df.count()
    print(f'*****         Loading {nb_rows} rows')
    print(f'*****         Display the Schema')
    df.printSchema()
    print(f'*****         Display few rows')
    df.show(3, truncate = False)
    return df, nb_rows

In [12]:
file = 'GlobalLandTemperaturesByCity.csv'
df_temperature, rows_temp = load_temperature(path, file)

*****         Loading 8599212 rows
*****         Display the Schema
root
 |-- dt: timestamp (nullable = true)
 |-- AverageTemperature: double (nullable = true)
 |-- AverageTemperatureUncertainty: double (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)

*****         Display few rows
+-------------------+------------------+-----------------------------+-----+-------+--------+---------+
|dt                 |AverageTemperature|AverageTemperatureUncertainty|City |Country|Latitude|Longitude|
+-------------------+------------------+-----------------------------+-----+-------+--------+---------+
|1743-11-01 00:00:00|6.068             |1.7369999999999999           |Århus|Denmark|57.05N  |10.33E   |
|1743-12-01 00:00:00|null              |null                         |Århus|Denmark|57.05N  |10.33E   |
|1744-01-01 00:00:00|null              |null                         |Å

#### Write to parquet files

In [13]:
pattern = re.search(r'(.*)\.csv$', file).group(1)
df = df_temperature

name_file = pattern + '_staging.parquet_' + date_time
print(f' Path to file Parquet is:   "{output_parquet}{name_file}"')
df.write.mode("overwrite").parquet(f'{output_parquet}{name_file}')
print(f' Done for "{name_file}" !')

path_temperature = output_parquet + name_file

 Path to file Parquet is:   "../../output/GlobalLandTemperaturesByCity_staging.parquet_20200527"
 Done for "GlobalLandTemperaturesByCity_staging.parquet_20200527" !


### Airports Code Data
#### Exploration
* Path = '../../data/airport-codes_csv.csv'
* There are 55075 rows and 12 column in *airport-codes_csv.csv*
* Name of the DataFrame : df_airport_code
* Some variables left more 50% of data (continent, iata_code and local_code) so I kept:

Column Name | Description 
-|-|
**ident**| Unique identifier Airport code|
**type**| Type of airport | 
**name**| Name of the airport | 
**continent**| Continent | | 
**iso_country**| ISO code of airport country |
**iso_region**| ISO code of the region airport | 
**municipality**| City name where the airport is located | 
**iata_code**| IATA code of the airport|

#### Read Airports Code

In [14]:
def load_airport_code(path, file):
    df = spark.read \
        .format("csv") \
        .option('header', 'true') \
        .option('inferSchema', 'true') \
        .load(path+file)
    nb_rows = df.count()
    print(f'*****         Loading {nb_rows} rows')
    print(f'*****         Display the Schema')
    df.printSchema()
    print(f'*****         Display few rows')
    df.show(3, truncate = False)
    return df, nb_rows

In [15]:
file = 'airport-codes_csv.csv'    
df_airport_code, rows_code = load_airport_code(path, file)

*****         Loading 55075 rows
*****         Display the Schema
root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: integer (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)

*****         Display few rows
+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+----------------------------------+
|ident|type         |name                |elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|coordinates                       |
+-----+-------------+--------------------+------------+---------+--------

#### Write to parquet files

In [16]:
pattern = re.search(r'(.*)\.csv$', file).group(1)
df = df_airport_code

name_file = pattern + '_staging.parquet_' + date_time
print(f' Path to file Parquet is:   "{output_parquet}{name_file}"')
df.write.mode("overwrite").parquet(f'{output_parquet}{name_file}')
print(f' Done for "{name_file}" !!!')

path_aiport_code = output_parquet + name_file

 Path to file Parquet is:   "../../output/airport-codes_csv_staging.parquet_20200527"
 Done for "airport-codes_csv_staging.parquet_20200527" !!!


### Global Airports Data
#### Exploration
* Path = '../../data/airports-extended.csv'
* There are 10668 rows and 13 columns in *airports-extended.csv*
* Name of the dataframe : df_global_airports
* No missing value, and I kept:

Column Name | Description | Example | Type
-|-|-|-|
**airport_name**|Name of airport|Nadzab Airport|Object
**airport_city**|Main city served by airport|Nadzab|Object
**airport_country**|Country or territory where airport is located|Papua New Guinea|Object
**airport_iata**|3-letter IATA code|LAE|Object

#### Read airports-extended

In [17]:
global_airports_schema = T.StructType([
    T.StructField('airport_ID', T.IntegerType(), False),
    T.StructField('name', T.StringType(), False),
    T.StructField('city', T.StringType(), False),
    T.StructField('country', T.StringType(), False),
    T.StructField('iata', T.StringType(), False),
    T.StructField('icao', T.StringType(), False),
    T.StructField('latitude', T.StringType(), False),
    T.StructField('longitude', T.StringType(), False),
    T.StructField('altitude', T.IntegerType(), False),
    T.StructField('timezone', T.StringType(), False),
    T.StructField('dst', T.StringType(), False),
    T.StructField('tz_timezone', T.StringType(), False),
    T.StructField('type', T.StringType(), False),
    T.StructField('data_source', T.StringType(), False)
])

In [18]:
def load_global_airports(path, file):
    df = spark.read \
        .format("csv") \
        .option('header', 'True') \
        .option('inferSchema', 'true') \
        .schema(global_airports_schema) \
        .load(path+file)
    nb_rows = df.count()
    print(f'*****         Loading {nb_rows} rows')
    print(f'*****         Display the Schema')
    df.printSchema()
    print(f'*****         Display few rows')
    df.show(3, truncate = False)
    return df, nb_rows

In [19]:
file = 'airports-extended.csv'
df_global_airports, rows_global = load_global_airports(path, file)

*****         Loading 10667 rows
*****         Display the Schema
root
 |-- airport_ID: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)
 |-- iata: string (nullable = true)
 |-- icao: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- altitude: integer (nullable = true)
 |-- timezone: string (nullable = true)
 |-- dst: string (nullable = true)
 |-- tz_timezone: string (nullable = true)
 |-- type: string (nullable = true)
 |-- data_source: string (nullable = true)

*****         Display few rows
+----------+----------------------------+-----------+----------------+----+----+------------------+------------------+--------+--------+---+--------------------+-------+-----------+
|airport_ID|name                        |city       |country         |iata|icao|latitude          |longitude         |altitude|timezone|dst|tz_timezone         |type   |d

#### Write to parquet files

In [20]:
pattern = re.search(r'(.*)\.csv$', file).group(1)
df = df_global_airports

name_file = pattern + '_staging.parquet_' + date_time
print(f' Path to file Parquet is:   "{output_parquet}{name_file}"')
df.write.mode("overwrite").parquet(f'{output_parquet}{name_file}')
print(f' Done for "{name_file}" !!!')

path_global_airports = output_parquet + name_file

 Path to file Parquet is:   "../../output/airports-extended_staging.parquet_20200527"
 Done for "airports-extended_staging.parquet_20200527" !!!


### Iso Country Data
#### Exploration
* Path = '../../data/wikipedia-iso-country-codes.csv
* There are 246 rows and 5 columns in *wikipedia-iso-country-codes.csv*
* Name of the dataframe: df_iso_country
* I remove 'ISO 3166-2' column, only one missing value. I choose to replace manually. 

Column Name | Description 
-|-|
**Country_name**|Country Name in English|
**Alpha2_code**|code 2 letter code for the country|
**Alpha3_code**|code 3 letter code for the country|
**Numeric_code**|ISO 3166-2 code|

#### Read wikipedia-iso-country-codes

In [21]:
iso_country_schema = T.StructType([
    T.StructField('Country', T.StringType(), False),
    T.StructField('Alpha_2', T.StringType(), False),
    T.StructField('Alpha_3', T.StringType(), False),
    T.StructField('Num_code', T.StringType(), False),
    T.StructField('ISO_3166-2', T.StringType(), True),    
]) 

In [22]:
def load_iso_country(path, file):
    df = spark.read \
        .format("csv") \
        .option('header', 'true') \
        .option('inferSchema', 'true') \
        .schema(iso_country_schema) \
        .load(path+file)
    nb_rows = df.count()
    print(f'*****         Loading {nb_rows} rows')
    print(f'*****         Display the Schema')
    df.printSchema()
    print(f'*****         Display few rows')
    df.show(3, truncate = False)
    return df, nb_rows

In [23]:
file = 'wikipedia-iso-country-codes.csv'
df_iso_country, rows_iso = load_iso_country(path, file)

*****         Loading 246 rows
*****         Display the Schema
root
 |-- Country: string (nullable = true)
 |-- Alpha_2: string (nullable = true)
 |-- Alpha_3: string (nullable = true)
 |-- Num_code: string (nullable = true)
 |-- ISO_3166-2: string (nullable = true)

*****         Display few rows
+--------+-------+-------+--------+-------------+
|Country |Alpha_2|Alpha_3|Num_code|ISO_3166-2   |
+--------+-------+-------+--------+-------------+
|Zimbabwe|ZW     |ZWE    |716     |ISO 3166-2:ZW|
|Zambia  |ZM     |ZMB    |894     |ISO 3166-2:ZM|
|Yemen   |YE     |YEM    |887     |ISO 3166-2:YE|
+--------+-------+-------+--------+-------------+
only showing top 3 rows



#### Write to parquet files

In [24]:
pattern = re.search(r'(.*)\.csv$', file).group(1)
df = df_iso_country

name_file = pattern + '_staging.parquet_' + date_time
print(f' Path to file Parquet is:   "{output_parquet}{name_file}"')
df.write.mode("overwrite").parquet(f'{output_parquet}{name_file}')
print(f' Done for "{name_file}" !!!')

path_iso_country = output_parquet + name_file

 Path to file Parquet is:   "../../output/wikipedia-iso-country-codes_staging.parquet_20200527"
 Done for "wikipedia-iso-country-codes_staging.parquet_20200527" !!!


### US cities Demographics
#### Exploration
* Path = '../../data/us-cities-demographics.csv
* There are 2891 rows and 12 columns in us-cities-demographics.csv
* Dataframe name : df_demograph
* Missing less than 1% in some variables so I drop 'Number of Veterans', 'Average Household Size' and kept: 

Column Name | Description | 
-|-|
**City**|Name of the city|
**State**|US state of the city|
**Median Age**|The median of the age of the population|
**Male Population**|Number of the male population|
**Female Population**|Number of the female population|
**Total Population**|Number of the total population|
**Foreign-born**|Number of residents of the city that were not born in the city|
**State Code**|Code of the state of the city|
**Race**|Race class|
**Count**|Number of individual of each race|

#### Read us-cities-demographics

In [25]:
demograph_schema = T.StructType([
    T.StructField('City', T.StringType(), False),
    T.StructField('State', T.StringType(), False),
    T.StructField('Median_Age', T.FloatType(), False),
    T.StructField('Male_Population', T.IntegerType(), False),
    T.StructField('Female_Population', T.IntegerType(), False),
    T.StructField('Total_Population', T.IntegerType(), False),
    T.StructField('Number_of_Veterans', T.IntegerType(), False),
    T.StructField('Foreign-born', T.IntegerType(), False),
    T.StructField('Average_Household_Size', T.FloatType(), False),
    T.StructField('State_Code', T.StringType(), False),
    T.StructField('Race', T.StringType(), False),
    T.StructField('Count', T.IntegerType(), False)
]) 

In [26]:
def load_demograph(path, file):
    df = spark.read \
        .format("csv") \
        .option('header', 'true') \
        .option('delimiter', ';') \
        .option('inferSchema', 'true') \
        .schema(demograph_schema) \
        .load(path+file)
    nb_rows = df.count()
    print(f'*****         Loading {nb_rows} rows')
    print(f'*****         Display the Schema')
    df.printSchema()
    print(f'*****         Display few rows')
    df.show(3, truncate = False)
    return df, nb_rows

In [27]:
file = 'us-cities-demographics.csv'
df_demograph, rows_demo = load_demograph(path, file)

*****         Loading 2891 rows
*****         Display the Schema
root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median_Age: float (nullable = true)
 |-- Male_Population: integer (nullable = true)
 |-- Female_Population: integer (nullable = true)
 |-- Total_Population: integer (nullable = true)
 |-- Number_of_Veterans: integer (nullable = true)
 |-- Foreign-born: integer (nullable = true)
 |-- Average_Household_Size: float (nullable = true)
 |-- State_Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: integer (nullable = true)

*****         Display few rows
+-------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+------------------+-----+
|City         |State        |Median_Age|Male_Population|Female_Population|Total_Population|Number_of_Veterans|Foreign-born|Average_Household_Size|State_Code|Race              |Count|
+-----

#### Write to parquet files

In [28]:
pattern = re.search(r'(.*)\.csv$', file).group(1)
df = df_demograph

name_file = pattern + '_staging.parquet_' + date_time
print(f' Path to file Parquet is:   "{output_parquet}{name_file}"')
df.write.mode("overwrite").parquet(f'{output_parquet}{name_file}')
print(f' Done for "{name_file}" !!!')

path_demograph = output_parquet + name_file

 Path to file Parquet is:   "../../output/us-cities-demographics_staging.parquet_20200527"
 Done for "us-cities-demographics_staging.parquet_20200527" !!!


### World Development Indicators Data
#### Exploration 
* Path = '../../data/WDIData.csv
* There are 422136 rows and 64 columns in *WDIData.csv*
* Dataframe name : df_indicator_dev
* This dataset contains 64 variables with economics context , most of which are variables per year(1960 to 2018). Data is missing a lot, between 40% and 91%. I just need the year 2015 to explain the Economic context in the country and make aggregation per country. I kept:

Column Name | Description | 
-|-|
**Country Name**|Name of the country|
**Country Code**|3 letters code of country|
**Indicator Name**|indicators of economic development|conversion factor, GDP (LCU per inter...|
**Indicator Code**|letters indicator code|
**2016**|one column per year since 1960|

#### Read WDIData

In [29]:
def load_indicator_dev(path, file):
    df = spark.read \
        .format("csv") \
        .option('header', 'true') \
        .option('inferSchema', 'true') \
        .load(path+file) \
        .select("Country Name","Country Code", "Indicator Name", "Indicator Code", "2015" ) \
        .toDF("Country_Name","Country_Code", "Indicator_Name", "Indicator_Code", "2015")
    nb_rows = df.count()
    print(f'*****         Loading {nb_rows} rows')
    print(f'*****         Display the Schema')
    df.printSchema()
    print(f'*****         Display few rows')
    df.show(3, truncate = False)
    return df, nb_rows


In [30]:
file = 'WDIData.csv'
df_indicator_dev, rows_dev = load_indicator_dev(path, file)

*****         Loading 422136 rows
*****         Display the Schema
root
 |-- Country_Name: string (nullable = true)
 |-- Country_Code: string (nullable = true)
 |-- Indicator_Name: string (nullable = true)
 |-- Indicator_Code: string (nullable = true)
 |-- 2015: double (nullable = true)

*****         Display few rows
+------------+------------+-------------------------------------------------------------------------+-----------------+----------------+
|Country_Name|Country_Code|Indicator_Name                                                           |Indicator_Code   |2015            |
+------------+------------+-------------------------------------------------------------------------+-----------------+----------------+
|Arab World  |ARB         |2005 PPP conversion factor, GDP (LCU per international $)                |PA.NUS.PPP.05    |null            |
|Arab World  |ARB         |2005 PPP conversion factor, private consumption (LCU per international $)|PA.NUS.PRVT.PP.05|null         

#### Write to parquet files

In [31]:
pattern = re.search(r'(.*)\.csv$', file).group(1)
df = df_indicator_dev

name_file = pattern + '_staging.parquet_' + date_time
print(f' Path to file Parquet is:   "{output_parquet}{name_file}"')
df.write.mode("overwrite").parquet(f'{output_parquet}{name_file}')
print(f' Done for "{name_file}" !!!')

path_indicator_dev = output_parquet + name_file

 Path to file Parquet is:   "../../output/WDIData_staging.parquet_20200527"
 Done for "WDIData_staging.parquet_20200527" !!!


In [35]:
### Create Parquet Files 
# Parse I94_SAS_Labels_Description.SAS and save in parquet format in '../../data/'
file = 'I94_SAS_Labels_Descriptions.SAS'
!python parse_file.py $path $file
#### Read Parquet files create from 'I94_SAS_Labels_Descriptions.SAS'

Running "../../data/I94_SAS_Labels_Descriptions.SAS"
 
There are 583 rows in i94port.parquet
There are 3 rows in i94visa.parquet
There are 55 rows in i94addr.parquet
There are 289 rows in i94cit_i94res.parquet
There are 4 rows in i94mode.parquet
 
***** Make i94 labels files is done!


### I94 Description Labels  Description

Here some data extract from '../../data/I94_SAS_Labels_Description.SAS'.    
To explain code in I94-immigration, I create 5 files and read here. The file was cleaned and parsed with the scrip *parse_file.py*. see below:

In [36]:
path = '../../data/'
file = 'I94_SAS_Labels_Descriptions.SAS'

In [37]:
print(path+file)

../../data/I94_SAS_Labels_Descriptions.SAS


In [38]:
%pycat parse_file.py

In [39]:
%run -i parse_file.py $path $file

Running "../../data/I94_SAS_Labels_Descriptions.SAS"
 
There are 583 rows in i94port.parquet
There are 3 rows in i94visa.parquet
There are 55 rows in i94addr.parquet
There are 289 rows in i94cit_i94res.parquet
There are 4 rows in i94mode.parquet
 
***** Make i94 labels files is done!


----

# Step 2: Explore Assess the Data

#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

In [6]:
!ls $output_parquet

airport-codes_csv_staging.parquet_20200526
airport-codes_csv_staging.parquet_20200527
airports-extended_staging.parquet_20200526
airports-extended_staging.parquet_20200527
education-statistics
GlobalLandTemperaturesByCity_staging.parquet_20200526
GlobalLandTemperaturesByCity_staging.parquet_20200527
i94addr.parquet
i94_apr16_staging.parquet_20200526
i94_apr16_staging.parquet_20200527
i94cit_i94res.parquet
i94mode.parquet
i94port.parquet
i94visa.parquet
us-cities-demographics_staging.parquet_20200526
us-cities-demographics_staging.parquet_20200527
WDIData_staging.parquet_20200526
WDIData_staging.parquet_20200527
wikipedia-iso-country-codes_staging.parquet_20200526
wikipedia-iso-country-codes_staging.parquet_20200527


In [12]:
airport_code = spark.read.parquet(output_parquet+'airport-codes_csv_staging.parquet_20200526')
demograph = spark.read.parquet(output_parquet+'us-cities-demographics_staging.parquet_20200526')
immigration = spark.read.parquet(output_parquet+'i94_apr16_staging.parquet_20200526')
temperature = spark.read.parquet(output_parquet+'GlobalLandTemperaturesByCity_staging.parquet_20200526')
global_airports = spark.read.parquet(output_parquet+'airports-extended_staging.parquet_20200526')
iso_country = spark.read.parquet(output_parquet+'wikipedia-iso-country-codes_staging.parquet_20200526')                                    
df_indicator_dev = spark.read.parquet(output_parquet+'WDIData_staging.parquet_20200526')    

In [None]:
# Read Parquet files
path = '../data/'
df_immigration = spark.read.parquet(path_i94_immigration)
df_temperature = spark.read.parquet(path_temperature)
df_airport_code = spark.read.parquet(path_aiport_code)
df_global_airports = spark.read.parquet(path_global_airports)
df_iso_country = spark.read.parquet(path_iso_country)
df_demograph = spark.read.parquet(path_demograph)
df_indicator_dev = spark.read.parquet(path_indicator_dev)

In [9]:
display(f'df_airport_code: There are {df_airport_code.count()} rows from parquet file, {rows_code} before staging')
display(f'df_demograph : There are {df_demograph.count()} rows from parquet file, {rows_demo} before staging')
display(f'df_global_airports : There are {df_global_airports.count()} rows from parquet file, {rows_global} before staging')
display(f'df_immigration : There are {df_immigration.count()} rows from parquet file, {rows_immig} before staging')
display(f'df_indicator_dev : There are {df_indicator_dev.count()} rows from parquet file, {rows_dev} before staging')
display(f'df_iso_country : There are {df_iso_country.count()} rows from parquet file, {rows_iso} before staging')
display(f'df_temperature : There are {df_temperature.count()} rows from parquet file, {rows_temp} before staging')

NameError: name 'rows_educ' is not defined

In [19]:
i94_mode = pd.read_parquet(output_parquet+'i94mode.parquet')
print(f'***** Dataframe i94_mode *****')
print("There are {} rows.".format(len(i94_mode)))
print(' ')

i94_ctry = pd.read_parquet(output_parquet+'i94cit_i94res.parquet')
print(f'***** Dataframe i94_ctry *****')
print("There are {} rows.".format(len(i94_ctry)))
print(' ')

i94_addr = pd.read_parquet(output_parquet+'i94addr.parquet')
print(f'***** Dataframe i94_addr *****')
print("There are {} rows.".format(len(i94_addr)))
print(' ')

i94_visa = pd.read_parquet(output_parquet+'i94visa.parquet')
print(f'***** Dataframe i94_visa *****')
print("There are {} rows.".format(len(i94_visa)))
print(' ')

i94_port = pd.read_parquet(output_parquet+'i94port.parquet')
print(f'***** Dataframe i94_port *****')
print("There are {} rows.".format(len(i94_port)))
print(' ')

***** Dataframe i94_mode *****
There are 4 rows.
 
***** Dataframe i94_ctry *****
There are 289 rows.
 
***** Dataframe i94_addr *****
There are 55 rows.
 
***** Dataframe i94_visa *****
There are 3 rows.
 
***** Dataframe i94_port *****
There are 583 rows.
 


In [13]:
print(output_parquet)
print(path)

../../output/
../data/


In [14]:
%xdel df
%who_ls DataFrame

NameError: name 'df' is not defined


['airport_code',
 'demograph',
 'df_educ_data',
 'df_indicator_dev',
 'global_airports',
 'immigration',
 'iso_country',
 'temperature']

### I94 Immigration Data
* i94addr, missing 152592 values (code State US, 2 letters)
    * fill by Port_id from the dataframe 'i94port' 
    * join on 'df_immigration.i94port == port_state_dic.Port_id', with no missing values
    * nul value replace by State_id
* int_col = ['cicid', 'i94yr', 'i94mon','i94cit', 'i94res', 'i94mode', 'i94bir', 'i94visa']
    * fill null by default value from dictionnary and cast the int_col in Integer
* str_cols = ['i94addr', 'i94port', 'gender', 'airline', 'visatype']
    * fill null by default value from dictionnary
* date_col = ['arrdate'(double sas format),'dtadfile'(string YYYYMMDD)]
    * 'arrdate' in SAS date format, a value represents the number of days between January 1, 1960, and a other date.
    * cast the date and fill the null value

In [None]:
print(path_i94_immigration)

In [15]:
#%xdel immigration 
#immigration = spark.read.parquet(path_i94_immigration)
immigration = spark.read.parquet(output_parquet+'i94_apr16_staging.parquet_20200526')
immigration.createOrReplaceTempView("temp_immig")

In [16]:
#Count of null values of dataframe in pyspark 
immigration.select([count(when(col(c).isNull(), c)).alias(c) for c in immigration.columns]).toPandas()
#Count of Missing values of dataframe in pyspark 
immigration.select([count(when(isnan(c), c)).alias(c) for c in immigration.columns]).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0


In [17]:
# get number of null value in df_immigration.i94port
immigration.filter(immigration.i94port.rlike('[A-Z]{3}')) \
              .filter(immigration.i94addr.isNull()) \
              .select(immigration.i94port, immigration.i94addr).count()

152079

In [20]:
#drop columns with a lot of null values and not useful
drop_col = ['depdate', 'count', 'occup', 'entdepa', 'entdepd', 'entdepu', 'matflag', 'biryear', \
            'insnum','visapost', 'fltno', 'admnum', 'insnum', 'dtaddto']
# create dictionnary from i94_port
port_state_dic = dict([(i,a) for i, a in zip(i94_port.Port_id,i94_port.State_id)])
# lamda function to get State_id
user_func =  udf(lambda x: port_state_dic.get(x))

newdf = immigration.drop(*drop_col) \
                   .withColumn('i94addr', F.when((F.col('i94addr').isNull()), \
                                                 user_func(immigration.i94port)) \
                                           .otherwise(F.col('i94addr')))

In [21]:
newdf.filter(newdf.i94port.rlike('[A-Z]{3}')) \
     .filter(newdf.i94addr.isNull()) \
     .select(newdf.i94port, newdf.i94addr).count()


3700

In [22]:
# replace the null value and cast the columns in integer
# int_col = ['cicid', 'i94yr', 'i94mon','i94cit', 'i94res', 'i94mode', 'i94bir', 'i94visa']
null_int = {'cicid': -1, 'i94yr': -1, 'i94mon': -1,'i94cit': 239, 'i94res': 239, 'i94mode': 9, 'i94bir': -1, 'i94visa': -1}
for k in null_int:
        newdf = newdf.withColumn(k, F.when((F.col(k).isNull()), null_int[k])
                 .otherwise(F.col(k).cast("int")))
        
# replace the null value for the string
# str_cols = ['i94addr', 'i94port', 'gender', 'airline', 'visatype']
null_str = {'i94addr': '99', 'i94port': '999', 'gender': 'U', 'airline': 'unknown', 'visatype': '99' }
for k in null_str:
        newdf = newdf.withColumn(k, F.when((F.col(k).isNull()), null_str[k])
                                 .otherwise(F.col(k)))
        
# date_col = ['arrdate'(double sas format),
#             'dtadfile'(string YYYYMMDD), 
null_date = {'arrdate': 'NA', 'dtadfile': 'NA'}
setup_date = udf(lambda x: (datetime(1960, 1, 1). date() + dt.timedelta(x)).isoformat() if x else None)
newdf = newdf.withColumn("arrdate", setup_date(newdf.arrdate)) \
             .withColumn("dtadfile",to_date(unix_timestamp(col("dtadfile"),"yyyyMMdd").cast("timestamp")))
for k in null_date:
        newdf = newdf.withColumn(k, F.when((F.col(k).isNull()), null_date[k])
                                 .otherwise(F.col(k)))

In [23]:
# display distinct value
newdf.agg(*(countDistinct(col(c)).alias(c) for c in newdf.columns)).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,i94bir,i94visa,dtadfile,gender,airline,visatype
0,3096313,1,1,243,229,299,30,4,466,113,3,118,4,535,17


In [24]:
newdf.printSchema()

root
 |-- cicid: integer (nullable = true)
 |-- i94yr: integer (nullable = true)
 |-- i94mon: integer (nullable = true)
 |-- i94cit: integer (nullable = true)
 |-- i94res: integer (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: string (nullable = true)
 |-- i94mode: integer (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- i94bir: integer (nullable = true)
 |-- i94visa: integer (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- visatype: string (nullable = true)



In [25]:
df_immigration = newdf.withColumn('dtadfile', to_date(newdf.dtadfile, 'yyyyMMdd').cast('date')) \
                               .withColumn('arrdate', to_date(newdf.arrdate, 'yyyyMMdd').cast('date')) \
                               .dropDuplicates()


In [26]:
df_immigration = (df_immigration.withColumnRenamed("cicid", "id_i94") \
             .withColumnRenamed("i94yr", "year") \
             .withColumnRenamed("i94mon", "month") \
             .withColumnRenamed("i94cit", "country_born_num") \
             .withColumnRenamed("i94res", "country_res_num") \
             .withColumnRenamed("i94port", "iata_code") \
             .withColumnRenamed("arrdate", "arr_date") \
             .withColumnRenamed("i94mode", "arri_mode") \
             .withColumnRenamed("i94addr", "state_id_arrival") \
             .withColumnRenamed("i94bir", "age") \
             .withColumnRenamed("i94visa", "arr_reason") \
             .withColumnRenamed("dtadfile", "dt_add_i94") \
             .withColumnRenamed("dtaddto", "depar_max") \
             .withColumnRenamed("gender", "gender") \
             .withColumnRenamed("airline","airline") \
             .withColumnRenamed("visatype:", "visatype"))

In [27]:
df_immigration.printSchema()
df_immigration.show(5)

root
 |-- id_i94: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- country_born_num: integer (nullable = true)
 |-- country_res_num: integer (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- arr_date: date (nullable = true)
 |-- arri_mode: integer (nullable = true)
 |-- state_id_arrival: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- arr_reason: integer (nullable = true)
 |-- dt_add_i94: date (nullable = true)
 |-- gender: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- visatype: string (nullable = true)

+------+----+-----+----------------+---------------+---------+--------+---------+----------------+---+----------+----------+------+-------+--------+
|id_i94|year|month|country_born_num|country_res_num|iata_code|arr_date|arri_mode|state_id_arrival|age|arr_reason|dt_add_i94|gender|airline|visatype|
+------+----+-----+----------------+---------------+---------+--------+---------

### Global Land Temperature Data

* As we see in [data exploration file](./0_dataset_information.ipynb), the first date is in 1743, and we find a row per day per town. 
    * Make aggregation 
* drop "dt", "AverageTemperatureUncertainty" , "Latitude" and "Longitude" columns

In [28]:
print(output_parquet+'GlobalLandTemperaturesByCity_staging.parquet_20200526')

../../output/GlobalLandTemperaturesByCity_staging.parquet_20200526


In [29]:
temperature = spark.read.parquet(output_parquet+'GlobalLandTemperaturesByCity_staging.parquet_20200526')

In [30]:
temperature.show(2)

+-------------------+------------------+-----------------------------+--------+-------+--------+---------+
|                 dt|AverageTemperature|AverageTemperatureUncertainty|    City|Country|Latitude|Longitude|
+-------------------+------------------+-----------------------------+--------+-------+--------+---------+
|1907-07-01 00:00:00|            14.739|                        0.624|Edmonton| Canada|  53.84N|  113.18W|
|1907-08-01 00:00:00|            12.001|                        0.603|Edmonton| Canada|  53.84N|  113.18W|
+-------------------+------------------+-----------------------------+--------+-------+--------+---------+
only showing top 2 rows



In [31]:
# drop column "AverageTemperatureUncertainty"
drop_cols = ["dt", "AverageTemperatureUncertainty", "Latitude", "Longitude"]
newdf = temperature.drop(*drop_cols)


In [32]:
newdf = newdf.groupBy('Country', 'City') \
    .agg(avg("AverageTemperature")) \
    .orderBy('Country') \
    .dropDuplicates()


In [33]:
newdf = (newdf.withColumnRenamed("Country", "Country") \
           .withColumnRenamed("City", "City") \
           .withColumnRenamed("avg(AverageTemperature)", "AverageTemperature"))
newdf = newdf.withColumn("AverageTemperature", newdf.AverageTemperature.cast('float'))

In [34]:
df_temperature = newdf.orderBy('Country')
df_temperature.printSchema()
df_temperature.show(5)

root
 |-- Country: string (nullable = true)
 |-- City: string (nullable = true)
 |-- AverageTemperature: float (nullable = true)

+-----------+------+------------------+
|    Country|  City|AverageTemperature|
+-----------+------+------------------+
|Afghanistan|Gardez|          17.27424|
|Afghanistan| Kabul|         14.342919|
|Afghanistan| Gazni|         10.311996|
|Afghanistan|Qunduz|         10.790278|
|Afghanistan| Herat|         14.213004|
+-----------+------+------------------+
only showing top 5 rows



### Airports Code Data
* MIssing value in Iata_code, Municipality in the whole table
    * I drop '["elevation_ft","continent", "gps_code", "coordinates"]'
    * I keep : ident, airport_type, airport_name, country_iso, city_name, iata_code, state_id
    The missing value in iata_code left with the drop. 
* I extract the State_id from the split of the local_code and rename columns.

In [35]:
airport_code = spark.read.parquet(output_parquet+'airport-codes_csv_staging.parquet_20200526')

In [36]:
airport_code.printSchema()

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: integer (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)



In [37]:
# drop columns
# filter closed , heliport and seaplace base airport, small_airport
# keep us airport
drop_cols = ["elevation_ft","continent", "gps_code", "coordinates"]
drop_airport = ['closed', 'heliport', 'seaplane_base', 'small_airport']
keep_us = ['US']
newdf = airport_code.drop(*drop_cols) \
                    .filter(~airport_code.type.isin(drop_airport)) \
                    .filter(airport_code.iso_country.isin(keep_us))


In [38]:
display(newdf.select([count(when(isnan(c), c)).alias(c) for c in newdf.columns]).toPandas())
display(newdf.select([count(when(col(c).isNull(), c)).alias(c) for c in newdf.columns]).toPandas())

Unnamed: 0,ident,type,name,iso_country,iso_region,municipality,iata_code,local_code
0,0,0,0,0,0,0,0,0


Unnamed: 0,ident,type,name,iso_country,iso_region,municipality,iata_code,local_code
0,0,0,0,0,0,5,60,9


In [39]:
#airport_code.groupBy('iso_country', 'iso_region').agg(count("*")).show()
#l = ['US']
newdf = newdf.withColumn("myisocountry", split(col("iso_region"), "-").getItem(0)) \
            .withColumn("myisoregion", split(col("iso_region"), "-").getItem(1))
newdf = newdf.withColumn("myisocountry",coalesce(newdf.myisocountry,newdf.iso_country))
drop_cols = ['myisocountry', 'iso_region', 'local_code']
newdf = newdf.drop(*drop_cols)


In [40]:
df_airport_code = newdf.filter(~newdf.iata_code.isNull()).dropDuplicates()
df_airport_code = (df_airport_code.withColumnRenamed("ident", "ident") \
                       .withColumnRenamed("type", "airport_type") \
                       .withColumnRenamed("name", "airport_name") \
                       .withColumnRenamed("iso_country", "country_iso2") \
                       .withColumnRenamed("municipality", "city_name" ) \
                       .withColumnRenamed("iata_code", "iata_code") \
                       .withColumnRenamed("myisoregion", "state_id"))
df_airport_code.count()

820

In [41]:
df_airport_code = df_airport_code.filter(~df_airport_code.airport_type.isin('small_airport'))

In [42]:
df_airport_code.show(2)

+-----+--------------+--------------------+------------+----------+---------+--------+
|ident|  airport_type|        airport_name|country_iso2| city_name|iata_code|state_id|
+-----+--------------+--------------------+------------+----------+---------+--------+
| KCWA|medium_airport|Central Wisconsin...|          US|   Mosinee|      CWA|      WI|
| KAFW| large_airport|Fort Worth Allian...|          US|Fort Worth|      AFW|      TX|
+-----+--------------+--------------------+------------+----------+---------+--------+
only showing top 2 rows



In [43]:
display(df_airport_code.select([count(when(isnan(c), c)).alias(c) for c in df_airport_code.columns]).toPandas())
display(df_airport_code.select([count(when(col(c).isNull(), c)).alias(c) for c in df_airport_code.columns]).toPandas())

Unnamed: 0,ident,airport_type,airport_name,country_iso2,city_name,iata_code,state_id
0,0,0,0,0,0,0,0


Unnamed: 0,ident,airport_type,airport_name,country_iso2,city_name,iata_code,state_id
0,0,0,0,0,0,0,0


In [44]:
# missing Value in iata code.
# TODO: make join with 2 others tables 
df_airport_code.printSchema()

root
 |-- ident: string (nullable = true)
 |-- airport_type: string (nullable = true)
 |-- airport_name: string (nullable = true)
 |-- country_iso2: string (nullable = true)
 |-- city_name: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- state_id: string (nullable = true)



### Global Airports Data
* There are some missing values.
* Data clean: I drop ["icao", "latitude", "longitude", "altitude", , "timezone", "dst", "tz_timezone", "data_source"] and keep only the airport in 'type'

In [45]:
global_airports = spark.read.parquet(output_parquet+'airports-extended_staging.parquet_20200526')
newdf = global_airports
global_airports.count()

10667

In [46]:
global_airports.show(2)

+----------+--------------------+-----------+----------------+----+----+------------------+------------------+--------+--------+---+--------------------+-------+-----------+
|airport_ID|                name|       city|         country|iata|icao|          latitude|         longitude|altitude|timezone|dst|         tz_timezone|   type|data_source|
+----------+--------------------+-----------+----------------+----+----+------------------+------------------+--------+--------+---+--------------------+-------+-----------+
|         2|      Madang Airport|     Madang|Papua New Guinea| MAG|AYMD|    -5.20707988739|     145.789001465|      20|      10|  U|Pacific/Port_Moresby|airport|OurAirports|
|         3|Mount Hagen Kagam...|Mount Hagen|Papua New Guinea| HGU|AYMH|-5.826789855957031|144.29600524902344|    5388|      10|  U|Pacific/Port_Moresby|airport|OurAirports|
+----------+--------------------+-----------+----------------+----+----+------------------+------------------+--------+--------+--

In [47]:
drop_cols = ["icao","type", "latitude", "longitude", "altitude", "timezone", "dst", "tz_timezone", "data_source"]
newdf = global_airports.filter(global_airports.type.isin('airport', 'unknown')) \
                    .drop(*drop_cols)

In [48]:
display(newdf.select([count(when(isnan(c), c)).alias(c) for c in newdf.columns]).toPandas())
display(newdf.select([count(when(col(c).isNull(), c)).alias(c) for c in newdf.columns]).toPandas())

Unnamed: 0,airport_ID,name,city,country,iata
0,0,0,0,0,0


Unnamed: 0,airport_ID,name,city,country,iata
0,0,0,44,0,1


In [49]:
df_global_airports = newdf.select(col("airport_ID").alias("airport_id").cast("int"), \
                                  col("name").alias("airport_name"), \
                                  col("city").alias("city_name"), \
                                  col("country").alias("country_name"), \
                                  col("iata").alias("iata_code"))


In [50]:
df_global_airports.printSchema()

root
 |-- airport_id: integer (nullable = true)
 |-- airport_name: string (nullable = true)
 |-- city_name: string (nullable = true)
 |-- country_name: string (nullable = true)
 |-- iata_code: string (nullable = true)



In [51]:
df_global_airports.show(5)

+----------+--------------------+------------+----------------+---------+
|airport_id|        airport_name|   city_name|    country_name|iata_code|
+----------+--------------------+------------+----------------+---------+
|         2|      Madang Airport|      Madang|Papua New Guinea|      MAG|
|         3|Mount Hagen Kagam...| Mount Hagen|Papua New Guinea|      HGU|
|         4|      Nadzab Airport|      Nadzab|Papua New Guinea|      LAE|
|         5|Port Moresby Jack...|Port Moresby|Papua New Guinea|      POM|
|         6|Wewak Internation...|       Wewak|Papua New Guinea|      WWK|
+----------+--------------------+------------+----------------+---------+
only showing top 5 rows



### Iso Country Data
* No missing values
* I drop 'ISO_3166-2' and rename columns


In [52]:
iso_country = spark.read.parquet(output_parquet+'wikipedia-iso-country-codes_staging.parquet_20200526')

In [53]:
newdf = iso_country
iso_country.count()

246

In [54]:
display(newdf.select([count(when(isnan(c), c)).alias(c) for c in newdf.columns]).toPandas())
display(newdf.select([count(when(col(c).isNull(), c)).alias(c) for c in newdf.columns]).toPandas())

Unnamed: 0,Country,Alpha_2,Alpha_3,Num_code,ISO_3166-2
0,0,0,0,0,0


Unnamed: 0,Country,Alpha_2,Alpha_3,Num_code,ISO_3166-2
0,0,0,0,0,0


In [55]:
iso_country.show(2)

+--------+-------+-------+--------+-------------+
| Country|Alpha_2|Alpha_3|Num_code|   ISO_3166-2|
+--------+-------+-------+--------+-------------+
|Zimbabwe|     ZW|    ZWE|     716|ISO 3166-2:ZW|
|  Zambia|     ZM|    ZMB|     894|ISO 3166-2:ZM|
+--------+-------+-------+--------+-------------+
only showing top 2 rows



In [98]:
df_iso_country =  newdf.drop("ISO_3166-2") \
                    .select(col("Country").alias("country_name"), \
                            col("Alpha_2").alias("country_iso2"), \
                            col("Alpha_3").alias("country_iso3"),
                            col("Num_code").alias("country_num") \
                    .cast("int"))

AnalysisException: "cannot resolve '`Country`' given input columns: [country_name.Country_Name, country_name.indicator_group, country_name.avg(2015)];;\n'Project ['Country AS country_name#2540, 'Alpha_2 AS country_iso2#2541, 'Alpha_3 AS country_iso3#2542, unresolvedalias(cast('Num_code as int), None)]\n+- SubqueryAlias `country_name`\n   +- Sort [Country_Name#2377 ASC NULLS FIRST], true\n      +- Aggregate [Country_Name#2377, indicator_group#2490], [Country_Name#2377, indicator_group#2490, avg(2015#2381) AS avg(2015)#2519]\n         +- Project [Country_Name#2377, Country_Code#2378, Indicator_Name#2379, Indicator_Code#2380, 2015#2381, CASE WHEN lower(Indicator_Name#2379) RLIKE population|birth|death|fertility|mortality|expectancy THEN cast(demography as string) WHEN lower(Indicator_Name#2379) RLIKE food|grain|nutrition|calories THEN cast(food as string) WHEN lower(Indicator_Name#2379) RLIKE trade|import|export|good|shipping|shipment THEN cast(trade as string) WHEN lower(Indicator_Name#2379) RLIKE health|desease|hospital|mortality|doctor THEN cast(health as string) WHEN lower(Indicator_Name#2379) RLIKE income|gdp|gni|deficit|budget|market|stock|bond|infrastructure THEN cast(economy as string) WHEN lower(Indicator_Name#2379) RLIKE fuel|energy|power|emission|electric|electricity THEN cast(energy as string) WHEN lower(Indicator_Name#2379) RLIKE education|literacy THEN cast(education as string) WHEN lower(Indicator_Name#2379) RLIKE employed|employment|umemployed|unemployment THEN cast(employment as string) WHEN lower(Indicator_Name#2379) RLIKE rural|village THEN cast(rural as string) WHEN lower(Indicator_Name#2379) RLIKE urban|city THEN cast(urban as string) END AS indicator_group#2490]\n            +- Relation[Country_Name#2377,Country_Code#2378,Indicator_Name#2379,Indicator_Code#2380,2015#2381] parquet\n"

In [57]:
df_iso_country.show(5)

+-----------------+------------+------------+-----------+
|     country_name|country_iso2|country_iso2|country_num|
+-----------------+------------+------------+-----------+
|         Zimbabwe|          ZW|         ZWE|        716|
|           Zambia|          ZM|         ZMB|        894|
|            Yemen|          YE|         YEM|        887|
|   Western Sahara|          EH|         ESH|        732|
|Wallis and Futuna|          WF|         WLF|        876|
+-----------------+------------+------------+-----------+
only showing top 5 rows



### US cities Demographics
* missing values
* dataclean
* df_demograph

In [58]:
demograph = spark.read.parquet(output_parquet+'us-cities-demographics_staging.parquet_20200526')

In [59]:
demograph.count()

2891

In [60]:
drop_cols = ["Number_of_Veterans"]
newdf = demograph.drop(*drop_cols) \
                 .select(col("City").alias("city_name"), \
                         col("State").alias("state_name"), \
                         col("Median_age").alias("median_age"), \
                         col("Male_population").alias("male_population"), \
                         col("Female_population").alias("female_population"), \
                         col("Total_population").alias("totale_population"), \
                         col("Foreign-born").alias("foreign_born"), \
                         col("State_Code").alias("state_id"), \
                         col("Race").alias("ethnic"), \
                         col("Count").alias("ethic_count").cast("int"))
                         

In [61]:
newdf = newdf.groupBy("state_name", "state_id", "city_name", "median_age", "male_population", "female_population", "ethnic") \
        .agg(sum("ethic_count")) \
        .orderBy("state_name", "city_name", "ethnic") \


In [62]:
newdf.show(5)

+----------+--------+----------+----------+---------------+-----------------+--------------------+----------------+
|state_name|state_id| city_name|median_age|male_population|female_population|              ethnic|sum(ethic_count)|
+----------+--------+----------+----------+---------------+-----------------+--------------------+----------------+
|   Alabama|      AL|Birmingham|      35.6|         102122|           112789|American Indian a...|            1319|
|   Alabama|      AL|Birmingham|      35.6|         102122|           112789|               Asian|            1500|
|   Alabama|      AL|Birmingham|      35.6|         102122|           112789|Black or African-...|          157985|
|   Alabama|      AL|Birmingham|      35.6|         102122|           112789|  Hispanic or Latino|            8940|
|   Alabama|      AL|Birmingham|      35.6|         102122|           112789|               White|           51728|
+----------+--------+----------+----------+---------------+-------------

In [63]:
display(newdf.select([count(when(isnan(c), c)).alias(c) for c in newdf.columns]).toPandas())
display(newdf.select([count(when(col(c).isNull(), c)).alias(c) for c in newdf.columns]).toPandas())

Unnamed: 0,state_name,state_id,city_name,median_age,male_population,female_population,ethnic,sum(ethic_count)
0,0,0,0,0,0,0,0,0


Unnamed: 0,state_name,state_id,city_name,median_age,male_population,female_population,ethnic,sum(ethic_count)
0,0,0,0,0,3,3,0,0


In [64]:
newdf.printSchema()

root
 |-- state_name: string (nullable = true)
 |-- state_id: string (nullable = true)
 |-- city_name: string (nullable = true)
 |-- median_age: float (nullable = true)
 |-- male_population: integer (nullable = true)
 |-- female_population: integer (nullable = true)
 |-- ethnic: string (nullable = true)
 |-- sum(ethic_count): long (nullable = true)



In [65]:
demograph.show(2)

+-------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+------------------+-----+
|         City|        State|Median_Age|Male_Population|Female_Population|Total_Population|Number_of_Veterans|Foreign-born|Average_Household_Size|State_Code|              Race|Count|
+-------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+------------------+-----+
|Silver Spring|     Maryland|      33.8|          40601|            41862|           82463|              1562|       30908|                   2.6|        MD|Hispanic or Latino|25924|
|       Quincy|Massachusetts|      41.0|          44129|            49500|           93629|              4147|       32935|                  2.39|        MA|             White|58723|
+-------------+-------------+----------+---------------+-----------------+-----------

### World Development Indicators Data
* df_indicator_dev

In [66]:
indicator_dev = spark.read.parquet(output_parquet+'WDIData_staging.parquet_20200526')

In [67]:
keep_cols = ['Country Name', 'Country Code', 'Indicator Name', 'Indicator Code', '2015']

In [169]:
indicator_dev.show()

+------------+------------+--------------------+--------------------+-------------------+
|Country_Name|Country_Code|      Indicator_Name|      Indicator_Code|               2015|
+------------+------------+--------------------+--------------------+-------------------+
|    Honduras|         HND|GDP per capita (c...|      NY.GDP.PCAP.KD|   2052.97245412901|
|    Honduras|         HND|GDP per capita (c...|      NY.GDP.PCAP.KN|   20860.1066881336|
|    Honduras|         HND|GDP per capita (c...|      NY.GDP.PCAP.CN|   50522.2697103759|
|    Honduras|         HND|GDP per capita (c...|      NY.GDP.PCAP.CD|   2286.20003874608|
|    Honduras|         HND|GDP per capita gr...|   NY.GDP.PCAP.KD.ZG|   2.04736639148943|
|    Honduras|         HND|GDP per capita, P...|   NY.GDP.PCAP.PP.KD|   4247.38174923163|
|    Honduras|         HND|GDP per capita, P...|   NY.GDP.PCAP.PP.CD|   4536.13535649536|
|    Honduras|         HND|GDP per person em...|   SL.GDP.PCAP.EM.KD|      10284.4609375|
|    Hondu

In [69]:
indicator_dev.count()

422136

In [185]:
indicator_dev.printSchema()

root
 |-- Country_Name: string (nullable = true)
 |-- Country_Code: string (nullable = true)
 |-- Indicator_Name: string (nullable = true)
 |-- Indicator_Code: string (nullable = true)
 |-- 2015: double (nullable = true)



In [186]:
# count distinc in column
#df_indicator_dev.select(F.countDistinct('Country_Name')).show()
[i.Country_Name for i in indicator_dev.select('Country_Name').distinct().collect()]

['South Asia',
 'Chad',
 'Paraguay',
 'Lower middle income',
 'Low & middle income',
 'Heavily indebted poor countries (HIPC)',
 'World',
 'Congo, Dem. Rep.',
 'Senegal',
 'Cabo Verde',
 'East Asia & Pacific (IDA & IBRD countries)',
 'Sweden',
 'Kiribati',
 'Least developed countries: UN classification',
 'Guyana',
 'Eritrea',
 'Pacific island small states',
 'Philippines',
 'Djibouti',
 'Tonga',
 'Malaysia',
 'Singapore',
 'Fiji',
 'Turkey',
 'Malawi',
 'Iraq',
 'Sint Maarten (Dutch part)',
 'Northern Mariana Islands',
 'Germany',
 'Comoros',
 'Cambodia',
 'Afghanistan',
 'Jordan',
 'Maldives',
 'Rwanda',
 'Sudan',
 'Palau',
 'France',
 'Turks and Caicos Islands',
 'Greece',
 'Kosovo',
 'Middle income',
 'Late-demographic dividend',
 'Caribbean small states',
 'Sri Lanka',
 'Macao SAR, China',
 'British Virgin Islands',
 'Dominica',
 'Equatorial Guinea',
 'Algeria',
 'Togo',
 'Not classified',
 'Argentina',
 'Iran, Islamic Rep.',
 'Belgium',
 'Angola',
 'San Marino',
 'Ecuador',
 'Qat

In [187]:
demography = ['population','birth','death','fertility','mortality','expectancy']
food = ['food','grain','nutrition','calories']
trade = ['trade','import','export','good','shipping','shipment']
health = ['health','desease','hospital','mortality','doctor']
economy = ['income','gdp','gni','deficit','budget','market','stock','bond','infrastructure']
energy = ['fuel','energy','power','emission','electric','electricity']
education = ['education','literacy']
employment =['employed','employment','umemployed','unemployment']
rural = ['rural','village']
urban = ['urban','city']

In [200]:
#indicator_dev.show(2)
#newdf = indicator_dev.where(F.col('2015').isnotNull())
newdf = indicator_dev.where(F.col("2015").isNotNull())
newdf = newdf.withColumnRenamed('2015', 'indic_2015')
newdf = newdf.withColumn('indic_2015', newdf.indic_2015.cast(DecimalType(18, 2)))
newdf.show()
#df.withColumn('total_sale_volume', df.total_sale_volume.cast(DecimalType(18, 2)))

+------------+------------+--------------------+-----------------+---------------+
|Country_Name|Country_Code|      Indicator_Name|   Indicator_Code|     indic_2015|
+------------+------------+--------------------+-----------------+---------------+
|    Honduras|         HND|GDP per capita (c...|   NY.GDP.PCAP.KD|        2052.97|
|    Honduras|         HND|GDP per capita (c...|   NY.GDP.PCAP.KN|       20860.11|
|    Honduras|         HND|GDP per capita (c...|   NY.GDP.PCAP.CN|       50522.27|
|    Honduras|         HND|GDP per capita (c...|   NY.GDP.PCAP.CD|        2286.20|
|    Honduras|         HND|GDP per capita gr...|NY.GDP.PCAP.KD.ZG|           2.05|
|    Honduras|         HND|GDP per capita, P...|NY.GDP.PCAP.PP.KD|        4247.38|
|    Honduras|         HND|GDP per capita, P...|NY.GDP.PCAP.PP.CD|        4536.14|
|    Honduras|         HND|GDP per person em...|SL.GDP.PCAP.EM.KD|       10284.46|
|    Honduras|         HND|GDP, PPP (constan...|NY.GDP.MKTP.PP.KD| 38706033100.68|
|   

In [201]:
# Make a new column with a key word link to the indicator_name
newdf = newdf.withColumn(
    "indicator_group", 
    F.when( F.lower(col('Indicator_Name')).rlike('|'.join(demography)), F.lit('demography').cast('string')) \
    .when( F.lower(col('Indicator_Name')).rlike('|'.join(food)), F.lit('food').cast('string')) \
    .when( F.lower(col('Indicator_Name')).rlike('|'.join(trade)), F.lit('trade').cast('string')) \
    .when( F.lower(col('Indicator_Name')).rlike('|'.join(health)), F.lit('health').cast('string')) \
    .when( F.lower(col('Indicator_Name')).rlike('|'.join(economy)), F.lit('economy').cast('string')) \
    .when( F.lower(col('Indicator_Name')).rlike('|'.join(energy)), F.lit('energy').cast('string')) \
    .when( F.lower(col('Indicator_Name')).rlike('|'.join(education)), F.lit('education').cast('string')) \
    .when( F.lower(col('Indicator_Name')).rlike('|'.join(employment)), F.lit('employment').cast('string')) \
    .when( F.lower(col('Indicator_Name')).rlike('|'.join(rural)), F.lit('rural').cast('string')) \
    .when( F.lower(col('Indicator_Name')).rlike('|'.join(urban)), F.lit('urban').cast('string')))         


In [202]:
#newdf.filter(newdf.indicator_group.isNull()).select('Indicator_Name').distinct().show(truncate=False)

In [204]:
newdf = newdf.groupBy('Country_Name', 'Country_code', 'indicator_group') \
             .agg(avg('indic_2015')).alias('avg_2015') \
             .orderBy('Country_Name', 'indicator_group') \
             .where(col('indicator_group').isNotNull())

In [205]:
df_indicator_dev = newdf \
    .select(col('Country_name').alias('countr_name'), \
            col('Country_code').alias('country_code'), \
            'indicator_group', \
            F.round(col('avg(indic_2015)'), 2).alias('avg_2015'))
df_indicator_dev.printSchema()
df_indicator_dev.show()



root
 |-- countr_name: string (nullable = true)
 |-- country_code: string (nullable = true)
 |-- indicator_group: string (nullable = true)
 |-- avg_2015: decimal(22,2) (nullable = true)

+-----------+------------+---------------+---------------+
|countr_name|country_code|indicator_group|       avg_2015|
+-----------+------------+---------------+---------------+
|Afghanistan|         AFG|     demography|     1163499.16|
|Afghanistan|         AFG|        economy| 34890745342.78|
|Afghanistan|         AFG|      education|    18307359.06|
|Afghanistan|         AFG|     employment|          33.46|
|Afghanistan|         AFG|         energy|   135179888.48|
|Afghanistan|         AFG|           food|          51.22|
|Afghanistan|         AFG|         health|          42.98|
|Afghanistan|         AFG|          trade|  5911911338.74|
|Afghanistan|         AFG|          urban|          51.11|
|    Albania|         ALB|     demography|       87476.38|
|    Albania|         ALB|        economy| 571

### [UP](#enhaut)

In [None]:

def get_key(my): 
    for key, value in port_state_dic.items():
        if my == key: 
            return(key, value)
        if my == value:
                return(key, value)
print(get_key('XT'))


### enbas

In [None]:
# i94_mode
i94_mode = pd.read_parquet(path+'i94mode.parquet')
#i94_mode.head(5)
# dictionnary
mode_dic = i94_mode.set_index("Mode_id")["Mode"].to_dict()

#display(i94_mode.head())
#i94_mode = pd.read_csv(path+'i94mode.csv')
#display(mode_dic)
#{'1': 'air', '2': 'sea', '3': 'land', '9': 'not reported'}

# i94cit_i94res
path = '../../data/'
i94_ctry = pd.read_parquet(path+'i94cit_i94res.parquet')
#i94_ctry.head(5)
# dictionnary
ctry_dic = i94_ctry.set_index("Country_id")['Country'].to_dict()

#display(ctry_dic)
#{582: 'mexico',
# 236: 'afghanistan',
# 101: 'albania',...}
#display(i94_ctry.head())

# i94_addr
path = '../../data/'
i94_addr = pd.read_parquet(path+'i94addr.parquet')
i94_addr.head(5)
# dictionnary
addr_dic = i94_addr.set_index("State_id")['State'].to_dict()
#display(addr_dic)
#{'al': 'alabama',
# 'ak': 'alaska',
# 'az': 'arizona',...}
#display(i94_addr.head())

# i94_visa
path = '../../data/'
i94_visa = pd.read_parquet(path+'i94visa.parquet')
#i94_visa.head(5)
# dictionnary
visa_dic = i94_visa.set_index('Code_visa')['Visa'].to_dict()

#display(visa_dic)
# {1: 'business', 2: 'pleasure', 3: 'student'}
#display(i94_visa.head())

# i94_port
path = '../../data/'
i94_port = pd.read_parquet(path+'i94port.parquet')
#i94_port.head(5)
# dictionnary
port_dic= dict([(i,[a,b]) for i, a,b in zip(i94_port.Port_id, i94_port.Port_city,i94_port.State_id)])
port_state_dic = dict([(i,a) for i, a in zip(i94_port.Port_id,i94_port.State_id)])


# each row becomes a dictionary where key is column name and value is the data in the cell
# [{'Port_id': 'alc', 'Port_city': 'alcan', 'State_id': 'ak'},
# {'Port_id': 'anc', 'Port_city': 'anchorage', 'State_id': 'ak'},...]
#port_dic = i94_port.to_dict('records')
#display(port_dic)
#display(i94_port.head())

#display(port_state_dic)
#display(port_dic)
#{'alc': ['alcan', 'ak'],
# 'anc': ['anchorage', 'ak'],
# 'bar': ['baker aaf - baker island', 'ak'],

In [None]:
def to_null(c):
    return when(~(col(c).isNull() | isnan(col(c)) | (trim(col(c)) == "")), col(c))


df.select([to_null(c).alias(c) for c in df.columns]).na.drop().show()

In [None]:
# if not duplicates, the twice values are the same
display(f"There are {df_immigration.distinct().count()} distinc values")
display(f"There are {df_immigration.count()} values") 

In [None]:
df.printSchema()

In [None]:
#Count of null values of dataframe in pyspark 
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).toPandas()

In [None]:
#drop_columns = ['cicid', 'i94yr', 'i94mon', 'i94visa', 'gender', 'airline', 'visatype']

In [None]:
newdf.where(col("i94addr").isNull()).filter(newdf.i94port =='MIA').show()


In [None]:
#Count of null values of dataframe in pyspark 
newdf.select([count(when(col(c).isNull(), c)).alias(c) for c in newdf.columns]).toPandas()

In [None]:
valuesA = [('',1, 'lun'),('Monkey',2, 'mar'),('',3, 'mer'),('Darth Vader',4, 'jeu')]
TableA = spark.createDataFrame(valuesA,['name','myid', 'day'])
# Put null in empty cell
TableA = TableA.withColumn('name', when(col('name') == '', None).otherwise(col('name')))


valuesB = [('Rutabaga',1),('Pirate',2),('Ninja',3),('Darth Vader',4)]
TableB = spark.createDataFrame(valuesB,['name','myid'])
 
TableA.show()
TableB.show()
TableA.createOrReplaceTempView("TableA")
TableB.createOrReplaceTempView("TableB")

TableA.alias('A').join(TableB.alias('B'), on='myid', how='left') \
                 .select(
                        'myid',
                        'day',
                        F.when(
                            F.isnull(F.col('A.name')),
                            F.col('B.name')
                        ).otherwise(F.col('A.name')).alias('name')
                    ) \
                 .show()

In [None]:
Basically below line of code check all 5 SAL fields and if it is null, replace it with 0. 
If not keep the original value.

df1 = df.withColumn("SAL1", when(df.SAL1.isNull(), lit(0)).otherwise(df.SAL1))\
.withColumn("SAL2", when(df.SAL2.isNull(), lit(0)).otherwise(df.SAL2))\
.withColumn("SAL3", when(df.SAL3.isNull(), lit(0)).otherwise(df.SAL3))\
.withColumn("SAL4", when(df.SAL4.isNull(), lit(0)).otherwise(df.SAL4))\
.withColumn("SAL5", when(df.SAL5.isNull(), lit(0)).otherwise(df.SAL5))\





In [None]:
ta.join(tb, ta.myid == tb.myid, 'left').select(tb.myid, coalesce(ta.name, tb.name)).show(truncate=False)