In [1]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .master("local[*]")
    .appName("Phase2_Test")
    .getOrCreate()
)


In [2]:
from pyspark.sql import functions as F;

jobs_path = "linkedin_job_postings.csv";
skills_path = "job_skills.csv";
summ_path = "job_summary.csv";

jobs_raw = (
    spark.read
    .option("header", True)
    .option("inferSchema", True)
    .csv(jobs_path)
);

skills_raw = (
    spark.read
    .option("header", True)
    .option("inferSchema", True)
    .csv(skills_path)
);

summ_raw = (
    spark.read
    .option("header", True)
    .option("inferSchema", True)
    .csv(summ_path)
);

def trim_cols(df):
    return df.toDF(*[c.strip() for c in df.columns]);

jobs = trim_cols(jobs_raw);
skills = trim_cols(skills_raw);
summ = trim_cols(summ_raw);

print("jobs_raw rows:", jobs.count());
print("skills_raw rows:", skills.count());
print("summ_raw rows:", summ.count());

jobs_raw rows: 1348488
skills_raw rows: 1296381
summ_raw rows: 48219735


In [3]:
jobs_sk = jobs.join(skills, on="job_link", how="left");

total_rows = jobs_sk.count();
missing_job_skills_count = jobs_sk.filter(F.col("job_skills").isNull()).count();
missing_job_link_count = jobs_sk.filter(F.col("job_link").isNull()).count();

pct_missing_job_link = (missing_job_link_count / total_rows) * 100;
pct_missing_job_skills = (missing_job_skills_count / total_rows) * 100;

rows_before = total_rows;

dup_count = (
    jobs_sk.select("job_link").count()
    - jobs_sk.select("job_link").dropDuplicates(["job_link"]).count()
);

jobs_sk = jobs_sk.dropDuplicates(["job_link"]);

rows_after = jobs_sk.count();

print("Total rows (before dedup):", rows_before);
print("pct_missing_job_link:", round(pct_missing_job_link, 4));
print("pct_missing_job_skills:", round(pct_missing_job_skills, 4));
print("Rows before cleaning:", rows_before);
print("Rows after cleaning:", rows_after);
print("Duplicates found and removed:", dup_count);

Total rows (before dedup): 1348488
pct_missing_job_link: 0.0
pct_missing_job_skills: 4.0129
Rows before cleaning: 1348488
Rows after cleaning: 1348463
Duplicates found and removed: 25


In [4]:
truthy = ["true", "t", "yes", "y", "1"];
falsy = ["false", "f", "no", "n", "0"];

bool_cols = ["got_summary", "got_ner", "is_being_worked"];

for col in bool_cols:
    s = F.lower(F.trim(F.col(col).cast("string")));
    jobs_sk = jobs_sk.withColumn(
        col,
        F.when(s.isin(truthy), F.lit(1))
         .when(s.isin(falsy), F.lit(0))
         .otherwise(F.lit(0))
    );

text_cols = ["job_link", "job_title", "company",
             "job_location", "job_level", "job_type"];

for col in text_cols:
    jobs_sk = jobs_sk.withColumn(
        col,
        F.when(F.col(col).isNull(), None).otherwise(
            F.regexp_replace(F.col(col).cast("string"), r"\s+", " ")
        )
    );
    jobs_sk = jobs_sk.withColumn(col, F.trim(F.col(col)));

print("type conversions and text cleanup.");

type conversions and text cleanup.


In [5]:
jobs_sk = jobs_sk.withColumn(
    "job_skills_clean",
    F.when(F.col("job_skills").isNull(), "")
     .otherwise(F.lower(F.col("job_skills").cast("string")))
);

jobs_sk = jobs_sk.withColumn(
    "job_skills_clean",
    F.regexp_replace("job_skills_clean", "[;|]", ",")
);

jobs_sk = jobs_sk.withColumn(
    "job_skills_clean",
    F.regexp_replace("job_skills_clean", r"\s*,\s*", ",")
);

jobs_sk = jobs_sk.withColumn(
    "job_skills_clean",
    F.regexp_replace("job_skills_clean", r"\s+", " ")
);

jobs_sk = jobs_sk.withColumn("skills_arr_raw", F.split("job_skills_clean", ","));

jobs_sk = jobs_sk.withColumn(
    "skills_list",
    F.expr("filter(transform(skills_arr_raw, x -> trim(x)), x -> x <> '')")
);

jobs_sk = jobs_sk.drop("skills_arr_raw");

jobs_sk = jobs_sk.withColumn("skill_count", F.size("skills_list"));

clean_cols = [
    "job_link", "job_title", "company",
    "job_location", "job_level", "job_type",
    "job_skills_clean"
];

for col in clean_cols:
    jobs_sk = jobs_sk.withColumn(
        col,
        F.regexp_replace(F.col(col), r"\s+", " ")
    );
    jobs_sk = jobs_sk.withColumn(col, F.trim(F.col(col)));

print("job_skills cleaning and text normalization.");

job_skills cleaning and text normalization.


In [6]:
from pyspark.sql import functions as F;

final_rows = rows_after;
final_cols = len(jobs_sk.columns);

print("Total number of rows after cleaning:", final_rows);
print("Total number of columns after cleaning:", final_cols);
print("Percentage of missing values in job_link column:", round(pct_missing_job_link, 4));
print("Percentage of missing values in job_skills column:", round(pct_missing_job_skills, 4));

Total number of rows after cleaning: 1348463
Total number of columns after cleaning: 18
Percentage of missing values in job_link column: 0.0
Percentage of missing values in job_skills column: 4.0129


In [None]:
from pyspark.sql import functions as F;

rows_after = jobs_sk.count();
num_columns_after = len(jobs_sk.columns);

jobs_sk_export = jobs_sk.withColumn(
    "skills_list",
    F.concat_ws(";", F.col("skills_list"))
);

output_dir = "linkedin_jobs_skills_clean_pyspark";

(
    jobs_sk_export
    .coalesce(1)
    .write
    .mode("overwrite")
    .option("header", True)
    .csv(output_dir)
);

print("rows_after:", rows_after);
print("num_columns_after:", num_columns_after);
