### Setup: Paths and Imports

In [0]:
from pyspark.sql.functions import col, explode, trim, avg, stddev_pop, row_number, round
from pyspark.sql.types import IntegerType, DoubleType
from pyspark.sql.window import Window

In [0]:
# S3 paths
bucket_name = "rearc-quest-897729141725-us-east-1"
bls_path = f"s3a://{bucket_name}/bls/files/pr.data.0.Current"
population_path = f"s3a://{bucket_name}/datausa/population.json"

### Load & Clean BLS Data (Tab-delimited)

In [0]:
# Load CSV
bls_df = spark.read.option("delimiter", "\t").option("header", True).csv(bls_path)

# Normalize column names
bls_df = bls_df.select([col(c).alias(c.strip().lower().replace(" ", "_")) for c in bls_df.columns])

# Trim all string columns
bls_df = bls_df.select([
    trim(col(c)).alias(c) if t == "string" else col(c)
    for c, t in bls_df.dtypes
])

# Cast numeric columns
bls_df = bls_df.withColumn("value", col("value").cast(DoubleType()))
bls_df = bls_df.withColumn("year", col("year").cast(IntegerType()))

### Load & Clean Population Data (JSON)


In [0]:
# Load JSON and explode the 'data' array
population_raw = spark.read.option("multiLine", True).json(population_path)
population_df = population_raw.select(explode("data").alias("record")).select("record.*")

# Normalize column names
population_df = population_df.select([col(c).alias(c.strip().lower().replace(" ", "_")) for c in population_df.columns])

# Trim strings and cast numeric fields
population_df = population_df.select([
    trim(col(c)).alias(c) if t == "string" else col(c)
    for c, t in population_df.dtypes
])
population_df = population_df.withColumn("year", col("year").cast(IntegerType()))
population_df = population_df.withColumn("population", col("population").cast(IntegerType()))


### Task 1: Population Stats (2013–2018)


In [0]:
pop_stats_df = (
    population_df
    .filter((col("year") >= 2013) & (col("year") <= 2018))
    .select(
        round(avg("population"), 2).alias("mean_population"),
        round(stddev_pop("population"), 2).alias("stddev_population")
    )
)
pop_stats_df.display()

mean_population,stddev_population
317437383.0,3886173.29


### Task 2: Best Year per Series ID

In [0]:
# Aggregate total value by series_id/year
bls_yearly = bls_df.groupBy("series_id", "year").sum("value").withColumnRenamed("sum(value)", "yearly_sum")

# Round yearly_sum to 2 decimal places
bls_yearly = bls_yearly.withColumn("yearly_sum", round(col("yearly_sum"), 2))

# Rank and extract top year per series_id
window_spec = Window.partitionBy("series_id").orderBy(col("yearly_sum").desc())
best_years_df = bls_yearly.withColumn("rank", row_number().over(window_spec)).filter(col("rank") == 1).drop("rank")

best_years_df.display()

series_id,year,yearly_sum
PRS30006011,2022,20.5
PRS30006012,2022,17.1
PRS30006013,1998,705.9
PRS30006021,2010,17.7
PRS30006022,2010,12.4
PRS30006023,2014,503.22
PRS30006031,2022,20.5
PRS30006032,2021,17.1
PRS30006033,1998,702.67
PRS30006061,2022,37.0


### Task 3: Join PRS30006032 Q01 with Population

In [0]:
target_df = bls_df.filter((col("series_id") == "PRS30006032") & (col("period") == "Q01"))
joined_df = target_df.join(population_df, on="year", how="inner")

final_result_df = joined_df.select("series_id", "year", "period", "value", "population")
final_result_df.display()

series_id,year,period,value,population
PRS30006032,2013,Q01,0.5,311536594
PRS30006032,2014,Q01,-0.1,314107084
PRS30006032,2015,Q01,-1.7,316515021
PRS30006032,2016,Q01,-1.4,318558162
PRS30006032,2017,Q01,0.9,321004407
PRS30006032,2018,Q01,0.5,322903030
PRS30006032,2019,Q01,-1.6,324697795
PRS30006032,2020,Q01,-7.0,326569308
PRS30006032,2021,Q01,0.7,329725481
PRS30006032,2022,Q01,5.3,331097593
