In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, lit
import pyspark.sql.functions as F
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, DateType
from pyspark.sql.functions import col
from pyspark.sql.functions import *

import pandas as pd
pd.set_option('max_columns', None)

In [2]:
def create_spark_session():
    """
    spark configuration.
    """
    print("Start the application")
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.8.5") \
        .getOrCreate()
    return spark

In [3]:
# local paths in S3
root_path = "/home/alison/curso/ED-Udacity/capstone-athena/projeto-udacity/data/"

In [4]:
def load_temp_state(spark):
    
    global_land_temperatures_by_state_path = root_path + "temperatures/GlobalLandTemperaturesByState.csv"

    global_temperatures_by_state_schema = StructType([\
                                                  StructField("t_state_dt", DateType(), False),
                                                  StructField('t_state_average_temperature', DoubleType(), False),
                                                  StructField('t_state_average_temperature_uncertainty', DoubleType(), False),
                                                  StructField('t_state_state', StringType(), False),
                                                  StructField('t_state_country', StringType(), False)
                                          ]) 
    
    return spark\
    .read\
    .format('com.databricks.spark.csv')\
    .option("sep",",")\
    .option("header", "true")\
    .option("encoding", "UTF-8")\
    .schema(global_temperatures_by_state_schema)\
    .load(global_land_temperatures_by_state_path)

In [5]:
def load_temp_glob(spark):
    
    global_temperatures_path = root_path + "temperatures/GlobalTemperatures.csv"

    global_temperatures_schema = StructType([ \
                                  StructField("t_glob_dt", DateType(), False),           
                                  StructField("t_glob_land_average_temperature", DoubleType(), False),
                                  StructField("t_glob_land_average_temperature_uncertainty", DoubleType(), False),
                                  StructField("t_glob_land_max_temperature", DoubleType(), False), 
                                  StructField("t_glob_land_max_temperature_uncertainty", DoubleType(), False),
                                  StructField("t_glob_land_min_temperature", DoubleType(), False), 
                                  StructField("t_glob_land_min_temperature_uncertainty", DoubleType(), False),
                                  StructField("t_glob_land_and_ocean_average_temperature", DoubleType(), False),
                                  StructField("t_glob_land_and_ocean_average_temperature_uncertainty", DoubleType(), False)           
                            ])
    return spark\
            .read\
            .format('com.databricks.spark.csv')\
            .option('sep',',')\
            .option("header", "true")\
            .option("encoding", "UTF-8")\
            .schema(global_temperatures_schema)\
            .load(global_temperatures_path)

In [6]:
def load_temp_major_city(spark):

    global_land_temperatures_by_major_city_path = root_path + "temperatures/GlobalLandTemperaturesByMajorCity.csv"

    global_temp_major_city_schema = StructType([\
                                                StructField("t_m_city_dt", DateType(), False),
                                                StructField("t_m_city_average_temperature", DoubleType(), False),
                                                StructField("t_m_city_average_temperature_uncertainty", DoubleType(), False),
                                                StructField("t_m_city_city", StringType(), False),
                                                StructField("t_m_city_country", StringType(), False),
                                                StructField("t_m_city_latitude", StringType(), False),
                                                StructField("t_m_city_longitude", StringType(), False)
                                ])
    
    return spark\
            .read\
            .format('com.databricks.spark.csv')\
            .option('sep',',')\
            .option('header', 'true')\
            .option("encoding", "UTF-8")\
            .schema(global_temp_major_city_schema)\
            .load(global_land_temperatures_by_major_city_path)

In [7]:
def load_temp_by_country(spark):
    
    global_land_temperatures_by_country_path = root_path + "temperatures/GlobalLandTemperaturesByCountry.csv"

    global_land_temp_by_country_schema = StructType([\
                                                      StructField("t_country_dt", DateType(), False),
                                                      StructField("t_country_average_temperature", DoubleType(), False),
                                                      StructField("t_country_average_temperature_uncertainty", DoubleType(), False),
                                                      StructField("t_country_country", StringType(), False)
                                      ])
    return spark\
            .read\
            .format('com.databricks.spark.csv')\
            .option('sep',',')\
            .option('header', 'true')\
            .option("encoding", "UTF-8")\
            .schema(global_land_temp_by_country_schema)\
            .load(global_land_temperatures_by_country_path)

In [8]:
def load_temp_by_city(spark):
    
    global_land_temperatures_by_city_path = root_path + "temperatures/GlobalLandTemperaturesByCity.csv"

    global_land_temp_by_city_schema = StructType([\
                                                  StructField("t_city_dt", DateType(), False),
                                                  StructField("t_city_average_temperature", DoubleType(), False),
                                                  StructField("t_city_average_temperature_uncertainty", DoubleType(), False),
                                                  StructField("t_city_city", StringType(), False),
                                                  StructField("t_city_country", StringType(), False),
                                                  StructField("t_city_latitude", StringType(), False),
                                                  StructField("t_city_longitude", StringType(), False)
                                      ])
    
    return spark\
            .read\
            .format('com.databricks.spark.csv')\
            .option('sep',',')\
            .option('header', 'true')\
            .option("encoding", "utf-8")\
            .schema(global_land_temp_by_city_schema)\
            .load(global_land_temperatures_by_city_path)

In [9]:
def load_cities_demographics(spark):
    
    us_cities_demographics_path = root_path + "us_cities_demographics.csv"

    us_cities_demog_schema = StructType([\
                                        StructField("cit_demog_city", StringType(), False),
                                        StructField("cit_demog_state", StringType(), False),
                                        StructField("cit_demog_median_age", DoubleType(), False),
                                        StructField("cit_demog_male_population", IntegerType(), False),
                                        StructField("cit_demog_female_population", IntegerType(), False),
                                        StructField("cit_demog_total_polulation", IntegerType(), False),
                                        StructField("cit_demog_number_veterans", IntegerType(), False),
                                        StructField("cit_demog_foreign_born", IntegerType(), False),
                                        StructField("cit_demog_average_household_size", DoubleType(), False),
                                        StructField("cit_demog_state_code", StringType(), False),
                                        StructField("cit_demog_race", StringType(), False),
                                        StructField("cit_demog_quant", IntegerType(), False)
                                        ])
    
    
    return spark\
            .read\
            .format('com.databricks.spark.csv')\
            .option('sep',';')\
            .option('header', 'true')\
            .option("encoding", "utf-8")\
            .schema(us_cities_demog_schema)\
            .load(us_cities_demographics_path)

In [10]:
def load_airport_codes(spark):
    
    airport_codes_csv_path = root_path + "airport_codes_csv.csv"
    
    airport_codes_schema = StructType([\
                                       StructField("airp_ident", StringType(), False),
                                       StructField("airp_type", StringType(), False),
                                       StructField("airp_name", StringType(), False),
                                       StructField("airp_elevation_ft", IntegerType(), False),
                                       StructField("airp_continent", StringType(), False),
                                       StructField("airp_iso_country", StringType(), False),
                                       StructField("airp_iso_region", StringType(), False),
                                       StructField("airp_municipality", StringType(), False),
                                       StructField("airp_gps_code", StringType(), False),
                                       StructField("airp_iata_code", StringType(), False),
                                       StructField("airp_local_code", StringType(), False),
                                       StructField("airp_coordinates", StringType(), False)
                                      ])
    
    return spark\
            .read\
            .format('com.databricks.spark.csv')\
            .option('sep',',')\
            .option('header', 'true')\
            .option("encoding", "utf-8")\
            .schema(airport_codes_schema)\
            .load(airport_codes_csv_path)

In [11]:
def load_country(spark):
    
    country_path = root_path + "country.csv"

    country_schema = StructType([\
                                  StructField("country_code", IntegerType(), False),
                                  StructField("country_name", StringType(), False)
                                 ])
    
    return spark\
            .read\
            .format('com.databricks.spark.csv')\
            .option('sep',';')\
            .option('header', 'true')\
            .option("encoding", "utf-8")\
            .schema(country_schema)\
            .load(country_path)

In [12]:
def load_transport_vehicle(spark):
    
    transport_vehicle_path = root_path + "transport_vehicle.csv"

    transport_vehicle_schema = StructType([\
                                           StructField("vehi_code", IntegerType(), False),
                                           StructField("vehi_name", StringType(), False)
                                          ])
    
    return spark\
            .read\
            .format('com.databricks.spark.csv')\
            .option('sep',';')\
            .option('header', 'true')\
            .option("encoding", "utf-8")\
            .schema(transport_vehicle_schema)\
            .load(transport_vehicle_path)

In [13]:
def load_state_usa(spark):
    
    state_usa_path = root_path + 'state_usa.csv'

    state_usa_schema = StructType([\
                                   StructField("state_usa_code", StringType(), False),
                                   StructField("state_usa_name", StringType(), False)
                                  ])
    
    return spark\
            .read\
            .format('com.databricks.spark.csv')\
            .option('sep', ';')\
            .option('header', 'true')\
            .option('encoding', 'utf-8')\
            .schema(state_usa_schema)\
            .load(state_usa_path)

In [14]:
def load_motivation(spark):
    
    motivation_path = root_path + 'motivation.csv'

    motivation_schema = StructType([\
                                    StructField("motiv_code", IntegerType(), False),
                                    StructField("motiv_name", StringType(), False)
                                   ])
    
    return spark\
            .read\
            .format('com.databricks.spark.csv')\
            .option('sep', ';')\
            .option('header', 'true')\
            .option('encoding', 'utf-8')\
            .schema(motivation_schema)\
            .load(motivation_path)

In [15]:
def load_immigration(spark):
    
    immigration_path = root_path + 'immigration_data_sample.csv'

    immigration_schema = StructType([\
                                   StructField("immig_passender_id", IntegerType(), False),
                                   StructField("immig_cicid", DoubleType(), False),
                                   StructField("immig_i94yr", DoubleType(), False),
                                   StructField("immig_i94mon", DoubleType(), False),
                                   StructField("immig_i94cit", DoubleType(), False),
                                   StructField("immig_i94res", DoubleType(), False),
                                   StructField("immig_i94port", StringType(), False),
                                   StructField("immig_arrdate", DoubleType(), False),
                                   StructField("immig_i94mode", DoubleType(), False),
                                   StructField("immig_i94addr", StringType(), False),
                                   StructField("immig_depdate", DoubleType(), False),
                                   StructField("immig_i94bir", DoubleType(), False),
                                   StructField("immig_i94visa", DoubleType(), False),
                                   StructField("immig_count", DoubleType(), False),
                                   StructField("immig_dtadfile", StringType(), False),
                                   StructField("immig_visapost", StringType(), False),
                                   StructField("immig_occup", StringType(), False),
                                   StructField("immig_entdepa", StringType(), False),
                                   StructField("immig_entdepd", StringType(), False),
                                   StructField("immig_entdepu", StringType(), False),
                                   StructField("immig_matflag", StringType(), False),
                                   StructField("immig_biryear", DoubleType(), False),
                                   StructField("immig_dtaddto", StringType(), False),
                                   StructField("immig_gender", StringType(), False),
                                   StructField("immig_insnum", StringType(), False),
                                   StructField("immig_airline", StringType(), False),
                                   StructField("immig_admnum", DoubleType(), False),
                                   StructField("immig_fltno", StringType(), False),
                                   StructField("immig_visatype", StringType(), False)
                                  ])
    
    return spark\
            .read\
            .format('com.databricks.spark.csv')\
            .option('sep', ',')\
            .option('header', 'true')\
            .option('encoding', 'utf-8')\
            .schema(immigration_schema)\
            .load(immigration_path)

In [16]:
def load_port(spark):
    
    port_path = root_path + 'port.csv'
    
    port_schema = StructType([\
                               StructField("port_code", StringType(), False),
                               StructField("port_name", StringType(), False)
                             ])
    
    df_port = spark\
            .read\
            .format('com.databricks.spark.csv')\
            .option('sep', ';')\
            .option('header', 'false')\
            .option('encoding', 'utf-8')\
            .schema(port_schema)\
            .load(port_path)

    return df_port\
            .withColumn('column_drop', F.split(df_port['port_name'], ','))\
            .withColumn('port_portal', trim(F.col('column_drop')[0]))\
            .withColumn('port_country_acronym', trim(F.col('column_drop')[1]))\
            .drop('column_drop')

In [17]:
def big_table(spark):
    
    df_temp_glob = load_temp_glob(spark).distinct()
    
    df_temp_state = load_temp_state(spark).select("t_state_state", "t_state_country").distinct()
    
    df_temp_major_city = load_temp_major_city(spark).distinct()
    
    df_temp_country = load_temp_by_country(spark).distinct()
    
    df_temp_city = load_temp_by_city(spark).select("t_city_country", "t_city_city").distinct()
    
    df_cities_demog = load_cities_demographics(spark).select("cit_demog_city").distinct()
    
    df_airport_codes = load_airport_codes(spark).distinct()
    
    df_country = load_country(spark).select("country_name", "country_code").distinct()
    
    df_transport_vehicle = load_transport_vehicle(spark).distinct()
    
    df_state_usa = load_state_usa(spark).distinct()
    
    df_motivation = load_motivation(spark).distinct()

    df_immigration = load_immigration(spark).distinct()
    
    df_port = load_port(spark).distinct()
    
    df_join_state_temp_glob = df_state_usa\
    .join(df_temp_state, upper(df_state_usa.state_usa_name) == upper(df_temp_state.t_state_state))
    
    df_join = df_join_state_temp_glob\
    .join(df_temp_city, upper(df_join_state_temp_glob.t_state_country) == upper(df_temp_city.t_city_country))\
    .join(df_cities_demog, upper(df_temp_city.t_city_city) == upper(df_cities_demog.cit_demog_city))\
    .join(df_airport_codes, upper(df_airport_codes.airp_municipality) == upper(df_cities_demog.cit_demog_city))\
    .join(df_country, upper(df_country.country_name) == upper(df_temp_state.t_state_country))\
    .join(df_immigration, upper(df_immigration.immig_i94addr) == upper(df_join_state_temp_glob.state_usa_code))\
    .join(df_transport_vehicle, df_immigration.immig_i94mode == df_transport_vehicle.vehi_code)\
    .join(df_motivation, df_motivation.motiv_code == df_immigration.immig_i94visa)\
    .drop('cit_demog_city', 'state_usa_name', 't_state_country', 'state_usa_code', 'airp_gps_code', 'airp_local_code',\
         't_city_country', 'country_code', 'vehi_code', 'motiv_code', 'country_code', 'airp_continent', \
         'airp_iata_code')
    
    return df_join

In [18]:
def main():
    
    save_path = "/home/alison/curso/ED-Udacity/capstone-pyspark/results/graph"
    
    spark = create_spark_session()
    
    df_join = big_table(spark)
    
#     df_join.write.format('com.databricks.spark.csv') \
#      .mode('overwrite').option("header", "true").save(save_path)
    
    return df_join
    
#     até aqui tudo esta funcionando, agora falta conectar as tabelas satelites 

In [19]:
result = main()

result.limit(2000).toPandas()

Start the application


Unnamed: 0,t_state_state,t_city_city,airp_ident,airp_type,airp_name,airp_elevation_ft,airp_iso_country,airp_iso_region,airp_municipality,airp_coordinates,country_name,immig_passender_id,immig_cicid,immig_i94yr,immig_i94mon,immig_i94cit,immig_i94res,immig_i94port,immig_arrdate,immig_i94mode,immig_i94addr,immig_depdate,immig_i94bir,immig_i94visa,immig_count,immig_dtadfile,immig_visapost,immig_occup,immig_entdepa,immig_entdepd,immig_entdepu,immig_matflag,immig_biryear,immig_dtaddto,immig_gender,immig_insnum,immig_airline,immig_admnum,immig_fltno,immig_visatype,vehi_name,motiv_name
0,Idaho,Huntington Beach,21CA,heliport,CRC-Huntington Beach Heliport,40.0,US,US-CA,Huntington Beach,"-118.031778, 33.683833",UNITED STATES,2388038,4865466.0,2016.0,4.0,123.0,123.0,SLC,20570.0,1.0,ID,20581.0,28.0,2.0,1.0,20160426,,,G,O,,M,1988.0,07242016,M,,DL,5.929245e+10,00057,WT,Air,Pleasure
1,Idaho,Huntington Beach,CL65,heliport,H.B.P.D. Heliport,56.0,US,US-CA,Huntington Beach,"-118.0009994506836, 33.69499969482422",UNITED STATES,2388038,4865466.0,2016.0,4.0,123.0,123.0,SLC,20570.0,1.0,ID,20581.0,28.0,2.0,1.0,20160426,,,G,O,,M,1988.0,07242016,M,,DL,5.929245e+10,00057,WT,Air,Pleasure
2,Idaho,Huntington Beach,9CL8,heliport,Union Eva Heliport,79.0,US,US-CA,Huntington Beach,"-118.06199645996094, 33.66170120239258",UNITED STATES,2388038,4865466.0,2016.0,4.0,123.0,123.0,SLC,20570.0,1.0,ID,20581.0,28.0,2.0,1.0,20160426,,,G,O,,M,1988.0,07242016,M,,DL,5.929245e+10,00057,WT,Air,Pleasure
3,Idaho,Huntington Beach,9CA8,heliport,Boeing Huntington Beach Heliport,65.0,US,US-CA,Huntington Beach,"-118.03399658203125, 33.74720001220703",UNITED STATES,2388038,4865466.0,2016.0,4.0,123.0,123.0,SLC,20570.0,1.0,ID,20581.0,28.0,2.0,1.0,20160426,,,G,O,,M,1988.0,07242016,M,,DL,5.929245e+10,00057,WT,Air,Pleasure
4,Idaho,Huntington Beach,2CA5,heliport,Platform Emmy Heliport,87.0,US,US-CA,Huntington Beach,"-118.04499816894531, 33.66230010986328",UNITED STATES,2388038,4865466.0,2016.0,4.0,123.0,123.0,SLC,20570.0,1.0,ID,20581.0,28.0,2.0,1.0,20160426,,,G,O,,M,1988.0,07242016,M,,DL,5.929245e+10,00057,WT,Air,Pleasure
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1995,Idaho,Stockton,S74,seaplane_base,Lost Isle Seaplane Base,,US,US-CA,Stockton,"-121.45700073242188, 38.00410079956055",UNITED STATES,2388038,4865466.0,2016.0,4.0,123.0,123.0,SLC,20570.0,1.0,ID,20581.0,28.0,2.0,1.0,20160426,,,G,O,,M,1988.0,07242016,M,,DL,5.929245e+10,00057,WT,Air,Pleasure
1996,Idaho,Stockton,99IL,small_airport,Providence Place Field,959.0,US,US-IL,Stockton,"-89.9574966430664, 42.28670120239258",UNITED STATES,2388038,4865466.0,2016.0,4.0,123.0,123.0,SLC,20570.0,1.0,ID,20581.0,28.0,2.0,1.0,20160426,,,G,O,,M,1988.0,07242016,M,,DL,5.929245e+10,00057,WT,Air,Pleasure
1997,Idaho,Stockton,0S2,small_airport,Stockton Municipal Airport,1973.0,US,US-KS,Stockton,"-99.29509735107422, 39.377201080322266",UNITED STATES,2388038,4865466.0,2016.0,4.0,123.0,123.0,SLC,20570.0,1.0,ID,20581.0,28.0,2.0,1.0,20160426,,,G,O,,M,1988.0,07242016,M,,DL,5.929245e+10,00057,WT,Air,Pleasure
1998,Idaho,Stockton,2M5,seaplane_base,Stockton Lake Seaplane Base,867.0,US,US-MO,Stockton,"-93.75849914550781, 37.650001525878906",UNITED STATES,2388038,4865466.0,2016.0,4.0,123.0,123.0,SLC,20570.0,1.0,ID,20581.0,28.0,2.0,1.0,20160426,,,G,O,,M,1988.0,07242016,M,,DL,5.929245e+10,00057,WT,Air,Pleasure


In [20]:
# quais são os veiculos mais utilizados para a imigração?

immi_vehi = result.groupBy('vehi_name', 'motiv_name')\
    .agg(count('immig_passender_id')\
    .alias('total_count_immi_by_vehicle'))\
    .sort(desc('total_count_immi_by_vehicle'))
    

immi_vehi.toPandas()

Unnamed: 0,vehi_name,motiv_name,total_count_immi_by_vehicle
0,Air,Pleasure,2133054
1,Air,Business,409374
2,Land,Pleasure,55404
3,Air,Student,40014
4,Land,Business,6156
5,Not reported,Pleasure,3078
6,Sea,Pleasure,3078


In [21]:
# quais são os principais motivos que levam a imigrar? 

airport = result.groupBy('airp_name', 'motiv_name')\
        .agg(count('immig_passender_id').alias('total_count_immit'))\
        .sort(desc("total_count_immit"))

airport.toPandas()

Unnamed: 0,airp_name,motiv_name,total_count_immit
0,Children's Hospital Heliport,Pleasure,3565
1,Good Samaritan Hospital Heliport,Pleasure,3565
2,Arlington Municipal Airport,Pleasure,3565
3,Medical Center Heliport,Pleasure,3565
4,Memorial Hospital Heliport,Pleasure,3565
...,...,...,...
8920,Flying Crown Airport,Student,13
8921,Burgess Field,Student,13
8922,Northridge Hospital Heliport,Student,13
8923,Striplin Airfield,Student,13


In [22]:
# quais foram os anos que mais tiveram imigração e quais o motivos deles?

year_immi = result.groupBy('immig_i94yr', 'motiv_name')\
        .agg(count('immig_passender_id').alias('total_count_motiv_by_year'))\
        .sort(desc('immig_i94yr'))

year_immi.toPandas()

Unnamed: 0,immig_i94yr,motiv_name,total_count_motiv_by_year
0,2016.0,Pleasure,2194614
1,2016.0,Business,415530
2,2016.0,Student,40014


In [None]:
# 