In [1]:
import pandas as pd
import itertools
import warnings
import numpy as np
import matplotlib.pyplot as plt
import json
# import statsmodels.api as sm
# from fbprophet import Prophet

In [2]:
from pyspark.sql import SparkSession, SQLContext, GroupedData
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

sqlContext = SQLContext(spark)
sqlContext.setConf("spark.sql.autoBroadcastJoinThreashold", "0")

In [3]:
utils_data = json.load(open('utils.json', 'r'))
us_state_code = utils_data['us_state_code']
us_code_state = {state: code for code, state in us_state_code.items()}
city_code = utils_data['city_codes']
immigration_code = utils_data['immigration_codes']

In [4]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

state_code_udf = udf(lambda code: us_state_code[state], StringType())
code_state_udf = udf(lambda state: us_code_state[state], StringType())
city_code_udf = udf(lambda code: city_code[code], StringType())
country_code_udf = udf(lambda code: immigration_code[code], StringType())

In [None]:
airport_df = pd.read_csv("airport-codes_csv.csv")
demographics_df = pd.read_csv("us-cities-demographics.csv", sep=";")
temperature_df = pd.read_csv("GlobalLandTemperaturesByState.csv")

In [None]:
temperature_df.head()

In [None]:
import datetime
temperature_df['dt'] = pd.to_datetime(temperature_df['dt'])
temperature_df['year'] = temperature_df['dt'].dt.year
temperature_df['month'] = temperature_df['dt'].dt.month

In [None]:
temperature_df['year'].max()

In [None]:
# use facebook prophet to predict future temperature
temperature_df['y'] = temperature_df['AverageTemperature']
temperature_df['ds'] = temperature_df['dt']

In [None]:
us_temperature_df = temperature_df[(temperature_df['Country']=="United States")&(temperature_df['year'] > 1900)]

In [None]:
us_temperature_df['state_code'] = us_temperature_df.apply(lambda row: us_state_code[row["State"]], axis=1)
us_temperature_df = us_temperature_df[['dt','AverageTemperature','State','Country','year','month','state_code','y','ds']]

In [None]:
us_temperature_df.head(20)

In [None]:
alabama_temperature_df = us_temperature_df[(us_temperature_df["state_code"]=="AL")&(us_temperature_df['year'] > 2000)]
alabama_temperature_df.head(20)

In [None]:
# temperature_model = Prophet(interval_width=0.95)
# temperature_model.fit(alabama_temperature_df)

In [None]:
# temperature_forecast = temperature_model.make_future_dataframe(periods=50, freq='MS')
# temperature_forecast = temperature_model.predict(temperature_forecast)

In [None]:
# plt.figure(figsize=(18, 6))
# temperature_model.plot(temperature_forecast, xlabel = 'Date', ylabel = 'AverageTemperature')
# plt.title('Date AverageTemperature');

In [None]:
# temperature_forecast

In [None]:
# def get_temperature_prediction(df):
#     temperature_model = Prophet(interval_width=0.95)
#     temperature_model.fit(df)
#     temperature_forecast = temperature_model.make_future_dataframe(periods=50, freq='MS')
#     temperature_forecast = temperature_model.predict(temperature_forecast)
#     temperature_forecast['year'] = temperature_forecast['ds'].dt.year
#     temperature_forecast['month'] = temperature_forecast['ds'].dt.month
#     temperature_forecast['AverageTemperature'] = temperature_forecast['yhat']
#     return temperature_forecast[['year', 'month', 'AverageTemperature']]

In [None]:
# us_temperature_prediction_df = pd.DataFrame(columns=['year', 'month', 'AverageTemperature', 'state_code', 'state', 'country'])

In [None]:
# for code, state in us_code_state.items():
#     print(code, state)
#     try:
#         # Take a copy of earch row
#         df = us_temperature_df[us_temperature_df['state_code']==code].copy()
#         df = get_temperature_prediction(df)
#         df = df[df['year'] > 2010]
#         df['state_code'] = code
#         df['state'] = state
#         df['country'] = 'United States'
#         us_temperature_prediction_df = us_temperature_prediction_df.append(df, ignore_index=True)
#     except Exception as e:
#         print(e)
#         continue

In [None]:
us_temperature_df[us_temperature_df['year']==2013].head(10)

In [None]:
us_temperature_df.to_csv("us_temperature.csv",index=False)

In [None]:
months = ['jan', 'feb', 'mar', 'apr', 'may', 'jun', 'jul', 'aug', 'sep', 'oct', 'nov', 'dec']
fileName = '../../data/18-83510-I94-Data-2016/i94_{}16_sub.sas7bdat'

In [None]:
import pyspark.sql.functions as f
for month in months:
    fileName = fileName.format(month)
    print(fileName)
    immigration_df = spark.read.format('com.github.saurfang.sas.spark').load(fileName)
    
    immigrationMonthDF=immigration_df.filter(immigration_df.i94addr.isNotNull())\
    .filter(immigration_df.i94res.isNotNull())\
    .filter(f.col("i94addr").isin(list(us_code_state.keys())))\
    .filter(f.col("i94port").isin(list(city_code.keys())))\
    .withColumn("i94res",f.col("i94res").cast("integer").cast("string"))\
    .withColumn("origin_country",country_code_udf(f.col("i94res")))\
    .withColumn("State",code_state_udf(f.col("i94addr")))\
    .withColumn("id",f.col("cicid").cast("integer"))\
    .withColumn("state_code",f.col("i94addr"))\
    .withColumn("city_code",f.col("i94port"))\
    .withColumn("year",f.col("i94yr").cast("integer"))\
    .withColumn("month",f.col("i94mon").cast("integer"))\
    .withColumn("city",city_code_udf(f.col("i94port")))
    
    immigrationMonthDF.select('id','year','month','origin_country','city_code','city','state_code','State').write.mode('append').parquet('immigration_data')

In [None]:
demographics_df.head()

In [None]:
demographics_df.loc[demographics_df["Race"] == "American Indian and Alaska Native","Race"] = "Native"
demographics_df.loc[demographics_df["Race"] == "Black or African-American","Race"] = "Afroamerican"
demographics_df.loc[demographics_df["Race"] == "Hispanic or Latino","Race"] = "Latino"

In [None]:
us_demographics_avg_df = demographics_df.groupby(['State','State Code','Race'])['Median Age'].mean()
us_demographics_sum_df = demographics_df.groupby(['State','State Code','Race'])['Total Population','Count','Male Population','Female Population','Number of Veterans','Foreign-born'].sum()

In [None]:
us_demographics_df = pd.concat([us_demographics_sum_df,us_demographics_avg_df],axis=1)
us_demographics_df.reset_index(inplace=True)
us_demographics_df["state_code"] = us_demographics_df["State Code"]
us_demographics_df["median_age"] = us_demographics_df["Median Age"]
us_demographics_df.head()

In [None]:
for state_code in us_code_state:
    print(state_code)
    df = us_demographics_df.loc[us_demographics_df['state_code'] == state_code]
    total_population = df['Total Population'].max()
    male_population = df['Male Population'].max()
    female_population = df['Female Population'].max()
    veterans_population = df['Number of Veterans'].max()
    foreign_population = df['Foreign-born'].max()
    median_age = df['median_age'].max()
    
    us_demographics_df.loc[us_demographics_df['state_code'] == state_code,'Total Population'] = total_population
    us_demographics_df.loc[us_demographics_df['state_code'] == state_code,'Male Population'] = male_population
    us_demographics_df.loc[us_demographics_df['state_code'] == state_code,'Female Population'] = female_population
    us_demographics_df.loc[us_demographics_df['state_code'] == state_code,'Number of Veterans'] = veterans_population
    us_demographics_df.loc[us_demographics_df['state_code'] == state_code,'Foreign-born'] = foreign_population
    us_demographics_df.loc[us_demographics_df['state_code'] == state_code,'median_age'] = median_age

In [None]:
us_demographics_df["percentage_male"] = us_demographics_df.apply(lambda row: float(row["Male Population"]/row["Total Population"])*100.0,axis=1)
us_demographics_df["percentage_female"] = us_demographics_df.apply(lambda row: float(row["Female Population"]/row["Total Population"])*100.0,axis=1)
us_demographics_df["percentage_veterans"] = us_demographics_df.apply(lambda row: float(row["Number of Veterans"]/row["Total Population"])*100.0,axis=1)
us_demographics_df["percentage_foreign_born"] = us_demographics_df.apply(lambda row: float(row["Foreign-born"]/row["Total Population"])*100.0,axis=1)
us_demographics_df["percentage_race"] = us_demographics_df.apply(lambda row: float(row["Count"]/row["Total Population"])*100.0,axis=1)

In [None]:
us_df_demographics = pd.pivot_table(us_demographics_df,values='percentage_race',index=["State","state_code","median_age","percentage_male","percentage_female","percentage_veterans","percentage_foreign_born"],columns=["Race"], aggfunc=np.mean, fill_value=0)
us_df_demographics = pd.DataFrame(us_df_demographics.to_records())

In [None]:
us_df_demographics

In [None]:
us_df_demographics.to_csv("us_demographics.csv",index=False)

In [None]:
airport_df.head()

In [None]:
us_airport_df = airport_df[airport_df["iso_country"]=="US"]
us_airport_df = us_airport_df[(us_airport_df["type"]=="small_airport")|(us_airport_df["type"]=="medium_airport")|(us_airport_df["type"]=="large_airport")]

In [None]:
us_airport_df["elevation_ft"] = us_airport_df.apply(lambda row: float(row["elevation_ft"]),axis=1)
us_airport_df["state_code"] = us_airport_df.apply(lambda row: row["iso_region"].split("-")[-1],axis=1)
us_airport_df["x_coordinate"] = us_airport_df.apply(lambda row: float(row["coordinates"].split(",")[0]),axis=1)
us_airport_df["y_coordinate"] = us_airport_df.apply(lambda row: float(row["coordinates"].split(",")[-1]),axis=1)

In [None]:
us_airport_df["country"] = us_airport_df["iso_country"]
us_airport_df["city_code"] = us_airport_df["local_code"]
us_airport_df = us_airport_df[["ident","type","name","elevation_ft","country","state_code","city_code","municipality","x_coordinate","y_coordinate"]]

In [None]:
us_airport_df.head()

In [None]:
us_airport_df.to_csv("us_airports.csv",index=False)

In [5]:
import pyspark.sql.functions as f
# Read every Dimension table
temperatureDF = spark.read.csv("us_temperature.csv",header=True)
demographicsDF = spark.read.csv("us_demographics.csv",header=True)
airportsDF = spark.read.csv("us_airports.csv",header=True)
immigrationDF = spark.read.parquet("immigration_data")

In [6]:
# For time processing purposes, I will be using only the first 2 months in the immigration dataframe
immigrationDF = immigrationDF.filter(f.col("month")<f.lit(2))

In [7]:
# Create Dimension TempViews
temperatureDF.createOrReplaceTempView("temperature")
immigrationDF.createOrReplaceTempView("immigration")
demographicsDF.createOrReplaceTempView("demographics")
airportsDF.createOrReplaceTempView("airports")

In [8]:
# Running SQL Procedure to get the fact table dataframe
# fact_table = spark.sql("""
#     SELECT 
#         immig.year,
#         immig.month,
#         immig.origin_country,
#         immig.State,
#         immig.state_code,
#         COUNT(immig.state_code) as number_immigrants,
#         temp.AverageTemperature as avg_temperature,
#         demo.median_age,
#         demo.percentage_male,
#         demo.percentage_female,
#         demo.percentage_veterans,
#         demo.percentage_foreign_born,
#         demo.Afroamerican as percentage_afroamerican,
#         demo.Asian as percentage_asian,
#         demo.Latino as percentage_latino,
#         demo.Native as percentage_native,
#         demo.White as percentage_white,
#         air.name as airport_name,
#         air.x_coordinate,
#         air.y_coordinate
#     FROM immigration as immig
#     JOIN temperature as temp ON immig.state_code=temp.state_code AND immig.year=temp.year AND immig.month=temp.month
#     JOIN demographics as demo ON demo.state_code=immig.state_code
#     JOIN airports as air ON air.state_code=immig.state_code
#     WHERE air.type='large_airport'
#     GROUP BY 1,2,3,4,5,7,8,9,10,11,12,13,14,15,16,17,18,19,20
#     ORDER BY 1,2,3,4
# """).cache()

fact_table = spark.sql("""
    SELECT
        immigration.year,
        immigration.month,
        immigration.origin_country,
        immigration.State,
        immigration.state_code,
        COUNT(immigration.state_code) as number_immigrants,
        temperature.AverageTemperature as avg_temperature
    FROM immigration
    JOIN temperature ON immigration.state_code=temperature.state_code
    GROUP BY 1,2,3,4,5,7
    ORDER BY 1,2,3,4
""").cache()

In [None]:
# Count rows in dataframe
fact_table.count()

In [None]:
fact_table.show()