In [11]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T
from delta import *

In [12]:
builder = SparkSession.builder \
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [13]:
warehouse = "/home/phuc/Practice/DataScience/DSProject/data/warehouse"
warehouse_df = spark.read.format("delta").load(warehouse)
warehouse_df = warehouse_df.toPandas()
warehouse_df.to_csv("/home/phuc/Practice/DataScience/DSProject/data/job.csv")

### Read raw data

In [3]:
job_path = "/home/phuc/Practice/DataScience/DSProject/data/raw/careerlink/2023-12-17T03-36-11+00-00.csv"
job_df = spark.read.csv(job_path, header=True)
job_df.count()

11277

In [4]:
job_df.printSchema()

root
 |-- post_id: string (nullable = true)
 |-- job_title: string (nullable = true)
 |-- job_listed: string (nullable = true)
 |-- salary: string (nullable = true)
 |-- job_deadline: string (nullable = true)
 |-- company_name: string (nullable = true)
 |-- job_address: string (nullable = true)
 |-- job_experience_requied: string (nullable = true)
 |-- employment_type: string (nullable = true)
 |-- job_function: string (nullable = true)
 |-- education_level: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- Industries: string (nullable = true)
 |-- job_description: string (nullable = true)
 |-- skill: string (nullable = true)



### Transform and ingest into delta lake

In [5]:
import typer
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from delta.tables import DeltaTable
from delta import *
from job_transformation.careerlink.convert_job_listed_to_datetime import convert_to_job_listed_datetime
from job_transformation.careerlink.extract_job_address import extract_job_address
from job_transformation.careerlink.extract_job_deadline_date import extract_job_deadline_date
from job_transformation.careerlink.modify_company_name import modify_company_name
from job_transformation.careerlink.modify_job_title import modify_job_title
from job_transformation.careerlink.normalize_employment_type import normalize_employment_type
from job_transformation.careerlink.normalize_industries import normalize_industries
from job_transformation.careerlink.normalize_job_function import normalize_job_function
from job_transformation.careerlink.extract_min_max_salary import extract_min_max_salary
from job_transformation.careerlink.extract_min_max_yoe import extract_min_max_yoe
from utils.merge_schema import merge_schema

In [6]:
job_df = spark.read.csv(job_path, header=True).limit(100)
job_df = job_df.withColumn("company_name", modify_company_name(job_df["company_name"]))
job_df = job_df.withColumn("job_title", modify_job_title(job_df["job_title"]))
job_df = job_df.withColumn("job_listed", convert_to_job_listed_datetime(job_df["job_listed"]))
job_df = job_df.withColumn("job_address", extract_job_address(job_df["job_address"]))
job_df = job_df.withColumn("job_deadline", extract_job_deadline_date(job_df["job_deadline"]))
job_df = job_df.withColumn("employment_type", normalize_employment_type(job_df["employment_type"]))
job_df = job_df.withColumn("job_level", normalize_job_function(job_df["job_function"])).drop("job_function")
job_df = job_df.withColumn('industry', normalize_industries(job_df["Industries"])).drop("Industries")
job_df = extract_min_max_yoe(job_df, "job_experience_requied", "job_yoe_min", "job_yoe_max")
job_df = extract_min_max_salary(job_df, "salary", "salary_min", "salary_max")
job_df = job_df.withColumn("ingested_at", F.current_date())
job_df = job_df.withColumn("updated_at", F.current_date())

In [7]:
job_df.count()

100

In [8]:
job_df.show(5)

+-------+--------------------+----------+-------------------+------------+--------------------+-----------------+----------------------+---------------+-------------------+--------+--------------------+--------------------+---------+--------------------+-----------+-----------+----------+----------+-----------+----------+
|post_id|           job_title|job_listed|             salary|job_deadline|        company_name|      job_address|job_experience_requied|employment_type|    education_level|  gender|     job_description|               skill|job_level|            industry|job_yoe_min|job_yoe_max|salary_min|salary_max|ingested_at|updated_at|
+-------+--------------------+----------+-------------------+------------+--------------------+-----------------+----------------------+---------------+-------------------+--------+--------------------+--------------------+---------+--------------------+-----------+-----------+----------+----------+-----------+----------+
|2733613|Sales Executive -..

In [8]:
ingest_table = "/home/phuc/Practice/DataScience/DSProject/data/ingestion/careerlink"


if DeltaTable.isDeltaTable(spark, ingest_table):
    deltaTable = DeltaTable.forPath(spark, ingest_table)
    print(type(deltaTable))
    deltaTable, job_df = merge_schema(spark, deltaTable, job_df)
    print(job_df.count())
    delta_df = deltaTable.toDF()
    print(delta_df.count())

    cols = {}
    for col in job_df.columns:
        if col == "ingested_at":
            continue
        cols[col] = F.when(job_df[col].isNotNull(), job_df[col]).otherwise(delta_df[col])
        
    deltaTable.alias('ingestion_table').merge(
        job_df.alias('daily_table'),
        'ingestion_table.post_id = daily_table.post_id'
    ).whenNotMatchedInsertAll().whenMatchedUpdate(set=cols).execute()
    
else:
    job_df.write.format("delta").save(ingest_table)

23/12/17 11:28:10 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

In [14]:
from job_transformation.careerlink.transformation_and_ingestion import transform_and_ingest

daily_table = "/home/phuc/Practice/DataScience/DSProject/data/raw/careerlink/2023-12-17T03-36-11+00-00.csv"
ingest_table = "/home/phuc/Practice/DataScience/DSProject/data/ingestion/careerlink"

transform_and_ingest(
    daily_table=daily_table,
    ingest_table=ingest_table
)

23/12/17 11:26:08 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
23/12/17 11:26:08 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
23/12/17 11:26:08 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
23/12/17 11:26:08 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
23/12/17 11:26:08 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 63.33% for 12 writers
23/12/17 11:26:08 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
23/12/17 11:26:08 WARN MemoryManager: Total allocation exceeds 95.

In [9]:
ingest_df = spark.read.format("delta").load("/home/phuc/Practice/DataScience/DSProject/data/ingestion/careerlink")
ingest_df.count()

100

In [10]:
ingest_df.printSchema()

root
 |-- post_id: string (nullable = true)
 |-- job_title: string (nullable = true)
 |-- job_listed: date (nullable = true)
 |-- salary: string (nullable = true)
 |-- job_deadline: date (nullable = true)
 |-- company_name: string (nullable = true)
 |-- job_address: string (nullable = true)
 |-- job_experience_requied: string (nullable = true)
 |-- employment_type: string (nullable = true)
 |-- education_level: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- job_description: string (nullable = true)
 |-- skill: string (nullable = true)
 |-- job_level: string (nullable = true)
 |-- industry: string (nullable = true)
 |-- job_yoe_min: integer (nullable = true)
 |-- job_yoe_max: integer (nullable = true)
 |-- salary_min: integer (nullable = true)
 |-- salary_max: integer (nullable = true)
 |-- ingested_at: date (nullable = true)
 |-- updated_at: date (nullable = true)



In [33]:
ingest_df.show(5)

+-------+--------------------+----------+-------------------+------------+--------------------+-----------------+----------------------+---------------+-------------------+--------+--------------------+--------------------+---------+--------------------+-----------+-----------+----------+----------+-----------+----------+
|post_id|           job_title|job_listed|             salary|job_deadline|        company_name|      job_address|job_experience_requied|employment_type|    education_level|  gender|     job_description|               skill|job_level|            industry|job_yoe_min|job_yoe_max|salary_min|salary_max|ingested_at|updated_at|
+-------+--------------------+----------+-------------------+------------+--------------------+-----------------+----------------------+---------------+-------------------+--------+--------------------+--------------------+---------+--------------------+-----------+-----------+----------+----------+-----------+----------+
|2733613|Sales Executive -..