**DATA SAMPLING**

In [0]:
from pyspark.sql import SparkSession

def combine_and_sample_correctly_with_multiple_columns(job_desc_file, resumes_file, output_file, sample_size=5700):
    # Initialize Spark session
    spark = SparkSession.builder \
        .appName("Combine and Sample Datasets with Multiple Columns") \
        .config("spark.executor.memory", "4g") \
        .config("spark.driver.memory", "4g") \
        .config("spark.sql.csv.multiLine", "true") \
        .config("spark.sql.csv.escape", '"') \
        .getOrCreate()

    print("Loading datasets...")

    # Load the job descriptions dataset
    job_desc_df = spark.read.csv(job_desc_file, header=True, inferSchema=True)

    # Load the resumes dataset with multiline support
    resumes_df = spark.read.option("multiLine", "true") \
                           .option("quote", '"') \
                           .option("escape", '"') \
                           .option("header", "true") \
                           .csv(resumes_file)

    # Verify data
    if job_desc_df.count() == 0 or resumes_df.count() == 0:
        raise ValueError("One of the input datasets is empty. Please check the input files.")

    print(f"Job Description Rows: {job_desc_df.count()}, Resume Rows: {resumes_df.count()}")

    # Perform Cartesian product
    print("Performing Cartesian product...")
    combined_df = job_desc_df.crossJoin(resumes_df)

    # Total number of rows after Cartesian product
    total_records = combined_df.count()
    print(f"Total records after Cartesian product: {total_records}")

    # Sample the specified number of records
    if sample_size > total_records:
        print(f"Warning: Sample size {sample_size} exceeds total records {total_records}. Taking all records.")
        sampled_df = combined_df
    else:
        sampled_df = combined_df.sample(withReplacement=False, fraction=sample_size / total_records, seed=1)

    print(f"Saving sampled dataset to: {output_file}")
    # Save the sampled dataset to a CSV file
    sampled_df.write.option("quote", '"') \
                    .option("escape", '"') \
                    .option("multiLine", "true") \
                    .csv(output_file, header=True, mode="overwrite")
    print(f"Sampled dataset saved successfully to: {output_file}")

# File paths
job_desc_file = "/FileStore/tables/jd.csv"
resumes_file = "/FileStore/tables/UpdatedResumeDataSet.csv"
output_file = "/FileStore/tables/sampled_datasets.csv"

# Sample size
sample_size = 5700

combine_and_sample_correctly_with_multiple_columns(job_desc_file, resumes_file, output_file, sample_size)

Loading datasets...
Job Description Rows: 5001, Resume Rows: 962
Performing Cartesian product...
Total records after Cartesian product: 4810962
Saving sampled dataset to: /FileStore/tables/sampled_datasets.csv
Sampled dataset saved successfully to: /FileStore/tables/sampled_datasets.csv


**DATA CLEANING**

In [0]:
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("Dataset_Inspection_and_Cleaning").getOrCreate()

# Load the dataset with updated options
file_path = "/FileStore/tables/sampled_datasets.csv"
data = spark.read.csv(
    file_path,
    header=True,
    inferSchema=True,
    multiLine=True,  # Handles multiline data in cells
    escape='"',      # Handles escaped quotes within data
    quote='"',       # Handles quoted strings properly
    encoding="UTF-8" # Ensures correct text encoding
)

# Show the schema to confirm proper loading
data.printSchema()

# Display the first few rows
data.show(truncate=False, n=20)

root
 |-- JD_Experience: string (nullable = true)
 |-- JD_Qualifications: string (nullable = true)
 |-- JD_Preference: string (nullable = true)
 |-- JD_Job Title: string (nullable = true)
 |-- JD_Role: string (nullable = true)
 |-- JD_Job Description: string (nullable = true)
 |-- JD_skills: string (nullable = true)
 |-- JD_Responsibilities: string (nullable = true)
 |-- Resume_Category: string (nullable = true)
 |-- Resume_information: string (nullable = true)

+-------------+-----------------+-------------+---------------------+-------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [0]:
# Number of rows
num_rows = data.count()

# Number of columns
num_cols = len(data.columns)

# Print the shape
print(f"Shape of the DataFrame: ({num_rows}, {num_cols})")

Shape of the DataFrame: (5690, 10)


In [0]:
from pyspark.sql.functions import col, regexp_extract, when
from pyspark.sql.types import IntegerType

# Step 1: Extract the minimum experience from the JD_Experience column
# Regex explanation: \d+ captures one or more digits (the numbers in the string)
data = data.withColumn("JD_Minimum_Experience",
                       regexp_extract(col("JD_Experience"), r"(\d+)", 1).cast(IntegerType()))

# Step 2: Replace nulls or missing values with 0 (if necessary)
data = data.withColumn("JD_Minimum_Experience",
                       when(col("JD_Minimum_Experience").isNull(), 0).otherwise(col("JD_Minimum_Experience")))

# Step 3: Drop the original JD_Experience column if it's no longer needed
data = data.drop("JD_Experience")

# Step 4: Display the updated DataFrame
data.select("JD_Minimum_Experience").show(5)

+---------------------+
|JD_Minimum_Experience|
+---------------------+
|                    4|
|                    1|
|                    0|
|                    2|
|                    5|
+---------------------+
only showing top 5 rows



In [0]:
from pyspark.sql.functions import col, sum

# Count the number of nulls in each column
null_counts = data.select(
    *[sum(col(column).isNull().cast("int")).alias(column) for column in data.columns]
)

# Show the null count for each column
null_counts.show()

+-----------------+-------------+------------+-------+------------------+---------+-------------------+---------------+------------------+---------------------+
|JD_Qualifications|JD_Preference|JD_Job Title|JD_Role|JD_Job Description|JD_skills|JD_Responsibilities|Resume_Category|Resume_information|JD_Minimum_Experience|
+-----------------+-------------+------------+-------+------------------+---------+-------------------+---------------+------------------+---------------------+
|                0|            0|           0|      0|                 0|        0|                  0|              0|                 0|                    0|
+-----------------+-------------+------------+-------+------------------+---------+-------------------+---------------+------------------+---------------------+



In [0]:
from pyspark.sql.functions import col, sum, when

# Count empty strings for each column
empty_string_counts = data.select(
    *[sum(when(col(column) == "", 1).otherwise(0)).alias(column) for column in data.columns]
)

# Show counts of empty strings
empty_string_counts.show()

+-----------------+-------------+------------+-------+------------------+---------+-------------------+---------------+------------------+---------------------+
|JD_Qualifications|JD_Preference|JD_Job Title|JD_Role|JD_Job Description|JD_skills|JD_Responsibilities|Resume_Category|Resume_information|JD_Minimum_Experience|
+-----------------+-------------+------------+-------+------------------+---------+-------------------+---------------+------------------+---------------------+
|                0|            0|           0|      0|                 0|        0|                  0|              0|                 0|                    0|
+-----------------+-------------+------------+-------+------------------+---------+-------------------+---------------+------------------+---------------------+



In [0]:
from pyspark.sql.functions import col, regexp_replace, trim, lower, when

# Columns to clean
columns_to_clean = [
    "JD_Qualifications", "JD_Preference", "JD_Job Title", "JD_Role",
    "JD_Job Description", "JD_skills", "JD_Responsibilities",
    "Resume_Category", "Resume_information"
]

# Step 1: Replace null values with an empty string
for column in columns_to_clean:
    data = data.withColumn(column, when(col(column).isNull(), "").otherwise(col(column)))

# Step 2: Clean the text
for column in columns_to_clean:
    data = data.withColumn(
        column,
        # Remove URLs, extra whitespace, and convert to lowercase
        lower(trim(regexp_replace(
            regexp_replace(col(column), r"https?://\S+|www\.\S+", ""),  # Remove URLs
            r"\s+", " "  # Replace multiple spaces with a single space
        )))
    )

# Show the cleaned DataFrame
data.select(columns_to_clean).show(truncate=False)

+-----------------+-------------+---------------------+-------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [0]:
!pip install spacy
!python -m spacy download en_core_web_sm

Collecting spacy
  Downloading spacy-3.8.2-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (29.4 MB)
[?25l[K     |                                | 10 kB 24.1 MB/s eta 0:00:02[K     |                                | 20 kB 8.7 MB/s eta 0:00:04[K     |                                | 30 kB 12.4 MB/s eta 0:00:03[K     |                                | 40 kB 5.8 MB/s eta 0:00:06[K     |                                | 51 kB 6.4 MB/s eta 0:00:05[K     |                                | 61 kB 7.6 MB/s eta 0:00:04[K     |                                | 71 kB 7.9 MB/s eta 0:00:04[K     |                                | 81 kB 9.0 MB/s eta 0:00:04[K     |                                | 92 kB 8.9 MB/s eta 0:00:04[K     |▏                               | 102 kB 7.0 MB/s eta 0:00:05[K     |▏                               | 112 kB 7.0 MB/s eta 0:00:05[K     |▏                               | 122 kB 7.0 MB/s eta 0:00:05[K     |▏                          

In [0]:
from pyspark.sql.functions import regexp_replace, col

# List of columns to apply the correction
columns_to_correct = data.columns

# Correct the spelling "exprience" to "experience" across all columns
for column in columns_to_correct:
    data = data.withColumn(column, regexp_replace(col(column), r'\bexprience\b', 'experience'))

# Show the updated DataFrame
data.show(truncate=False)

+-----------------+-------------+---------------------+-------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [0]:
# Extract unique qualifications from the JD_Qualifications column
if "JD_Qualifications" in data.columns:
    unique_qualifications = data.select("JD_Qualifications").distinct().rdd.map(lambda row: row["JD_Qualifications"]).collect()
else:
    unique_qualifications = []

# Print the unique qualifications
print("Unique Qualifications in JD_Qualifications column:")
print(unique_qualifications)

Unique Qualifications in JD_Qualifications column:
['bca', 'phd', 'mca', 'ba', 'b.tech', 'm.tech', 'b.com', 'bba', 'm.com', 'mba']


In [0]:
import re
from pyspark.sql.functions import udf, col, regexp_replace, lower, trim
from pyspark.sql.types import StringType

# Define the set of technical terms or keywords to preserve as-is
preserve_keywords = {"angular.js", "aws", "azure", "c#", "c++", "css", "django", "docker", "flask", "html",
    "java", "javascript", "kubernetes", "ms excel", "ms office", "ms power point",
    "node.js", "oracle", "pytorch", "python", "r", "react.js", "ruby", "sql",
    "tensorflow", "ui", "ux", "ux/ui", "bca", "phd", "mca", "ba", "mcom", "bcom", "bba", "mba", "btech", "mtech"}

# Define a list of entity keywords to exclude
exclude_keywords = {"organization", "location", "date", "time", "person", "event"}

# Function to clean text while preserving and excluding certain keywords
def clean_and_preserve_keywords(text):
    if text is None:
        return ""

    # Lowercase the text
    text = text.lower()

    # Remove non-ASCII characters
    text = re.sub(r'[^\x00-\x7F]+', ' ', text)

    # Split text into words
    words = text.split()

    # Process each word
    cleaned_words = []
    for word in words:
        if word in preserve_keywords:  # Preserve keywords exactly
            cleaned_words.append(word)
        elif word in exclude_keywords:  # Exclude unwanted keywords
            continue
        else:
            # Remove punctuation and keep alphanumeric characters
            cleaned_word = re.sub(r'[^\w\s+#]', '', word)
            if cleaned_word:  # Add the word if it is not empty
                cleaned_words.append(cleaned_word)

    # Join cleaned words back into a single string
    return " ".join(cleaned_words).strip()

# Register the cleaning function as a UDF
clean_text_udf = udf(clean_and_preserve_keywords, StringType())

# Step 1: Remove HTML tags and punctuations from all columns using regexp_replace
# Assuming `data` is your input DataFrame
columns_to_clean = data.columns  # Apply to all columns

for column in columns_to_clean:
    data = data.withColumn(
        column,
        regexp_replace(  # Remove HTML tags
            regexp_replace(col(column), r"<[^>]+>", ""),  # Remove punctuations
            r"[^\w\s]", ""  # Retain only alphanumeric characters and whitespace
        )
    )

# Step 2: Apply the PySpark-based cleaning to further clean and preserve technical terms
# Exclude 'JD_Experience' from additional cleaning if required
columns_to_clean = [col_name for col_name in data.columns if col_name != 'JD_Experience']

for column in columns_to_clean:
    data = data.withColumn(f"Cleaned_{column}", clean_text_udf(col(column)))

# Show the cleaned columns
data.select([col(f"Cleaned_{column}") for column in columns_to_clean]).show(truncate=False)


+-------------------------+---------------------+---------------------+-------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [0]:
# Standardize column names: replace invalid characters with underscores
data = data.select([col(column).alias(column.replace(" ", "_")
                                      .replace("(", "")
                                      .replace(")", "")
                                      .replace("\n", "")
                                      .replace("\t", "")
                                      .replace("=", "")
                                      .replace("{", "")
                                      .replace("}", "")
                                      .replace(";", "")) for column in data.columns])

# Verify updated column names
print("Updated column names:")
data.printSchema()


Updated column names:
root
 |-- JD_Qualifications: string (nullable = true)
 |-- JD_Preference: string (nullable = true)
 |-- JD_Job_Title: string (nullable = true)
 |-- JD_Role: string (nullable = true)
 |-- JD_Job_Description: string (nullable = true)
 |-- JD_skills: string (nullable = true)
 |-- JD_Responsibilities: string (nullable = true)
 |-- Resume_Category: string (nullable = true)
 |-- Resume_information: string (nullable = true)
 |-- JD_Minimum_Experience: string (nullable = true)
 |-- Cleaned_JD_Qualifications: string (nullable = true)
 |-- Cleaned_JD_Preference: string (nullable = true)
 |-- Cleaned_JD_Job_Title: string (nullable = true)
 |-- Cleaned_JD_Role: string (nullable = true)
 |-- Cleaned_JD_Job_Description: string (nullable = true)
 |-- Cleaned_JD_skills: string (nullable = true)
 |-- Cleaned_JD_Responsibilities: string (nullable = true)
 |-- Cleaned_Resume_Category: string (nullable = true)
 |-- Cleaned_Resume_information: string (nullable = true)
 |-- Cleaned_JD_

In [0]:
# Save the DataFrame as a table in the Databricks catalog
data.write.format("delta").mode("overwrite").saveAsTable("cleaned_data_table")

# Print success message
print("Data saved successfully to Databricks catalog as 'cleaned_data_table'.")

Data saved successfully to Databricks catalog as 'cleaned_data_table'.


In [0]:
# Save the cleaned dataset as a CSV file
data.write.format("csv") \
    .option("header", "true") \
    .mode("overwrite") \
    .save("dbfs:/FileStore/cleaned_dataset_table")

print("Cleaned dataset saved successfully as CSV at 'dbfs:/FileStore/cleaned_dataset_table'.")

Cleaned dataset saved successfully as CSV at 'dbfs:/FileStore/cleaned_dataset_table'.


**Feature Engineering**

In [0]:
data.columns

Out[15]: ['JD_Qualifications',
 'JD_Preference',
 'JD_Job_Title',
 'JD_Role',
 'JD_Job_Description',
 'JD_skills',
 'JD_Responsibilities',
 'Resume_Category',
 'Resume_information',
 'JD_Minimum_Experience',
 'Cleaned_JD_Qualifications',
 'Cleaned_JD_Preference',
 'Cleaned_JD_Job_Title',
 'Cleaned_JD_Role',
 'Cleaned_JD_Job_Description',
 'Cleaned_JD_skills',
 'Cleaned_JD_Responsibilities',
 'Cleaned_Resume_Category',
 'Cleaned_Resume_information',
 'Cleaned_JD_Minimum_Experience']

In [0]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer

# Load the saved table into a DataFrame
data = spark.sql("SELECT * FROM cleaned_data_table")

# Columns to tokenize
columns_to_tokenize = [
    "Cleaned_JD_Qualifications",
    "Cleaned_JD_Preference",
    "Cleaned_JD_Job_Title",
    "Cleaned_JD_Role",
    "Cleaned_JD_Job_Description",
    "Cleaned_JD_skills",
    "Cleaned_JD_Responsibilities",
    "Cleaned_Resume_Category",
    "Cleaned_Resume_information"
]

# Tokenize each column
for column in columns_to_tokenize:
    tokenizer = Tokenizer(inputCol=column, outputCol=f"{column}_tokens")
    data = tokenizer.transform(data)

# Display the tokenized columns
data.select([col for col in data.columns if "tokens" in col]).show(truncate=False)


+--------------------------------+----------------------------+---------------------------+-----------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------+-----------------------------------------

In [0]:
# List of columns to drop
columns_to_drop = [
    "JD_Qualifications",
    "JD_Preference",
    "JD_Job_Title",
    "JD_Role",
    "JD_Job_Description",
    "JD_skills",
    "JD_Responsibilities",
    "Resume_Category",
    "Resume_information",
    "JD_Minimum_Experience"
]

# Drop the specified columns
data = data.drop(*columns_to_drop)

# Show the remaining columns
print("Remaining columns:", data.columns)


Remaining columns: ['Cleaned_JD_Qualifications', 'Cleaned_JD_Preference', 'Cleaned_JD_Job_Title', 'Cleaned_JD_Role', 'Cleaned_JD_Job_Description', 'Cleaned_JD_skills', 'Cleaned_JD_Responsibilities', 'Cleaned_Resume_Category', 'Cleaned_Resume_information', 'Cleaned_JD_Minimum_Experience', 'Cleaned_JD_Qualifications_tokens', 'Cleaned_JD_Preference_tokens', 'Cleaned_JD_Job_Title_tokens', 'Cleaned_JD_Role_tokens', 'Cleaned_JD_Job_Description_tokens', 'Cleaned_JD_skills_tokens', 'Cleaned_JD_Responsibilities_tokens', 'Cleaned_Resume_Category_tokens', 'Cleaned_Resume_information_tokens']


In [0]:
# Number of rows
num_rows = data.count()

# Number of columns
num_cols = len(data.columns)

# Print the shape
print(f"Shape of the DataFrame: ({num_rows}, {num_cols})")

Shape of the DataFrame: (5690, 19)


In [0]:
resume_scores = spark.read.csv("/FileStore/tables/Resume_Score_dataset-3.csv", header=True, inferSchema=True)

In [0]:
# Check original data
data.show()
data.printSchema()

# Check resume scores data
resume_scores.show()
resume_scores.printSchema()


+-------------------------+---------------------+--------------------+--------------------+--------------------------+--------------------+---------------------------+-----------------------+--------------------------+-----------------------------+--------------------------------+----------------------------+---------------------------+----------------------+---------------------------------+------------------------+----------------------------------+------------------------------+---------------------------------+
|Cleaned_JD_Qualifications|Cleaned_JD_Preference|Cleaned_JD_Job_Title|     Cleaned_JD_Role|Cleaned_JD_Job_Description|   Cleaned_JD_skills|Cleaned_JD_Responsibilities|Cleaned_Resume_Category|Cleaned_Resume_information|Cleaned_JD_Minimum_Experience|Cleaned_JD_Qualifications_tokens|Cleaned_JD_Preference_tokens|Cleaned_JD_Job_Title_tokens|Cleaned_JD_Role_tokens|Cleaned_JD_Job_Description_tokens|Cleaned_JD_skills_tokens|Cleaned_JD_Responsibilities_tokens|Cleaned_Resume_Category_

In [0]:
from pyspark.sql.functions import monotonically_increasing_id

data = data.withColumn("unique_id", monotonically_increasing_id())

resume_scores = resume_scores.withColumn("unique_id", monotonically_increasing_id())

In [0]:
data.show()
resume_scores.show()

+-------------------------+---------------------+--------------------+--------------------+--------------------------+--------------------+---------------------------+-----------------------+--------------------------+-----------------------------+--------------------------------+----------------------------+---------------------------+----------------------+---------------------------------+------------------------+----------------------------------+------------------------------+---------------------------------+---------+
|Cleaned_JD_Qualifications|Cleaned_JD_Preference|Cleaned_JD_Job_Title|     Cleaned_JD_Role|Cleaned_JD_Job_Description|   Cleaned_JD_skills|Cleaned_JD_Responsibilities|Cleaned_Resume_Category|Cleaned_Resume_information|Cleaned_JD_Minimum_Experience|Cleaned_JD_Qualifications_tokens|Cleaned_JD_Preference_tokens|Cleaned_JD_Job_Title_tokens|Cleaned_JD_Role_tokens|Cleaned_JD_Job_Description_tokens|Cleaned_JD_skills_tokens|Cleaned_JD_Responsibilities_tokens|Cleaned_Resume

In [0]:
merged_data = data.join(resume_scores, on="unique_id").drop("unique_id")

In [0]:
merged_data.show()
merged_data.printSchema()

+-------------------------+---------------------+--------------------+--------------------+--------------------------+--------------------+---------------------------+-----------------------+--------------------------+-----------------------------+--------------------------------+----------------------------+---------------------------+----------------------+---------------------------------+------------------------+----------------------------------+------------------------------+---------------------------------+------------+
|Cleaned_JD_Qualifications|Cleaned_JD_Preference|Cleaned_JD_Job_Title|     Cleaned_JD_Role|Cleaned_JD_Job_Description|   Cleaned_JD_skills|Cleaned_JD_Responsibilities|Cleaned_Resume_Category|Cleaned_Resume_information|Cleaned_JD_Minimum_Experience|Cleaned_JD_Qualifications_tokens|Cleaned_JD_Preference_tokens|Cleaned_JD_Job_Title_tokens|Cleaned_JD_Role_tokens|Cleaned_JD_Job_Description_tokens|Cleaned_JD_skills_tokens|Cleaned_JD_Responsibilities_tokens|Cleaned_Res

In [0]:
merged_data.show(5)

+-------------------------+---------------------+--------------------+--------------------+--------------------------+--------------------+---------------------------+-----------------------+--------------------------+-----------------------------+--------------------------------+----------------------------+---------------------------+----------------------+---------------------------------+------------------------+----------------------------------+------------------------------+---------------------------------+------------+
|Cleaned_JD_Qualifications|Cleaned_JD_Preference|Cleaned_JD_Job_Title|     Cleaned_JD_Role|Cleaned_JD_Job_Description|   Cleaned_JD_skills|Cleaned_JD_Responsibilities|Cleaned_Resume_Category|Cleaned_Resume_information|Cleaned_JD_Minimum_Experience|Cleaned_JD_Qualifications_tokens|Cleaned_JD_Preference_tokens|Cleaned_JD_Job_Title_tokens|Cleaned_JD_Role_tokens|Cleaned_JD_Job_Description_tokens|Cleaned_JD_skills_tokens|Cleaned_JD_Responsibilities_tokens|Cleaned_Res

In [0]:
from pyspark.sql.functions import col

# Convert the column 'Cleaned_JD_Minimum_Experience' to integer type
merged_data = merged_data.withColumn(
    "Cleaned_JD_Minimum_Experience",
    col("Cleaned_JD_Minimum_Experience").cast("int")
)

# Verify the schema to ensure the column type is changed
merged_data.printSchema()

# Show a few rows to verify the conversion
merged_data.select("Cleaned_JD_Minimum_Experience").show()

root
 |-- Cleaned_JD_Qualifications: string (nullable = true)
 |-- Cleaned_JD_Preference: string (nullable = true)
 |-- Cleaned_JD_Job_Title: string (nullable = true)
 |-- Cleaned_JD_Role: string (nullable = true)
 |-- Cleaned_JD_Job_Description: string (nullable = true)
 |-- Cleaned_JD_skills: string (nullable = true)
 |-- Cleaned_JD_Responsibilities: string (nullable = true)
 |-- Cleaned_Resume_Category: string (nullable = true)
 |-- Cleaned_Resume_information: string (nullable = true)
 |-- Cleaned_JD_Minimum_Experience: integer (nullable = true)
 |-- Cleaned_JD_Qualifications_tokens: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- Cleaned_JD_Preference_tokens: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- Cleaned_JD_Job_Title_tokens: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- Cleaned_JD_Role_tokens: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- Cleaned_JD_Job_

In [0]:
# Display the first few rows of merged_data with all columns
merged_data.show(n=5, truncate=False)

+-------------------------+---------------------+---------------------+---------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [0]:
merged_data.columns

Out[28]: ['Cleaned_JD_Qualifications',
 'Cleaned_JD_Preference',
 'Cleaned_JD_Job_Title',
 'Cleaned_JD_Role',
 'Cleaned_JD_Job_Description',
 'Cleaned_JD_skills',
 'Cleaned_JD_Responsibilities',
 'Cleaned_Resume_Category',
 'Cleaned_Resume_information',
 'Cleaned_JD_Minimum_Experience',
 'Cleaned_JD_Qualifications_tokens',
 'Cleaned_JD_Preference_tokens',
 'Cleaned_JD_Job_Title_tokens',
 'Cleaned_JD_Role_tokens',
 'Cleaned_JD_Job_Description_tokens',
 'Cleaned_JD_skills_tokens',
 'Cleaned_JD_Responsibilities_tokens',
 'Cleaned_Resume_Category_tokens',
 'Cleaned_Resume_information_tokens',
 'Resume_Score']

In [0]:
from pyspark.sql.functions import concat_ws, col

# List of array columns to convert
array_columns = [col_name for col_name, dtype in merged_data.dtypes if dtype.startswith('array')]

# Convert each array column to a concatenated string
for col_name in array_columns:
    merged_data = merged_data.withColumn(col_name, concat_ws(" ", col(col_name)))

print("Converted array columns to strings.")

Converted array columns to strings.


In [0]:
# Save the merged_data DataFrame as a CSV file
merged_data.write.format("csv") \
    .option("header", "true") \
    .mode("overwrite") \
    .save("dbfs:/FileStore/merged_data")

print("DataFrame saved successfully as CSV at 'dbfs:/FileStore/merged_data'")

DataFrame saved successfully as CSV at 'dbfs:/FileStore/merged_data'
