# Glide Technical Exercise  

## 1) PySpark set up

In [1]:
# required classes
import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.context import SparkContext

In [2]:
# set up configuration
conf = SparkConf() \
    .setMaster("local[*]") \
    .setAppName("glide") \
    .set("spark.jars", "/home/canovasjm/spark/spark-3.0.3-bin-hadoop3.2/jars/gcs-connector-hadoop3-latest.jar") \
    .set("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", "path/to/google_credentials.json")

sc = SparkContext(conf=conf)

sc._jsc.hadoopConfiguration().set("fs.AbstractFileSystem.gs.impl",  "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
sc._jsc.hadoopConfiguration().set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
sc._jsc.hadoopConfiguration().set("fs.gs.auth.service.account.json.keyfile", "/home/canovasjm/.google/credentials/google_credentials.json")
sc._jsc.hadoopConfiguration().set("fs.gs.auth.service.account.enable", "true")

22/03/08 05:03:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
# build Spark session
spark = SparkSession.builder \
    .config(conf=sc.getConf()) \
    .getOrCreate()

## 2) Import CSV files to pyspark data frame

### 2.1) Read files

In [4]:
# read CSV files located in GCS bucket
df = spark.read \
    .option("header", "true") \
    .csv("gs://dtc_data_lake_deng-338919/glide/*")

                                                                                

In [None]:
# print first 5 rows to check if df looks fine
df.show(5)

In [None]:
# print the shape of df
print((df.count(), len(df.columns)))

In [None]:
# check the df schema. As we see, all columns are read as string
df.schema

So far our all columns in our data frame are of type string. This is not optimal and won't allow us to do further manipulations of this data frame, so we need to fix it before moving on

### 2.2) Read files with a proper schema

In [5]:
from pyspark.sql import types

In [6]:
# define a proper schema
schema = types.StructType([
    types.StructField('snapshot_date', types.DateType(), True),
    types.StructField('employee_number', types.IntegerType(), True),
    types.StructField('status', types.StringType(), True),
    types.StructField('first_name', types.StringType(), True),
    types.StructField('last_name', types.StringType(), True),
    types.StructField('gender', types.StringType(), True),
    types.StructField('email', types.StringType(), True),
    types.StructField('phone_number', types.StringType(), True),
    types.StructField('salary', types.IntegerType(), True),
    types.StructField('termination_date', types.DateType(), True)
])

In [7]:
# read CSV files located in GCS bucket again, this time specifying the schema we defined above
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv("gs://dtc_data_lake_deng-338919/glide/*")

In [8]:
# check the schema
df.schema

StructType(List(StructField(snapshot_date,DateType,true),StructField(employee_number,IntegerType,true),StructField(status,StringType,true),StructField(first_name,StringType,true),StructField(last_name,StringType,true),StructField(gender,StringType,true),StructField(email,StringType,true),StructField(phone_number,StringType,true),StructField(salary,IntegerType,true),StructField(termination_date,DateType,true)))

## 3) Some data exploration

In [45]:
# Is there any record with employee_number NULL ?
df.filter("employee_number IS NULL").count()

                                                                                

0

In [46]:
# How many employees have a termination date different than NULL ?
df.filter("termination_date IS NOT NULL").show()

                                                                                

+-------------+---------------+--------+----------+---------+------+------------------+------------+------+----------------+
|snapshot_date|employee_number|  status|first_name|last_name|gender|             email|phone_number|salary|termination_date|
+-------------+---------------+--------+----------+---------+------+------------------+------------+------+----------------+
|   2020-01-04|             25|Inactive|    Stella|     Hunt|Female|s.hunt@newmail.com| 194-7397-62|122746|      2020-01-04|
+-------------+---------------+--------+----------+---------+------+------------------+------------+------+----------------+



In [9]:
from pyspark.sql import functions as F

In [48]:
# Examine records for employee_number == 25
df \
    .filter("employee_number == 25") \
    .orderBy(F.col("snapshot_date")) \
    .show()



+-------------+---------------+--------+----------+---------+------+------------------+------------+------+----------------+
|snapshot_date|employee_number|  status|first_name|last_name|gender|             email|phone_number|salary|termination_date|
+-------------+---------------+--------+----------+---------+------+------------------+------------+------+----------------+
|   2020-01-01|             25|  Active|    Stella|     Hunt|Female|s.hunt@newmail.com| 194-7397-62|122746|            null|
|   2020-01-02|             25|  Active|    Stella|     Hunt|Female|s.hunt@newmail.com| 194-7397-62|122746|            null|
|   2020-01-03|             25|  Active|    Stella|     Hunt|Female|s.hunt@newmail.com| 194-7397-62|122746|            null|
|   2020-01-04|             25|Inactive|    Stella|     Hunt|Female|s.hunt@newmail.com| 194-7397-62|122746|      2020-01-04|
+-------------+---------------+--------+----------+---------+------+------------------+------------+------+----------------+



                                                                                

In [49]:
# Examine records for employee_number == 3 , which is duplicated on January 5th
df \
    .filter("employee_number == 3") \
    .orderBy(F.col("snapshot_date")) \
    .show()



+-------------+---------------+------+----------+---------+------+--------------------+------------+------+----------------+
|snapshot_date|employee_number|status|first_name|last_name|gender|               email|phone_number|salary|termination_date|
+-------------+---------------+------+----------+---------+------+--------------------+------------+------+----------------+
|   2020-01-01|              3|Active|    Sydney|  Farrell|Female|s.farrell@newmail...| 187-8343-84|151217|            null|
|   2020-01-02|              3|Active|    Sydney|  Farrell|Female|s.farrell@newmail...| 187-8343-84|151217|            null|
|   2020-01-03|              3|Active|    Sydney|  Farrell|Female|s.farrell@newmail...| 187-8343-84|151217|            null|
|   2020-01-04|              3|Active|    Sydney|  Farrell|Female|s.farrell@newmail...| 187-8343-84|151217|            null|
|   2020-01-05|              3|Active|    Sydney|  Farrell|Female|s.farrell@newmail...| 187-8343-84|151217|            null|



                                                                                

## 4) Remove duplicates

In [50]:
# drop duplicated records
df = df.dropDuplicates()

In [51]:
# print the shape of df, after romoving the duplicate
print((df.count(), len(df.columns)))



(494, 10)




                                                                                

## 5) Staging table: `employee_snapshot`

In [10]:
# save the df as staging table
df.coalesce(1).write.csv(path='../data/processed/employee_snapshot/', mode='overwrite')

                                                                                

## 6) Date aggregation: `valid_from` and `valid_to`

In this step, my goal is to create a data frame containing `employee_number` and `status`, as well as two new columns with aggregated dates: `valid_from` and `valid_to`. 

My plan is to have, for each combination of employee and status, the date when the employee started and the date when the employee left the company. If the employee hasn't left the company yet, then I want to display the most recent snapshot date available.

In the next step, I will join the data frame created here to the data frame I read in step 2.2)

In [11]:
# create a temp table from the data frame
df.registerTempTable('table_df')

In [12]:
# create the new columns with Spark SQL
df_agg_dates = spark.sql("""
SELECT
    employee_number
    , status
    , MIN(snapshot_date) AS valid_from
    , MAX(snapshot_date) AS valid_to
FROM table_df
GROUP BY employee_number, status
ORDER BY employee_number, status
;
""")

In [55]:
# print some records of df_agg_dates to check how it looks
df_agg_dates.show(10)



+---------------+------+----------+----------+
|employee_number|status|valid_from|  valid_to|
+---------------+------+----------+----------+
|              1|Active|2020-01-01|2020-01-10|
|              2|Active|2020-01-01|2020-01-10|
|              3|Active|2020-01-01|2020-01-10|
|              4|Active|2020-01-01|2020-01-10|
|              5|Active|2020-01-01|2020-01-10|
|              6|Active|2020-01-01|2020-01-10|
|              7|Active|2020-01-01|2020-01-10|
|              8|Active|2020-01-01|2020-01-10|
|              9|Active|2020-01-01|2020-01-10|
|             10|Active|2020-01-01|2020-01-10|
+---------------+------+----------+----------+
only showing top 10 rows




                                                                                

## 7) Transformations

### 7.1) Join previously created to data frames

Now I will left join **df** and **df_agg_dates** on the composite key `['employee_number', 'status']` and save this to another data freme, named **df_result**. This way, **df_result** will have not only the original columns provided but also `valid_from` and `valid_to` created in 6)

In [13]:
# perform the join
df_result = df.join(df_agg_dates, on=['employee_number', 'status'], how='left')

In [14]:
# take a look at df_result selecting only some columns
df_result \
    .select("employee_number","status", "snapshot_date", "first_name", "last_name", "gender", "termination_date", "valid_from", "valid_to") \
    .orderBy(F.col("employee_number"), F.col("snapshot_date")) \
    .show()



+---------------+------+-------------+----------+---------+------+----------------+----------+----------+
|employee_number|status|snapshot_date|first_name|last_name|gender|termination_date|valid_from|  valid_to|
+---------------+------+-------------+----------+---------+------+----------------+----------+----------+
|              1|Active|   2020-01-01| Frederick|   Barnes|  Male|            null|2020-01-01|2020-01-10|
|              1|Active|   2020-01-02| Frederick|   Barnes|  Male|            null|2020-01-01|2020-01-10|
|              1|Active|   2020-01-03| Frederick|   Barnes|  Male|            null|2020-01-01|2020-01-10|
|              1|Active|   2020-01-04| Frederick|   Barnes|  Male|            null|2020-01-01|2020-01-10|
|              1|Active|   2020-01-05| Frederick|   Barnes|  Male|            null|2020-01-01|2020-01-10|
|              1|Active|   2020-01-06| Frederick|   Barnes|  Male|            null|2020-01-01|2020-01-10|
|              1|Active|   2020-01-07| Frederi


                                                                                

In [58]:
# create a temp table from df_result to use Spark SQL
df_result.registerTempTable('table_df_result')

### 7.2) Number rows ordered by `snapshot_date` descending

In [59]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

In [60]:
# define window specification
windowSpec = Window \
    .partitionBy("employee_number", "status") \
    .orderBy(F.col("snapshot_date").desc())

In [61]:
# number the rows in df_result partitioned by "employee_number" and "status" order by "snapshot_date" descending
df_final = df_result \
    .withColumn("rn",row_number().over(windowSpec)) \
    .filter("rn == 1") \
    .orderBy("employee_number", "status")

In [62]:
# print some records of df_final to check how it looks
df_final.show(30)



+---------------+--------+-------------+----------+----------+------+--------------------+------------+------+----------------+----------+----------+---+
|employee_number|  status|snapshot_date|first_name| last_name|gender|               email|phone_number|salary|termination_date|valid_from|  valid_to| rn|
+---------------+--------+-------------+----------+----------+------+--------------------+------------+------+----------------+----------+----------+---+
|              1|  Active|   2020-01-10| Frederick|    Barnes|  Male|f.barnes@newmail.com| 094-8926-78| 38582|            null|2020-01-01|2020-01-10|  1|
|              2|  Active|   2020-01-10|    Alford|     Grant|  Male| a.grant@newmail.com| 389-8947-85| 53126|            null|2020-01-01|2020-01-10|  1|
|              3|  Active|   2020-01-10|    Sydney|   Farrell|Female|s.farrell@newmail...| 187-8343-84|151217|            null|2020-01-01|2020-01-10|  1|
|              4|  Active|   2020-01-10|     Rosie|  Richards|Female|r.richa


                                                                                

In [63]:
# print the shape of df_final
print((df_final.count(), len(df_final.columns)))



(51, 13)



                                                                                

In [69]:
# save the df_final as employee table
df_final \
    .select("employee_number", "status", "snapshot_date", "first_name", "last_name", "gender", "email","phone_number", "salary", "termination_date", "valid_from", "valid_to") \
    .coalesce(1) \
    .write.csv(path='../data/processed/employee/', mode='overwrite')

                                                                                

### 7.2.bis) Number rows ordered by `snapshot_date` descending using SparkSQL

I'm including the following code chunks because I first tried the solution implemented in 7.1) using Spark SQL. But I got the following error: `cannot resolve '``rn``' given input columns`. 

It seems Spark SQL cannot resolve the alias `rn` in the `WHERE` clause. If I try to run `ROW_NUMBER() ...` as a sub-query in the `WHERE` clause I got another error stating this sub-query in not allowed here.

For illustration purpose, I commented out the `WHERE` clause below:

In [65]:
df_intermediate = spark.sql("""
SELECT
    employee_number
    , status
    , snapshot_date
    , ROW_NUMBER() OVER(PARTITION BY employee_number, status ORDER BY snapshot_date) AS rn
FROM table_df_result
-- WHERE rn = 1
GROUP BY employee_number, status, snapshot_date
ORDER BY employee_number, status, snapshot_date
;
""")

In [66]:
df_intermediate \
    .filter("rn == 1") \
    .show(30)



+---------------+--------+-------------+---+
|employee_number|  status|snapshot_date| rn|
+---------------+--------+-------------+---+
|              1|  Active|   2020-01-01|  1|
|              2|  Active|   2020-01-01|  1|
|              3|  Active|   2020-01-01|  1|
|              4|  Active|   2020-01-01|  1|
|              5|  Active|   2020-01-01|  1|
|              6|  Active|   2020-01-01|  1|
|              7|  Active|   2020-01-01|  1|
|              8|  Active|   2020-01-01|  1|
|              9|  Active|   2020-01-01|  1|
|             10|  Active|   2020-01-01|  1|
|             11|  Active|   2020-01-01|  1|
|             12|  Active|   2020-01-01|  1|
|             13|  Active|   2020-01-01|  1|
|             14|  Active|   2020-01-01|  1|
|             15|  Active|   2020-01-01|  1|
|             16|  Active|   2020-01-01|  1|
|             17|  Active|   2020-01-01|  1|
|             18|  Active|   2020-01-01|  1|
|             19|  Active|   2020-01-01|  1|
|         


                                                                                