In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from awsglue.context import GlueContext
from pyspark.context import SparkContext


sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session  

job = Job(glueContext)
job.init("analysis_job")

# Define S3 paths
s3_bucket = "s3://bls-gov-dataset/"
csv_path = s3_bucket + "bls-data/pr.data.0.Current"
json_path = s3_bucket + "datausa_population.json"

# Load CSV data into DataFrame
df_csv = spark.read.option("header", "true").option("delimiter", "\t").csv(csv_path)

# Strip leading and trailing spaces from column names
df_csv = df_csv.toDF(*[c.strip() for c in df_csv.columns])

# Apply trim to all columns
df_csv = df_csv.select([trim(col(c)).alias(c) for c in df_csv.columns])

# Cast year to int
df_csv = df_csv.withColumn("year", col("year").cast("int"))

# Load JSON data into DataFrame
df_json = spark.read.option("multiline", "true").json(json_path)

# Part 3.1: Calculate mean and standard deviation of annual US population (2013-2018)
df_population_filtered = df_json.filter((col("Year") >= 2013) & (col("Year") <= 2018))
df_population_stats = df_population_filtered.select(
    mean("Population").alias("Mean_Population"),
    stddev("Population").alias("StdDev_Population")
)
df_population_stats.show()

# Part 3.2: Find the best year for each series_id based on the largest annual sum of 'value' using SQL
df_csv.createOrReplaceTempView("csv_table")
df_best_year = spark.sql("""
    WITH annual_values AS (
        SELECT series_id, CAST(year AS INT) AS year, SUM(value) AS annual_value_sum
        FROM csv_table
        GROUP BY series_id, year
    ), ranked_values AS (
        SELECT *, ROW_NUMBER() OVER (PARTITION BY series_id ORDER BY annual_value_sum DESC) AS rank
        FROM annual_values
    )
    SELECT series_id, year, ceil(round(annual_value_sum)) as max_value FROM ranked_values WHERE rank = 1
""")
df_best_year.show()

# Part 3.3: Report for series_id = 'PRS30006032' and period = 'Q01' with corresponding population
df_filtered_series = df_csv.filter((col("series_id") == "PRS30006032") & (col("period") == "Q01"))
df_joined = df_filtered_series.join(df_json, df_filtered_series.year == df_json.Year, "left").select(
    df_filtered_series.series_id,
    df_filtered_series.year,
    df_filtered_series.period,
    df_filtered_series.value,
    df_json.Population)
df_joined = df_joined.filter(col("Population").isNotNull())
df_joined.show()

job.commit()


