In [34]:
import pandas as pd
import json

pd.set_option('display.max_columns', 500)

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

i94_df = spark.read.format('com.github.saurfang.sas.spark').load('/home/mabloq/immigration_data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')
i94_df.head()

Row(cicid=6.0, i94yr=2016.0, i94mon=4.0, i94cit=692.0, i94res=692.0, i94port='XXX', arrdate=20573.0, i94mode=None, i94addr=None, depdate=None, i94bir=37.0, i94visa=2.0, count=1.0, dtadfile=None, visapost=None, occup=None, entdepa='T', entdepd=None, entdepu='U', matflag=None, biryear=1979.0, dtaddto='10282016', gender=None, insnum=None, airline=None, admnum=1897628485.0, fltno=None, visatype='B2')

## 1. Read i_94 form and write to parquet staging

In [29]:
i94_df = spark.read.format('com.github.saurfang.sas.spark').load('/home/mabloq/immigration_data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')

In [4]:
i94 = i94_df.selectExpr("cicid as cic_id", "visatype as visa_id", "i94port as port_id","airline as airline_id", "i94cit as cit_id",\
                        "i94res as res_id", "i94yr as year", "i94mon as month","i94bir as age", "gender", \
                        "entdepa as arrival_date", "entdepd as depart_date","dtadfile as date_begin", \
                        "dtaddto as date_end")


# join94_df = i94.join(demographics_df, i94['port_id'] == demographics_df['port_id']) 

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import DateType

@udf(DateType())
    def date_add_(start, add):
        if type(start) is not datetime:
            date = datetime.strptime(start, "%Y-%m-%d")
            if add is None:
                return date

            return date + timedelta(add)

        if add is None:
            return start

        return start + timedelta(add)

In [None]:
from pyspark.sql.functions import lit, col, to_date
from pyspark.sql.types import IntegerType as Int

i94 = i94.withColumn('cic_id', col('cic_id').cast(Int()))
i94 = i94.withColumn('cit_id', col('cit_id').cast(Int()))
i94 = i94.withColumn('res_id', col('res_id').cast(Int()))
i94 = i94.withColumn('year', col('year').cast(Int()))
i94 = i94.withColumn('month', col('month').cast(Int()))
i94 = i94.withColumn('age', col('age').cast(Int()))

i94 = i94.withColumn('date_begin', to_date('date_begin', 'yyyyMMdd'))
i94 = i94.withColumn('date_end', to_date('date_end', 'MMddyyyy'))

# SAS arrival_date and depart_date are integers couting days since 1960-01-01 so I will create use custom
# date_add_ shown above function to convert these columns into date datatype
i94 = i94.withColumn('sas_date', lit("1960-01-01"))

i94 = i94.withColumn('depart_date', col('depart_date').cast(Int()))
i94 = i94.withColumn('depart_date', date_add_('sas_date', 'depart_date'))


i94 = i94.withColumn('arrival_date', col('arrival_date').cast(Int()))
i94 = i94.withColumn('arrival_date', date_add_('sas_date', 'arrival_date'))
i94.drop('sas_date')


In [30]:
i94.write.mode('overwrite').partitionBy('month').parquet('/home/mabloq/airflow/data/parquet/i94/')

## 2. Read i94 port codes and save to parquet

In [53]:
iport_df = spark.read.csv('/home/mabloq/airflow/data/i_port.txt', sep="=",header=True,ignoreLeadingWhiteSpace=True, \
                         ignoreTrailingWhiteSpace=True,quote="'")

In [54]:
from pyspark.sql.functions import split as spark_split
split_col = spark_split(iport_df['name'], ',')
iport_df = iport_df.withColumn('city', split_col.getItem(0))
iport_df = iport_df.withColumn('state', split_col.getItem(1))    

In [55]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

@udf(StringType())
def state_null(col, state):
    if state is None:
        if col.startswith("No PORT"):
            return "NPRT"
        elif col.startswith("Collapsed"):
            return "CPRT"
        elif "UNKOWN" in col or "UNIDENTIFIED" in col:
            return "UNKWN"
        elif "WASHINGTON DC" in col:
            return "DC"
        elif "MARIPOSA AZ" in col:
            return "AZ"
    else:
        return state

In [56]:
iport_df = iport_df.withColumn('state', state_null('name', 'state'))
iport_df.printSchema()
iport_df.toPandas()

root
 |-- port_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)



Unnamed: 0,port_id,name,city,state
0,ALC,"ALCAN, AK",ALCAN,AK
1,ANC,"ANCHORAGE, AK",ANCHORAGE,AK
2,BAR,"BAKER AAF - BAKER ISLAND, AK",BAKER AAF - BAKER ISLAND,AK
3,DAC,"DALTONS CACHE, AK",DALTONS CACHE,AK
4,PIZ,"DEW STATION PT LAY DEW, AK",DEW STATION PT LAY DEW,AK
5,DTH,"DUTCH HARBOR, AK",DUTCH HARBOR,AK
6,EGL,"EAGLE, AK",EAGLE,AK
7,FRB,"FAIRBANKS, AK",FAIRBANKS,AK
8,HOM,"HOMER, AK",HOMER,AK
9,HYD,"HYDER, AK",HYDER,AK


In [109]:
iport_df.write.mode('overwrite').parquet('/home/mabloq/airflow/data/parquet/i94ports/')

## 3. Read and write i94 countries to parquet 

In [130]:
countries_df = spark.read.csv("/home/mabloq/airflow/data/i_country.txt",sep="=",header=True,ignoreLeadingWhiteSpace=True, \
                         ignoreTrailingWhiteSpace=True,quote="'")


In [None]:
from pyspark.sql.types import ShortType as Shrt
countries_df = countries_df.withColumn('country_id',col('country_id').cast(Shrt()))

In [35]:
countries_df.write.mode('overwrite').parquet('/home/mabloq/airflow/data/parquet/i94countries/')

In [129]:
countries_df.printSchema()

root
 |-- country_id: string (nullable = true)
 |-- name: string (nullable = true)



## 4. Read and write i94 Visa Codes to Parquet

In [3]:
visa_df = spark.read.csv("/home/mabloq/airflow/data/visa_types.csv",header=True)

In [4]:
visa_df.head(5)

[Row(code='B1', name='Visa Holders-Business'),
 Row(code='B2', name='Visa Holders-Pleasure'),
 Row(code='E1', name='Visa Holders-Treaty Trader'),
 Row(code='E2', name='Visa Holders-Treaty Investor'),
 Row(code='F1', name='Visa Holders-Students')]

In [6]:
visa_df.write.mode('overwrite').parquet('/home/mabloq/airlfow/data/parquet/visa_codes/')

In [40]:
#visa_df.toPandas()

## 5. Read and write demographics to parquet

In [3]:
demographics_df = spark.read.csv("/home/mabloq/airflow/data/us-cities-demographics.csv",sep=";",header=True)

In [41]:
#demographics_df.toPandas()

In [40]:
port_df= spark.read.parquet("/home/mabloq/airflow/data/parquet/i94ports/")
#port_df.toPandas()

In [65]:
demographics_df.select('Race').distinct().toPandas()

Unnamed: 0,Race
0,Black or African-American
1,Hispanic or Latino
2,White
3,Asian
4,American Indian and Alaska Native


In [6]:
from pyspark.sql.functions import udf
@udf('string')
def gen_demo_id(race):
    if race == "Black or African-American":
        return "BAA"
    elif race == "Hispanic or Latino":
        return "HL"
    elif race == "White":
        return "W"
    elif race == "Asian":
        return "A"
    elif race == "American Indian and Alaska Native":
        return "AI"
    else:
        return "O"

In [7]:
demographics_df = demographics_df.withColumn('race_code',gen_demo_id("Race"))

In [69]:
#demographics_df.toPandas()

In [36]:
from pyspark.sql.functions import upper,trim
joined_df = demographics_df.join(port_df, [upper(demographics_df['City']) == upper(port_df['city']), \
                                 trim(demographics_df['State Code']) == trim(port_df['state'])])

        
joined_df = joined_df.select(demographics_df["*"], port_df['port_id'])
#joined_df.toPandas()

In [37]:
from pyspark.sql.functions import concat, col, lit
joined_df = joined_df.withColumn('demographics_id', concat(col("port_id"),lit("-"),col("race_code")))
# joined_df.toPandas()

In [38]:
joined_df = joined_df.selectExpr("demographics_id", "port_id", "city", "state", \
                                             "`Median Age` as median_age", "`Male Population` as male_population", \
                                              "`Female Population` as female_population", \
                                             "`Total Population` as total_population", \
                                             "`Average Household Size` as avg_household_size", \
                                             "`Foreign-born` as foreign_born", "`Race` as race", \
                                             "race_code")

#joined_df.toPandas()

In [None]:
joined_df = joined_df.withColumn("demographics_id", col("demographics_id").cast(Str()))
joined_df = joined_df.withColumn("port_id", col("port_id").cast(Str()))
joined_df = joined_df.withColumn("city", col("city").cast(Str()))
joined_df = joined_df.withColumn("state", col("state").cast(Str()))
joined_df = joined_df.withColumn("median_age", col("median_age").cast(Dec(4,1)))
joined_df = joined_df.withColumn("male_population", col("male_population").cast(Int()))
joined_df = joined_df.withColumn("female_population",col("female_population").cast(Int()))
joined_df = joined_df.withColumn("total_population", col("total_population").cast(Int()))
joined_df = joined_df.withColumn("avg_household_size", col("avg_household_size").cast(Dec(3,2)))
joined_df = joined_df.withColumn("foreign_born", col("foreign_born").cast(Int()))
joined_df = joined_df.withColumn("race", col("race").cast(Str()))
joined_df = joined_df.withColumn("race_code", col("race_code").cast(Str()))


In [None]:
joined_df = joined_df.fillna(0)

In [39]:
joined_df.write.mode('overwrite').parquet('/home/mabloq/airflow/data/parquet/demographics/')

## 6. Read visa codes and write to parquet

In [47]:
visa_codes_df = spark.read.csv("/home/mabloq/airflow/data/visa_types.csv",header=True)
#visa_codes_df.toPandas()

In [11]:
visa_codes_df.write.mode('overwrite').parquet("/home/mabloq/airflow/data/parquet/visa_codes/")

NameError: name 'visa_codes_df' is not defined

# PART 2: Insert to Database

In [57]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark import SparkConf
spark = SparkSession.builder \
            .config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11") \
            .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
            .config("spark.hadoop.fs.s3a.access.key", "AKIA35IS6VTE57XE2BN6") \
            .config("spark.hadoop.fs.s3a.secret.key", "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") \
            .config("spark.hadoop.fs.s3a.multiobjectdelete.enable","false") \
            .config("spark.hadoop.fs.s3a.fast.upload","true") \
            .config("spark.sql.parquet.filterPushdown", "true") \
            .config("spark.sql.parquet.mergeSchema", "false") \
            .config("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2") \
            .config("spark.speculation", "false") \
            .enableHiveSupport().getOrCreate()



In [2]:
sf = spark.read.parquet("s3a://mabloq-udacity/parquet/demographics/")

In [66]:
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, ArrayType, StructField as Fld, FloatType as Flt,DecimalType as Dec, StringType as Str, IntegerType as Int, LongType as Long
sf = sf.withColumn("avg_household_size", col("avg_household_size").cast(Dec(3,2)))


In [67]:

sf.printSchema()

root
 |-- demographics_id: string (nullable = true)
 |-- port_id: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- median_age: string (nullable = true)
 |-- male_population: string (nullable = true)
 |-- female_population: string (nullable = true)
 |-- total_population: string (nullable = true)
 |-- avg_household_size: decimal(3,2) (nullable = true)
 |-- foreign_born: string (nullable = true)
 |-- race: string (nullable = true)
 |-- race_code: string (nullable = true)



In [57]:
df3.toPandas()

Py4JJavaError: An error occurred while calling o633.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 2, localhost, executor driver): java.io.FileNotFoundException: No such file or directory: s3a://mabloq-udacity/parquet/demographics/part-00000-ccc08972-2908-4157-bace-c619d363fd3f-c000.snappy.parquet
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:127)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:177)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.scan_nextBatch_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:153)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:148)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
	at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:557)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:345)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:194)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:299)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3257)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3254)
	at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3254)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.FileNotFoundException: No such file or directory: s3a://mabloq-udacity/parquet/demographics/part-00000-ccc08972-2908-4157-bace-c619d363fd3f-c000.snappy.parquet
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:127)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:177)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.scan_nextBatch_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:153)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:148)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
	at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:557)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:345)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:194)


In [42]:
from pyspark.sql.types import StructType, ArrayType, StructField as Fld, FloatType as Flt,DecimalType as Dec, StringType as Str, IntegerType as Int, LongType as Long
cSchema =  StructType([
  Fld("demographics_id", Str(), False),
  Fld("port_id", Str(), False),
  Fld("city", Str(), True),
  Fld("state", Str(), True),
  Fld("median_age", Int(), True),
  Fld("male_population", Int(), True),
  Fld("female_population", Int(), True),
  Fld("total_population", Int(), True),
  Fld("avg_household_size", Dec(3,2), True),
  Fld("foreign_born", Int(), True),
  Fld("race", Str(), True),
  Fld("race_code", Str(), True)])                     

In [49]:
df3.write.mode("overwrite").parquet("s3a://mabloq-udacity/parquet/demographics")

Py4JJavaError: An error occurred while calling o431.parquet.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:566)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 7.0 failed 1 times, most recent failure: Lost task 0.0 in stage 7.0 (TID 7, localhost, executor driver): org.apache.spark.SparkException: Task failed while writing rows.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.FileNotFoundException: No such file or directory: s3a://mabloq-udacity/parquet/demographics/part-00000-48c71fb2-60fa-4213-a663-c1237ff83d72-c000.snappy.parquet
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:127)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:177)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.scan_nextBatch_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:153)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:148)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
	at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:557)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:345)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:194)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:167)
	... 33 more
Caused by: org.apache.spark.SparkException: Task failed while writing rows.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: java.io.FileNotFoundException: No such file or directory: s3a://mabloq-udacity/parquet/demographics/part-00000-48c71fb2-60fa-4213-a663-c1237ff83d72-c000.snappy.parquet
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:127)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:177)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.scan_nextBatch_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:153)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:148)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
	at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:557)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:345)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:194)


In [27]:
df.printSchema()

root
 |-- demographics_id: string (nullable = true)
 |-- port_id: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- median_age: integer (nullable = true)
 |-- male_population: integer (nullable = true)
 |-- female_population: integer (nullable = true)
 |-- total_population: integer (nullable = true)
 |-- avg_household_size: decimal(3,2) (nullable = true)
 |-- foreign_born: integer (nullable = true)
 |-- race: string (nullable = true)
 |-- race_code: string (nullable = true)



In [3]:
spark.stop()

In [24]:
i94 = spark.read.parquet("s3a://mabloq-udacity/i94")

In [19]:
from pyspark.sql.functions import udf
from pyspark.sql.types import DateType
from datetime import datetime
from datetime import timedelta

@udf(DateType())
def date_add_(start, add):
    if type(start) is not datetime:
        date = datetime.strptime(start, "%Y-%m-%d")
        if add is None:
            return date

        return date + timedelta(add)

    if add is None:
        return start

    return start + timedelta(add)


In [25]:
from pyspark.sql.functions import date_add, lit, col, to_date
from pyspark.sql.types import IntegerType as Int
i94 = i94.selectExpr("cicid as cic_id", "visatype as visa_id", "i94port as port_id", "airline as airline_id",
                                "i94cit as cit_id",
                                "i94res as res_id", "i94yr as year", "i94mon as month", "i94bir as age", "gender",
                                "arrdate as arrival_date", "depdate as depart_date", "dtadfile as date_begin",
                                "dtaddto as date_end")



i94 = i94.withColumn('cic_id', col('cic_id').cast(Int()))
i94 = i94.withColumn('cit_id', col('cit_id').cast(Int()))
i94 = i94.withColumn('res_id', col('res_id').cast(Int()))
i94 = i94.withColumn('year', col('year').cast(Int()))
i94 = i94.withColumn('month', col('month').cast(Int()))
i94 = i94.withColumn('age', col('age').cast(Int()))

i94 = i94.withColumn('date_begin', to_date('date_begin', 'yyyyMMdd'))
i94 = i94.withColumn('date_end', to_date('date_end', 'MMddyyyy'))

i94 = i94.withColumn('depart_date', col('depart_date').cast(Int()))
i94 = i94.withColumn('sas_date', lit("1960-01-01"))
i94 = i94.withColumn('depart_date', date_add_('sas_date', 'depart_date'))


i94 = i94.withColumn('arrival_date', col('arrival_date').cast(Int()))
i94 = i94.withColumn('arrival_date', date_add_('sas_date', 'arrival_date'))
i94.drop('sas_date')
i94.printSchema()
i94.limit(10).toPandas()

# i94.head(5)

root
 |-- cic_id: integer (nullable = true)
 |-- visa_id: string (nullable = true)
 |-- port_id: string (nullable = true)
 |-- airline_id: string (nullable = true)
 |-- cit_id: integer (nullable = true)
 |-- res_id: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- arrival_date: date (nullable = true)
 |-- depart_date: date (nullable = true)
 |-- date_begin: date (nullable = true)
 |-- date_end: date (nullable = true)
 |-- sas_date: string (nullable = false)



Unnamed: 0,cic_id,visa_id,port_id,airline_id,cit_id,res_id,year,month,age,gender,arrival_date,depart_date,date_begin,date_end,sas_date
0,5748517,B1,LOS,QF,245,438,2016,4,40,F,2016-04-30,2016-05-08,2016-04-30,2016-10-29,1960-01-01
1,5748518,B1,LOS,VA,245,438,2016,4,32,F,2016-04-30,2016-05-17,2016-04-30,2016-10-29,1960-01-01
2,5748519,B1,LOS,DL,245,438,2016,4,29,M,2016-04-30,2016-05-08,2016-04-30,2016-10-29,1960-01-01
3,5748520,B1,LOS,DL,245,438,2016,4,29,F,2016-04-30,2016-05-14,2016-04-30,2016-10-29,1960-01-01
4,5748521,B1,LOS,DL,245,438,2016,4,28,M,2016-04-30,2016-05-14,2016-04-30,2016-10-29,1960-01-01
5,5748522,B2,HHW,NZ,245,464,2016,4,57,M,2016-04-30,2016-05-05,2016-04-30,2016-10-29,1960-01-01
6,5748523,B2,HHW,NZ,245,464,2016,4,66,F,2016-04-30,2016-05-12,2016-04-30,2016-10-29,1960-01-01
7,5748524,B2,HHW,NZ,245,464,2016,4,41,F,2016-04-30,2016-05-12,2016-04-30,2016-10-29,1960-01-01
8,5748525,B2,HOU,NZ,245,464,2016,4,27,M,2016-04-30,2016-05-07,2016-04-30,2016-10-29,1960-01-01
9,5748526,B2,LOS,NZ,245,464,2016,4,26,F,2016-04-30,2016-05-07,2016-04-30,2016-10-29,1960-01-01


In [119]:
i94 = i94.drop('sas_date')
i94.limit(10).toPandas()

Unnamed: 0,cic_id,visa_id,port_id,airline_id,cit_id,res_id,year,month,age,gender,arrival_Date,depart_date,date_begin,date_end
0,5748517.0,B1,LOS,QF,245.0,438.0,2016.0,4.0,40.0,F,2016-04-30,20582.0,20160430,10292016
1,5748518.0,B1,LOS,VA,245.0,438.0,2016.0,4.0,32.0,F,2016-04-30,20591.0,20160430,10292016
2,5748519.0,B1,LOS,DL,245.0,438.0,2016.0,4.0,29.0,M,2016-04-30,20582.0,20160430,10292016
3,5748520.0,B1,LOS,DL,245.0,438.0,2016.0,4.0,29.0,F,2016-04-30,20588.0,20160430,10292016
4,5748521.0,B1,LOS,DL,245.0,438.0,2016.0,4.0,28.0,M,2016-04-30,20588.0,20160430,10292016
5,5748522.0,B2,HHW,NZ,245.0,464.0,2016.0,4.0,57.0,M,2016-04-30,20579.0,20160430,10292016
6,5748523.0,B2,HHW,NZ,245.0,464.0,2016.0,4.0,66.0,F,2016-04-30,20586.0,20160430,10292016
7,5748524.0,B2,HHW,NZ,245.0,464.0,2016.0,4.0,41.0,F,2016-04-30,20586.0,20160430,10292016
8,5748525.0,B2,HOU,NZ,245.0,464.0,2016.0,4.0,27.0,M,2016-04-30,20581.0,20160430,10292016
9,5748526.0,B2,LOS,NZ,245.0,464.0,2016.0,4.0,26.0,F,2016-04-30,20581.0,20160430,10292016


In [112]:
i94 = i94.withColumn('new_date', date_add(col('sas_date'), col('arrival_date')))

TypeError: Column is not iterable

In [65]:
i94_df = spark.read.format('com.github.saurfang.sas.spark').load('/home/mabloq/immigration_data/18-83510-I94-Data-2016/*')

Py4JJavaError: An error occurred while calling o3540.load.
: java.io.FileNotFoundException: File /home/mabloq/immigration_data/18-83510-I94-Data-2016/* does not exist
	at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:611)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:601)
	at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:421)
	at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.<init>(ChecksumFileSystem.java:142)
	at org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:346)
	at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:769)
	at com.github.saurfang.sas.spark.SasRelation.inferSchema(SasRelation.scala:115)
	at com.github.saurfang.sas.spark.SasRelation.<init>(SasRelation.scala:50)
	at com.github.saurfang.sas.spark.SasRelation$.apply(SasRelation.scala:42)
	at com.github.saurfang.sas.spark.DefaultSource.createRelation(DefaultSource.scala:50)
	at com.github.saurfang.sas.spark.DefaultSource.createRelation(DefaultSource.scala:39)
	at com.github.saurfang.sas.spark.DefaultSource.createRelation(DefaultSource.scala:27)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:318)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


In [63]:
df.printSchema()

root
 |-- cic_id: integer (nullable = true)
 |-- port_id: string (nullable = true)
 |-- visa_id: string (nullable = true)
 |-- cit_id: short (nullable = true)
 |-- res_id: short (nullable = true)
 |-- year: short (nullable = true)
 |-- month: short (nullable = true)
 |-- age: short (nullable = true)
 |-- gender: string (nullable = true)
 |-- arrival_date: date (nullable = true)
 |-- depart_date: integer (nullable = true)
 |-- date_begin: date (nullable = true)
 |-- date_end: date (nullable = true)

