# Transformation Pipeline - Gold [![259302-pipeline-management-aws-deployment-copy-icon.png](https://i.postimg.cc/3w6CMc6p/259302-pipeline-management-aws-deployment-copy-icon.png)](https://postimg.cc/zLCRK0qX)

##### Load Data

In [88]:
# Define file paths for the ingested data
placements_path = "Tables/Silver/tblPlacements" 
interviews_path = "Tables/Silver/tblInterviews"

StatementMeta(, c5b3ff5e-5432-4399-9557-b03dde529ae7, 90, Finished, Available, Finished)

In [89]:
# Load the data into DataFrames
placements_df = spark.read.format("delta").option("header", "true").load(placements_path)
interviews_df = spark.read.format("delta").option("header", "true").load(interviews_path)

StatementMeta(, c5b3ff5e-5432-4399-9557-b03dde529ae7, 91, Finished, Available, Finished)

##### EDA

In [90]:
# Inspect placement and interview data
print("Placements Schema:")
placements_df.printSchema()
print("Sample Placements Data:")
placements_df.show()

print("Interviews Schema:")
interviews_df.printSchema()
print("Sample Interviews Data:")
interviews_df.show()

StatementMeta(, c5b3ff5e-5432-4399-9557-b03dde529ae7, 92, Finished, Available, Finished)

Placements Schema:
root
 |-- PlacementId: integer (nullable = true)
 |-- Candidate_email: string (nullable = true)
 |-- Start_Date: date (nullable = true)
 |-- Status: string (nullable = true)
 |-- Marketing_Opt_Out: string (nullable = true)
 |-- First_Name: string (nullable = true)
 |-- Last_Name: string (nullable = true)
 |-- Full_Name: string (nullable = true)

Sample Placements Data:
+-----------+-----------------+----------+--------+-----------------+----------+---------+---------+
|PlacementId|  Candidate_email|Start_Date|  Status|Marketing_Opt_Out|First_Name|Last_Name|Full_Name|
+-----------+-----------------+----------+--------+-----------------+----------+---------+---------+
|          1|Francis@gmail.com|2024-04-11|  Active|            FALSE|   Francis|         | Francis |
|          2|Jessica@gmail.com|2024-05-10|  Active|             TRUE|   Jessica|         | Jessica |
|          3|Michael@gmail.com|2024-06-12|  Active|            FALSE|   Michael|         | Michael |
|  

##### Gold Layer Transformations

In [91]:
# Import dependencies
from pyspark.sql.functions import col, lit, when, year, row_number, coalesce
from pyspark.sql.window import Window
from datetime import datetime

# Calculate "last year" dynamically
current_year = datetime.now().year
last_year = current_year - 1

StatementMeta(, c5b3ff5e-5432-4399-9557-b03dde529ae7, 93, Finished, Available, Finished)

In [92]:
# Filter placements and interviews for last year
filtered_placements = placements_df.filter(year(col("Start_Date")) == last_year).select(
    col("Candidate_Email").alias("Email"),
    col("PlacementId").alias("Placement_Id"),
    col("Start_Date").alias("Placement_Date")
)

filtered_interviews = interviews_df.filter(year(col("Interview_Date")) == last_year).select(
    col("Candidate_Email").alias("Email"),
    col("Interview_Date").alias("Interviewed_Date")
)

StatementMeta(, c5b3ff5e-5432-4399-9557-b03dde529ae7, 94, Finished, Available, Finished)

In [93]:
# Perform an outer join to merge placements and interviews
merged_df = filtered_placements.join(
    filtered_interviews,
    on="Email",
    how="outer"
)

StatementMeta(, c5b3ff5e-5432-4399-9557-b03dde529ae7, 95, Finished, Available, Finished)

In [94]:
# Create final PlacementId and Interviewed_Date columns
merged_df = merged_df.withColumn(
    "PlacementId",
    coalesce(col("Placement_Id"), lit(None).cast("int"))
).withColumn(
    "Interviewed_Date",
    coalesce(col("Interviewed_Date"), lit(None).cast("date"))
)

StatementMeta(, c5b3ff5e-5432-4399-9557-b03dde529ae7, 96, Finished, Available, Finished)

In [95]:
# Add a Rank column to prioritize PlacementId over Interviewed_Date
merged_df = merged_df.withColumn(
    "Rank", when(col("PlacementId").isNotNull(), 1).otherwise(2)
)

StatementMeta(, c5b3ff5e-5432-4399-9557-b03dde529ae7, 97, Finished, Available, Finished)

In [96]:
# Use a window function to deduplicate by Candidate_Email
window_spec = Window.partitionBy("Email").orderBy("Rank")
deduplicated_df = merged_df.withColumn(
    "row_number", row_number().over(window_spec)
).filter(col("row_number") == 1).drop("row_number", "Rank")

StatementMeta(, c5b3ff5e-5432-4399-9557-b03dde529ae7, 98, Finished, Available, Finished)

In [97]:
# Select only the required columns
final_df = deduplicated_df.select(
    col("Email").alias("Candidate_Email"),
    "PlacementId",
    "Interviewed_Date"
)

StatementMeta(, c5b3ff5e-5432-4399-9557-b03dde529ae7, 99, Finished, Available, Finished)

##### Save Data to Gold Layer

In [98]:
# Save transformed data into gold layer
final_df.write.format("delta").mode("overwrite").save("Tables/Gold/placementsPrevYear")

print("Tables loaded into Gold Layer Successfully!")

StatementMeta(, c5b3ff5e-5432-4399-9557-b03dde529ae7, 100, Finished, Available, Finished)

Tables loaded into Gold Layer Successfully!
