In [28]:
import os
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T
from itertools import chain
from pyspark.sql.functions import udf
spark

In [29]:
DATAFOLDER = '/Users/christian/Data/udacity_capstone/'
OUTPUTFOLDER = '/Users/christian/Data/udacity_capstone/output'

In [30]:
def get_immi_schema():
    """
    Map the column names to datatypes and return as a schema.
    """
    immi_schema = T.StructType([
        T.StructField('cicid', T.IntegerType()),
        T.StructField('i94yr', T.StringType()),
        T.StructField('i94mon', T.StringType()),
        T.StructField('i94cit', T.StringType()),
        T.StructField('i94res', T.IntegerType()),
        T.StructField('i94port', T.StringType()),
        T.StructField('arrdate', T.StringType()),
        T.StructField('i94mode', T.StringType()),
        T.StructField('i94addr', T.StringType()),
        T.StructField('depdate', T.StringType()),
        T.StructField('i94bir', T.StringType()),
        T.StructField('i94visa', T.StringType()),
        T.StructField('count',  T.IntegerType()),
        T.StructField('dtadfile', T.StringType()),
        T.StructField('visapost', T.IntegerType()),
        T.StructField('occup', T.StringType()),
        T.StructField('entdepa', T.StringType()),
        T.StructField('entdepd', T.IntegerType()),
        T.StructField('entdepu', T.StringType()),
        T.StructField('matflag', T.StringType()),
        T.StructField('biryear', T.StringType()),
        T.StructField('dtaddto', T.StringType()),
        T.StructField('gender', T.StringType()),
        T.StructField('insnum', T.StringType()),
        T.StructField('airline', T.StringType()),
        T.StructField('admnum', T.StringType()),
        T.StructField('fltno', T.StringType()),
        T.StructField('visatype', T.StringType()),
    ])
    return immi_schema


In [31]:
schema = get_immi_schema()


In [32]:

spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()
#df_spark = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')
df_spark = spark.read.option("mergeSchema", "true").parquet(os.path.join(DATAFOLDER, 'sas_data'))

# convert data types

In [33]:
df_spark = df_spark.withColumn('i94yr', df_spark['i94yr'].cast(T.IntegerType())).\
        withColumn('i94mon', df_spark['i94mon'].cast(T.IntegerType())).\
        withColumn('i94cit', df_spark['i94cit'].cast(T.IntegerType())).\
        withColumn('i94res', df_spark['i94res'].cast(T.IntegerType())).\
        withColumn('arrdate', df_spark['arrdate'].cast(T.IntegerType())).\
        withColumn('i94mode', df_spark['i94mode'].cast(T.IntegerType())).\
        withColumn('depdate', df_spark['depdate'].cast(T.IntegerType())).\
        withColumn('i94bir', df_spark['i94bir'].cast(T.IntegerType())).\
        withColumn('i94visa', df_spark['i94visa'].cast(T.IntegerType())).\
        withColumn('count', df_spark['count'].cast(T.IntegerType())).\
        withColumn('biryear', df_spark['biryear'].cast(T.IntegerType())).\
        withColumn('admnum', df_spark['admnum'].cast(T.IntegerType()))

In [34]:
df_sample = pd.read_csv(os.path.join(DATAFOLDER, 'immigration_data_sample.csv'))

In [35]:
df_spark.head()

Row(cicid=459651.0, i94yr=2016, i94mon=4, i94cit=135, i94res=135, i94port='ATL', arrdate=20547, i94mode=1, i94addr='FL', depdate=20559, i94bir=54, i94visa=2, count=1, dtadfile='20160403', visapost=None, occup=None, entdepa='O', entdepd='R', entdepu=None, matflag='M', biryear=1962, dtaddto='07012016', gender=None, insnum=None, airline='VS', admnum=2147483647, fltno='00115', visatype='WT')

In [36]:
df_time = df_spark
df_time = df_time.withColumn("arrival_date", F.expr("date_add(to_date('1960-01-01'), arrdate)"))
df_time = df_time.withColumn("depart_date", F.expr("date_add(to_date('1960-01-01'), depdate)"))


In [37]:
df_time.withColumn("diff_days", F.datediff("depart_date", "arrival_date")).show()

+--------+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+----------+-----+--------+------------+-----------+---------+
|   cicid|i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|    admnum|fltno|visatype|arrival_date|depart_date|diff_days|
+--------+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+----------+-----+--------+------------+-----------+---------+
|459651.0| 2016|     4|   135|   135|    ATL|  20547|      1|     FL|  20559|    54|      2|    1|20160403|    null| null|      O|      R|   null|      M|   1962|07012016|  null|  null|     VS|2147483647|00115|      WT|  2016-04-03| 20

## map i94addrl - state

In [43]:

def map_col(df, map_col_name, df_col_name, new_col_name):
    """
    
    Parameters
    ----------
    df : spark dataframe
        The file containing the df_col_name to be used for mapping.
    map_col_name : str
        The column name of the mapping file.
    df_col_name : str
        The column name in the Spark dataframe to be used.
    new_col_name : str
        New column name of the mapping results.
    
    
    """
    df_map = pd.read_csv(os.path.join(DATAFOLDER, f'{map_col_name}.csv'), quotechar="'")
    id_col = f'{map_col_name}_id'
    dic_map = dict(zip(df_map[id_col], df_map[map_col_name]))
    mapping_expr = F.create_map([F.lit(x) for x in chain(*dic_map.items())])
    return df.withColumn(new_col_name, mapping_expr[F.col(df_col_name)])



@udf
def udf_city_name(city_full):
    splt = str.split(city_full, ',')
    if len(splt) == 0:
        return ''
    return splt[0].capitalize().strip()

@udf
def udf_state_short(city_full):
    splt = str.split(city_full, ',')
    if len(splt) < 2:
        return ''
    return splt[1].strip()

@udf
def udf_state_format(port_state):
    return port_state.capitalize()

In [44]:
df_test.select("i94addr").distinct().show(10)

+-------+
|i94addr|
+-------+
|     CI|
|     FT|
|     SC|
|     AZ|
|     PU|
|     UA|
|     EA|
|     NS|
|     KI|
|     PI|
+-------+
only showing top 10 rows



# TODO replace mapping in i94addrl to have names in captialize!

In [47]:
#df, map_col_name, df_col_name, new_col_name
df_test = df_time
#df_test = map_col(df_test, 'i94prtl', 'i94port', 'port')

# city name of arrival port
#df_test = df_test.withColumn("port_city", udf_city_name("port"))
df_test = map_col(df_test, 'prtl_city', 'i94port', 'port_city')
#df_test = df_test.withColumn("port_state_short", udf_state_short("port"))
df_test = map_col(df_test, 'prtl_state', 'i94port', 'port_state_short')
df_test = map_col(df_test, 'addrl', 'port_state_short', 'port_state')
#df_test = df_test.withColumn('port_state_2', udf_state_format('port_state'))

# country of entry, country of citizenship
df_test = map_col(df_test, 'cntyl', 'i94port', 'state_cit')
df_test = map_col(df_test, 'cntyl', 'i94port', 'state_res')
#df_test = map_col(df_test, 'i94addrl', 'i94addr', 'port_state')
df_test = map_col(df_test, 'visa', 'i94visa', 'visa')
#df_test = df_test.withColumn("port_state", udf_state_format("port_state"))

Is arrdate ever null?

In [49]:
df_test.where(F.col("arrdate").isNull()).show()

+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+-------+------+------+-------+------+-----+--------+------------+-----------+---------+----------------+----------+---------+---------+----+
|cicid|i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear|dtaddto|gender|insnum|airline|admnum|fltno|visatype|arrival_date|depart_date|port_city|port_state_short|port_state|state_cit|state_res|visa|
+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+-------+------+------+-------+------+-----+--------+------------+-----------+---------+----------------+----------+---------+---------+----+
+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+-----

# merge with demographic data

In [50]:
#demographic = pd.read_csv(os.path.join(DATAFOLDER, 'us-cities-demographics.csv'), sep=';')

In [51]:
demographics = spark.read.format('csv').options(header='true', sep=';', inferSchema=True).\
    load(os.path.join(DATAFOLDER, 'us-cities-demographics.csv'))

match airport with demographic table

In [52]:
# look up unique combinations of city and state in fact table
# 

In [53]:
# subset with unique combinations of city, state and 
# port_city, port_state, i94port (as key )

In [54]:
df_test.show(5)

+--------+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+----------+-----+--------+------------+-----------+---------+----------------+----------+---------+---------+--------+
|   cicid|i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|    admnum|fltno|visatype|arrival_date|depart_date|port_city|port_state_short|port_state|state_cit|state_res|    visa|
+--------+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+----------+-----+--------+------------+-----------+---------+----------------+----------+---------+---------+--------+
|459651.0| 2016|     4|   135|   135|    ATL|  20547|      1|   

In [109]:
def spark_read_csv(spark, folder, filename, **kwargs):

    return spark.read.format('csv').options(header='true', inferSchema=True, **kwargs).\
        load(os.path.join(folder, filename))

def format_column_names(s):
    s = s.casefold()
    s = s.replace(' ', '_')
    s = s.replace('-', '_')
    return s

def rename_columns(df):
    old_names = df.schema.names
    new_names = [format_column_names(s) for s in old_names]
    df = reduce(lambda df, idx: 
                df.withColumnRenamed(old_names[idx], new_names[idx]), range(len(old_names)), df)
    return df

def create_demographic_table(spark, datafolder, outputfolder): 
    # select a subset of original table
    #df_demo = df.select(['i94port']).dropDuplicates()
    df_demo = spark_read_csv(spark, datafolder, 'prtl_city.csv')
    #pd.read_csv(os.path.join(datafolder, 'prtl.csv'), quotechar="'")
    demographics = spark_read_csv(spark, datafolder, 'us-cities-demographics.csv', sep=';')
    #pd.read_csv(os.path.join(datafolder, 'us-cities-demographics.csv'), quotechar="'")
    
    # do the preprocessing, append columns
    #df_demo = map_col(df_demo, 'prtl_city', 'prtl_city', 'port_city')
    df_demo = df_demo.withColumnRenamed('prtl_city', 'port_city')
    #df_test = df_test.withColumn("port_state_short", udf_state_short("port"))
    df_demo = map_col(df_demo, 'prtl_state', 'prtl_city_id', 'port_state_short')
    df_demo = map_col(df_demo, 'addrl', 'port_state_short', 'port_state')
    
    # we are deleting the ones, for which no states were found, since we only focus on the US here
    # later expand world wide
    df_demo = df_demo.dropna(subset=['port_state'])
    df_demo = df_demo.join(demographics, (df_demo.port_city == demographics.City) & (df_demo.port_state_short == demographics['State Code']))
    columns_to_drop = ['port_city', 'port_state', 'port_state_short']
    df_demo = df_demo.drop(*columns_to_drop)
    
    # reformat column names
    df_demo = rename_columns(df_demo)
    df_demo.write.parquet(os.path.join(outputfolder, 'city_demographics.parquet'), 'overwrite')
    return df_demo

In [110]:
df_demo = create_demographic_table(spark, DATAFOLDER, OUTPUTFOLDER)

In [112]:
df_demo.show()

+------------+---------------+------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-------+
|prtl_city_id|           city|       state|median_age|male_population|female_population|total_population|number_of_veterans|foreign_born|average_household_size|state_code|                race|  count|
+------------+---------------+------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-------+
|         TAC|         Tacoma|  Washington|      37.7|         100914|           107036|          207950|             19040|       31863|                  2.48|        WA|Black or African-...|  30914|
|         BIL|       Billings|     Montana|      36.3|          52705|            57565|          110270|              9121|        3206|                   2.4|        MT|               White| 102

# create single tables for everything

## country table

In [379]:
def spark_read_csv(folder, filename, **kwargs):

    return spark.read.format('csv').options(header='true', inferSchema=True, **kwargs).\
        load(os.path.join(folder, filename))

def csv_to_parquet(datafolder, outputfolder, csv_name, table_name): 
    df = spark_read_csv(datafolder, f'{csv_name}.csv', sep=',', quotechar="'")
    df = df.withColumnRenamed('value', 'id')
    df.write.parquet(os.path.join(outputfolder, f'{table_name}.parquet'), 'overwrite')
    return df

In [380]:
cntyl = csv_to_parquet(DATAFOLDER, OUTPUTFOLDER, 'i94cntyl', 'country')

In [120]:
map_col_name = 'prtl_state'
df_map = spark_read_csv(spark, datafolder, f'{map_col_name}.csv')
#pd.read_csv(os.path.join(DATAFOLDER, f'{map_col_name}.csv'), quotechar="'")
df_map = df_map.toPandas()
id_col = f'{map_col_name}_id'
dic_map = dict(zip(df_map[id_col], df_map[map_col_name]))
mapping_expr = F.create_map([F.lit(x) for x in chain(*dic_map.items())])

In [119]:
list(df_map[id_col])

TypeError: Column is not iterable

# model table i94model

In [381]:
transport = csv_to_parquet(DATAFOLDER, OUTPUTFOLDER, 'i94model', 'transport')

In [382]:
transport.show()

+---+--------------+
| id|      i94model|
+---+--------------+
|  1|         'Air'|
|  2|         'Sea'|
|  3|        'Land'|
|  9|'Not reported'|
+---+--------------+



# visa table i94visa

In [383]:
visa = csv_to_parquet(DATAFOLDER, OUTPUTFOLDER, 'i94visa', 'visa')

In [384]:
visa.show()

+---+--------+
| id| i94visa|
+---+--------+
|  1|Business|
|  2|Pleasure|
|  3| Student|
+---+--------+



## global annual

In [None]:
#TODO
# also aggregate by year
folder = os.path.join(DATAFOLDER, 'climate-change-earth-surface-temperature-data')
global_annual = csv_to_parquet(folder, OUTPUTFOLDER, 'GlobalTemperatures', 'visa')

# Time Table

In [17]:
def create_full_time_table(outputfolder, daysafter=36525):

    days_till_2060 = range(daysafter)
    all_dates = [(t,) for t in days_till_2060]
    
    t_schema = T.StructType([T.StructField('i94_date', T.IntegerType())])
    timeframe = spark.createDataFrame(all_dates, t_schema)
    timeframe = timeframe.withColumn("dt_date", F.expr("date_add(to_date('1960-01-01'), i94_date)"))
    timeframe = timeframe.select('i94_date', 'dt_date',
                    F.year('dt_date').alias('year'),
                    F.month('dt_date').alias('month'),
                    F.dayofmonth('dt_date').alias('day'),
                    F.dayofweek('dt_date').alias('weekday'))
    
    timeframe.write.partitionBy('year', 'month').parquet(os.path.join(outputfolder, 'dates.parquet'), 'overwrite')
    return timeframe

In [18]:
timeframe = create_full_time_table(OUTPUTFOLDER)

In [20]:
timeframe.dtypes

[('i94_date', 'int'),
 ('dt_date', 'date'),
 ('year', 'int'),
 ('month', 'int'),
 ('day', 'int'),
 ('weekday', 'int')]

In [21]:
timeframe.show()

+--------+----------+----+-----+---+-------+
|i94_date|   dt_date|year|month|day|weekday|
+--------+----------+----+-----+---+-------+
|       0|1960-01-01|1960|    1|  1|      6|
|       1|1960-01-02|1960|    1|  2|      7|
|       2|1960-01-03|1960|    1|  3|      1|
|       3|1960-01-04|1960|    1|  4|      2|
|       4|1960-01-05|1960|    1|  5|      3|
|       5|1960-01-06|1960|    1|  6|      4|
|       6|1960-01-07|1960|    1|  7|      5|
|       7|1960-01-08|1960|    1|  8|      6|
|       8|1960-01-09|1960|    1|  9|      7|
|       9|1960-01-10|1960|    1| 10|      1|
|      10|1960-01-11|1960|    1| 11|      2|
|      11|1960-01-12|1960|    1| 12|      3|
|      12|1960-01-13|1960|    1| 13|      4|
|      13|1960-01-14|1960|    1| 14|      5|
|      14|1960-01-15|1960|    1| 15|      6|
|      15|1960-01-16|1960|    1| 16|      7|
|      16|1960-01-17|1960|    1| 17|      1|
|      17|1960-01-18|1960|    1| 18|      2|
|      18|1960-01-19|1960|    1| 19|      3|
|      19|

In [22]:
def create_full_time_table(outputfolder, daysafter=36525):

    future_days = range(daysafter)
    all_dates = [(t,) for t in future_days]
    
    t_schema = T.StructType([T.StructField('i_date', T.IntegerType())])
    timeframe = spark.createDataFrame(all_dates, t_schema)
    timeframe = timeframe.withColumn("dt_date", F.expr("date_add(to_date('1960-01-01'), i_date)"))
    timeframe = timeframe.select('i_date', 'dt_date',
                    F.year('dt_date').alias('year'),
                    F.month('dt_date').alias('month'),
                    F.dayofmonth('dt_date').alias('day'),
                    F.dayofweek('dt_date').alias('weekday'))
    
    timeframe.write.partitionBy('year', 'month').parquet(os.path.join(outputfolder, 'dates.parquet'), 'overwrite')
    return timeframe

In [23]:
tf = create_full_time_table(OUTPUTFOLDER)

In [24]:
tf.head()

Row(i_date=0, dt_date=datetime.date(1960, 1, 1), year=1960, month=1, day=1, weekday=6)

# Climate data

In [None]:
# TODO apply rolling window function on average of last 10 years

In [359]:
folder = os.path.join(DATAFOLDER, 'climate-change-earth-surface-temperature-data')
filename = 'GlobalLandTemperaturesByCountry.csv'

In [428]:
climate = spark_read_csv(folder, filename, sep=',')

## convert timestamp to date
already done

In [419]:
climate.dtypes

[('dt', 'timestamp'),
 ('AverageTemperature', 'double'),
 ('AverageTemperatureUncertainty', 'double'),
 ('Country', 'string')]

In [429]:
climate = climate.withColumn('dt', F.to_date(F.col('dt')))

merge with country id from country table

In [440]:
climate.agg({"dt": "min"}).collect()[0][0]

datetime.date(1743, 11, 1)

In [439]:
climate.agg({"dt": "max"}).collect()[0][0]

datetime.date(2013, 9, 1)

In [430]:
country = spark.read.option("mergeSchema", "true").parquet(os.path.join(OUTPUTFOLDER, 'country.parquet'))

In [432]:
climate = climate.join(country, on=country['i94cntyl'] == climate['Country'], how='leftouter')

In [437]:
climate = climate.drop('i94cntyl')

In [475]:
def generate_climate_country(folder, filename, outputfolder):
    climate = spark_read_csv(folder, filename, sep=',')
    climate = climate.withColumn('dt', F.to_date(F.col('dt')))
    country = spark.read.option("mergeSchema", "true").parquet(os.path.join(outputfolder, 'country.parquet'))
    climate = climate.join(country, on=country['i94cntyl'] == climate['Country'], how='leftouter')
    climate = climate.drop('i94cntyl')
    climate = rename_columns(climate)
    climate = climate.withColumn('year', F.year('dt').alias('year'))
    climate = climate.withColumnRenamed('id', 'country_id').\
        withColumnRenamed('averagetemperatureuncertainty', 'avg_uncertainty').\
        withColumnRenamed('averagetemperature', 'avg_temperature')
    climate.write.partitionBy('year', 'country').parquet(os.path.join(outputfolder, 'climate_country.parquet'), 'overwrite')
    return climate

In [484]:
folder = os.path.join(DATAFOLDER, 'climate-change-earth-surface-temperature-data')
filename = 'GlobalLandTemperaturesByCountry.csv'
climate_country = generate_climate_country(folder, filename, OUTPUTFOLDER)

In [474]:
climate_country = climate_country.withColumnRenamed('id', 'country_id').\
    withColumnRenamed('averagetemperatureuncertainty', 'avg_uncertainty')

## aggregate annual average temperature

In [442]:
annual = climate.withColumn('year', F.year('dt').alias('year'))

In [480]:
climate_country.groupby([F.col('country'), F.col('country_id'), F.col('year')]).agg(F.avg('averagetemperature').alias('avg_temperature')).show()

+----------+----------+----+------------------+
|   country|country_id|year|   avg_temperature|
+----------+----------+----+------------------+
|   Albania|       101|1838|11.598333333333334|
|   Algeria|       316|1918|          22.41125|
|   Andorra|       102|1855|10.281666666666666|
|   Andorra|       102|1903|10.768500000000001|
|   Andorra|       102|1933|11.168999999999999|
|  Anguilla|       529|1862|          26.02125|
|  Anguilla|       529|1924|26.494000000000003|
|  Anguilla|       529|1969|27.049250000000004|
|   Armenia|       151|1858|7.9815000000000005|
|   Armenia|       151|1926| 8.654083333333332|
|   Armenia|       151|1995| 9.892166666666668|
|     Aruba|       532|1937|28.008666666666667|
|Azerbaijan|       152|1852|            10.991|
|Azerbaijan|       152|1856|10.825583333333334|
|   Bahrain|       298|1848|25.192666666666668|
|  Barbados|       513|1962|            26.849|
|   Belarus|       153|1998| 6.528333333333333|
|    Bhutan|       242|1871|11.630416666

In [465]:
annual = spark.read.option("mergeSchema", "true").parquet(os.path.join(OUTPUTFOLDER, 'climate_country.parquet'))

In [485]:
def create_annual_temp_table(outputfolder):
    annual = spark.read.option("mergeSchema", "true").parquet(os.path.join(OUTPUTFOLDER, 'climate_country.parquet'))
    annual.groupby([F.col('country'), F.col('country_id'), F.col('year')]).agg(F.avg('avg_temperature').alias('avg_temperature'))
    annual.write.partitionBy('country').parquet(os.path.join(outputfolder, 'annual_climate_country.parquet'), 'overwrite')
    return annual

In [463]:
annual = spark.read.option("mergeSchema", "true").parquet(os.path.join(OUTPUTFOLDER, 'climate_country.parquet'))
annual.groupby([F.col('country'), F.col('country_id'), F.col('year')]).agg(F.avg('averagetemperature').alias('avg_temperature'))

AnalysisException: "cannot resolve '`country_id`' given input columns: [Country, id, averagetemperatureuncertainty, year, dt, averagetemperature];;\n'Aggregate [country#15911, 'country_id, year#15910], [country#15911, 'country_id, year#15910, avg(AverageTemperature#15907) AS avg(AverageTemperature)#15925]\n+- Relation[dt#15906,averagetemperature#15907,averagetemperatureuncertainty#15908,id#15909,year#15910,Country#15911] parquet\n"

In [None]:
#todo when storing partition by country and year

# Global Temperatures

# Asylum Report

In [None]:
# https://www.kaggle.com/dhs/refugee-report

# Fact Table

- Calculate length of stay

In [304]:
df_test.head()

Row(cicid=459651.0, i94yr=2016, i94mon=4, i94cit=135, i94res=135, i94port='ATL', arrdate=20547, i94mode=1, i94addr='FL', depdate=20559, i94bir=54, i94visa=2, count=1, dtadfile='20160403', visapost=None, occup=None, entdepa='O', entdepd='R', entdepu=None, matflag='M', biryear=1962, dtaddto='07012016', gender=None, insnum=None, airline='VS', admnum=2147483647, fltno='00115', visatype='WT', arrival_date=datetime.date(2016, 4, 3), depart_date=datetime.date(2016, 4, 15), port_city='Atlanta', port_state_short='GA', port_state='Georgia', state_cit='United Kingdom', state_res='United Kingdom', visa='Pleasure')

In [311]:
timeframe.filter("i94_date = 20573").select('dt_date').show()

+----------+
|   dt_date|
+----------+
|2016-04-29|
|2016-04-29|
+----------+



In [405]:
df_fact = df_test.withColumn("arrival_dt", F.expr("date_add(to_date('1960-01-01'), arrdate)"))
df_fact = df_fact.withColumn("depart_dt", F.expr("date_add(to_date('1960-01-01'), depdate)"))

In [406]:
keep_columns = ['cicid', 'i94yr', 'i94mon', 'arrival_dt', 'arrdate', 'depdate', 'i94cit', 'i94res', 'i94port', 'i94mode', 'i94addr', 'i94bir',
               'i94visa', 'visatype', 'biryear', 'gender', 'airline', 'fltno', 'length_stay']

#maybe = ['entdepa', 'entdepd', 'matflag']

In [407]:
df_fact = df_fact.withColumn("length_stay", F.datediff("depart_date", "arrival_date"))


In [408]:
df_fact.select(keep_columns).show()

+--------+-----+------+----------+-------+-------+------+------+-------+-------+-------+------+-------+--------+-------+------+-------+-----+-----------+
|   cicid|i94yr|i94mon|arrival_dt|arrdate|depdate|i94cit|i94res|i94port|i94mode|i94addr|i94bir|i94visa|visatype|biryear|gender|airline|fltno|length_stay|
+--------+-----+------+----------+-------+-------+------+------+-------+-------+-------+------+-------+--------+-------+------+-------+-----+-----------+
|459651.0| 2016|     4|2016-04-03|  20547|  20559|   135|   135|    ATL|      1|     FL|    54|      2|      WT|   1962|  null|     VS|00115|         12|
|459652.0| 2016|     4|2016-04-03|  20547|  20555|   135|   135|    ATL|      1|     FL|    74|      2|      WT|   1942|     F|     VS|  103|          8|
|459653.0| 2016|     4|2016-04-03|  20547|  20557|   135|   135|    ATL|      1|     FL|    44|      2|      B2|   1972|     M|     VS|  109|         10|
|459654.0| 2016|     4|2016-04-03|  20547|  20555|   135|   135|    ATL|    

In [413]:
df_fact.dtypes

[('cicid', 'double'),
 ('i94yr', 'int'),
 ('i94mon', 'int'),
 ('i94cit', 'int'),
 ('i94res', 'int'),
 ('i94port', 'string'),
 ('arrdate', 'int'),
 ('i94mode', 'int'),
 ('i94addr', 'string'),
 ('depdate', 'int'),
 ('i94bir', 'int'),
 ('i94visa', 'int'),
 ('count', 'int'),
 ('dtadfile', 'string'),
 ('visapost', 'string'),
 ('occup', 'string'),
 ('entdepa', 'string'),
 ('entdepd', 'string'),
 ('entdepu', 'string'),
 ('matflag', 'string'),
 ('biryear', 'int'),
 ('dtaddto', 'string'),
 ('gender', 'string'),
 ('insnum', 'string'),
 ('airline', 'string'),
 ('admnum', 'int'),
 ('fltno', 'string'),
 ('visatype', 'string'),
 ('arrival_date', 'date'),
 ('depart_date', 'date'),
 ('port_city', 'string'),
 ('port_state_short', 'string'),
 ('port_state', 'string'),
 ('state_cit', 'string'),
 ('state_res', 'string'),
 ('visa', 'string'),
 ('arrival_dt', 'date'),
 ('depart_dt', 'date'),
 ('length_stay', 'int')]

In [336]:
def process_facts_table()

In [335]:
# df_test.join(timeframe, on=df_test['arrdate']==timeframe['i94_date'], how='leftouter').show() 
# too complicated
#df_test.join(timeframe, on=df_test['arrdate']==timeframe['i94_date'], how='left').dropDuplicates().select(timeframe['dt_date']).show()

In [323]:
df_test.count()

439428

In [None]:
df_test.withColumn("length_stay", F.datediff("depart_date", "arrival_date")).show()


# TODO

- copy my csv files to S3 manually
with a DAG
- write some csvs to redshift 
- copy the stuff to redshift
- preprocessed tables as parquet files?
- maybe split it up into two DAGs
-- one does the intialization of the dimension tables, which won't change that often
-- the other one does the updating of the fact table

- add IATA code to city ?

- or maybe the nodes can be confiured to only run once?


1. csv von S3 -> redshift
2. parquet von S3 -> Spark -> redshift, maybe like this

https://sonra.io/2018/01/01/using-apache-airflow-to-build-a-data-pipeline-on-aws/

In [None]:
COPY listing
FROM 's3://mybucket/data/listings/parquet/'
IAM_ROLE 'arn:aws:iam::0123456789012:role/MyRedshiftRole'
FORMAT AS PARQUET;

If nothing works use Docker
https://towardsdatascience.com/getting-started-with-airflow-using-docker-cd8b44dbff98

Additional databases
- https://www.kaggle.com/open-flights/flight-route-database
- https://www.kaggle.com/dhs/refugee-report
- https://www.dhs.gov/immigration-statistics

In [121]:
OUTPUTFOLDER

'/Users/christian/Data/udacity_capstone/output'

In [124]:
df_country = spark.read.option("mergeSchema", "true").parquet(os.path.join(OUTPUTFOLDER, 'country.parquet')) 

In [125]:
df_country.columns

['cntyl_id', 'cntyl']

In [128]:
df_temp_country = spark.read.option("mergeSchema", "true").parquet(os.path.join(OUTPUTFOLDER, 'temperature_country.parquet')) 

In [129]:
df_temp_country.columns

['dt',
 'avg_temperature',
 'avg_uncertainty',
 'cntyl_id',
 'cntyl',
 'year',
 'country']