In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lower, lit, udf, split
from pyspark.sql.types import StringType
import os
import requests
from bs4 import BeautifulSoup
import pandas as pd

# Initialize Spark session
spark = SparkSession.builder \
    .appName("UnifyDatasets") \
    .getOrCreate()

from consts import QUESTIONS_PATH, open_csv_file

#### All code questions with their solutions

In [0]:
from pyspark.sql.functions import col, regexp_extract, size

def clean_html_content(html_content):
    """
    Remove HTML tags and normalize text from the given HTML content.
    Handles missing or invalid content gracefully.
    """
    if not isinstance(html_content, str):
        return None
    soup = BeautifulSoup(html_content, "html.parser")
    clean_text = soup.get_text(separator=" ")
    return " ".join(clean_text.split())

# Register the clean_html_content function as a UDF
clean_html_content_udf = udf(clean_html_content, StringType())

def get_solution_url(problem_number):
    base_url = "https://github.com/fishercoder1534/Leetcode/blob/master/src/main/java/com/fishercoder/solutions/"
    if problem_number < 1000:
        folder = "firstthousand"
    elif problem_number < 2000:
        folder = "secondthousand"
    elif problem_number < 3000:
        folder = "thirdthousand"
    else:
        folder = "fourththousand"
    return f"{base_url}{folder}/_{problem_number}.java"

def fetch_solution(url):
    """
    Fetch the raw content of the solution from the GitHub file URL.
    """
    try:
        # Convert the GitHub URL to the raw content URL
        raw_url = url.replace("github.com", "raw.githubusercontent.com").replace("/blob/", "/")
        
        # Fetch the solution content
        response = requests.get(raw_url)
        if response.status_code == 200:
            return response.text
        else:
            # Failed to fetch solution - ignore
            return None
    except Exception as e:
        print(f"Error fetching solution from {url}: {e}")
        return None

def unify_leetcode_datasets():
    leetcode_problems_content = open_csv_file(spark, QUESTIONS_PATH, "leetcode_problems_data.csv") \
        .drop("title", "likes", "dislikes") \
        .withColumnRenamed("slug", "formatted_title") \
        .withColumn("formatted_title", lower(col("formatted_title"))) \
        .withColumn("question", clean_html_content_udf(col("content"))) \
        .drop("content") \
        .filter(col("question").isNotNull())
    # Original columns: question_id,title,content,difficulty,likes,dislikes,slug.
    # New columns: question_id,formatted_title,question,difficulty.
    
    leetcode_problems_meta = open_csv_file(spark, QUESTIONS_PATH, "leetcode_problems_metadata.csv") \
        .drop("page_number", "is_premium", "title", "accepted", "submission", "solution", "discussion_count", "likes", "dislikes") \
        .withColumn("formatted_title", split(col("problem_URL"), "/").getItem(4)) \
        .drop("problem_URL") \
        .withColumnRenamed("id", "question_id") \
        .withColumnRenamed("problem_description", "question") \
        .withColumnRenamed("topic_tags", "topics") \
        .filter(col("question").isNotNull())
    # Original columns: id,page_number,is_premium,title,problem_description,topic_tags,difficulty,similar_questions,no_similar_questions,acceptance,accepted,submission,solution,discussion_count,likes,dislikes,problem_URL,solution_URL.
    # New columns: question_id,question,topics,difficulty,similar_questions,no_similar_questions,acceptance, formatted_title,solution_URL.
    
    pattern = r"/_([0-9]+)\.java"
    leetcode_links = open_csv_file(spark, QUESTIONS_PATH, "leetcode_problems&solutions_links.csv") \
        .drop("name") \
        .withColumn("formatted_title", split(col("link"), "/").getItem(4)) \
        .withColumn(
            "solution_idx",
            regexp_extract(col("solution"), pattern, 1)  # Extract the first capturing group
        ) \
        .drop("link", "solution")
    # Original columns: name,link,difficulty,solution.
    # New columns: formatted_title,difficulty,solution_idx.

    # Fetch solutions
    for row in leetcode_links.collect():
        idx = row["solution_idx"]
        if idx and idx.isdigit():
            solution_url = get_solution_url(int(idx))
            solution_content = fetch_solution(solution_url)
            leetcode_links = leetcode_links.withColumn("solution", lit(solution_content))
    leetcode_links = leetcode_links.drop("solution_idx")
    # New columns: formatted_title,difficulty,solution.
    
    # Merge datasets
    merge1 = leetcode_problems_content.join(leetcode_problems_meta, ["question_id", "formatted_title", "question", "difficulty"], "outer")
        # .withColumn("solution", lit(None).cast(StringType()))
    merge2 = merge1.join(leetcode_links, ["formatted_title", "difficulty"], "outer") \
        .filter(col("question").isNotNull())

    return merge2

leetcode_with_solutions = unify_leetcode_datasets()
display(leetcode_with_solutions)

#### General & DS open questions

In [0]:
from pyspark.sql.functions import lit
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

def unify_open_questions_datasets():
    """ Load and preprocess the open-ended question datasets. """
    data_science_questions = open_csv_file(spark, QUESTIONS_PATH, "open_questions_data_science.csv") \
        .withColumnRenamed("DESCRIPTION", "question") \
        .withColumnRenamed("ID", "question_id") \
        .withColumn("category", lit("Data Science")) \
        .withColumn("topics", lit("Data Science"))
    # Original columns: ID,DESCRIPTION
    # New columns: question_id,question,category,topics
    
    num_ds_rows = data_science_questions.count()
    window_spec = Window.orderBy(lit(0))

    general_questions = open_csv_file(spark, QUESTIONS_PATH, "general_open_questions.csv") \
        .withColumn("index", row_number().over(window_spec) - 1) \
        .withColumn("question_id", lit(num_ds_rows) + col("index") + 1) \
        .withColumn("category", lit("General")) \
        .withColumn("topics", lit("Soft Skills")) \
        .drop("index")
    # Original columns: question
    # New columns: question_id,question,category,topics

    # Concatenate both into one database
    open_questions_df = data_science_questions.unionByName(general_questions)
    
    return open_questions_df

open_questions_df = unify_open_questions_datasets()
display(open_questions_df)

#### Job postings data from all our resources

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
import os
from consts import JOBS_PATH

In [0]:
from pyspark.sql.functions import col, create_map, lit, when
from itertools import chain

seniority_mapping = {
    "Not Applicable": 0,
    "Non pertinent": 0,
    "Di-angkop": 0,
    "Stagiaire / Alternant": 0,
    "Internship": 0,
    "Entry level": 1,
    "Premier emploi": 1,
    "Associate": 1,
    "Mid-Senior level": 2,
    "Confirmé": 2,
    "Manager": 2,
    "Director": 2,
    "Executive": 2,
}
# Convert the dictionary into a format compatible with create_map
mapping_expr = create_map([lit(x) for x in chain(*seniority_mapping.items())])

# Replace all occurrences of '-1' in the dataset with None
def replace_minus_one_with_none(df):
    for column in df.columns:
        df = df.withColumn(column, when(col(column) == "-1", None).otherwise(col(column)))
    return df

def unify_jobpostings_datasets():
    job_descriptions_and_skills = open_csv_file(spark, JOBS_PATH, "job_descriptions_and_skills.csv") \
        .withColumnRenamed("category", "field") \
        .withColumnRenamed("job_description", "job_summary") \
        .withColumnRenamed("job_skill_set", "skills")
    # Original columns: job_id,category,job_title,job_description,job_skill_set
    # New columns: job_id,field,job_title,job_summary,skills

    linkedin_hightech_jobs = open_csv_file(spark, JOBS_PATH, "linkedin_hightech_jobs.csv") \
        .drop("url", "company_id", "job_location", "job_employment_type", "job_base_pay_range", "company_url", "job_posted_time", "job_num_applicants", "discovery_input") \
        .withColumnRenamed("job_posting_id", "job_id") \
        .withColumnRenamed("job_function", "field") \
        .withColumnRenamed("job_industries", "company_industry") \
        .withColumn("level", mapping_expr.getItem(col("job_seniority_level"))) \
        .drop("job_seniority_level")
    # Original columns: url,job_posting_id,job_title,company_name,company_id,job_location,job_summary,apply_link,job_seniority_level,job_function,job_employment_type,job_industries,job_base_pay_range,company_url,job_posted_time,job_num_applicants,discovery_input
    # New columns: job_id,job_title,company_name,job_summary,apply_link,level,field,company_industry

    indeed_jobs = open_csv_file(spark, JOBS_PATH, "indeed_jobs.csv") \
        .drop("JOB_URL", "DATE_OF_POSTING", "WEBSITE", "SALARY", "REMOTE", "CITIES", "STATE", "COUNTRY", "JOB_TYPE", "ZIPCODE", "WEBSITEPOSTING") \
        .withColumnRenamed("JOB_TITLE", "job_title") \
        .withColumnRenamed("COMPANY", "company_name") \
        .withColumnRenamed("INDUSTRY", "company_industry") \
        .withColumnRenamed("JOB_DESCRIPTION", "job_summary")
    # Original columns: JOB_URL,DATE_OF_POSTING,JOB_TITLE,COMPANY,WEBSITE,INDUSTRY,SALARY,REMOTE,CITIES,STATE,COUNTRY,JOB_TYPE,ZIPCODE,JOB_DESCRIPTION,WEBSITEPOSTING
    # New columns: job_title,company_name,company_industry,job_summary

    glassdoor_data_jobs = open_csv_file(spark, JOBS_PATH, "glassdoor_data_jobs_and_company_info.csv") \
        .drop("Salary Estimate", "Rating", "Location", "Size", "Founded", "Type of ownership", "Revenue") \
        .withColumnRenamed("Job Title", "job_title") \
        .withColumnRenamed("Job Description", "job_summary") \
        .withColumnRenamed("Company Name", "company_name") \
        .withColumnRenamed("Industry", "company_industry") \
        .withColumnRenamed("Sector", "field")
    glassdoor_data_jobs = replace_minus_one_with_none(glassdoor_data_jobs)
    glassdoor_data_jobs = glassdoor_data_jobs.withColumn("company_name", split(col("company_name"), "\n").getItem(0))
    # Original columns: Job Title,Salary Estimate,Job Description,Rating,Company Name,Location,Size,Founded,Type of ownership,Industry,Sector,Revenue
    # New columns: job_title,job_summary,company_name,company_industry,field

    linkedin_data_jobs = open_csv_file(spark, JOBS_PATH, "linkedin_data_jobs.csv") \
        .drop("Employment type", "company_id", "context", "date", "education", "location", "months_experience", "sal_high", "sal_low", "salary") \
        .withColumnRenamed("Industries", "company_industry") \
        .withColumnRenamed("Job function", "field") \
        .withColumnRenamed("company", "company_name") \
        .withColumnRenamed("description", "job_summary") \
        .withColumnRenamed("post_id", "job_id") \
        .withColumnRenamed("post_url", "post_link") \
        .withColumnRenamed("title", "job_title") \
        .withColumn("level", mapping_expr.getItem(col("Seniority level"))) \
        .drop("Seniority level")
    # Original columns: Employment type, Industries, Job function, Seniority level, company, company_id, context, date, description, education, location, months_experience, post_id, post_url, sal_high, sal_low, salary, title
    # New columns: company_industry,field,level,company_name,job_summary,job_id,post_link,job_title

    merge1 = job_descriptions_and_skills.join(linkedin_hightech_jobs, ["job_id", "field", "job_title", "job_summary"], "outer")
    merge2 = merge1.join(indeed_jobs, ["job_title", "company_name", "company_industry", "job_summary"], "outer")
    merge3 = merge2.join(glassdoor_data_jobs, ["job_title", "job_summary", "company_name", "company_industry", "field"], "outer")
    merge4 = merge3.join(linkedin_data_jobs, ["company_industry", "field", "level", "company_name", "job_summary", "job_id", "job_title"], "outer")
    
    return merge4

all_jobpostings = unify_jobpostings_datasets()
display(all_jobpostings)