# Spark Local Test

In [59]:
import pandas as pd
import numpy as np
from zipfile import ZipFile

import configparser
from pyspark.sql import SparkSession, Window
from pyspark import SparkContext, SparkConf
from pyspark.sql.functions import udf, col, monotonically_increasing_id, row_number
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.types import TimestampType, DateType, StringType
from pyspark.sql import functions as F

from datetime import datetime, timedelta

import os

import config

## Configure Connection

In [2]:
# sc.stop()

In [3]:
output_data = "test_output/"

In [4]:
configure = SparkConf().setAppName('udac_config').setMaster('local')
sc = SparkContext(conf = configure)

In [5]:
# getOrCreate modifies the parameters of existing Spark Session
spark = SparkSession.builder.appName('udac_cap').config('config option', 'config value').getOrCreate()

In [6]:
spark.sparkContext.getConf().getAll()

[('spark.master', 'local'),
 ('spark.sql.warehouse.dir',
  'file:/Users/morgan/Documents/10_Udacity/data_eng_nano/usa-tourism-etl/spark-warehouse'),
 ('spark.app.id', 'local-1617224224461'),
 ('spark.app.startTime', '1617224223552'),
 ('spark.rdd.compress', 'True'),
 ('spark.driver.host', '10.0.0.223'),
 ('spark.app.name', 'udac_config'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.submit.pyFiles', ''),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.driver.port', '50825')]

### Airports

In [7]:
airports = spark.read.option("header", True).csv("data/airport_codes.csv")

In [8]:
airports.show(5)

+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|         coordinates|
+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|  00A|     heliport|   Total Rf Heliport|          11|       NA|         US|     US-PA|    Bensalem|     00A|     null|       00A|-74.9336013793945...|
| 00AA|small_airport|Aero B Ranch Airport|        3435|       NA|         US|     US-KS|       Leoti|    00AA|     null|      00AA|-101.473911, 38.7...|
| 00AK|small_airport|        Lowell Field|         450|       NA|         US|     US-AK|Anchor Point|    00AK|     null|      00AK|-151.695999146, 5...|
| 00AL|small_airport|        Epps Airpark|         820|       NA|         US|     

In [9]:
lat_long = F.split(airports.coordinates, ",")
airports = airports.withColumn('longitude', lat_long.getItem(0))
airports = airports.withColumn('latitude', lat_long.getItem(1))

region_split = F.split(airports.iso_region, "-")
airports = airports.withColumn('state', region_split.getItem(1))

In [10]:
airports = airports.select(['ident',
                 'iata_code',
                 'name','type',
                 'municipality',
                 'state',
                 'local_code',
                 'latitude',
                 'longitude',
                 'elevation_ft']).where(airports.iso_country=="US")

In [11]:
airports = airports.sort('iata_code', ascending=True)

In [12]:
airports = airports.na.drop(subset='iata_code')

In [13]:
airports = airports.withColumn("latitude", airports.latitude.cast('float')) \
                    .withColumn("longitude", airports.longitude.cast('float')) \
                    .withColumn("elevation_fit", airports.elevation_ft.cast('integer'))

In [14]:
airports.show()

+-----+---------+--------------------+--------------+-------------+-----+----------+--------+---------+------------+-------------+
|ident|iata_code|                name|          type| municipality|state|local_code|latitude|longitude|elevation_ft|elevation_fit|
+-----+---------+--------------------+--------------+-------------+-----+----------+--------+---------+------------+-------------+
| KAAF|      AAF|Apalachicola Regi...| small_airport| Apalachicola|   FL|       AAF| 29.7275| -85.0275|          20|           20|
| KAAP|      AAP|      Andrau Airpark|        closed|      Houston|   TX|       AAP| 29.7225| -95.5883|          79|           79|
| KABE|      ABE|Lehigh Valley Int...|medium_airport|    Allentown|   PA|       ABE| 40.6521| -75.4408|         393|          393|
| KABI|      ABI|Abilene Regional ...|medium_airport|      Abilene|   TX|       ABI| 32.4113| -99.6819|        1791|         1791|
| PAFM|      ABL|      Ambler Airport|medium_airport|       Ambler|   AK|       AFM

#### Successfully created parquets

In [15]:
#airports.write.mode('overwrite').parquet(os.path.join(output_data, "airports"))

---
## USA Cities Demographics

In [78]:
cities = spark.read.option('header', True) \
        .option('delimiter', ";") \
        .csv("data/us_cities_demographics.csv")

In [79]:

for original, revised in config.USA_CITIES_RENAME_COLS.items():
    cities = cities.withColumnRenamed(original, revised)

cities = cities.withColumn("state_city", F.concat_ws("_", cities.state_code, cities.city))

In [80]:
for i_var in config.USA_CITIES_INTEGER_VARS:
    cities = cities.withColumn(i_var, cities[i_var].cast('integer'))
    
for f_var in config.USA_CITIES_FLOAT_VARS:
    cities = cities.withColumn(f_var, cities[f_var].cast('float'))

In [73]:
cities2 = cities.dropDuplicates(["state_city"])

In [74]:
race_count = cities.select("state_city", "race", "race_pop")
race_count = race_count.withColumn("race_pop", race_count.race_pop.cast('float'))
race_count = race_count.groupBy("state_city").pivot("race").agg(F.first("race_pop"))

In [75]:
cities_final = cities2.join(race_count, cities2.state_city == race_count.state_city)
cities_final = cities_final.drop("race", "race_pop", "state_city", "state_city")

In [76]:
for original, revised in config.RACE_RENAME_COLS.items():
    cities_final = cities_final.withColumnRenamed(original, revised)

In [77]:
cities_final.printSchema()

root
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- median_age: float (nullable = true)
 |-- male_pop: integer (nullable = true)
 |-- female_pop: integer (nullable = true)
 |-- total_pop: integer (nullable = true)
 |-- num_veterans: integer (nullable = true)
 |-- num_foreigners: integer (nullable = true)
 |-- avg_household_size: float (nullable = true)
 |-- state_code: string (nullable = true)
 |-- native_american_pop: float (nullable = true)
 |-- asian_pop: float (nullable = true)
 |-- black_american_pop: float (nullable = true)
 |-- hispanic_pop: float (nullable = true)
 |-- white_pop: float (nullable = true)



#### Successfully created parquets

In [24]:

# cities_final.write.mode('overwrite').parquet(os.path.join(output_data, "cities"))

---
## USA Temperatures

In [25]:
temperatures = spark.read.option('header', True) \
                .csv("data/GlobalLandTemperaturesByCity.csv")

In [26]:
temperatures.printSchema()

root
 |-- dt: string (nullable = true)
 |-- AverageTemperature: string (nullable = true)
 |-- AverageTemperatureUncertainty: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



In [27]:
temperatures.show(3)

+----------+------------------+-----------------------------+-----+-------+--------+---------+
|        dt|AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|
+----------+------------------+-----------------------------+-----+-------+--------+---------+
|1743-11-01|             6.068|           1.7369999999999999|Århus|Denmark|  57.05N|   10.33E|
|1743-12-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-01-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
+----------+------------------+-----------------------------+-----+-------+--------+---------+
only showing top 3 rows



In [28]:
temperatures = temperatures.select("*").where((temperatures.Country == "United States") & (temperatures.dt > "1969-12-31"))

In [29]:
temperatures = temperatures.withColumnRenamed("dt", "date_time") \
                            .withColumnRenamed("AverageTemperature", "avg_daily_temp") \
                            .withColumnRenamed("AverageTemperatureUncertainty", "avg_temp_temp_uncertainty") \
                            .withColumnRenamed("City", "city") \
                            .withColumnRenamed("Latitude", "latitude") \
                            .withColumnRenamed("Longitude", "longitude")

In [30]:
temperatures = temperatures.withColumn("lat_length", F.length("latitude")) \
                            .withColumn("long_length", F.length("longitude")) \
                            .withColumn("latitude_2", F.expr("""substr(latitude, 1, lat_length-1)""")) \
                            .withColumn("longitude_2", F.expr("""substr(longitude, 1, long_length-1)""")) 

In [31]:
temperatures = temperatures.withColumn("latitude", temperatures.latitude_2.cast('float')) \
                            .withColumn("longitude", temperatures.longitude_2.cast('float'))

temperatures = temperatures.withColumn("longitude", -1 * col("longitude"))

temperatures = temperatures.drop("Country", "lat_length", "long_length", "latitude_2", "longitude_2")

In [32]:
temperatures.show(2)

+----------+--------------+-------------------------+-------+--------+---------+
| date_time|avg_daily_temp|avg_temp_temp_uncertainty|   city|latitude|longitude|
+----------+--------------+-------------------------+-------+--------+---------+
|1970-01-01|         3.969|                    0.289|Abilene|   32.95|  -100.53|
|1970-02-01|         8.463|                    0.177|Abilene|   32.95|  -100.53|
+----------+--------------+-------------------------+-------+--------+---------+
only showing top 2 rows



In [33]:

#temperatures.write.mode('overwrite').parquet(os.path.join(output_data, "temperatures"))

---
## Visits

In [113]:
# UDF
def convert_datetime(num_days):
    try:
        start = datetime(1960, 1, 1)
        return start + timedelta(days=(num_days))
    except:
        return None
    
udf_datetime_from_sas = udf(lambda x: convert_datetime(x), DateType())

In [124]:
visits = spark.read.option('header', True) \
            .option('delimiter', ",") \
            .csv("data/immigration_data_sample.csv")

df2 = spark.read.option('header', True).csv("data/airport_dict.csv")

df3 = spark.read.option('header', True).csv("data/country_codes.csv")

In [125]:
cities_dict = df2.withColumn("city", F.split(col("airport"), ",").getItem(0))

cities_dict = cities_dict.withColumn("city", F.initcap("city")) \
                        .drop("airport", "state")

In [126]:
countries_dict = df3.withColumn("country", F.initcap("country")) \
                    .withColumn("country_code", df3["country_code"].cast('integer')) \
                    

In [127]:
# KEEP FOR LATER REFERENCE

# deleteWhitespaceUDF = udf(lambda s: s.replace(" ", "") if type(s) is str else s, StringType())
# deleteApostropheUDF = udf(lambda s: s.replace("''", "") if type(s) is str else s, StringType())
# airports2_dict = airports2_dict.withColumn("state_cleaned", F.expr("substring(state, 1, length(state)-1)"))
# airports2_dict = airports2_dict.withColumn("state", deleteApostropheUDF("state"))

In [128]:
visits2 = visits.withColumn("arrival_date", udf_datetime_from_sas("arrdate")) \
                .withColumn("departure_date", udf_datetime_from_sas("depdate")) \
                .drop("insnum", "dtadfile", "fltno", 'i94bir', "occup", "admnum", "visapost", "entdepu" "arrdate", "depdate")

In [129]:
for original, renamed in config.TOURISM_RENAME_COLS.items():
    visits2 = visits2.withColumnRenamed(original, renamed)

In [130]:
cast_to_integer = ["citizen_id", "arrival_yr", "arrival_month", "citizen_cntry_code", "residency_cntry_code", "travel_mode",
                "reason_for_travel", "num_people", "birth_year"]

for feature in config.TOURISM_INTEGER_VARS:
    visits2 = visits2.withColumn(feature, visits2[feature].cast('integer'))

In [131]:
visits.show(5)

+-------+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+-------------+-----+--------+
|    _c0|    cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|       admnum|fltno|visatype|
+-------+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+-------------+-----+--------+
|2027561|4084316.0|2016.0|   4.0| 209.0| 209.0|    HHW|20566.0|    1.0|     HI|20573.0|  61.0|    2.0|  1.0|20160422|    null| null|      G|      O|   null|      M| 1955.0|07202016|     F|  null|     JL|56582674633.0|00782|      WT|
|2171295|4422636.0|2016.0|   4.0| 582.0| 582.0|    MCA|20567.0|    1

In [133]:
visits2.select("matflag").distinct.show()

AttributeError: 'function' object has no attribute 'show'

#### Join all 3 dataframes

In [134]:
tourism_final = visits2.join(countries_dict, visits2.citizen_cntry_code == countries_dict.country_code, how ='left')
tourism_final = tourism_final.withColumnRenamed("country", "citizen_country") \
                            .drop("country_code")

tourism_final = tourism_final.join(countries_dict, tourism_final.residency_cntry_code == countries_dict.country_code, how='left')
tourism_final = tourism_final.withColumnRenamed("country", "residency_country") \
                            .drop("country_code")

In [135]:
tourism_final = tourism_final.join(cities_dict, tourism_final.airport == cities_dict.airport_code, how='left')
tourism_final = tourism_final.withColumnRenamed("city", "airport_city") \
                            .drop("airport_code")

In [136]:
tourism_final.show(5)

+--------+----------+----------+-------------+------------------+--------------------+-------+-----------+-------------+-----------------+----------+-------+-------+-------+-------+----------+--------+------+-------+---------+------------+--------------+---------------+-----------------+--------------+
|visit_id|citizen_id|arrival_yr|arrival_month|citizen_cntry_code|residency_cntry_code|airport|travel_mode|airport_state|reason_for_travel|num_people|entdepa|entdepd|entdepu|matflag|birth_year| dtaddto|gender|airline|visa_type|arrival_date|departure_date|citizen_country|residency_country|  airport_city|
+--------+----------+----------+-------------+------------------+--------------------+-------+-----------+-------------+-----------------+----------+-------+-------+-------+-------+----------+--------+------+-------+---------+------------+--------------+---------------+-----------------+--------------+
| 2027561|   4084316|      2016|            4|               209|                 209|  

In [143]:
tourism_final.select("matflag").distinct().show(5)

+-------+
|matflag|
+-------+
|   null|
|      M|
+-------+

