In [1]:
# Imports and installs
import pandas as pd
import importlib
import os
import configparser
import datetime as dt

from pyspark.sql import SparkSession
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.functions import isnan, when, count, col, udf, dayofmonth, dayofweek, month, year, weekofyear



ModuleNotFoundError: No module named 'ocs'

In [None]:
config = configparser.ConfigParser()
config.read('iam.cfg')
os.environ['AWS_ACCESS_KEY_ID']=config['AWS_CREDS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS_CREDS']['AWS_SECRET_ACCESS_KEY']

spark = SparkSession.builder.\
    config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11").\
    enableHiveSupport().getOrCreate()

#### World Happiness dataset
<hr style="background-color: #b7d0e2;"/> 

Import and check world happiness dataset for 2016 year

In [3]:
# Read in the data and count
file_name = "happiness_2016.csv"
happy_df = spark.read.csv(file_name, inferSchema=True, header=True, sep=',')
happy_df.count()

157

In [4]:
# display the first records
happy_df.limit(5).toPandas()

Unnamed: 0,Country,Region,Happy_Rank,Happy_Score,lconf_level,Hconf_level,Economy,Family,Health,Freedom,Trust,Generosity,Dystopia_res
0,Denmark,Western Europe,1,7.526,7.46,7.592,1.44178,1.16374,0.79504,0.57941,0.44453,0.36171,2.73939
1,Switzerland,Western Europe,2,7.509,7.428,7.59,1.52733,1.14524,0.86303,0.58557,0.41203,0.28083,2.69463
2,Iceland,Western Europe,3,7.501,7.333,7.669,1.42666,1.18326,0.86733,0.56624,0.14975,0.47678,2.83137
3,Norway,Western Europe,4,7.498,7.421,7.575,1.57744,1.1269,0.79579,0.59609,0.35776,0.37895,2.66465
4,Finland,Western Europe,5,7.413,7.351,7.475,1.40598,1.13464,0.81091,0.57104,0.41004,0.25492,2.82596


### Step 3: Define the Data Model

### 3.1 Conceptual Data Model
![Database schema](Capstone_project_happy_data_model.jpg)

In this project we have 5 dimensions table and 1 fact table. 
The airport dimension table comes from airport_codes_csv and link to fact table through the i94port field.
The country dimension table is made up of data from the immigration datasets and link to fact table through the country_code field.
The happiness dimension table comes from the happiness dataset and links to the immigration fact table through the country_code field. 
The visa type dimension table comes from the immigration datasets and links to the immigaration through the visa_type_key field. 
The immigration fact table is the heart of the data model. This table's data comes from the immigration data sets and contains keys that links to the dimension tables.

### 3.2 Mapping Out Data Pipelines
The pipeline steps are as follows:
* Load the datasets and analysis data
* Clean all data missing data and column 
* Create visa_type, country, immigration calendar, happiness and airport dimension table
* Create immigration fact table
* Checking the number of data

In [23]:
def create_country_dim(df, output_data):
    """This function creates a country table

    :param df: spark dataframe of immigration events
    :param output_data: path to write dimension dataframe
    :return: spark dataframe country dimension
    """
    # load state csv
    map_codes = pd.read_csv('i94_state.csv')
    
    @udf()
    def get_country_name(code):
        name = map_codes[map_codes['code']==code]['Name'].iloc[0]
        if name:
            return name.title()
        return None
        
    # select and rename i94_state column
    country_df = df.select(['i94res']).distinct().withColumnRenamed('i94res', 'country_code')
    
    # create country_name column
    country_df = country_df.withColumn('country_name', get_country_name(country_df.country_code))
    
    # write the dimension to a parquet file
    country_df.write.parquet(output_data + "country", mode="overwrite")
    
    return country_df

In [19]:
def create_happiness_dim(df, output_data):
    """This function create happiness dimension table
    
    :param df: spark dataframe of happiness data
    :param output_data: path to write dimension dataframe
    :return: spark dataframe happiness dimension
    """
    # load state csv
    map_codes = pd.read_csv('i94_state.csv')
    
    # Title name
    map_codes['Name'] = map_codes['Name'].str.title() 
    
    @udf()
    def get_country_code(name):
        code = map_codes['code'] [map_codes['Name']==name].iloc[0]
        if code:
            return code
        return None

    # create country_id column
    #df = df.withColumn('Country_id', get_country_code(df.Country))
    
    # write dimension to parquet file
    df.write.parquet(output_data + "happiness", mode="overwrite")
    
    return df

In [20]:
output_data = "data_tables/"
happiness_dim_df = create_happiness_dim(happy_df, output_data)

# show first 5 
happiness_dim_df.show(5)

+-----------+--------------+----------+-----------+-----------+-----------+-------+-------+-------+-------+-------+----------+------------+
|    Country|        Region|Happy_Rank|Happy_Score|lconf_level|Hconf_level|Economy| Family| Health|Freedom|  Trust|Generosity|Dystopia_res|
+-----------+--------------+----------+-----------+-----------+-----------+-------+-------+-------+-------+-------+----------+------------+
|    Denmark|Western Europe|         1|      7.526|       7.46|      7.592|1.44178|1.16374|0.79504|0.57941|0.44453|   0.36171|     2.73939|
|Switzerland|Western Europe|         2|      7.509|      7.428|       7.59|1.52733|1.14524|0.86303|0.58557|0.41203|   0.28083|     2.69463|
|    Iceland|Western Europe|         3|      7.501|      7.333|      7.669|1.42666|1.18326|0.86733|0.56624|0.14975|   0.47678|     2.83137|
|     Norway|Western Europe|         4|      7.498|      7.421|      7.575|1.57744| 1.1269|0.79579|0.59609|0.35776|   0.37895|     2.66465|
|    Finland|Western