In [1]:
from core.spark_session import create_spark_session

DATE_PATH = "2026/01/30"

spark = create_spark_session()

df = (
    spark.read
    .option("multiLine", True)
    .json(f"s3a://data-lake/bronze/adzuna/{DATE_PATH}/*.json")
)

df.show(5)


+--------------------+--------------------+------------+--------------------+------+
|            batch_id|         ingested_at|record_count|             records|source|
+--------------------+--------------------+------------+--------------------+------+
|32942ca8-ec46-4e7...|2026-01-30T00:22:...|         100|[{Adzuna::API::Re...|adzuna|
|bc265b51-97a1-41c...|2026-01-30T00:22:...|         100|[{Adzuna::API::Re...|adzuna|
|28cde783-3e4b-4ff...|2026-01-30T00:22:...|          50|[{Adzuna::API::Re...|adzuna|
+--------------------+--------------------+------------+--------------------+------+



In [2]:
from pyspark.sql.functions import explode, col

jobs_df = (
    df
    .filter(col("records").isNotNull())
    .select(explode("records").alias("job"))
)
jobs_df.printSchema()

root
 |-- job: struct (nullable = true)
 |    |-- __CLASS__: string (nullable = true)
 |    |-- adref: string (nullable = true)
 |    |-- category: struct (nullable = true)
 |    |    |-- __CLASS__: string (nullable = true)
 |    |    |-- label: string (nullable = true)
 |    |    |-- tag: string (nullable = true)
 |    |-- company: struct (nullable = true)
 |    |    |-- __CLASS__: string (nullable = true)
 |    |    |-- display_name: string (nullable = true)
 |    |-- contract_time: string (nullable = true)
 |    |-- contract_type: string (nullable = true)
 |    |-- created: string (nullable = true)
 |    |-- description: string (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- latitude: double (nullable = true)
 |    |-- location: struct (nullable = true)
 |    |    |-- __CLASS__: string (nullable = true)
 |    |    |-- area: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- display_name: string (nullable = true)
 |    

In [3]:
jobs_df.select(
      "job.title",
      "job.company.display_name",
      "job.location.display_name",
      "job.salary_min",
      "job.salary_max",
      "job.contract_type"
  ) \
  .show(10, truncate=False)


+-------------------------------------------------------------+--------------------+----------------------------+----------+----------+-------------+
|title                                                        |display_name        |display_name                |salary_min|salary_max|contract_type|
+-------------------------------------------------------------+--------------------+----------------------------+----------+----------+-------------+
|Speech and Language Therapist                                |Outcomes First Group|Kilhallon, Par              |53200.0   |53200.0   |permanent    |
|Speech and Language Therapist                                |Outcomes First Group|Chadbury, Evesham           |0.0       |53200.0   |permanent    |
|Principal Structural Engineer                                |BAE Systems         |Chapels, Kirkby-In-Furness  |50141.93  |50141.93  |NULL         |
|Senior Planning, Monitoring & Control Professional – Planning|BAE Systems         |Heysham, Morecam

In [4]:
jobs_df.select(
      "job.title",
      "job.company.display_name",
      "job.location.display_name",
      "job.salary_min",
      "job.salary_max",
      "job.contract_type"
  ) \
  .show(10, truncate=False)


+-------------------------------------------------------------+--------------------+----------------------------+----------+----------+-------------+
|title                                                        |display_name        |display_name                |salary_min|salary_max|contract_type|
+-------------------------------------------------------------+--------------------+----------------------------+----------+----------+-------------+
|Speech and Language Therapist                                |Outcomes First Group|Kilhallon, Par              |53200.0   |53200.0   |permanent    |
|Speech and Language Therapist                                |Outcomes First Group|Chadbury, Evesham           |0.0       |53200.0   |permanent    |
|Principal Structural Engineer                                |BAE Systems         |Chapels, Kirkby-In-Furness  |50141.93  |50141.93  |NULL         |
|Senior Planning, Monitoring & Control Professional – Planning|BAE Systems         |Heysham, Morecam

In [5]:
from pyspark.sql.functions import col, count, when

check_null_job_df = (
    df
    .select(explode("records").alias("job"))
    .select("job.*")
)

null_stats = check_null_job_df.select([
    count(when(col(c).isNull(), c)).alias(c)
    for c in check_null_job_df.columns
])

null_stats.show(truncate=False)

+---------+-----+--------+-------+-------------+-------------+-------+-----------+---+--------+--------+---------+------------+-------------------+----------+----------+-----+
|__CLASS__|adref|category|company|contract_time|contract_type|created|description|id |latitude|location|longitude|redirect_url|salary_is_predicted|salary_max|salary_min|title|
+---------+-----+--------+-------+-------------+-------------+-------+-----------+---+--------+--------+---------+------------+-------------------+----------+----------+-----+
|0        |0    |0       |0      |78           |158          |0      |0          |0  |21      |0       |21       |0           |0                  |1         |0         |0    |
+---------+-----+--------+-------+-------------+-------------+-------+-----------+---+--------+--------+---------+------------+-------------------+----------+----------+-----+

