In [1]:
from pyspark.sql import SparkSession
import  pyspark.sql.functions as F
from pyspark.sql.functions import col, split, udf
from pyspark.sql.types import StructType as R, StructField as Fld, DoubleType as Dbl, StringType as Str, IntegerType as Int, DateType as Date
from pyspark.sql.types import StructType as R, StructField as Fld, DoubleType as Dbl, StringType as Str, IntegerType as Int, DateType as Date
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import desc


In [2]:
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()
#df_spark =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')


In [3]:
df_spark=spark.read.parquet("sas_data")

In [4]:
df_spark.head(1)

[Row(cicid=5748517.0, i94yr=2016.0, i94mon=4.0, i94cit=245.0, i94res=438.0, i94port='LOS', arrdate=20574.0, i94mode=1.0, i94addr='CA', depdate=20582.0, i94bir=40.0, i94visa=1.0, count=1.0, dtadfile='20160430', visapost='SYD', occup=None, entdepa='G', entdepd='O', entdepu=None, matflag='M', biryear=1976.0, dtaddto='10292016', gender='F', insnum=None, airline='QF', admnum=94953870030.0, fltno='00011', visatype='B1')]

In [5]:
df_spark.count()

3096313

In [6]:
mode = df_spark.groupby(['i94visa']).count()
mode.show(5)

+-------+-------+
|i94visa|  count|
+-------+-------+
|    1.0| 522079|
|    3.0|  43366|
|    2.0|2530868|
+-------+-------+



In [7]:
# interested only on air entry
immigrants=df_spark.where(F.col("i94mode")==1) 
immigrants.count()

2994505

In [8]:
immigrants.groupby(['i94visa']).count().show()

+-------+-------+
|i94visa|  count|
+-------+-------+
|    1.0| 512862|
|    3.0|  39495|
|    2.0|2442148|
+-------+-------+



In [9]:
immigrants.limit(3).toPandas().T

Unnamed: 0,0,1,2
cicid,5.74852e+06,5.74852e+06,5.74852e+06
i94yr,2016,2016,2016
i94mon,4,4,4
i94cit,245,245,245
i94res,438,438,438
i94port,LOS,LOS,LOS
arrdate,20574,20574,20574
i94mode,1,1,1
i94addr,CA,NV,WA
depdate,20582,20591,20582


In [10]:
immigrants=immigrants.select("cicid", "i94yr", "i94mon", "i94cit", "i94res", "i94port", "arrdate", "i94visa",  "biryear", "gender", "visatype", "airline")

In [11]:
immigrants.limit(3).toPandas().T

Unnamed: 0,0,1,2
cicid,5.74852e+06,5.74852e+06,5.74852e+06
i94yr,2016,2016,2016
i94mon,4,4,4
i94cit,245,245,245
i94res,438,438,438
i94port,LOS,LOS,LOS
arrdate,20574,20574,20574
i94visa,1,1,1
biryear,1976,1984,1987
gender,F,F,M


In [12]:
mode = immigrants.groupby(['i94cit']).count()
mode=mode.withColumn("i94cit", mode["i94cit"].cast(IntegerType())).orderBy(desc("count"))
mode.show(5)

+------+------+
|i94cit| count|
+------+------+
|   135|351396|
|   209|204462|
|   111|182316|
|   245|178077|
|   582|173774|
+------+------+
only showing top 5 rows



In [13]:
#immigrants.groupby(['i94cit']).count().orderBy(desc("count")).show()
#immigrants = immigrants.withColumn("i94cit", immigrants["i94cit"].cast(IntegerType()))
#immigrants = immigrants.withColumn("i94res", immigrants["i94res"].cast(IntegerType()))
#immigrants = immigrants.withColumn("i94visa", immigrants["i94visa"].cast(IntegerType()))
immigrants = immigrants.withColumn("biryear", immigrants["biryear"].cast(IntegerType()))
immigrants.show(5)

+---------+------+------+------+------+-------+-------+-------+-------+------+--------+-------+
|    cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94visa|biryear|gender|visatype|airline|
+---------+------+------+------+------+-------+-------+-------+-------+------+--------+-------+
|5748517.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|   1976|     F|      B1|     QF|
|5748518.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|   1984|     F|      B1|     VA|
|5748519.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|   1987|     M|      B1|     DL|
|5748520.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|   1987|     F|      B1|     DL|
|5748521.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|   1988|     M|      B1|     DL|
+---------+------+------+------+------+-------+-------+-------+-------+------+--------+-------+
only showing top 5 rows



In [14]:
Schema_country = R([
    Fld("id",Str()),
    Fld("country",Str())
    ])


In [15]:
country_dict=spark.read.csv(
    "country_dict.csv", header=False, mode="DROPMALFORMED",sep="=", schema=Schema_country)

In [16]:
country_dict.take(5)

[Row(id=' 582 ', country="  'MEXICO'"),
 Row(id='   236 ', country="  'AFGHANISTAN'"),
 Row(id='   101 ', country="  'ALBANIA'"),
 Row(id='   316 ', country="  'ALGERIA'"),
 Row(id='   102 ', country="  'ANDORRA'")]

In [17]:
spaceDeleteUDF = udf(lambda s: s.replace(" ", ""), Str())
ampesandDeleteUDF = udf(lambda s: s.replace("'", ""), Str())

country_dict.withColumn("id", spaceDeleteUDF("id"))
#country_dict.withColumn("id", ampesandDeleteUDF("country"))  #???
#immigrants = immigrants.withColumn("i94cit", immigrants["i94cit"].cast(IntegerType()))

country_dict.show()

+-------+-------------------+
|     id|            country|
+-------+-------------------+
|   582 |           'MEXICO'|
|   236 |      'AFGHANISTAN'|
|   101 |          'ALBANIA'|
|   316 |          'ALGERIA'|
|   102 |          'ANDORRA'|
|   324 |           'ANGOLA'|
|   529 |         'ANGUILLA'|
|   518 |  'ANTIGUA-BARBUDA'|
|   687 |        'ARGENTINA'|
|   151 |          'ARMENIA'|
|   532 |            'ARUBA'|
|   438 |        'AUSTRALIA'|
|   103 |          'AUSTRIA'|
|   152 |       'AZERBAIJAN'|
|   512 |          'BAHAMAS'|
|   298 |          'BAHRAIN'|
|   274 |       'BANGLADESH'|
|   513 |         'BARBADOS'|
|   104 |          'BELGIUM'|
|   581 |           'BELIZE'|
+-------+-------------------+
only showing top 20 rows



In [18]:
immigrants.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- biryear: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- visatype: string (nullable = true)
 |-- airline: string (nullable = true)



In [19]:
immigrants = immigrants.join(country_dict, immigrants.i94cit == country_dict.id,how='right') # Could also use 'left_outer'
immigrants=immigrants.withColumnRenamed("country", "cit_country")
immigrants=immigrants.drop('id','i94cit')
immigrants.show()

+---------+------+------+------+-------+-------+-------+-------+------+--------+-------+------------+
|    cicid| i94yr|i94mon|i94res|i94port|arrdate|i94visa|biryear|gender|visatype|airline| cit_country|
+---------+------+------+------+-------+-------+-------+-------+------+--------+-------+------------+
|5761447.0|2016.0|   4.0| 299.0|    LOS|20574.0|    2.0|   1972|     F|      B2|     KE|  'MONGOLIA'|
|5761448.0|2016.0|   4.0| 299.0|    CHI|20574.0|    2.0|   1974|     F|      B2|     SK|  'MONGOLIA'|
|5761451.0|2016.0|   4.0| 299.0|    WAS|20574.0|    3.0|   1982|     M|      F2|     UA|  'MONGOLIA'|
|5761452.0|2016.0|   4.0| 299.0|    WAS|20574.0|    2.0|   1990|     F|      B2|     UA|  'MONGOLIA'|
|5761453.0|2016.0|   4.0| 299.0|    WAS|20574.0|    1.0|   1966|     M|      B1|     KE|  'MONGOLIA'|
|5761454.0|2016.0|   4.0| 299.0|    WAS|20574.0|    2.0|   1950|     F|      B2|     KE|  'MONGOLIA'|
|5761455.0|2016.0|   4.0| 299.0|    WAS|20574.0|    2.0|   1970|     F|      B2|  

In [20]:
#immigrants=immigrants.drop('id')
immigrants.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- biryear: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- visatype: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- cit_country: string (nullable = true)



In [21]:
immigrants.take(3)

[Row(cicid=5761447.0, i94yr=2016.0, i94mon=4.0, i94res=299.0, i94port='LOS', arrdate=20574.0, i94visa=2.0, biryear=1972, gender='F', visatype='B2', airline='KE', cit_country="  'MONGOLIA'"),
 Row(cicid=5761448.0, i94yr=2016.0, i94mon=4.0, i94res=299.0, i94port='CHI', arrdate=20574.0, i94visa=2.0, biryear=1974, gender='F', visatype='B2', airline='SK', cit_country="  'MONGOLIA'"),
 Row(cicid=5761451.0, i94yr=2016.0, i94mon=4.0, i94res=299.0, i94port='WAS', arrdate=20574.0, i94visa=3.0, biryear=1982, gender='M', visatype='F2', airline='UA', cit_country="  'MONGOLIA'")]

In [22]:
immigrants = immigrants.join(country_dict, immigrants.i94res == country_dict.id,how='right') 
immigrants=immigrants.withColumnRenamed("country", "res_country")
immigrants=immigrants.drop('id','i94res')
immigrants.show()

+---------+------+------+-------+-------+-------+-------+------+--------+-------+------------+------------+
|    cicid| i94yr|i94mon|i94port|arrdate|i94visa|biryear|gender|visatype|airline| cit_country| res_country|
+---------+------+------+-------+-------+-------+-------+------+--------+-------+------------+------------+
|5761447.0|2016.0|   4.0|    LOS|20574.0|    2.0|   1972|     F|      B2|     KE|  'MONGOLIA'|  'MONGOLIA'|
|5761448.0|2016.0|   4.0|    CHI|20574.0|    2.0|   1974|     F|      B2|     SK|  'MONGOLIA'|  'MONGOLIA'|
|5761451.0|2016.0|   4.0|    WAS|20574.0|    3.0|   1982|     M|      F2|     UA|  'MONGOLIA'|  'MONGOLIA'|
|5761452.0|2016.0|   4.0|    WAS|20574.0|    2.0|   1990|     F|      B2|     UA|  'MONGOLIA'|  'MONGOLIA'|
|5761453.0|2016.0|   4.0|    WAS|20574.0|    1.0|   1966|     M|      B1|     KE|  'MONGOLIA'|  'MONGOLIA'|
|5761454.0|2016.0|   4.0|    WAS|20574.0|    2.0|   1950|     F|      B2|     KE|  'MONGOLIA'|  'MONGOLIA'|
|5761455.0|2016.0|   4.0|   

In [23]:
Schema_Visa = R([
    Fld("id",Str()),
    Fld("Visa_Type",Str())
    ])
visa_dict=spark.read.csv(
    "visa_dict.csv", header=False, mode="DROPMALFORMED",sep="=", schema=Schema_Visa)

In [24]:
visa_dict.show(3)

+---+---------+
| id|Visa_Type|
+---+---------+
| 1 | Business|
| 2 | Pleasure|
| 3 |  Student|
+---+---------+



In [25]:
immigrants = immigrants.join(visa_dict, immigrants.i94visa == visa_dict.id,how='right') 
immigrants=immigrants.drop('id','i94visa')
immigrants.show()

+---------+------+------+-------+-------+-------+------+--------+-------+-----------+-------------+---------+
|    cicid| i94yr|i94mon|i94port|arrdate|biryear|gender|visatype|airline|cit_country|  res_country|Visa_Type|
+---------+------+------+-------+-------+-------+------+--------+-------+-----------+-------------+---------+
|5748517.0|2016.0|   4.0|    LOS|20574.0|   1976|     F|      B1|     QF|    'CHINA'|  'AUSTRALIA'| Business|
|5748518.0|2016.0|   4.0|    LOS|20574.0|   1984|     F|      B1|     VA|    'CHINA'|  'AUSTRALIA'| Business|
|5748519.0|2016.0|   4.0|    LOS|20574.0|   1987|     M|      B1|     DL|    'CHINA'|  'AUSTRALIA'| Business|
|5748520.0|2016.0|   4.0|    LOS|20574.0|   1987|     F|      B1|     DL|    'CHINA'|  'AUSTRALIA'| Business|
|5748521.0|2016.0|   4.0|    LOS|20574.0|   1988|     M|      B1|     DL|    'CHINA'|  'AUSTRALIA'| Business|
|5748876.0|2016.0|   4.0|    HOU|20574.0|   1973|     M|      B1|     UA|    'CHINA'|     'MEXICO'| Business|
|5748877.0

In [28]:
immigrants.createOrReplaceTempView("immigrants")

In [29]:
immigrants.createOrReplaceTempView("immigrants")
immigrations = spark.sql("SELECT *, date_add(to_date('1960-01-01'), arrdate) AS arrival_date FROM immigrants")
immigrants=immigrants.drop('arrdate')

In [33]:
immigrations.show()

+---------+------+------+-------+-------+-------+------+--------+-------+-----------+-------------+---------+------------+
|    cicid| i94yr|i94mon|i94port|arrdate|biryear|gender|visatype|airline|cit_country|  res_country|Visa_Type|arrival_date|
+---------+------+------+-------+-------+-------+------+--------+-------+-----------+-------------+---------+------------+
|5748517.0|2016.0|   4.0|    LOS|20574.0|   1976|     F|      B1|     QF|    'CHINA'|  'AUSTRALIA'| Business|  2016-04-30|
|5748518.0|2016.0|   4.0|    LOS|20574.0|   1984|     F|      B1|     VA|    'CHINA'|  'AUSTRALIA'| Business|  2016-04-30|
|5748519.0|2016.0|   4.0|    LOS|20574.0|   1987|     M|      B1|     DL|    'CHINA'|  'AUSTRALIA'| Business|  2016-04-30|
|5748520.0|2016.0|   4.0|    LOS|20574.0|   1987|     F|      B1|     DL|    'CHINA'|  'AUSTRALIA'| Business|  2016-04-30|
|5748521.0|2016.0|   4.0|    LOS|20574.0|   1988|     M|      B1|     DL|    'CHINA'|  'AUSTRALIA'| Business|  2016-04-30|
|5748876.0|2016.