In [10]:
# Optional: remove old folder if it exists
!rm -rf /content/linkedin_jobs

# Unzip into /content/linkedin_jobs
!unzip "archive (1).zip" -d /content/linkedin_jobs

Archive:  archive (1).zip
  inflating: /content/linkedin_jobs/companies/companies.csv  
  inflating: /content/linkedin_jobs/companies/company_industries.csv  
  inflating: /content/linkedin_jobs/companies/company_specialities.csv  
  inflating: /content/linkedin_jobs/companies/employee_counts.csv  
  inflating: /content/linkedin_jobs/jobs/benefits.csv  
  inflating: /content/linkedin_jobs/jobs/job_industries.csv  
  inflating: /content/linkedin_jobs/jobs/job_skills.csv  
  inflating: /content/linkedin_jobs/jobs/salaries.csv  
  inflating: /content/linkedin_jobs/mappings/industries.csv  
  inflating: /content/linkedin_jobs/mappings/skills.csv  
  inflating: /content/linkedin_jobs/postings.csv  


In [11]:
!ls /content/linkedin_jobs


companies  jobs  mappings  postings.csv


In [12]:
# === Cell 1: Spark setup and imports ===

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

spark = (
    SparkSession.builder
    .appName("DS5110_LinkedIn_DataEngineering")
    .getOrCreate()
)

# CHANGE THIS: folder where you extracted Kaggle "LinkedIn Job Postings 2023–2024" archive (1).zip
# The folder should contain: postings.csv, jobs/, mappings/, companies/
base_path = "/content/linkedin_jobs"


In [13]:
# === Cell 2: Load raw LinkedIn Kaggle tables ===

postings_df = (
    spark.read
    .option("header", True)
    .option("inferSchema", True)
    .option("multiLine", True)   # <-- important
    .option("quote", '"')        # standard CSV quoting
    .option("escape", '"')       # handle embedded quotes
    .csv(f"{base_path}/postings.csv")
)


salaries_df = (
    spark.read
    .option("header", True)
    .option("inferSchema", True)
    .csv(f"{base_path}/jobs/salaries.csv")
)

job_skills_df = (
    spark.read
    .option("header", True)
    .option("inferSchema", True)
    .csv(f"{base_path}/jobs/job_skills.csv")
)

job_industries_df = (
    spark.read
    .option("header", True)
    .option("inferSchema", True)
    .csv(f"{base_path}/jobs/job_industries.csv")
)

benefits_df = (
    spark.read
    .option("header", True)
    .option("inferSchema", True)
    .csv(f"{base_path}/jobs/benefits.csv")
)

skills_map_df = (
    spark.read
    .option("header", True)
    .option("inferSchema", True)
    .csv(f"{base_path}/mappings/skills.csv")
)

industries_map_df = (
    spark.read
    .option("header", True)
    .option("inferSchema", True)
    .csv(f"{base_path}/mappings/industries.csv")
)

companies_df = (
    spark.read
    .option("header", True)
    .option("inferSchema", True)
    .option("multiLine", True)   # <-- also important here
    .option("quote", '"')
    .option("escape", '"')
    .csv(f"{base_path}/companies/companies.csv")
)


company_industries_df = (
    spark.read
    .option("header", True)
    .option("inferSchema", True)
    .csv(f"{base_path}/companies/company_industries.csv")
)

company_specialities_df = (
    spark.read
    .option("header", True)
    .option("inferSchema", True)
    .csv(f"{base_path}/companies/company_specialities.csv")
)

employee_counts_df = (
    spark.read
    .option("header", True)
    .option("inferSchema", True)
    .csv(f"{base_path}/companies/employee_counts.csv")
)


In [14]:
# === Cell 3: Clean and standardize job postings ===

# Rename posting-level company_name to avoid collision with companies table
postings_clean = postings_df.withColumnRenamed("company_name", "posting_company_name")

# Cast IDs and numeric fields
postings_clean = (
    postings_clean
    .withColumn("job_id", F.col("job_id").cast("long"))
    .withColumn("company_id", F.col("company_id").cast("long"))
    .withColumn("views", F.col("views").cast("int"))
    .withColumn("applies", F.col("applies").cast("int"))
    .withColumn("remote_allowed", F.col("remote_allowed").cast("int"))
    .withColumn("max_salary", F.col("max_salary").cast("double"))
    .withColumn("med_salary", F.col("med_salary").cast("double"))
    .withColumn("min_salary", F.col("min_salary").cast("double"))
    .withColumn("normalized_salary", F.col("normalized_salary").cast("double"))
)

# Trim / normalize key text fields
postings_clean = (
    postings_clean
    .withColumn("posting_company_name", F.trim(F.col("posting_company_name")))
    .withColumn("title", F.trim(F.col("title")))
    .withColumn("description", F.trim(F.col("description")))
    .withColumn("location_raw", F.trim(F.col("location")))
    .withColumn("pay_period", F.upper(F.col("pay_period")))
    .withColumn("currency", F.upper(F.col("currency")))
    .withColumn("work_type", F.upper(F.col("work_type")))
    .withColumn("formatted_work_type", F.trim(F.col("formatted_work_type")))
    .withColumn("formatted_experience_level", F.trim(F.col("formatted_experience_level")))
    .withColumn("posting_domain", F.trim(F.col("posting_domain")))
    .withColumn("application_type", F.trim(F.col("application_type")))
    .withColumn("application_url", F.trim(F.col("application_url")))
    .withColumn("job_posting_url", F.trim(F.col("job_posting_url")))
)

# Drop rows missing critical text fields
postings_clean = postings_clean.filter(
    F.col("title").isNotNull() & F.col("description").isNotNull()
)

# Convert millisecond epoch columns to timestamp
for c in ["original_listed_time", "listed_time", "expiry", "closed_time"]:
    postings_clean = postings_clean.withColumn(
        f"{c}_ts",
        F.to_timestamp(F.from_unixtime((F.col(c) / F.lit(1000)).cast("bigint")))
    )

# Parse location into city / state / country (best effort)
loc_split = F.split(F.col("location_raw"), ",")

postings_clean = (
    postings_clean
    .withColumn("job_city", F.trim(loc_split.getItem(0)))
    .withColumn(
        "job_state",
        F.when(F.size(loc_split) > 1, F.trim(loc_split.getItem(1)))
    )
    .withColumn(
        "job_country",
        F.when(F.lower(F.col("location_raw")).contains("united states"), F.lit("US"))
    )
)

# Remote flag standardized to 0/1
postings_clean = postings_clean.withColumn(
    "remote_flag",
    F.when(F.col("remote_allowed") == 1, F.lit(1)).otherwise(F.lit(0))
)

In [15]:
# === Cell 4: Salary integration + skills / industries / benefits ===

# ---- Salaries ----
salaries_std = (
    salaries_df
    .select(
        F.col("job_id").cast("long"),
        F.col("max_salary").cast("double").alias("max_salary_sal"),
        F.col("med_salary").cast("double").alias("med_salary_sal"),
        F.col("min_salary").cast("double").alias("min_salary_sal"),
        F.upper(F.col("pay_period")).alias("pay_period_sal"),
        F.upper(F.col("currency")).alias("currency_sal"),
        F.upper(F.col("compensation_type")).alias("compensation_type_sal"),
    )
)

jobs_with_salary = (
    postings_clean.alias("p")
    .join(salaries_std.alias("s"), on="job_id", how="left")
)

# Coalesce salary fields from postings + salaries table
jobs_with_salary = (
    jobs_with_salary
    .withColumn(
        "salary_min",
        F.coalesce(F.col("p.min_salary"), F.col("min_salary_sal"))
    )
    .withColumn(
        "salary_max",
        F.coalesce(F.col("p.max_salary"), F.col("max_salary_sal"))
    )
    .withColumn(
        "salary_med",
        F.coalesce(F.col("p.med_salary"), F.col("med_salary_sal"))
    )
    .withColumn(
        "salary_pay_period",
        F.coalesce(F.col("p.pay_period"), F.col("pay_period_sal"))
    )
    .withColumn(
        "salary_currency",
        F.coalesce(F.col("p.currency"), F.col("currency_sal"))
    )
    .withColumn(
        "salary_comp_type",
        F.coalesce(F.col("p.compensation_type"), F.col("compensation_type_sal"))
    )
)

# Base salary from min/med/max
jobs_with_salary = (
    jobs_with_salary
    .withColumn(
        "salary_midpoint",
        (F.col("salary_min") + F.col("salary_max")) / F.lit(2.0)
    )
    .withColumn(
        "salary_base",
        F.coalesce(
            F.col("salary_med"),
            F.col("salary_midpoint"),
            F.col("salary_max"),
            F.col("salary_min")
        )
    )
)

# Annualize salary and fall back to normalized_salary (already annualized USD in Kaggle)
jobs_with_salary = (
    jobs_with_salary
    .withColumn(
        "salary_annual_usd_calc",
        F.when(F.col("salary_pay_period") == "YEARLY", F.col("salary_base"))
        .when(F.col("salary_pay_period") == "HOURLY", F.col("salary_base") * F.lit(2080))
        .when(F.col("salary_pay_period") == "WEEKLY", F.col("salary_base") * F.lit(52))
        .when(F.col("salary_pay_period") == "BIWEEKLY", F.col("salary_base") * F.lit(26))
        .when(F.col("salary_pay_period") == "MONTHLY", F.col("salary_base") * F.lit(12))
    )
    .withColumn(
        "salary_annual_usd_raw",
        F.coalesce(F.col("normalized_salary"), F.col("salary_annual_usd_calc"))
    )
)

# Filter out clearly unrealistic salaries
jobs_with_salary = (
    jobs_with_salary
    .withColumn(
        "salary_annual_usd",
        F.when(
            (F.col("salary_annual_usd_raw") >= F.lit(10000)) &
            (F.col("salary_annual_usd_raw") <= F.lit(1000000)),
            F.col("salary_annual_usd_raw")
        )
    )
    .withColumn("salary_annual_usd", F.round(F.col("salary_annual_usd")))
)

# ---- Skills ----
job_skills_named = (
    job_skills_df
    .join(skills_map_df, on="skill_abr", how="left")
)

skills_agg = (
    job_skills_named
    .groupBy("job_id")
    .agg(
        F.array_sort(F.collect_set("skill_name")).alias("skills")
    )
    .withColumn("skills_str", F.concat_ws(", ", F.col("skills")))
)

# ---- Job industries ----
job_industries_named = (
    job_industries_df
    .join(industries_map_df, on="industry_id", how="left")
)

job_industries_agg = (
    job_industries_named
    .groupBy("job_id")
    .agg(
        F.array_sort(F.collect_set("industry_name")).alias("job_industries")
    )
    .withColumn("job_industries_str", F.concat_ws(", ", F.col("job_industries")))
)

# ---- Benefits ----
benefits_agg = (
    benefits_df
    .groupBy("job_id")
    .agg(
        F.array_sort(F.collect_set("type")).alias("benefits")
    )
    .withColumn("benefits_str", F.concat_ws(", ", F.col("benefits")))
)

# Attach skills / industries / benefits to job-level data
jobs_enriched = (
    jobs_with_salary
    .join(skills_agg, on="job_id", how="left")
    .join(job_industries_agg, on="job_id", how="left")
    .join(benefits_agg, on="job_id", how="left")
)


In [16]:
# === Cell 5: Company enrichment + final analytics-ready dataset ===

# Clean and rename company columns
companies_clean = (
    companies_df
    .withColumn("company_id", F.col("company_id").cast("long"))
    .withColumn("company_name", F.trim(F.col("name")))
    .withColumn("company_description", F.trim(F.col("description")))
    .withColumn("company_city", F.trim(F.col("city")))
    .withColumn("company_state", F.trim(F.col("state")))
    .withColumn("company_country", F.trim(F.col("country")))
    .withColumn("company_zip_code", F.col("zip_code"))
    .withColumn("company_address", F.trim(F.col("address")))
    .withColumn("company_url", F.trim(F.col("url")))
    .withColumn("company_size", F.trim(F.col("company_size")))
    .drop("name", "description", "city", "state", "country", "zip_code", "address", "url")
)

# Aggregate company industries and specialities
company_industries_agg = (
    company_industries_df
    .groupBy("company_id")
    .agg(
        F.array_sort(F.collect_set("industry")).alias("company_industries")
    )
    .withColumn("company_industries_str", F.concat_ws(", ", F.col("company_industries")))
)

company_specialities_agg = (
    company_specialities_df
    .groupBy("company_id")
    .agg(
        F.array_sort(F.collect_set("speciality")).alias("company_specialities")
    )
    .withColumn("company_specialities_str", F.concat_ws(", ", F.col("company_specialities")))
)

# Latest employee_count / follower_count per company
employee_counts_df = (
    employee_counts_df
    .withColumn("company_id", F.col("company_id").cast("long"))
    .withColumn("employee_count", F.col("employee_count").cast("int"))
    .withColumn("follower_count", F.col("follower_count").cast("int"))
)

w = Window.partitionBy("company_id").orderBy(F.desc("time_recorded"))

employee_latest = (
    employee_counts_df
    .withColumn("rn", F.row_number().over(w))
    .filter(F.col("rn") == 1)
    .drop("rn")
)

company_full = (
    companies_clean
    .join(company_industries_agg, on="company_id", how="left")
    .join(company_specialities_agg, on="company_id", how="left")
    .join(employee_latest, on="company_id", how="left")
)

# Join company data into job dataset
jobs_final = (
    jobs_enriched
    .withColumn("company_id", F.col("company_id").cast("long"))
    .join(company_full, on="company_id", how="left")
)

# Select clean schema for modeling / EDA
jobs_feature_df = jobs_final.select(
    # IDs
    "job_id",
    "company_id",

    # Job-level text
    "title",
    "description",

    # Posting-level company name (raw) and cleaned company name
    "posting_company_name",
    "company_name",

    # Location
    "location_raw",
    "job_city",
    "job_state",
    "job_country",
    "zip_code",
    "fips",

    # Time
    "original_listed_time_ts",
    "listed_time_ts",
    "expiry_ts",
    "closed_time_ts",

    # Work / experience / remote
    "formatted_work_type",
    "work_type",
    "formatted_experience_level",
    "remote_flag",

    # Salary
    "salary_min",
    "salary_max",
    "salary_med",
    "salary_pay_period",
    "salary_currency",
    "salary_comp_type",
    "salary_annual_usd",

    # Engagement
    "views",
    "applies",
    "sponsored",
    "posting_domain",
    "job_posting_url",
    "application_url",
    "application_type",

    # Skills / industries / benefits
    "skills",
    "skills_str",
    "job_industries",
    "job_industries_str",
    "benefits",
    "benefits_str",

    # Company attributes
    "company_description",
    "company_city",
    "company_state",
    "company_country",
    "company_zip_code",
    "company_address",
    "company_url",
    "company_size",
    "employee_count",
    "follower_count",
    "company_industries",
    "company_industries_str",
    "company_specialities",
    "company_specialities_str",
)

# Quick sanity checks
jobs_feature_df.printSchema()
jobs_feature_df.show(5, truncate=80)

# Write analytics-ready dataset to Parquet for downstream ML / EDA
output_path = "./clean_linkedin_jobs_parquet"  # relative to notebook directory

(
    jobs_feature_df
    .coalesce(1)  # adjust based on cluster size
    .write
    .mode("overwrite")
    .parquet(output_path)
)

print(f"Saved cleaned dataset to: {output_path}")


root
 |-- job_id: long (nullable = true)
 |-- company_id: long (nullable = true)
 |-- title: string (nullable = true)
 |-- description: string (nullable = true)
 |-- posting_company_name: string (nullable = true)
 |-- company_name: string (nullable = true)
 |-- location_raw: string (nullable = true)
 |-- job_city: string (nullable = true)
 |-- job_state: string (nullable = true)
 |-- job_country: string (nullable = true)
 |-- zip_code: integer (nullable = true)
 |-- fips: integer (nullable = true)
 |-- original_listed_time_ts: timestamp (nullable = true)
 |-- listed_time_ts: timestamp (nullable = true)
 |-- expiry_ts: timestamp (nullable = true)
 |-- closed_time_ts: timestamp (nullable = true)
 |-- formatted_work_type: string (nullable = true)
 |-- work_type: string (nullable = true)
 |-- formatted_experience_level: string (nullable = true)
 |-- remote_flag: integer (nullable = false)
 |-- salary_min: double (nullable = true)
 |-- salary_max: double (nullable = true)
 |-- salary_med: d

In [17]:
# === Cell 6: Basic EDA on jobs_feature_df ===

from pyspark.sql import functions as F

print("Total rows in jobs_feature_df:")
print(jobs_feature_df.count())

# 1. Top job locations (raw)
print("\nTop raw locations by number of postings:")
(
    jobs_feature_df
    .groupBy("location_raw")
    .count()
    .orderBy(F.desc("count"))
    .show(10, truncate=False)
)

# 1b. Top job cities by number of postings (non-null only)
print("\nTop job cities by number of postings (job_city not null):")
(
    jobs_feature_df
    .where(F.col("job_city").isNotNull())
    .groupBy("job_city")
    .count()
    .orderBy(F.desc("count"))
    .show(10, truncate=False)
)

# 2. Salary annual USD summary (for non-null salaries)
print("\nSalary (annual USD) summary for non-null values:")
(
    jobs_feature_df
    .select("salary_annual_usd")
    .where(F.col("salary_annual_usd").isNotNull())
    .describe()
    .show()
)

# 3. Salary bucket distribution
print("\nSalary bucket distribution:")
salary_bucket_df = (
    jobs_feature_df
    .select(
        F.when(F.col("salary_annual_usd") < 50000, "<50k")
         .when((F.col("salary_annual_usd") >= 50000) & (F.col("salary_annual_usd") < 100000), "50k–100k")
         .when((F.col("salary_annual_usd") >= 100000) & (F.col("salary_annual_usd") < 150000), "100k–150k")
         .when((F.col("salary_annual_usd") >= 150000) & (F.col("salary_annual_usd") < 200000), "150k–200k")
         .when(F.col("salary_annual_usd") >= 200000, ">=200k")
         .otherwise("Unknown")
         .alias("salary_bucket")
    )
    .groupBy("salary_bucket")
    .count()
    .orderBy(F.desc("count"))
)
salary_bucket_df.show(truncate=False)

# 4. Most common skills across all postings
print("\nMost common skills across postings:")
skills_exploded = jobs_feature_df.select(F.explode_outer("skills").alias("skill"))
(
    skills_exploded
    .groupBy("skill")
    .count()
    .orderBy(F.desc("count"))
    .show(20, truncate=False)
)


Total rows in jobs_feature_df:
123842

Top raw locations by number of postings:
+-------------+-----+
|location_raw |count|
+-------------+-----+
|United States|8122 |
|New York, NY |2755 |
|Chicago, IL  |1834 |
|Houston, TX  |1762 |
|Dallas, TX   |1383 |
|Atlanta, GA  |1363 |
|Boston, MA   |1176 |
|Austin, TX   |1083 |
|Charlotte, NC|1075 |
|Phoenix, AZ  |1059 |
+-------------+-----+
only showing top 10 rows


Top job cities by number of postings (job_city not null):
+-------------+-----+
|job_city     |count|
+-------------+-----+
|United States|8122 |
|New York     |3403 |
|Chicago      |1836 |
|Houston      |1776 |
|Dallas       |1394 |
|Atlanta      |1369 |
|Austin       |1325 |
|Boston       |1202 |
|Washington   |1118 |
|Los Angeles  |1100 |
+-------------+-----+
only showing top 10 rows


Salary (annual USD) summary for non-null values:
+-------+-----------------+
|summary|salary_annual_usd|
+-------+-----------------+
|  count|            35617|
|   mean|96590.29463458461|
| s

In [18]:
from pyspark.sql import functions as F

print("Sample of job_city vs location_raw where job_city looks suspicious:")
(
    jobs_feature_df
    .select("job_city", "location_raw", "title")
    .where(F.col("job_city").isin(
        "Full-time", "national origin", "gender identity",
        "sexual orientation", "age", "religion", "disability", "sex", "gender"
    ))
    .show(50, truncate=False)
)


Sample of job_city vs location_raw where job_city looks suspicious:
+--------+------------+-----+
|job_city|location_raw|title|
+--------+------------+-----+
+--------+------------+-----+

