In [1]:
import pyspark
# Read in data from S3 Buckets
from pyspark import SparkFiles, SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext('local')
spark = SparkSession(sc)

In [2]:
# Load in data
url = "https://finalproject-uoft.s3.amazonaws.com/IRCC_M_PRadmiss_0002_CSV.csv"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("IRCC_M_PRadmiss_0002_CSV.csv"), sep="\t", header=True, inferSchema=True)
df.show()

+-------+----------+--------+--------+------------+-------+-------------------------+----------------------+-----+
|EN_YEAR|EN_QUARTER|EN_MONTH|FR_ANNEÉ|FR_TRIMESTRE|FR_MOIS|EN_COUNTRY_OF_CITIZENSHIP|FR_PAYS_DE_CITOYENNETÉ|TOTAL|
+-------+----------+--------+--------+------------+-------+-------------------------+----------------------+-----+
|   2018|        Q3|     Jul|    2018|          T3|  juil.|                  Germany|             Allemagne|  120|
|   2017|        Q4|     Oct|    2017|          T4|   oct.|                   Israel|                Israël|  125|
|   2016|        Q1|     Mar|    2016|          T1|   mars|                   Cyprus|                Chypre|    5|
|   2018|        Q4|     Nov|    2018|          T4|   nov.|                 Bulgaria|              Bulgarie|   15|
|   2016|        Q4|     Oct|    2016|          T4|   oct.|                  Denmark|              Danemark|    5|
|   2016|        Q3|     Sep|    2016|          T3|  sept.|        Chad, Republi

In [3]:
# Show schema to confirm date type
df.printSchema()

root
 |-- EN_YEAR: integer (nullable = true)
 |-- EN_QUARTER: string (nullable = true)
 |-- EN_MONTH: string (nullable = true)
 |-- FR_ANNEÉ: integer (nullable = true)
 |-- FR_TRIMESTRE: string (nullable = true)
 |-- FR_MOIS: string (nullable = true)
 |-- EN_COUNTRY_OF_CITIZENSHIP: string (nullable = true)
 |-- FR_PAYS_DE_CITOYENNETÉ: string (nullable = true)
 |-- TOTAL: string (nullable = true)



In [4]:
# Drop Columns not using
columns_to_drop = ['FR_ANNEÉ', 'FR_TRIMESTRE','FR_MOIS','FR_PAYS_DE_CITOYENNETÉ']
df = df.drop(*columns_to_drop)

In [5]:
df.show()

+-------+----------+--------+-------------------------+-----+
|EN_YEAR|EN_QUARTER|EN_MONTH|EN_COUNTRY_OF_CITIZENSHIP|TOTAL|
+-------+----------+--------+-------------------------+-----+
|   2018|        Q3|     Jul|                  Germany|  120|
|   2017|        Q4|     Oct|                   Israel|  125|
|   2016|        Q1|     Mar|                   Cyprus|    5|
|   2018|        Q4|     Nov|                 Bulgaria|   15|
|   2016|        Q4|     Oct|                  Denmark|    5|
|   2016|        Q3|     Sep|        Chad, Republic of|    5|
|   2018|        Q3|     Jul|       Korea, Republic of|  435|
|   2017|        Q4|     Oct|                  Ukraine|  215|
|   2015|        Q1|     Feb|       Bosnia-Herzegovina|   10|
|   2016|        Q4|     Oct|                  Uruguay|   --|
|   2018|        Q4|     Nov|                  Eritrea|  645|
|   2018|        Q4|     Nov|              New Zealand|   25|
|   2017|        Q4|     Dec|                   Turkey|   95|
|   2016

In [6]:
df.count()

8646

In [7]:
from pyspark.sql.types import IntegerType

newdf = df.withColumn("TOTAL", df["TOTAL"].cast(IntegerType()))

In [8]:
newdf.printSchema()

root
 |-- EN_YEAR: integer (nullable = true)
 |-- EN_QUARTER: string (nullable = true)
 |-- EN_MONTH: string (nullable = true)
 |-- EN_COUNTRY_OF_CITIZENSHIP: string (nullable = true)
 |-- TOTAL: integer (nullable = true)



In [9]:
newdf.show()

+-------+----------+--------+-------------------------+-----+
|EN_YEAR|EN_QUARTER|EN_MONTH|EN_COUNTRY_OF_CITIZENSHIP|TOTAL|
+-------+----------+--------+-------------------------+-----+
|   2018|        Q3|     Jul|                  Germany|  120|
|   2017|        Q4|     Oct|                   Israel|  125|
|   2016|        Q1|     Mar|                   Cyprus|    5|
|   2018|        Q4|     Nov|                 Bulgaria|   15|
|   2016|        Q4|     Oct|                  Denmark|    5|
|   2016|        Q3|     Sep|        Chad, Republic of|    5|
|   2018|        Q3|     Jul|       Korea, Republic of|  435|
|   2017|        Q4|     Oct|                  Ukraine|  215|
|   2015|        Q1|     Feb|       Bosnia-Herzegovina|   10|
|   2016|        Q4|     Oct|                  Uruguay| null|
|   2018|        Q4|     Nov|                  Eritrea|  645|
|   2018|        Q4|     Nov|              New Zealand|   25|
|   2017|        Q4|     Dec|                   Turkey|   95|
|   2016

In [10]:
#year_df = df.orderBy('EN_YEAR').select('EN_YEAR').distinct().toPandas()
#year_df.head()
#df.select('EN_YEAR').distinct().show()
yeardf = df.orderBy('EN_YEAR').select('EN_YEAR').distinct()

In [11]:
year_df = df.orderBy('EN_YEAR').select('EN_YEAR').distinct().toPandas()
year_df.head()

Unnamed: 0,EN_YEAR
0,2015
1,2016
2,2017
3,2018
4,2019


In [12]:
year_df['id'] = year_df.index+100

In [13]:
year_df.head()

Unnamed: 0,EN_YEAR,id
0,2015,100
1,2016,101
2,2017,102
3,2018,103
4,2019,104


In [14]:
yeardf.show()

+-------+
|EN_YEAR|
+-------+
|   2015|
|   2016|
|   2017|
|   2018|
|   2019|
+-------+



In [15]:
countrydf = df.orderBy('EN_COUNTRY_OF_CITIZENSHIP').select('EN_COUNTRY_OF_CITIZENSHIP').distinct()
countrydf.show()

+-------------------------+
|EN_COUNTRY_OF_CITIZENSHIP|
+-------------------------+
|              Afghanistan|
|                  Albania|
|                  Algeria|
|                  Andorra|
|                   Angola|
|      Antigua and Barbuda|
|                Argentina|
|                  Armenia|
|                Australia|
|                  Austria|
|               Azerbaijan|
|      Bahama Islands, The|
|                  Bahrain|
|               Bangladesh|
|                 Barbados|
|                  Belarus|
|                  Belgium|
|                   Belize|
|       Benin, Republic of|
|                   Bhutan|
+-------------------------+
only showing top 20 rows



In [16]:
countrydf.count()

196

In [17]:
datedf = df.orderBy('EN_QUARTER','EN_MONTH').select('EN_QUARTER','EN_MONTH').distinct()

In [18]:
date_df = df.orderBy('EN_QUARTER','EN_MONTH').select('EN_QUARTER','EN_MONTH').distinct().toPandas()

In [19]:
import datetime
#month_number = datetime.datetime.strptime(date_df['EN_MONTH'], '%b').month

#train['date1'] = train['ID'].apply(lambda x: datetime.strptime(x, '%Y%m%d%H'))

date_df['month_number'] = date_df['EN_MONTH'].apply(lambda x: datetime.datetime.strptime(x, '%b').month)

In [20]:
date_df.head()

Unnamed: 0,EN_QUARTER,EN_MONTH,month_number
0,Q1,Feb,2
1,Q1,Jan,1
2,Q1,Mar,3
3,Q2,Apr,4
4,Q2,Jun,6


In [21]:
datedf.show()

+----------+--------+
|EN_QUARTER|EN_MONTH|
+----------+--------+
|        Q1|     Feb|
|        Q1|     Jan|
|        Q1|     Mar|
|        Q2|     Apr|
|        Q2|     Jun|
|        Q2|     May|
|        Q3|     Aug|
|        Q3|     Jul|
|        Q3|     Sep|
|        Q4|     Dec|
|        Q4|     Nov|
|        Q4|     Oct|
+----------+--------+



In [37]:
from pyspark.sql.types import StringType, IntegerType
from pyspark.sql.functions import udf

#maturity_udf = udf(lambda age: "adult" if age >=18 else "child", StringType())
month_udf = udf(lambda x: datetime.datetime.strptime(x, '%b').month)

#df = sqlContext.createDataFrame([{'name': 'Alice', 'age': 1}])
datedf = datedf.withColumn("month_num", month_udf(datedf.EN_MONTH))


In [39]:
datedf.printSchema()

root
 |-- EN_QUARTER: string (nullable = true)
 |-- EN_MONTH: string (nullable = true)
 |-- month_num: string (nullable = true)



In [40]:
datedf.show()

+----------+--------+---------+
|EN_QUARTER|EN_MONTH|month_num|
+----------+--------+---------+
|        Q1|     Feb|        2|
|        Q1|     Jan|        1|
|        Q1|     Mar|        3|
|        Q2|     Apr|        4|
|        Q2|     Jun|        6|
|        Q2|     May|        5|
|        Q3|     Aug|        8|
|        Q3|     Jul|        7|
|        Q3|     Sep|        9|
|        Q4|     Dec|       12|
|        Q4|     Nov|       11|
|        Q4|     Oct|       10|
+----------+--------+---------+



In [42]:
datedf_exp = datedf.selectExpr("month_num", "EN_MONTH as month", "EN_QUARTER as quarter" )
datedf_exp.show()
datedf_exp.printSchema()

+---------+-----+-------+
|month_num|month|quarter|
+---------+-----+-------+
|        2|  Feb|     Q1|
|        1|  Jan|     Q1|
|        3|  Mar|     Q1|
|        4|  Apr|     Q2|
|        6|  Jun|     Q2|
|        5|  May|     Q2|
|        8|  Aug|     Q3|
|        7|  Jul|     Q3|
|        9|  Sep|     Q3|
|       12|  Dec|     Q4|
|       11|  Nov|     Q4|
|       10|  Oct|     Q4|
+---------+-----+-------+

root
 |-- month_num: string (nullable = true)
 |-- month: string (nullable = true)
 |-- quarter: string (nullable = true)



In [43]:
# Change ID type
datedf_exp = datedf_exp.withColumn("month_num", datedf_exp["month_num"].cast(IntegerType()))

In [44]:
datedf_exp.show()
datedf_exp.printSchema()

+---------+-----+-------+
|month_num|month|quarter|
+---------+-----+-------+
|        2|  Feb|     Q1|
|        1|  Jan|     Q1|
|        3|  Mar|     Q1|
|        4|  Apr|     Q2|
|        6|  Jun|     Q2|
|        5|  May|     Q2|
|        8|  Aug|     Q3|
|        7|  Jul|     Q3|
|        9|  Sep|     Q3|
|       12|  Dec|     Q4|
|       11|  Nov|     Q4|
|       10|  Oct|     Q4|
+---------+-----+-------+

root
 |-- month_num: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- quarter: string (nullable = true)



In [23]:
# Import date time functions
from pyspark.sql.functions import year, month
from pyspark.sql.functions import udf

In [24]:
def month_string_to_number(string):
    m = {
        'jan': 1,
        'feb': 2,
        'mar': 3,
        'apr':4,
         'may':5,
         'jun':6,
         'jul':7,
         'aug':8,
         'sep':9,
         'oct':10,
         'nov':11,
         'dec':12
        }
    s = string.strip()[:3].lower()

    try:
        out = m[s]
        return out
    except:
        raise ValueError('Not a month')
monthWithPython = udf(lambda z: month_converter(z), IntegerType())

In [25]:
def month_converter(month):
    months = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
    return months.index(month) + 1
monthWithPython2 = udf(lambda z: month_converter(z), IntegerType())

In [41]:
type(datedf)

pyspark.sql.dataframe.DataFrame

In [46]:
year_df.rename(columns={'EN_YEAR':'year'}, inplace=True)
year_df = year_df[['id', 'year']]
year_df.head()

Unnamed: 0,id,year
0,100,2015
1,101,2016
2,102,2017
3,103,2018
4,104,2019


In [47]:
year_exp = spark.createDataFrame(year_df)
year_exp.show()
year_exp.printSchema()

+---+----+
| id|year|
+---+----+
|100|2015|
|101|2016|
|102|2017|
|103|2018|
|104|2019|
+---+----+

root
 |-- id: long (nullable = true)
 |-- year: long (nullable = true)



In [48]:
# Change ID type
year_exp = year_exp.withColumn("id", year_exp["id"].cast(IntegerType()))
year_exp = year_exp.withColumn("year", year_exp["year"].cast(IntegerType()))
year_exp.printSchema()


root
 |-- id: integer (nullable = true)
 |-- year: integer (nullable = true)



In [28]:
country_df = df.orderBy('EN_COUNTRY_OF_CITIZENSHIP').select('EN_COUNTRY_OF_CITIZENSHIP').distinct().toPandas()

In [29]:
country_df['country_id'] = country_df.index+1000
country_df.count()
country_df.head()

Unnamed: 0,EN_COUNTRY_OF_CITIZENSHIP,country_id
0,Afghanistan,1000
1,Albania,1001
2,Algeria,1002
3,Andorra,1003
4,Angola,1004


In [30]:
country_df.rename( columns={"EN_COUNTRY_OF_CITIZENSHIP": "country"}, inplace=True)
country_df = country_df[['country_id','country']]
country_df.head()

Unnamed: 0,country_id,country
0,1000,Afghanistan
1,1001,Albania
2,1002,Algeria
3,1003,Andorra
4,1004,Angola


In [31]:

country_exp = spark.createDataFrame(country_df)
country_exp.printSchema()

root
 |-- country_id: long (nullable = true)
 |-- country: string (nullable = true)



In [32]:
# Change ID type
country_exp = country_exp.withColumn("country_id", country_exp["country_id"].cast(IntegerType()))

In [33]:
# Connect and insert Data in the DB

# Configure settings for RDS
mode = "append"
# url = "jdbc:postgresql://198.123.43.24:5432/kockpit"
jdbc_url="jdbc:postgresql://classetlproject.ckrgp7m80r1y.us-east-1.rds.amazonaws.com:5432/migration_ca"
config = {"user":"root", 
          "password": "rootroot", 
          "driver":"org.postgresql.Driver"}

In [35]:
# Append DataFrame to active_user table in RDS

country_exp.write.jdbc(url=jdbc_url, table='countries', mode=mode, properties=config)

In [45]:
# Append DataFrame to active_user table in RDS

datedf_exp.write.jdbc(url=jdbc_url, table='date', mode=mode, properties=config)

In [49]:
# Append DataFrame to active_user table in RDS

year_exp.write.jdbc(url=jdbc_url, table='years', mode=mode, properties=config)

In [50]:
newdf.show()

+-------+----------+--------+-------------------------+-----+
|EN_YEAR|EN_QUARTER|EN_MONTH|EN_COUNTRY_OF_CITIZENSHIP|TOTAL|
+-------+----------+--------+-------------------------+-----+
|   2018|        Q3|     Jul|                  Germany|  120|
|   2017|        Q4|     Oct|                   Israel|  125|
|   2016|        Q1|     Mar|                   Cyprus|    5|
|   2018|        Q4|     Nov|                 Bulgaria|   15|
|   2016|        Q4|     Oct|                  Denmark|    5|
|   2016|        Q3|     Sep|        Chad, Republic of|    5|
|   2018|        Q3|     Jul|       Korea, Republic of|  435|
|   2017|        Q4|     Oct|                  Ukraine|  215|
|   2015|        Q1|     Feb|       Bosnia-Herzegovina|   10|
|   2016|        Q4|     Oct|                  Uruguay| null|
|   2018|        Q4|     Nov|                  Eritrea|  645|
|   2018|        Q4|     Nov|              New Zealand|   25|
|   2017|        Q4|     Dec|                   Turkey|   95|
|   2016

In [52]:
# Append DataFrame to active_user table in RDS

newdf.write.jdbc(url=jdbc_url, table='migration_stage', mode=mode, properties=config)