In [1]:
print('hello world')

hello world


In [18]:
# Starting Spark Session
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Education Gold Processing") \
    .getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
# Loading silver layer data 
silver_df = spark.read.parquet("datamart/silver/combined_resume_jd/*.parquet")

# Structure Silver Data
from utils.spark_utils import pyspark_df_info
pyspark_df_info(silver_df)
silver_df.show(5, truncate=False)


Total entries: 6241
Data columns (total 36 columns):
#   Column                    Non-Null Count     Dtype          
--- ------------------------- ------------------ ---------------
0   resume_id                 6241               string         
1   job_id                    6241               string         
2   snapshot_date             6241               date           
3   fit                       6241               string         
4   company_name              3908               string         
5   role_title                6046               string         
6   about_the_company         3664               string         
7   job_responsibilities      6241               array<string>  
8   jd_soft_skills            6241               array<string>  
9   required_language_proficiencies 6241               array<string>  
10  job_snapshot              6241               date           
11  jd_hard_skills_general    6240               array<string>  
12  jd_hard_skills_specific   

In [3]:
from pyspark.sql.functions import col, sum

# Assuming silver_df is your PySpark DataFrame
null_counts = silver_df.select(
    sum(col("required_edu_level").isNull().cast("int")).alias("null_count_required_edu_level"), # Alias is 'null_count_required_edu_level'
    sum(col("edu_highest_level").isNull().cast("int")).alias("null_count_highest_edu_level")
).collect()[0]

# --- This is the line that needs to be corrected ---
print(f"Null values in 'required_edu_level': {null_counts['null_count_required_edu_level']}")
# --- Make sure the key matches the alias exactly: ^ here

print(f"Null values in 'highest_edu_level': {null_counts['null_count_highest_edu_level']}")

Null values in 'required_edu_level': 1863
Null values in 'highest_edu_level': 462


In [4]:
print("Distinct required_edu_level values:")
print(silver_df.select('required_edu_level').distinct().count())



for edu_level in silver_df.select('required_edu_level').distinct().collect():
    c = silver_df.filter(silver_df.required_edu_level == edu_level.required_edu_level).count()
    print(f"{edu_level.required_edu_level} : {c}")

print("Distinct highest_edu_level values:")
print(silver_df.select('edu_highest_level').distinct().count())

for edu_level in silver_df.select('edu_highest_level').distinct().collect():
    c = silver_df.filter(silver_df.edu_highest_level == edu_level.edu_highest_level).count()
    print(f"{edu_level.edu_highest_level} : {c}")

Distinct required_edu_level values:
7
High School : 170
Master's Degree : 1069
Bachelor's Degree : 2984
Associate's Degree : 23
Others : 81
Doctorate : 51
None : 0
Distinct highest_edu_level values:
7
High School : 395
Master's Degree : 2365
Bachelor's Degree : 2147
Associate's Degree : 426
Others : 296
Doctorate : 150
None : 0


In [5]:
# Assuming you have a SparkSession named 'spark' and your DataFrame 'silver_df' is loaded.
# For example:
# from pyspark.sql import SparkSession
# spark = SparkSession.builder.appName("FeatureEngineering").getOrCreate()
# silver_df = spark.read.load(...) # Load your silver data

from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType, FloatType

# --- Step 1: Define the Ordinal Mapping ---
# This Python dictionary holds the ordinal mapping we've defined.
edu_level_mapping = {
    'Others': 0,
    'High School': 1,
    'Associate\'s Degree': 2,
    'Bachelor\'s Degree': 3,
    'Master\'s Degree': 4,
    'Doctorate': 5
}
# Note: 'None' is intentionally left out. It will become null, which we'll handle.

# --- Step 2: Create a Spark Mapping Expression ---
# We convert the Python dictionary into a Spark mapping expression by first flattening it
# into a list of [key1, value1, key2, value2, ...]. This is a more robust approach.
flat_map_list = [item for sublist in edu_level_mapping.items() for item in sublist]
mapping_expr = F.create_map([F.lit(x) for x in flat_map_list])

# --- Step 3: Add Ordinal Rank Columns to the DataFrame ---
# We create two new temporary columns that hold the integer rank for the candidate's
# education and the job's required education.
# The `mapping_expr[F.col(...)]` syntax performs the lookup.
gold_df = silver_df.withColumn(
    "highest_edu_rank",
    mapping_expr[F.col("edu_highest_level")].cast(IntegerType())
).withColumn(
    "required_edu_rank",
    mapping_expr[F.col("required_edu_level")].cast(IntegerType())
)

# --- Step 4: Create the 'edu_match_flag' Feature (Updated Null Logic) ---
# This feature indicates if the education level is a match, with special handling for nulls.
# - A candidate with a Doctorate (rank 5) is always considered a match (1).
# - If both levels are present, it's a match (1) only if they are exactly equal.
# - If both levels are present and not equal, it's not a match (0).
# - Otherwise (one is null and candidate is not a Doctorate), the outcome is null.
gold_df = gold_df.withColumn(
    "edu_match_flag",
    F.when(F.col("highest_edu_rank") == 5, 1)
     .when(F.col("highest_edu_rank") == F.col("required_edu_rank"), 1)
     .when(F.col("highest_edu_rank").isNotNull() & F.col("required_edu_rank").isNotNull(), 0)
     .otherwise(None)
)


# --- Step 5: Create the 'edu_score' Feature (Updated Null Logic) ---
# This implements the symmetric scoring logic with new exceptions for null values.
# - If candidate's education is null, score is null.
# - A Doctorate (rank 5) vs a null requirement is a perfect match (0.0).
# - A non-Doctorate vs a null requirement is an unknown match (null).
# - Otherwise, the score is calculated based on the rank difference.
max_rank_diff = float(max(edu_level_mapping.values()) - min(edu_level_mapping.values())) # This is 5.0

gold_df = gold_df.withColumn(
    "edu_score",
    F.when(
        F.col("highest_edu_rank").isNull(),
        None
    ).when(
        (F.col("highest_edu_rank") == 5) & F.col("required_edu_rank").isNull(),
        0.0
    ).when(
        F.col("required_edu_rank").isNull(),
        None
    ).otherwise(
        (F.col("highest_edu_rank") - F.col("required_edu_rank")) / max_rank_diff
    ).cast(FloatType())
)

# --- Step 6: Handle Null Values (Optional but Recommended) ---
# Our logic correctly produces nulls where education info was missing.
# For many models, you need to fill these. A common strategy is to fill with 0,
# assuming no information means no match.
# gold_df = gold_df.na.fill(value=0, subset=["edu_match_flag", "edu_score"])

# --- Step 7: Verify the Results and Clean Up ---
# Select the relevant columns to see the result of our transformations.
print("Verification of new education features:")
gold_df.select(
    "resume_id",
    "job_id",
    "edu_highest_level",
    "required_edu_level",
    "highest_edu_rank",
    "required_edu_rank",
    "edu_match_flag",
    "edu_score"
).show(20, truncate=False)

# Finally, you can drop the intermediate rank columns to keep the gold table clean.
final_gold_df = gold_df.drop("highest_edu_rank", "required_edu_rank")

print("\nFinal Gold DataFrame schema:")
final_gold_df.printSchema()


Verification of new education features:
+------------+-----------+-----------------+------------------+----------------+-----------------+--------------+---------+
|resume_id   |job_id     |edu_highest_level|required_edu_level|highest_edu_rank|required_edu_rank|edu_match_flag|edu_score|
+------------+-----------+-----------------+------------------+----------------+-----------------+--------------+---------+
|RES_s93wTCLp|JD_s93wTCLp|Master's Degree  |NULL              |4               |NULL             |NULL          |NULL     |
|RES_Z7yf1tu6|JD_Z7yf1tu6|Master's Degree  |Bachelor's Degree |4               |3                |0             |0.2      |
|RES_hAp1XnJZ|JD_hAp1XnJZ|Master's Degree  |NULL              |4               |NULL             |NULL          |NULL     |
|RES_zsgGxd2s|JD_zsgGxd2s|Master's Degree  |NULL              |4               |NULL             |NULL          |NULL     |
|RES_RrDNNvMz|JD_RrDNNvMz|Others           |Bachelor's Degree |0               |3           

In [6]:
# """
# This script contains functions to generate gold-level education features for the 
# resume-to-job-description matching project.
# """

# from pyspark.sql import DataFrame, SparkSession
# from pyspark.sql import functions as F
# from pyspark.sql.types import IntegerType, FloatType

# # --- Configuration: Define the Ordinal Mapping ---
# # This dictionary holds the ordinal mapping for education levels.
# # It's defined globally as it's a fixed business rule.
# EDU_LEVEL_MAPPING = {
#     'Others': 0,
#     'High School': 1,
#     'Associate\'s Degree': 2,
#     'Bachelor\'s Degree': 3,
#     'Master\'s Degree': 4,
#     'Doctorate': 5
# }

# def create_education_features(df: DataFrame) -> DataFrame:
#     """
#     Generates gold-level education features from a silver-level DataFrame.

#     This function takes a DataFrame and adds two new columns:
#     1.  'edu_match_flag': A binary flag indicating if education levels are an exact match,
#         with special handling for nulls and Doctorates.
#     2.  'edu_score': A normalized score from -1.0 to 1.0 representing the alignment
#         between the candidate's education and the job requirement.

#     It then drops the original and intermediate columns used for the calculation.

#     Args:
#         df (DataFrame): The input silver-level Spark DataFrame.

#     Returns:
#         DataFrame: A new DataFrame with the added gold features and dropped columns.
#     """
#     # --- Step 1: Create a Spark Mapping Expression ---
#     flat_map_list = [item for sublist in EDU_LEVEL_MAPPING.items() for item in sublist]
#     mapping_expr = F.create_map([F.lit(x) for x in flat_map_list])

#     # --- Step 2: Add Intermediate Ordinal Rank Columns ---
#     # These temporary columns hold the integer rank for education levels.
#     df_with_ranks = df.withColumn(
#         "highest_edu_rank",
#         mapping_expr[F.col("edu_highest_level")].cast(IntegerType())
#     ).withColumn(
#         "required_edu_rank",
#         mapping_expr[F.col("required_edu_level")].cast(IntegerType())
#     )

#     # --- Step 3: Create the 'edu_match_flag' Feature ---
#     df_with_flag = df_with_ranks.withColumn(
#         "edu_match_flag",
#         F.when(F.col("highest_edu_rank") == 5, 1)
#          .when(F.col("highest_edu_rank") == F.col("required_edu_rank"), 1)
#          .when(F.col("highest_edu_rank").isNotNull() & F.col("required_edu_rank").isNotNull(), 0)
#          .otherwise(None)
#     )

#     # --- Step 4: Create the 'edu_score' Feature ---
#     max_rank = max(EDU_LEVEL_MAPPING.values())
#     min_rank = min(EDU_LEVEL_MAPPING.values())
#     max_rank_diff = float(max_rank - min_rank)

#     df_with_score = df_with_flag.withColumn(
#         "edu_score",
#         F.when(F.col("highest_edu_rank").isNull(), None)
#          .when((F.col("highest_edu_rank") == 5) & F.col("required_edu_rank").isNull(), 0.0)
#          .when(F.col("required_edu_rank").isNull(), None)
#          .otherwise((F.col("highest_edu_rank") - F.col("required_edu_rank")) / max_rank_diff)
#          .cast(FloatType())
#     )

#     # --- Step 5: Handle Null Values and Clean Up ---
#     # Fill any remaining nulls in our new feature columns with 0.
#     # Then, drop the columns as requested.
#     final_df = df_with_score.na.fill(value=0, subset=["edu_match_flag", "edu_score"])

#     columns_to_drop = [
#         "edu_highest_level", "required_edu_level",
#         "highest_edu_rank", "required_edu_rank"
#     ]
    
#     return final_df.drop(*columns_to_drop)

# # --- Example Usage Block ---
# if __name__ == '__main__':
#     # This block demonstrates how to use the function.
#     # It will only run when the script is executed directly.
    
#     spark = SparkSession.builder \
#         .appName("EducationFeatureEngineering") \
#         .getOrCreate()

#     # Create a sample DataFrame that mimics your silver_df structure
#     sample_data = [
#         ("resume1", "job1", "Doctorate", "Master's Degree"),   # Overqualified
#         ("resume2", "job2", "Bachelor's Degree", "Bachelor's Degree"), # Exact match
#         ("resume3", "job3", "High School", "Master's Degree"),   # Underqualified
#         ("resume4", "job4", "Doctorate", None),                 # Null required, should be 1 / 0.0
#         ("resume5", "job5", "Master's Degree", None),          # Null required, should be null/null -> 0/0
#         ("resume6", "job6", None, "Bachelor's Degree"),        # Null highest, should be null/null -> 0/0
#     ]
#     columns = ["resume_id", "job_id", "edu_highest_level", "required_edu_level"]
#     silver_df_sample = spark.createDataFrame(sample_data, columns)

#     print("--- Sample Silver DataFrame ---")
#     silver_df_sample.show()

#     # Apply the feature engineering function
#     gold_df_sample = create_education_features(silver_df_sample)

#     print("\n--- Resulting Gold DataFrame ---")
#     gold_df_sample.show()
    
#     print("\n--- Gold DataFrame Schema ---")
#     gold_df_sample.printSchema()

#     spark.stop()


In [7]:
from pyspark.sql.functions import col, sum

# Assuming gold_df is your PySpark DataFrame
total_rows = gold_df.count()

null_counts = gold_df.select(
    sum(col("edu_match_flag").isNull().cast("int")).alias("null_count_edu_match_flag"),
    sum(col("edu_score").isNull().cast("int")).alias("null_count_edu_score")
).collect()[0]

print(f"Total rows in DataFrame: {total_rows}")
print(f"Null values in 'edu_match_flag': {null_counts['null_count_edu_match_flag']}")
print(f"Null values in 'edu_score': {null_counts['null_count_edu_score']}")

Total rows in DataFrame: 6241
Null values in 'edu_match_flag': 2137
Null values in 'edu_score': 2137


# GPA

In [9]:
from pyspark.sql import functions as F

# --- 1. Get Min, Max, and Mean using describe() ---
# The describe() function is a quick way to get several key stats.
# It computes the count, mean, standard deviation, min, and max.
print("--- Basic Statistics for edu_gpa ---")
silver_df.select("edu_gpa").describe().show()


# --- 2. Calculate the Median ---
# The median is the 50th percentile. In Spark, this is calculated using approxQuantile.
# The first argument is the column, the second is a list of quantiles (0.5 for median),
# and the third is the relative error (0.0 means perfect accuracy).
print("\n--- Median Calculation ---")
# approxQuantile returns a list, so we select the first (and only) element.
median_gpa = silver_df.approxQuantile("edu_gpa", [0.5], 0.0)[0]
print(f"Median GPA: {median_gpa}")


# --- 3. Calculate the Mode ---
# The mode is the most frequently occurring value. To find it, we group by the
# GPA values, count the occurrences of each, and find the one with the highest count.
print("\n--- Mode Calculation ---")
mode_df = silver_df.filter(F.col("edu_gpa").isNotNull()) \
                   .groupBy("edu_gpa") \
                   .count() \
                   .orderBy(F.col("count").desc())

# The mode is the value in the first row of the resulting DataFrame.
# .first() returns a Row object, so we access the value by its column name.
mode_gpa = mode_df.first()['edu_gpa']
print(f"Mode GPA: {mode_gpa}")

# You can also show the full frequency distribution to check for multiple modes
print("\n--- Top 5 Most Frequent GPA Values ---")
mode_df.show(5)

--- Basic Statistics for edu_gpa ---
+-------+-----------------+
|summary|          edu_gpa|
+-------+-----------------+
|  count|             1324|
|   mean|4.866694829615222|
| stddev|7.974494952646127|
|    min|             2.49|
|    max|            63.28|
+-------+-----------------+


--- Median Calculation ---
Median GPA: 3.759999990463257

--- Mode Calculation ---
Mode GPA: 4.0

--- Top 5 Most Frequent GPA Values ---
+-------+-----+
|edu_gpa|count|
+-------+-----+
|    4.0|  220|
|    3.6|  137|
|    3.5|  134|
|    3.9|   84|
|   3.86|   58|
+-------+-----+
only showing top 5 rows



In [10]:
from pyspark.sql import functions as F

# Filter the DataFrame to keep only rows where edu_gpa > 4, then count them.
gpa_outliers_count = silver_df.filter(F.col("edu_gpa") > 4).count()

print(f"Number of entries with GPA > 4: {gpa_outliers_count}")

Number of entries with GPA > 4: 66


In [12]:
from pyspark.sql import functions as F

# Chain two conditions inside the filter using the '&' (AND) operator
gpa_10_scale_count = silver_df.filter(
    (F.col("edu_gpa") > 4) & (F.col("edu_gpa") <= 10)
).count()

print(f"Number of entries with GPA > 4 and <= 10: {gpa_10_scale_count}")

Number of entries with GPA > 4 and <= 10: 42


In [13]:
from pyspark.sql import functions as F

# 1. Filter the DataFrame for rows where edu_gpa > 10
# 2. Select only the edu_gpa column
# 3. Collect the results to the driver as a list of Row objects
gpa_outliers_rows = silver_df.filter(F.col("edu_gpa") > 10).select("edu_gpa").collect()

# 4. Use a list comprehension to extract the float value from each Row object
gpa_outliers_list = [row.edu_gpa for row in gpa_outliers_rows]

# Print the final list
print("List of GPA values greater than 10:")
print(gpa_outliers_list)

# You can also print the count for context
print(f"\nTotal count of such values: {len(gpa_outliers_list)}")

List of GPA values greater than 10:
[63.279998779296875, 63.279998779296875, 63.279998779296875, 63.279998779296875, 63.279998779296875, 63.279998779296875, 63.279998779296875, 63.279998779296875, 63.279998779296875, 63.279998779296875, 63.279998779296875, 63.279998779296875, 63.279998779296875, 63.279998779296875, 63.279998779296875, 63.279998779296875, 63.279998779296875, 63.279998779296875, 63.279998779296875, 63.279998779296875, 63.279998779296875, 63.279998779296875, 63.279998779296875, 63.279998779296875]

Total count of such values: 24


# gpa standardization function

In [15]:
"""
This script contains a function to standardize GPA values from different scales
into a single 0-4 scale.
"""

from pyspark.sql import DataFrame, SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import FloatType

def standardize_gpa(df: DataFrame) -> DataFrame:
    """
    Standardizes the 'edu_gpa' column in a DataFrame to a 0-4 scale.

    The function applies the following rules:
    - Values <= 4 are kept as is.
    - Values > 4 and <= 10 are scaled down from a 10-point scale.
    - Values > 10 and <= 100 are scaled down from a 100-point scale.
    - Null values remain null.
    - Values > 100 are considered invalid and are set to null.

    Args:
        df (DataFrame): The input silver-level Spark DataFrame with an 'edu_gpa' column.

    Returns:
        DataFrame: A new DataFrame with the 'edu_gpa' column standardized.
    """
    print("Standardizing 'edu_gpa' column...")

    # Use a series of 'when' conditions to apply the standardization logic
    standardized_df = df.withColumn(
        "edu_gpa",
        F.when(F.col("edu_gpa").isNull(), F.lit(None))  # Keep nulls as null
         .when(F.col("edu_gpa") <= 4, F.col("edu_gpa"))  # Already on a 0-4 scale
         .when((F.col("edu_gpa") > 4) & (F.col("edu_gpa") <= 10), (F.col("edu_gpa") / 10.0) * 4.0)  # Convert from 10-point scale
         .when((F.col("edu_gpa") > 10) & (F.col("edu_gpa") <= 100), (F.col("edu_gpa") / 100.0) * 4.0) # Convert from 100-point scale
         .otherwise(F.lit(None))  # Set values > 100 or other edge cases to null
         .cast(FloatType()) # Ensure the final column is of FloatType
    )

    print("Standardization complete.")
    return standardized_df



In [16]:
gold_df = standardize_gpa(silver_df)
# Select the "jobid" and "edu_gpa" columns and show the top 10 rows
gold_df.select("job_id", "resume_id", "edu_gpa").show(10)

Standardizing 'edu_gpa' column...
Standardization complete.
+-----------+------------+-------+
|     job_id|   resume_id|edu_gpa|
+-----------+------------+-------+
|JD_s93wTCLp|RES_s93wTCLp|   NULL|
|JD_Z7yf1tu6|RES_Z7yf1tu6|   3.33|
|JD_hAp1XnJZ|RES_hAp1XnJZ|   NULL|
|JD_zsgGxd2s|RES_zsgGxd2s|   NULL|
|JD_RrDNNvMz|RES_RrDNNvMz|    3.7|
|JD_7rYoKTcb|RES_7rYoKTcb|    3.6|
|JD_dI74kKQw|RES_dI74kKQw|   NULL|
|JD_AAwaK8NY|RES_AAwaK8NY|   NULL|
|JD_jxAmUMgn|RES_jxAmUMgn|   NULL|
|JD_fTYeywv1|RES_fTYeywv1|    3.5|
+-----------+------------+-------+
only showing top 10 rows



In [17]:
from pyspark.sql import functions as F

# --- 1. Get Min, Max, and Mean using describe() ---
# The describe() function is a quick way to get several key stats.
# It computes the count, mean, standard deviation, min, and max.
print("--- Basic Statistics for edu_gpa ---")
gold_df.select("edu_gpa").describe().show()


# --- 2. Calculate the Median ---
# The median is the 50th percentile. In Spark, this is calculated using approxQuantile.
# The first argument is the column, the second is a list of quantiles (0.5 for median),
# and the third is the relative error (0.0 means perfect accuracy).
print("\n--- Median Calculation ---")
# approxQuantile returns a list, so we select the first (and only) element.
median_gpa = gold_df.approxQuantile("edu_gpa", [0.5], 0.0)[0]
print(f"Median GPA: {median_gpa}")


# --- 3. Calculate the Mode ---
# The mode is the most frequently occurring value. To find it, we group by the
# GPA values, count the occurrences of each, and find the one with the highest count.
print("\n--- Mode Calculation ---")
mode_df = gold_df.filter(F.col("edu_gpa").isNotNull()) \
                   .groupBy("edu_gpa") \
                   .count() \
                   .orderBy(F.col("count").desc())

# The mode is the value in the first row of the resulting DataFrame.
# .first() returns a Row object, so we access the value by its column name.
mode_gpa = mode_df.first()['edu_gpa']
print(f"Mode GPA: {mode_gpa}")

# You can also show the full frequency distribution to check for multiple modes
print("\n--- Top 5 Most Frequent GPA Values ---")
mode_df.show(5)

--- Basic Statistics for edu_gpa ---
+-------+-------------------+
|summary|            edu_gpa|
+-------+-------------------+
|  count|               1324|
|   mean| 3.6317906200525623|
| stddev|0.37492991501618805|
|    min|                1.9|
|    max|                4.0|
+-------+-------------------+


--- Median Calculation ---
Median GPA: 3.700000047683716

--- Mode Calculation ---
Mode GPA: 4.0

--- Top 5 Most Frequent GPA Values ---
+-------+-----+
|edu_gpa|count|
+-------+-----+
|    4.0|  220|
|    3.6|  137|
|    3.5|  134|
|    3.9|   84|
|   3.86|   58|
+-------+-----+
only showing top 5 rows



# Institution fanciness

In [21]:
from pyspark.sql import functions as F

# Filter out nulls, get distinct values, and then count them
distinct_institution_count = silver_df.filter(F.col("edu_institution").isNotNull()) \
                                      .distinct() \
                                      .count()

print(f"Number of distinct non-null edu_institution values: {distinct_institution_count}")

Number of distinct non-null edu_institution values: 5657


In [36]:
"""
This script takes a raw university rankings CSV file that contains multiple
junk header lines, cleans it, enforces a strict schema, and saves the result
as a schema-aware Parquet file.
"""
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

def clean_and_save_as_parquet(spark: SparkSession, input_path: str, output_path: str):
    """
    Reads a raw CSV with junk headers, cleans it, and saves it as a Parquet file.

    Args:
        spark (SparkSession): The active SparkSession.
        input_path (str): File path for the raw/dirty CSV.
        output_path (str): Folder path to save the clean Parquet file.
    """
    print(f"Reading raw text file from: {input_path}")
    
    # --- Step 1: Read the file as raw text to handle unpredictable headers ---
    # This is the most robust way to deal with junk lines.
    raw_text_df = spark.read.text(input_path)

    # --- Step 2: Filter out all junk/header lines ---
    # We keep only the lines that start with a number, which represent the actual data.
    # The regex '^\s*[0-9]' checks if the line starts with optional whitespace followed by a digit.
    data_lines_df = raw_text_df.filter(F.col("value").rlike(r"^\s*[0-9]"))

    # --- Step 3: Define and Enforce the Schema ---
    # Manually define the schema for our data.
    final_schema = StructType([
        StructField("Rank", IntegerType(), True),
        StructField("Institution", StringType(), True),
        StructField("Country", StringType(), True)
    ])
    
    # Split the text line by commas, trim whitespace, and cast to the correct types.
    split_col = F.split(data_lines_df['value'], ',')
    clean_df = data_lines_df.select(
        F.trim(split_col.getItem(0)).cast(IntegerType()).alias('Rank'),
        F.trim(split_col.getItem(1)).alias('Institution'),
        F.trim(split_col.getItem(2)).alias('Country')
    )
    
    # Final check to ensure the DataFrame schema matches our defined schema
    # (This is implicitly handled by the casts above, but is good practice to be aware of)
    
    print(f"Saving clean Parquet file to: {output_path}")

    # --- Step 4: Write the cleaned DataFrame to a Parquet file ---
    # Parquet is a columnar, schema-aware format, perfect for reference data.
    # .mode("overwrite") will replace the folder if it already exists.
    clean_df.write.mode("overwrite").parquet(output_path)

    print("Cleaning complete. A new Parquet file has been saved.")


# --- Example Usage Block ---
if __name__ == '__main__':
    spark = SparkSession.builder.appName("RankingsCleaner").getOrCreate()
    
    # --- Input and Output Paths ---
    raw_csv_path = "./qs_ranking_2021_clean.csv"
    # The output will be a folder containing the Parquet data.
    clean_parquet_path = "datamart/references/qs_rankings"
    
    try:
        clean_and_save_as_parquet(spark, raw_csv_path, clean_parquet_path)
        print(f"\nSuccessfully created clean Parquet data in the folder: '{clean_parquet_path}'")
        
        # You can read it back to verify the schema and data
        print("\n--- Verifying the first 5 rows and schema of the clean Parquet data ---")
        verified_df = spark.read.parquet(clean_parquet_path)
        verified_df.printSchema()
        verified_df.show(5)

    except Exception as e:
        print(f"An error occurred. Please ensure '{raw_csv_path}' exists and is accessible.")
        print(f"Error details: {e}")
        
    spark.stop()


Reading raw text file from: ./qs_ranking_2021_clean.csv
Saving clean Parquet file to: datamart/references/qs_rankings
Cleaning complete. A new Parquet file has been saved.

Successfully created clean Parquet data in the folder: 'datamart/references/qs_rankings'

--- Verifying the first 5 rows and schema of the clean Parquet data ---
root
 |-- Rank: integer (nullable = true)
 |-- Institution: string (nullable = true)
 |-- Country: string (nullable = true)

+----+--------------------+-------+
|Rank|         Institution|Country|
+----+--------------------+-------+
|   1|Massachusetts Ins...|    USA|
|   2| Stanford University|    USA|
|   3|  Harvard University|    USA|
|   4|California Instit...|    USA|
|   5|University of Oxford|     UK|
+----+--------------------+-------+
only showing top 5 rows



In [28]:
spark = SparkSession.builder.appName("CsvCleaner").getOrCreate()
    
raw_csv_path = "./qs_ranking_2021.csv"
    # Note: Spark's CSV writer saves to a folder. The actual .csv file will be inside.
clean_csv_folder_path = "./qs_ranking_2021_clean_csv"


clean_rankings_csv(spark, raw_csv_path, clean_csv_folder_path)


Reading raw CSV from: ./qs_ranking_2021.csv
Saving clean CSV to: ./qs_ranking_2021_clean_csv
Cleaning complete. A new CSV file has been saved.


In [41]:
"""
This script creates an 'institution_tier' feature by fuzzy matching against
a pre-cleaned, schema-enforced Parquet file of university rankings.
"""
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, IntegerType, FloatType, StructType, StructField
from rapidfuzz import fuzz # Using rapidfuzz for performance
import re

def clean_institution_name(name: str) -> str:
    """
    Cleans a university name to a standardized format for matching.
    """
    if name is None:
        return None
    name = name.lower()
    name = re.sub(r'[^\w\s]', '', name) # Remove punctuation
    
    # Remove common words and suffixes
    common_words = ['the', 'of', 'and', 'university', 'college', 'institute']
    query = name.split()
    result_words = [word for word in query if word.lower() not in common_words]
    name = ' '.join(result_words)
    
    name = name.strip() # Remove leading/trailing whitespace
    return name

def create_institution_features(spark: SparkSession, silver_df: DataFrame, rankings_parquet_path: str) -> DataFrame:
    """
    Adds an 'institution_tier' column to the DataFrame by matching against
    a clean Parquet reference file.

    Args:
        spark (SparkSession): The active SparkSession object.
        silver_df (DataFrame): The input silver DataFrame.
        rankings_parquet_path (str): The file path to the clean QS rankings Parquet folder.

    Returns:
        DataFrame: A new DataFrame with the new feature added.
    """
    # --- Step 1: Load and Prepare Rankings Data from Parquet (Simplified) ---
    # Reading from Parquet is much simpler as it's schema-aware.
    rankings_df = spark.read.parquet(rankings_parquet_path)
    
    clean_name_udf = F.udf(clean_institution_name, StringType())
    # We no longer need the 'Country' column for this feature.
    rankings_df = rankings_df.select(
        F.col("Institution").alias("qs_institution"),
        F.col("Rank").alias("qs_rank")
    ).withColumn("cleaned_qs_name", clean_name_udf(F.col("qs_institution")))

    # --- Step 2: Prepare Resume Data ---
    resume_institutions_df = silver_df.select("edu_institution").filter(F.col("edu_institution").isNotNull()).distinct()
    resume_institutions_df = resume_institutions_df.withColumn("cleaned_resume_name", clean_name_udf(F.col("edu_institution")))

    # --- Step 3: Fuzzy Match to Find the Best Ranked Match ---
    
    # Define a function for the UDF to return only rank and score
    def get_best_match(resume_name, rankings_broadcast):
        best_score = 0
        best_rank = None
        
        for row in rankings_broadcast.value:
            qs_name = row['cleaned_qs_name']
            score = fuzz.token_sort_ratio(resume_name, qs_name)
            if score > best_score:
                best_score = score
                best_rank = row['qs_rank']
        
        # Only accept matches with a high confidence score
        if best_score > 85:
            return (best_rank, best_score)
            
        # If no good match is found, return None/zero values.
        return (None, 0)
    
    rankings_list_broadcast = spark.sparkContext.broadcast(rankings_df.collect())
    
    # Define schema for the UDF's return type (no country)
    match_schema = StructType([
        StructField("matched_rank", IntegerType(), True),
        StructField("match_score", IntegerType(), True)
    ])
    get_best_match_udf = F.udf(lambda name: get_best_match(name, rankings_list_broadcast), match_schema)
    
    matched_institutions_df = resume_institutions_df.withColumn(
        "match_result", get_best_match_udf(F.col("cleaned_resume_name"))
    )
    
    # Unpack the struct result
    matched_institutions_df = matched_institutions_df.select(
        "edu_institution",
        F.col("match_result.matched_rank").alias("matched_rank"),
        F.col("match_result.match_score").alias("match_score")
    )
    
    # --- Step 4: Join Match Results back to the Main DataFrame ---
    df_with_rank = silver_df.join(matched_institutions_df, "edu_institution", "left")

    # --- Step 5: Create the Final Feature Column ---
    # Create institution_tier
    df_with_tier = df_with_rank.withColumn(
        "institution_tier",
        F.when(F.col("matched_rank").isNotNull() & (F.col("matched_rank") <= 100), "Tier 1")
         .when(F.col("matched_rank").isNotNull() & (F.col("matched_rank") <= 500), "Tier 2")
         .otherwise("Tier 3") # Tier 3 for ranks > 500 OR for unranked/unmatched
    )
    
    # Drop intermediate columns
    final_df = df_with_tier.drop("matched_rank", "match_score")

    return final_df

# --- Example Usage Block (Updated to reflect user's notebook flow) ---
if __name__ == '__main__':
    # You will need to install rapidfuzz: pip install rapidfuzz
    
    # 1. Start Spark Session
    spark = SparkSession.builder \
        .appName("Education Gold Processing") \
        .getOrCreate()
    spark.sparkContext.setLogLevel("ERROR")

    # 2. Loading silver layer data 
    print("Loading silver layer data...")
    silver_df = spark.read.parquet("datamart/silver/combined_resume_jd/*.parquet")
    
    # 3. Define path to clean reference data
    clean_rankings_path = "datamart/references/qs_rankings"
    
    # 4. Run the feature engineering function
    print("Running institution feature engineering...")
    try:
        # Pass the active spark session and the silver_df to the function
        gold_df = create_institution_features(spark, silver_df, clean_rankings_path)
    
        # 5. Show results
        print("\n--- Final DataFrame with Institution Features (Top 10) ---")
        gold_df.select("edu_institution", "institution_tier").show(10, truncate=False)

    except Exception as e:
        print(f"\nAn error occurred. Please ensure the folder '{clean_rankings_path}' exists and is accessible.")
        print(f"Error details: {e}")
    
    # Stop the session only at the very end of your entire notebook or script.
    # spark.stop()


Loading silver layer data...
Running institution feature engineering...

--- Final DataFrame with Institution Features (Top 10) ---
+------------------------------------------+----------------+
|edu_institution                           |institution_tier|
+------------------------------------------+----------------+
|Rochester Institute of Technology         |Tier 3          |
|Syracuse University                       |Tier 3          |
|New York Institute of Technology          |Tier 3          |
|Pace University, Seidenberg School of CSIS|Tier 3          |
|Northeastern University-Boston, MA        |Tier 3          |
|Northwestern Polytechnic University       |Tier 3          |
|Columbia University                       |Tier 1          |
|University of Greenwich-Nairobi           |Tier 3          |
|Florida International University          |Tier 3          |
|Friends University-Wichita                |Tier 3          |
+------------------------------------------+----------------+


In [42]:
from pyspark.sql import functions as F

# Group by the 'institution_tier' column and count the occurrences
tier_counts_df = gold_df.groupBy("institution_tier").count()

# For better readability, order the results by tier
tier_counts_df = tier_counts_df.orderBy("institution_tier")

# Show the final counts
print("--- Row Counts per Institution Tier ---")
tier_counts_df.show()

--- Row Counts per Institution Tier ---
+----------------+-----+
|institution_tier|count|
+----------------+-----+
|          Tier 1|  139|
|          Tier 2|  228|
|          Tier 3| 5874|
+----------------+-----+



In [3]:
from pyspark.sql import DataFrame

def show_certification_columns(df: DataFrame, num_rows: int = 10):
  """
  Displays a specified number of rows for the 'required_cert_categories'
  and 'cert_categories' columns of a PySpark DataFrame.

  Args:
    df: The input PySpark DataFrame.
    num_rows: The number of rows to display. Defaults to 10.
  """
  if "required_cert_categories" in df.columns and "cert_categories" in df.columns:
    df.select("required_cert_categories", "cert_categories").show(num_rows, truncate=False)
  else:
    print("One or both of the specified columns do not exist in the DataFrame.")
    print("Available columns are:", df.columns)

# Example usage with your DataFrame named silver_df:

In [4]:
show_certification_columns(silver_df , 10)

+-----------------------------------------------+---------------+
|required_cert_categories                       |cert_categories|
+-----------------------------------------------+---------------+
|[]                                             |[]             |
|[]                                             |[]             |
|[]                                             |[]             |
|[]                                             |[]             |
|[]                                             |[]             |
|[Business Analysis, Agile & Project Management]|[]             |
|[ERP & Business Software]                      |[]             |
|[]                                             |[]             |
|[ERP & Business Software]                      |[]             |
|[]                                             |[]             |
+-----------------------------------------------+---------------+
only showing top 10 rows



In [9]:
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, IntegerType, FloatType, BooleanType, StructType, StructField
from rapidfuzz import fuzz
import re

In [24]:
def create_cert_match(df: DataFrame) -> DataFrame:
    """
    Creates the 'cert_match' binary flag based on the intersection of
    required certifications and the candidate's certifications.
    (Internal helper function)
    """
    # Coalesce null arrays to empty arrays to prevent errors
    df_with_arrays = df.withColumn("req_certs", F.coalesce(F.col("required_cert_categories"), F.array())) \
                       .withColumn("cand_certs", F.coalesce(F.col("cert_categories"), F.array()))

    # Find the intersection of the two certification lists
    df_with_intersection = df_with_arrays.withColumn(
        "cert_intersection", 
        F.array_intersect(F.col("req_certs"), F.col("cand_certs"))
    )

    # Apply the logic:
    # Match = 1 if (there is at least one common cert) OR (both lists are empty)
    # Match = 0 otherwise.
    df_with_flag = df_with_intersection.withColumn(
        "cert_match",
        F.when(
            (F.size(F.col("cert_intersection")) > 0) | 
            ((F.size(F.col("req_certs")) == 0) & (F.size(F.col("cand_certs")) == 0)),
            1
        ).otherwise(0)
    )
    
    # Drop the intermediate helper columns
    return df_with_flag.drop("req_certs", "cand_certs", "cert_intersection")

In [25]:
g_df = create_cert_score(silver_df)

In [27]:
# from pyspark.sql import SparkSession
# import random

# # Create a SparkSession (standard boilerplate for a local script)
# # In environments like Databricks, the 'spark' session is usually pre-configured.
# spark = SparkSession.builder.appName("ShowCertMatch").getOrCreate()

# # --- Create a Sample DataFrame `g_df` for Demonstration ---
# # In your real use case, your g_df DataFrame would already be loaded.
# # We'll create a 'cert_match' column with some example boolean/string values.
# match_options = [True, False, True, True, False]
# data = [(i, random.choice(match_options)) for i in range(20)]
# columns = ["id", "cert_match"]
# g_df = spark.createDataFrame(data, columns)


# --- Show the top 10 values for the "cert_match" column ---
# 1. .select("cert_match") isolates the column you want.
# 2. .show(10) displays the first 10 rows of the selection.

print("Displaying the top 10 rows for the 'cert_match' column:")
g_df.select("cert_score").show(10)

Displaying the top 10 rows for the 'cert_match' column:
+----------+
|cert_score|
+----------+
|       1.0|
|       1.0|
|       1.0|
|       1.0|
|       1.0|
|       0.0|
|       0.0|
|       1.0|
|       0.0|
|       1.0|
+----------+
only showing top 10 rows



In [28]:
# --- 1. Calculate the Mean ---
# We use the built-in `mean` aggregate function on the g_df DataFrame.
mean_value = g_df.agg(F.mean("cert_score")).first()[0]
print(f"Mean of 'cert_score': {mean_value:.2f}")

# --- 2. Calculate the Median ---
# The median is the 50th percentile. We use `approxQuantile` on g_df.
# The parameters are: (column, [percentile], relative_error)
# A relative error of 0 gives the exact median.
# The result is a list, so we take the first element.
median_value = g_df.approxQuantile("cert_score", [0.5], 0.0)[0]
print(f"Median of 'cert_score': {median_value}")


# --- 3. Calculate the Mode ---
# The mode is the most frequently occurring value.
# We find it by grouping, counting, and ordering on the g_df DataFrame.
mode_row = g_df.groupBy("cert_score").count().orderBy(F.col("count").desc()).first()
mode_value = mode_row["cert_score"]
mode_count = mode_row["count"]

print(f"Mode of 'cert_score': {mode_value} (appeared {mode_count} times)")

Mean of 'cert_score': 0.72
Median of 'cert_score': 1.0
Mode of 'cert_score': 1.0 (appeared 4498 times)


In [19]:
print('hello')

hello
