In [1]:
#Import libraries
import boto3
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, concat_ws, from_unixtime, dense_rank, monotonically_increasing_id
from pyspark.sql.functions import hour, minute,  dayofmonth, month, year
from pyspark.sql.window import Window

In [2]:
#Initialing Spark Session
spark = SparkSession.builder.getOrCreate()

In [3]:
#Global Variables
Access_Key = ''
Secret_Key = ''

#Bucket
Bucket = ''
Prefix = ''
S3_Uri = f's3://{Bucket}/{Prefix}'

#AWS Redshift vars
host=''
port=''
user=''
password=''
database=''

In [4]:
session = boto3.Session(
    aws_access_key_id=Access_Key,
    aws_secret_access_key=Secret_Key
)
credentials = session.get_credentials()
spark._jsc.hadoopConfiguration().set(
    'fs.s3a.access.key', credentials.access_key)
spark._jsc.hadoopConfiguration().set(
    'fs.s3a.secret.key', credentials.secret_key)

In [5]:
#Review about the items in the bucket with the prefix
try:
    s3_client = session.client('s3')

    response = s3_client.list_objects_v2(
        Bucket=Bucket,
        Prefix=Prefix
    )
except Exception as e:
    print(f"Error: {e}")

Error: An error occurred (SignatureDoesNotMatch) when calling the ListObjectsV2 operation: The request signature we calculated does not match the signature you provided. Check your key and signing method.


# Reading data

In [6]:
ds = spark.read.json("data-sample.json")

#The credentials doesn't work, so I decided to leave evidence about how to read data from a Bucket in json or parquet format
#json_df = spark.read.json(S3_Uri)
#parquet_df = spark.read.parquet(S3_Uri)

In [7]:
#Data view
ds.show(1)

+--------------------+-------+----------+--------+----+-------+----+------+-----+
|     _corrupt_record|adverts|applicants|benefits|city|company|  id|sector|title|
+--------------------+-------+----------+--------+----+-------+----+------+-----+
|[{"id":"806a2843-...|   null|      null|    null|null|   null|null|  null| null|
+--------------------+-------+----------+--------+----+-------+----+------+-----+
only showing top 1 row



# ETL Process

In [8]:
ds = ds.drop('_corrupt_record')

In [9]:
ds.count()

300

In [10]:
ds.printSchema()

root
 |-- adverts: struct (nullable = true)
 |    |-- activeDays: long (nullable = true)
 |    |-- applyUrl: string (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- publicationDateTime: string (nullable = true)
 |    |-- status: string (nullable = true)
 |-- applicants: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- age: long (nullable = true)
 |    |    |-- applicationDate: string (nullable = true)
 |    |    |-- firstName: string (nullable = true)
 |    |    |-- lastName: string (nullable = true)
 |    |    |-- skills: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |-- benefits: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- city: string (nullable = true)
 |-- company: string (nullable = true)
 |-- id: string (nullable = true)
 |-- sector: string (nullable = true)
 |-- title: string (nullable = true)



In [11]:
ds = ds.withColumnRenamed('id', 'id_principal')

In [12]:
# Splittear las columnas
ds = ds.select("adverts.*", 
              explode("applicants").alias("applicant"), 
              "benefits", 
              "city", 
              "company", 
              "id_principal", 
              "sector", 
              "title")


In [13]:
ds.printSchema()

root
 |-- activeDays: long (nullable = true)
 |-- applyUrl: string (nullable = true)
 |-- id: string (nullable = true)
 |-- publicationDateTime: string (nullable = true)
 |-- status: string (nullable = true)
 |-- applicant: struct (nullable = true)
 |    |-- age: long (nullable = true)
 |    |-- applicationDate: string (nullable = true)
 |    |-- firstName: string (nullable = true)
 |    |-- lastName: string (nullable = true)
 |    |-- skills: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |-- benefits: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- city: string (nullable = true)
 |-- company: string (nullable = true)
 |-- id_principal: string (nullable = true)
 |-- sector: string (nullable = true)
 |-- title: string (nullable = true)



In [14]:
# Splittear las columnas dentro de "applicant"
ds = ds.select("activeDays", 
               "applyUrl", 
               "id", 
               "publicationDateTime", 
               "status",
               col("applicant.age").alias("age"),
               col("applicant.applicationDate").alias("applicationDate"),
               col("applicant.firstName").alias("firstName"),
               col("applicant.lastName").alias("lastName"),
               col("applicant.skills").alias("skills"),
               "benefits", 
               "city", 
               "company", 
               "id_principal", 
               "sector", 
               "title")

In [15]:
ds.show(1)

+----------+--------------------+--------------------+-------------------+------+---+---------------+---------+--------+--------------------+--------------------+--------+--------------------+--------------------+--------------------+------------------+
|activeDays|            applyUrl|                  id|publicationDateTime|status|age|applicationDate|firstName|lastName|              skills|            benefits|    city|             company|        id_principal|              sector|             title|
+----------+--------------------+--------------------+-------------------+------+---+---------------+---------+--------+--------------------+--------------------+--------+--------------------+--------------------+--------------------+------------------+
|         7|https://sourcefor...|4bfccc55-c9f9-423...|         1565329261|Active| 62|     1565588461|   Giraud|   Fakes|[Kettlebells, EoM...|[Medical Insuranc...|Dongtuan|Waters, Ferry and...|d137f114-b0b8-4cb...|Research and Deve...|Staf

In [16]:
#Splitting array columns
ds = ds.withColumn("benefits_split", concat_ws(", ", "benefits"))
ds = ds.withColumn("skills_split", concat_ws(", ", "skills"))

#Dropping array columns
Columns_to_drop = ["skills", "benefits"]
ds = ds.drop(*Columns_to_drop)

In [17]:
ds = ds.withColumnRenamed('id', 'id_adverts')

In [18]:
ds.printSchema()

root
 |-- activeDays: long (nullable = true)
 |-- applyUrl: string (nullable = true)
 |-- id_adverts: string (nullable = true)
 |-- publicationDateTime: string (nullable = true)
 |-- status: string (nullable = true)
 |-- age: long (nullable = true)
 |-- applicationDate: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- lastName: string (nullable = true)
 |-- city: string (nullable = true)
 |-- company: string (nullable = true)
 |-- id_principal: string (nullable = true)
 |-- sector: string (nullable = true)
 |-- title: string (nullable = true)
 |-- benefits_split: string (nullable = false)
 |-- skills_split: string (nullable = false)



# Ordering ds

In [19]:
ds = ds.select("id_principal",
               "id_adverts",
                "company",
               "city",
               "sector",
               "title",
               "status",
               "publicationDateTime",
               "activeDays",
               "applyUrl",
               "benefits_split",
               "firstName",
               "lastName",
               "age",
               "skills_split" ,
               "applicationDate"
              )

# Casting date colums

In [20]:
ds = ds.withColumn("publicationDateTime", from_unixtime(col("publicationDateTime")).cast("timestamp"))
ds = ds.withColumn("applicationDate", from_unixtime(col("applicationDate")).cast("timestamp"))


In [21]:
ds.show(1)

+--------------------+--------------------+--------------------+--------+--------------------+------------------+------+-------------------+----------+--------------------+--------------------+---------+--------+---+--------------------+-------------------+
|        id_principal|          id_adverts|             company|    city|              sector|             title|status|publicationDateTime|activeDays|            applyUrl|      benefits_split|firstName|lastName|age|        skills_split|    applicationDate|
+--------------------+--------------------+--------------------+--------+--------------------+------------------+------+-------------------+----------+--------------------+--------------------+---------+--------+---+--------------------+-------------------+
|d137f114-b0b8-4cb...|4bfccc55-c9f9-423...|Waters, Ferry and...|Dongtuan|Research and Deve...|Staff Accountant I|Active|2019-08-08 23:41:01|         7|https://sourcefor...|Medical Insurance...|   Giraud|   Fakes| 62|Kettlebell

In [22]:
df = ds.dropna(subset=["firstName"])

# Analysis
#In order to have the table facts and dimensions, I recommend:
#Facts: Application
#Dimensions: company, date, work, candidate

# Dimension

In [23]:
#Creating ID per dimensions 
df = df.withColumn("ID_Company", dense_rank().over(Window.orderBy("company")))
df = df.withColumn("ID_Date", dense_rank().over(Window.orderBy("publicationDateTime")))

#For candidate first of all I'm going to create the full name column to create and id based on that column
df = df.withColumn("Full_Name", concat_ws(" ",df.firstName, df.lastName))
df = df.withColumn("ID_Candidate", dense_rank().over(Window.orderBy("Full_Name")))

In [24]:
#Company
D_Company = df.select('ID_Company', 'company','city','sector').distinct()

In [25]:
D_Company.show(5,False)

+----------+-------------------------------+---------+--------+
|ID_Company|company                        |city     |sector  |
+----------+-------------------------------+---------+--------+
|1         |Abernathy, Haag and Wisozk     |Svetlyy  |Services|
|2         |Adams LLC                      |Alimono  |Training|
|3         |Altenwerth, Volkman and Runte  |Altavista|null    |
|4         |Altenwerth-Steuber             |Jianghu  |Support |
|5         |Ankunding, Jaskolski and Crooks|Górki    |Services|
+----------+-------------------------------+---------+--------+
only showing top 5 rows



In [26]:
#Date
D_Date = df.select('ID_Date', 'publicationDateTime', 'activeDays').dropDuplicates()

#Extracting day, month and year trnasformations
D_Date = D_Date.withColumn("_day", dayofmonth(D_Date["publicationDateTime"]))\
        .withColumn("_month", month(D_Date["publicationDateTime"]))\
        .withColumn("_year", year(D_Date["publicationDateTime"]))\
        .withColumn("_hour", hour(D_Date["publicationDateTime"]))\
        .withColumn("_minute", minute(D_Date["publicationDateTime"]))

In [27]:
D_Date.show(5,False)

+-------+-------------------+----------+----+------+-----+-----+-------+
|ID_Date|publicationDateTime|activeDays|_day|_month|_year|_hour|_minute|
+-------+-------------------+----------+----+------+-----+-----+-------+
|1      |2010-09-04 16:38:53|25        |4   |9     |2010 |16   |38     |
|2      |2010-09-16 14:10:58|7         |16  |9     |2010 |14   |10     |
|3      |2010-09-16 21:23:20|16        |16  |9     |2010 |21   |23     |
|4      |2010-09-25 01:49:18|28        |25  |9     |2010 |1    |49     |
|5      |2010-09-29 11:57:08|17        |29  |9     |2010 |11   |57     |
+-------+-------------------+----------+----+------+-----+-----+-------+
only showing top 5 rows



In [28]:
#Candidate
D_Candidate = df.select('ID_Candidate', 'Full_Name', 'firstName','lastName','age','skills_split','applicationDate').dropDuplicates()

In [33]:
D_Candidate.show(5)

+------------+-------------+---------+--------+---+--------------------+-------------------+
|ID_Candidate|    Full_Name|firstName|lastName|age|        skills_split|    applicationDate|
+------------+-------------+---------+--------+---+--------------------+-------------------+
|           1|  Aaren Legen|    Aaren|   Legen| 59|Ektron, Online Re...|2012-12-03 01:46:39|
|           2|  Abbie Ilett|    Abbie|   Ilett| 40|Hydrology, Profes...|2012-11-30 09:58:19|
|           3|Abbye Freathy|    Abbye| Freathy| 37|Igneous Petrology...|2018-04-09 10:00:01|
|           4| Abe Alenshev|      Abe|Alenshev| 25|Ultimate Frisbee,...|2014-09-11 15:42:49|
|           5|  Abe Gleeson|      Abe| Gleeson| 63|BDC, PWM, News Wr...|2015-02-21 06:18:04|
+------------+-------------+---------+--------+---+--------------------+-------------------+
only showing top 5 rows



In [30]:
#Work
D_Work = df.select('id_adverts', 'title', 'status', 'applyUrl', 'benefits_split').dropDuplicates()

In [32]:
D_Work.show(5)

+--------------------+--------------------+--------+--------------------+--------------------+
|          id_adverts|               title|  status|            applyUrl|      benefits_split|
+--------------------+--------------------+--------+--------------------+--------------------+
|1c9c2375-57aa-494...|       VP Accounting|  Active|https://army.mil/...|Medical Insurance...|
|47037f4f-2b75-41c...|Mechanical System...|Inactive|https://dailymoti...|Dental Plan, Home...|
|30b51262-b7f8-407...|Accounting Assist...|  Active|https://jugem.jp/...|Car, Dental Plan,...|
|a7ed7dfb-d4f2-472...|      Civil Engineer|  Active|http://angelfire....|Car, Home Office,...|
|7e1ace18-af5a-4d0...| Electrical Engineer| Deleted|https://usnews.co...|Phone, Home Offic...|
+--------------------+--------------------+--------+--------------------+--------------------+
only showing top 5 rows



# Facts Table

In [35]:
Fact_Table = df.select("id_principal",
                       "ID_Company",
                       "ID_Date",
                       "id_adverts",
                       "ID_Candidate"
                      )

In [None]:
import psycopg2
def InsertIntoRedshift(df, Table_Name):
    #Creating connection
    conn = psycopg2.connect(
        host=host,
        port=port,
        dbname=database,
        user=user,
        password=password
    )
    cursor = conn.cursor()
    #Creating Temp df
    table_name_temp = f'{Table_Name}_temp'
    df.createOrReplaceTempView(table_name_temp)

    #Inserting dataframe/table. In this part I assumed the tables already exists
    insert_query = f'''
        INSERT INTO Table_Name{}
        SELECT *
        FROM {table_name_temp}
    '''
    cursor.execute(insert_query)
    conn.commit()

    #Closing connection
    cursor.close()
    conn.close()