In [0]:
df_raw = spark.read.parquet(
    "/Volumes/workspace/default/raw_job_postings/0000.parquet"
)

# print(df_raw.count())
df_raw.printSchema()

In [0]:
df_raw.count()

In [0]:
from pyspark.sql.functions import col

df_core = df_raw.select(
    col("job_id"),
    col("title"),
    col("description"),
    col("skills_desc")
)

# df_core.display()

In [0]:
from pyspark.sql.functions import sum as spark_sum

df_core.select(
    spark_sum(col("job_id").isNull().cast("int")).alias("job_id_null_count"),
    spark_sum(col("title").isNull().cast("int")).alias("title_null_count"),
    spark_sum(col("description").isNull().cast("int")).alias("description_count"),
    spark_sum(col("skills_desc").isNull().cast("int")).alias("skills_desc_count")
).show()

In [0]:
df_non_null = df_core.filter(
    col("job_id").isNotNull() &
    col("title").isNotNull() &
    col("description").isNotNull()
)

In [0]:
print("Before:", df_core.count())
print("After:", df_non_null.count())

In [0]:
duplicate_groups = (
    df_non_null
    .groupBy("title", "description")
    .count()
    .filter(col("count")>1)
    .count()
)
duplicate_groups

In [0]:
df_dedup = df_non_null.dropDuplicates(["title", "description"])

In [0]:
print("After Dedup count:", df_dedup.count())

In [0]:
df_dedup.show(5)

In [0]:
# Role Filter for Data Engineer
from pyspark.sql.functions import lower

df_de = df_dedup.filter(
    lower(col("title")).rlike(
        "data engineer|analytics engineer|big data engineer|data platform engineer"
    )
)

In [0]:
df_de.count()

In [0]:
df_de.select("title").distinct().show(20)

In [0]:
display(df_de.select("description").limit(5))

In [0]:
df_de = df_de.select(
    "job_id",
    "title",
    "description",
    "skills_desc"
)


In [0]:
from pyspark.sql.functions import regexp_replace

df_text = df_de.withColumn(
    "clean_text",
    lower(col("description"))
)

df_text = df_text.withColumn(
    "clean_text",
    regexp_replace("clean_text", r"http\S+|www\S+", " ")
)

df_text = df_text.withColumn(
    "clean_text",
    regexp_replace("clean_text", r"[^a-z0-9+\.\# ]", " ") #replace anything that is not one of these with a space
)

df_text = df_text.withColumn(
    "clean_text",
    regexp_replace("clean_text", r"\s+", " ") #Normalize whitespaces, it removes all the extra spaces
)

In [0]:
df_text.select("clean_text").display()

In [0]:
from pyspark.sql.functions import length

df_text.select(
    length("clean_text").alias("text_length")
).summary().show()


In [0]:
df_nlp = df_text.select(
    "job_id",
    "title",
    "clean_text",
    "description"
)

In [0]:
data_engineer_skills = [
    # languages
    "python", "sql", "java", "scala",

    # big data
    "spark", "pyspark", "hadoop", "hive", "kafka",

    # cloud
    "aws", "azure", "gcp", "s3", "redshift", "bigquery",

    # orchestration / etl
    "airflow", "dbt", "etl", "elt",

    # databases
    "postgres", "mysql", "snowflake", "databricks",

    # formats / tools
    "parquet", "delta", "delta lake",

    # misc
    "ci/cd", "git", "docker", "kubernetes"
]

In [0]:
import re

skill_patterns = {
    skill: re.compile(rf"\b{re.escape(skill)}\b")
    for skill in data_engineer_skills
} # Here we create a dictionary where key is skill and value is a compiled regex pattern to find exact match
# So the value is NOT the skill string itself — it’s a precompiled search rule.



In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType

def extract_skills(text): #this function requires an argument
    found = []
    if text:
        for skill, pattern in skill_patterns.items():
            if pattern.search(text): #if pattern from the above dict matches the text input it will append to the list
                found.append(skill)
    return found

extract_skills_udf = udf(extract_skills, ArrayType(StringType()))


In [0]:
df_skills = df_nlp.withColumn(
    "extracted_skills",
    extract_skills_udf("clean_text")
)


In [0]:
df_skills.select("title", "extracted_skills").show(5, truncate=False)


In [0]:
from pyspark.sql.functions import explode, round

skill_counts = (
    df_skills
    .select(explode("extracted_skills").alias("skill"))
    .groupBy("skill")
    .count()
    .orderBy("count", ascending=False)
)


In [0]:
skill_counts.show(20, truncate=False)


In [0]:
total_jobs = df_skills.count()

skill_stats = skill_counts.withColumn(
    "percentage",
    round((col("count") / total_jobs) * 100,1)
)

skill_stats.show(20, truncate=False)


In [0]:
skill_stats.orderBy("percentage", ascending=False).show(15, truncate=False)


### Skill Normalization and Ontology
### Right now the output treats apache spark, pyspark, spark as three different skills but we know that they all are same and so we need to count or treat them as one entity/capability

In [0]:
# Ontology Disctionary
skill_ontology = {
    "python": ["python"],
    "sql": ["sql", "postgresql", "mysql", "sqlite"],
    
    "spark": ["spark", "pyspark", "apache spark"],
    "hadoop": ["hadoop", "hdfs"],
    
    "aws": ["aws", "amazon web services", "s3", "ec2", "redshift"],
    "azure": ["azure", "adf", "synapse"],
    "gcp": ["gcp", "bigquery", "cloud composer"],
    
    "airflow": ["airflow", "apache airflow"],
    "dbt": ["dbt"],
    
    "kafka": ["kafka", "apache kafka"],
    
    "databricks": ["databricks"],
    "snowflake": ["snowflake"],
    
    "docker": ["docker"],
    "kubernetes": ["kubernetes", "k8s"],
    
    "etl": ["etl", "elt"],
    "delta lake": ["delta", "delta lake"]
}


In [0]:
import re

normalized_patterns = {
    canonical: [
        re.compile(rf"\b{re.escape(variant)}\b")
        for variant in variants
    ]
    for canonical, variants in skill_ontology.items()
}


In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.functions import ArrayType, StringType

def extract_normalized_skills(text):
    found_normalized_skills = []
    if text:
        for canonical_skill, patterns in normalized_patterns.items():
            for pattern in patterns:
                if pattern.search(text):
                    found_normalized_skills.append(canonical_skill)
                    break  # stop once one variant matches
    return found_normalized_skills

extract_normalized_skills_udf = udf(
    extract_normalized_skills,
    ArrayType(StringType())
)

In [0]:
df_norm = df_nlp.withColumn(
    "normalized_skills",
    extract_normalized_skills_udf("clean_text")
)


In [0]:
df_norm.select("title", "normalized_skills").show(5)

In [0]:
from pyspark.sql.functions import explode, col, round

skill_counts_norm = (
    df_norm
    .select(explode("normalized_skills").alias("skill"))
    .groupBy("skill")
    .count()
)

total_jobs = df_norm.count()

skill_stats_norm = skill_counts_norm.withColumn(
    "percentage",
    round((col("count") / total_jobs) * 100, 1)
).orderBy("percentage", ascending=False)

skill_stats_norm.show(20, truncate=False)


## Implemented a skill ontology and normalization layer to consolidate variant skill mentions into canonical capabilities prior to downstream analytics. I have answered which skills are in demand for Data Engineers.

Now we try to answers questions like ----
Which skills appear together?
Are there any disticnt skill bundles?
Are there any sub-roles inside Data Engineer?
Co-occurence is important than frequency because frequency tells python is mentioned in 78% of the jobs, while co-occurence tells Python, SQL, AWS appear often. Thats the difference between popularity vs capability profiles

In [0]:
df_skill_long = (
    df_norm
    .select(
        col("job_id"),
        explode("normalized_skills").alias("skills")
    )
)

In [0]:
df_skill_long.show(10)

In [0]:
skill_job_counts = (
    df_skill_long
    .groupBy("skills")
    .agg({"job_id": "count"})
    .withColumnRenamed("count(job_id)", "job_count")
    .orderBy("job_count", ascending = False)
)

skill_job_counts.show(10)

In [0]:
df_pairs = (
    df_skill_long.alias("a")
    .join(
        df_skill_long.alias("b"),
        on="job_id"
    )
    .where(col("a.skills") < col("b.skills"))  # avoid duplicates & self-pairs
    .select(
        col("a.skills").alias("skill_a"),
        col("b.skills").alias("skill_b")
    )
)

In [0]:
co_occurrence = (
    df_pairs
    .groupBy("skill_a", "skill_b")
    .count()
    .withColumnRenamed("count", "intersection")
)


In [0]:
co_occurrence.orderBy("intersection", ascending=False).show(10)

In [0]:
jaccard = (
    co_occurrence
    .join(
        skill_job_counts
            .withColumnRenamed("skills", "skill_a")
            .withColumnRenamed("job_count", "count_a"),
        on="skill_a"
    )
    .join(
        skill_job_counts
            .withColumnRenamed("skills", "skill_b")
            .withColumnRenamed("job_count", "count_b"),
        on="skill_b"
    )
)


In [0]:
from pyspark.sql.functions import round

jaccard = jaccard.withColumn(
    "jaccard_similarity",
    round(
        col("intersection") /
        (col("count_a") + col("count_b") - col("intersection")),
        3
    )
)


In [0]:
jaccard.orderBy("jaccard_similarity", ascending=False).show(15, truncate=False)
