In [None]:
from pyspark.sql import SparkSession

# Initialize a Spark session
spark = SparkSession.builder.appName("recruitment_transform").getOrCreate()

# Read all JSON files from the directory into a single DataFrame
df = spark.read.json("../recruitment_extract")
df.count()

In [None]:
# Filter out the corrupt records
df.cache() # Push the dataframe to RAM because the spark laziness can cause me an error :)
clean_corrupt_df = df.filter(df._corrupt_record.isNull())

record_count = clean_corrupt_df.count()

# Drop the _corrupt_record column from dataframe
clean_corrupt_df = clean_corrupt_df.drop("_corrupt_record")

# Show the schema and verify the column is removed
clean_corrupt_df.printSchema()
# clean_df.show(20)

# Print the result
print(f"Number of records in clean_df: {record_count}")

In [None]:
# Define the regex pattern for Korean characters
korean_pattern = "[\uAC00-\uD7A3]"  # This range includes Hangul syllables

# Remove records with Korean characters in the job_description
clean_korean = clean_corrupt_df.filter(~clean_corrupt_df.job_description.rlike(korean_pattern))

# Count remaining records
remaining_count = clean_korean.count()
print(f"Number of records after removing Korean entries: {remaining_count}")

# Drop the job_schedule since it might be too complex to extract it from raw data column from clean_df
clean_df = clean_korean.drop("job_schedule")

# Show the schema and verify the column is removed
clean_df.printSchema()
clean_df.show(2, truncate=True)

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import split, col, when, concat, lit

In [None]:
from pyspark.sql import functions as F

def split_job_description(clean_df):
    """
    I want to split job_description to 2 parts, the real job description and the job requirements
    Data from the careerlink website have the key is "Kinh nghiệm / Kỹ năng chi tiết" while vietnamworks have a key is "Yêu cầu công việc"
    """
    # Define the conditions for splitting the job_description
    clean_df_split = clean_df.withColumn(
        "job_description_s",
        when(col("job_description").contains("Yêu cầu công việc"), split(col("job_description"), "Yêu cầu công việc").getItem(0))
        .when(col("job_description").contains("Kinh nghiệm / Kỹ năng chi tiết"), split(col("job_description"), "Kinh nghiệm / Kỹ năng chi tiết").getItem(0))
        .otherwise(col("job_description"))
    ).withColumn(
        "job_requirements",
        when(col("job_description").contains("Yêu cầu công việc"), concat(lit("Yêu cầu công việc"), split(col("job_description"), "Yêu cầu công việc").getItem(1)))
        .when(col("job_description").contains("Kinh nghiệm / Kỹ năng chi tiết"), concat(lit("Kinh nghiệm / Kỹ năng chi tiết"), split(col("job_description"), "Kinh nghiệm / Kỹ năng chi tiết").getItem(1)))
        .otherwise(None)
    )
    
    # Select and return the new DataFrame with the new columns
    return clean_df_split

# Apply the function to your clean_df
split_df = split_job_description(clean_df)

# Show the result
split_df.show(truncate=False)


In [None]:
split_df = split_df.drop('job_description') # Drop the old job description since i have it splitted

In [None]:
# Clean the redundant \n in the data, i keep the "\n" at the end of every line because i might need them later
from pyspark.sql.functions import regexp_replace
from pyspark.sql import DataFrame

def reduce_newlines(df: DataFrame, columns: list) -> DataFrame:
    for column_name in columns:
        df = df.withColumn(column_name, regexp_replace(col(column_name), r"\n+", "\n"))
    return df

df_cleaned_n_1= reduce_newlines(split_df, ["job_description_s", "job_requirements"])
df_cleaned_n_1.show(truncate=False)

In [None]:
# Filter rows where "job_links" contains "career_link"
filtered_df = df_cleaned_n_1.filter(col("job_link").like("%careerlink%"))

# Show the filtered DataFrame
filtered_df.show(truncate=False)


In [None]:
# There are few <br> tag in the data need to be clean
def remove_br_tags(df: DataFrame, columns: list) -> DataFrame:
    for column_name in columns:
        df = df.withColumn(column_name, regexp_replace(col(column_name), "<br\\s*/?>", ""))
    return df

df_cleaned_br = remove_br_tags(df_cleaned_n_1, ["job_description_s", "job_requirements"])
df_cleaned_br.show(truncate=False)

In [None]:
""" 
A problem with data from Career link website is that some of their fields contain reduntant "\n", and it 
need to be clean before calculate the expire date so do i
"""

def clean_newlines(df: DataFrame, columns: list) -> DataFrame:
    for column in columns:
        df = df.withColumn(
            column,
            F.trim(F.regexp_replace(F.col(column), r'\n+', ' '))  # Replace multiple newlines with a space
        )  # Corrected placement of the closing parenthesis
    return df

columns_to_clean = ["job_expire_date","job_location", "job_salary", "job_title", "job_yoe"]  # Example column names
df_cleaned_n_2 = clean_newlines(df_cleaned_br, columns_to_clean)


In [None]:
# Filter rows where "job_links" contains "career_link"
filtered_df = df_cleaned_n_2.filter(col("job_link").like("%careerlink%"))

# Show the filtered DataFrame
filtered_df.show(truncate=False)


In [None]:
from pyspark.sql import functions as F
from pyspark.sql import DataFrame
# Calculate the exact expire date of the job (format dd/mm/yyyy) base on the column job_expire_date which the data is like "Het han trong x ngay"
def convert_expiry_date(df: DataFrame, crawl_date: str, date_col: str) -> DataFrame:
    # Define the crawl date
    crawl_date_col = F.to_date(F.lit(crawl_date), "dd/MM/yyyy")
    
    # Extract the number of days from the "job_expire_date" column and calculate the absolute expiration date
    df = df.withColumn(
        date_col,
        F.date_format(
            F.date_add(crawl_date_col, F.regexp_extract(F.col(date_col), r"(\d+)", 1).cast("int")), 
            "dd/MM/yyyy"
        )
    )
    
    return df

# Example usage
df_update_expire_date = convert_expiry_date(df_cleaned_n_2, "13/10/2024", "job_expire_date")
df_update_expire_date.show(truncate=True)


In [None]:
# Filter rows where "job_links" contains "career_link"
filtered_df = df_cleaned_n_2.filter(col("job_link").like("%careerlink%"))

# Show the filtered DataFrame
filtered_df.show(truncate=False)


In [None]:
df_update_expire_date.write \
.mode("overwrite") \
.option("path", "../recruitment_load") \
.save()

In [None]:
df_update_expire_date.rdd.getNumPartitions()