<a href="https://colab.research.google.com/github/honyango/test/blob/main/DATA_INGESTION_CLUSTER2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

INSTALL DEPENDENCIES

In [3]:
# Install Java (Spark requires JDK 8)
!apt-get update -qq
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Download and extract Spark (includes Hadoop binaries for HDFS simulation)
!wget -q https://archive.apache.org/dist/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz
!tar xf spark-3.5.0-bin-hadoop3.tgz

# Install findspark (helps Python locate Spark)
!pip install -q findspark

# Set environment variables (mimics cluster config)
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.0-bin-hadoop3"

# Initialize findspark and create Spark session
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# Create Spark session in local mode: "local[*]" uses all cores for parallelism
# appName: For job tracking (like in YARN on EMR)
# Scaling note: In Dataproc/EMR, replace with .master("yarn") and .config("spark.executor.memory", "4g")
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("LinkedInJobPostingsIngestion") \
    .config("spark.executor.memory", "2g") \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()

# Verify setup
print("Spark Version:", spark.version)
print("Hadoop Version (simulated):", spark.conf.get("spark.hadoopVersion", "3.3.0"))
spark.sparkContext.setLogLevel("WARN")  # Reduce log noise

W: Skipping acquire of configured file 'main/source/Sources' as repository 'https://r2u.stat.illinois.edu/ubuntu jammy InRelease' does not seem to provide it (sources.list entry misspelt?)
Spark Version: 3.5.0
Hadoop Version (simulated): 3.3.0


In [7]:
# Replace the string 'NULL' with actual null values
df = df.withColumn("job_level", when(col("job_level") == "NULL", None).otherwise(col("job_level")))

# Fill the null values with 'Unknown'
df = df.na.fill({'job_level': 'Unknown'})

# Show the unique values again to verify
df.select("job_level").distinct().show()

+--------------------+
|           job_level|
+--------------------+
|           Associate|
|             Unknown|
|          Mid senior|
|Job Development S...|
|Chief Computer Pr...|
+--------------------+



In [6]:
df.select("job_level").distinct().show()

+--------------------+
|           job_level|
+--------------------+
|           Associate|
|          Mid senior|
|Job Development S...|
|Chief Computer Pr...|
|                NULL|
+--------------------+



In [5]:
print(df.columns)

['job_link', 'last_processed_time', 'got_summary', 'got_ner', 'is_being_worked', 'job_title', 'company', 'job_location', 'first_seen', 'search_city', 'search_country', 'search_position', 'job_level', 'job_type']


show count of jobs

In [10]:
# show count of jobs
df.count()


639433

In [4]:
# Load the dataset
# inferSchema: Spark attempts to automatically determine the data types of columns
# header: Treat the first row as the header
df = spark.read.csv("/content/linkedin_job_postings.csv", inferSchema=True, header=True)

# Display the schema and a few rows to verify
df.printSchema()
df.show(5)

root
 |-- job_link: string (nullable = true)
 |-- last_processed_time: string (nullable = true)
 |-- got_summary: string (nullable = true)
 |-- got_ner: string (nullable = true)
 |-- is_being_worked: string (nullable = true)
 |-- job_title: string (nullable = true)
 |-- company: string (nullable = true)
 |-- job_location: string (nullable = true)
 |-- first_seen: string (nullable = true)
 |-- search_city: string (nullable = true)
 |-- search_country: string (nullable = true)
 |-- search_position: string (nullable = true)
 |-- job_level: string (nullable = true)
 |-- job_type: string (nullable = true)

+--------------------+--------------------+-----------+-------+---------------+--------------------+--------------------+--------------------+----------+-----------+--------------+--------------------+----------+--------+
|            job_link| last_processed_time|got_summary|got_ner|is_being_worked|           job_title|             company|        job_location|first_seen|search_city|sear

INGEST THE DATA INTO SPARK

In [8]:
# Read CSV into Spark DataFrame
# header=True: Uses first row as column names
# inferSchema=True: Auto-detects data types (e.g., string for title, int for criteria_max_salary)
# Options for distribution: Spark auto-partitions based on file size (e.g., 124K rows -> ~10-20 partitions)
df = spark.read \
    .option("header", "True") \
    .option("inferSchema", "True") \
    .csv("linkedin_job_postings.csv")

# Cache for reuse (like in-memory distribution across nodes)
df.cache()

# Show schema (data types, nullable)
df.printSchema()

# Show first 5 rows and count
print("Total rows:", df.count())
df.show(5, truncate=False)

# Basic stats: Handle formats by checking for nulls/malformed rows
df.describe().show()

root
 |-- job_link: string (nullable = true)
 |-- last_processed_time: string (nullable = true)
 |-- got_summary: string (nullable = true)
 |-- got_ner: string (nullable = true)
 |-- is_being_worked: string (nullable = true)
 |-- job_title: string (nullable = true)
 |-- company: string (nullable = true)
 |-- job_location: string (nullable = true)
 |-- first_seen: string (nullable = true)
 |-- search_city: string (nullable = true)
 |-- search_country: string (nullable = true)
 |-- search_position: string (nullable = true)
 |-- job_level: string (nullable = true)
 |-- job_type: string (nullable = true)

Total rows: 639433
+------------------------------------------------------------------------------------------------------------------------------+-----------------------------+-----------+-------+---------------+--------------------------------------------------------------------------+----------------------------+--------------------+----------+-----------+--------------+---------------

VERIFY DATA DISTRIBUTION AND BASIC PROCESS

In [9]:
# Check partitions (mimics HDFS block distribution)
print("Number of partitions:", df.rdd.getNumPartitions())

# Repartition for even distribution (e.g., scale to 4 "nodes")
df_repartitioned = df.repartition(4)
print("Repartitioned to:", df_repartitioned.rdd.getNumPartitions())

# Example: Filter and aggregate (distributed computation)
# Group by company, count jobs (runs in parallel across partitions)
top_companies = df.groupBy("company").count().orderBy(col("count").desc()).limit(10)
top_companies.show()

# Convert to Parquet for efficient storage (like HDFS output)
df.write.mode("overwrite").parquet("linkedin_jobs_parquet")
print("Saved as Parquet (compressed, distributed format)")

Number of partitions: 2
Repartitioned to: 4
+--------------------+-----+
|             company|count|
+--------------------+-----+
|     Health eCareers|15268|
|   Jobs for Humanity|12546|
|      Dollar General| 6492|
|   TravelNurseSource| 5731|
|Gotham Enterprise...| 4310|
|      Energy Jobline| 4290|
|               Jobot| 4102|
|      VolunteerMatch| 3469|
|        PracticeLink| 3341|
|       ClearanceJobs| 3304|
+--------------------+-----+

Saved as Parquet (compressed, distributed format)
