In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
import json

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Rearc_Data_Quest") \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()

print("Spark session initialized successfully!")
print(f"Spark version: {spark.version}")

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/07/07 23:02:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/07/07 23:02:37 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


Spark session initialized successfully!
Spark version: 4.0.0


In [2]:
#Loading both the CSV file from Part 1 (time series data) and JSON file from Part 2 (population data).

# Load CSV file without schema first to handle whitespace in headers
df_timeseries_raw = spark.read \
    .option("header", "true") \
    .option("delimiter", "\t") \
    .csv("tmp/pr.data.0.Current")

# Clean column names by stripping whitespace
clean_columns = [col_name.strip() for col_name in df_timeseries_raw.columns]

# Rename columns to remove whitespace
df_temp = df_timeseries_raw
for old_col, new_col in zip(df_timeseries_raw.columns, clean_columns):
    if old_col != new_col:
        df_temp = df_temp.withColumnRenamed(old_col, new_col)

# Now cast to proper types and select only needed columns
df_timeseries = df_temp.select(
    col("series_id").cast(StringType()),
    col("year").cast(IntegerType()),
    col("period").cast(StringType()),
    col("value").cast(DoubleType()),
    col("footnote_codes").cast(StringType())
)

# Clean whitespace from data values
df_timeseries = df_timeseries.select([
    trim(col(c)).alias(c) if c in ["series_id", "period"] else col(c) 
    for c in df_timeseries.columns
])

print(f"Total rows: {df_timeseries.count():,}")
df_timeseries.show(10)

# Load JSON population data
with open("tmp/population_data.json", "r") as f:
    json_data = json.load(f)

# Extract the data array from JSON
population_data = json_data["data"]

# Define schema for population data
population_schema = StructType([
    StructField("ID Nation", StringType(), True),
    StructField("Nation", StringType(), True),
    StructField("ID Year", IntegerType(), True),
    StructField("Year", StringType(), True),
    StructField("Population", LongType(), True),
    StructField("Slug Nation", StringType(), True)
])

# Create DataFrame from JSON data
df_population = spark.createDataFrame(population_data, population_schema)

print(f"Total rows: {df_population.count():,}")
df_population.show(10)

Total rows: 37,002
+-----------+----+------+-----+--------------+
|  series_id|year|period|value|footnote_codes|
+-----------+----+------+-----+--------------+
|PRS30006011|1995|   Q01|  2.6|          NULL|
|PRS30006011|1995|   Q02|  2.1|          NULL|
|PRS30006011|1995|   Q03|  0.9|          NULL|
|PRS30006011|1995|   Q04|  0.1|          NULL|
|PRS30006011|1995|   Q05|  1.4|          NULL|
|PRS30006011|1996|   Q01| -0.2|          NULL|
|PRS30006011|1996|   Q02| -0.3|          NULL|
|PRS30006011|1996|   Q03| -0.1|          NULL|
|PRS30006011|1996|   Q04|  0.2|          NULL|
|PRS30006011|1996|   Q05| -0.1|          NULL|
+-----------+----+------+-----+--------------+
only showing top 10 rows


                                                                                

Total rows: 11
+---------+-------------+-------+----+----------+-------------+
|ID Nation|       Nation|ID Year|Year|Population|  Slug Nation|
+---------+-------------+-------+----+----------+-------------+
|  01000US|United States|   2023|2023| 332387540|united-states|
|  01000US|United States|   2022|2022| 331097593|united-states|
|  01000US|United States|   2021|2021| 329725481|united-states|
|  01000US|United States|   2020|2020| 326569308|united-states|
|  01000US|United States|   2019|2019| 324697795|united-states|
|  01000US|United States|   2018|2018| 322903030|united-states|
|  01000US|United States|   2017|2017| 321004407|united-states|
|  01000US|United States|   2016|2016| 318558162|united-states|
|  01000US|United States|   2015|2015| 316515021|united-states|
|  01000US|United States|   2014|2014| 314107084|united-states|
+---------+-------------+-------+----+----------+-------------+
only showing top 10 rows


In [3]:
# Using the dataframe from the population data API (Part 2)
# generate the mean and the standard deviation of the annual US population across the years [2013, 2018] inclusive.
population_stats = df_population \
    .filter((col("ID Year") >= 2013) & (col("ID Year") <= 2018)) \
    .select("ID Year", "Population")

print("Population data for 2013-2018:")
population_stats.orderBy("ID Year").show()

stats_result = population_stats \
    .select("Population") \
    .agg(
        mean("Population").alias("mean_population"),
        stddev("Population").alias("std_population")
    )

print("POPULATION STATISTICS (2013-2018):")

stats_result.show()

Population data for 2013-2018:
+-------+----------+
|ID Year|Population|
+-------+----------+
|   2013| 311536594|
|   2014| 314107084|
|   2015| 316515021|
|   2016| 318558162|
|   2017| 321004407|
|   2018| 322903030|
+-------+----------+

POPULATION STATISTICS (2013-2018):
+---------------+------------------+
|mean_population|    std_population|
+---------------+------------------+
|   3.17437383E8|4257089.5415293295|
+---------------+------------------+



In [4]:
# Using the dataframe from the time-series (Part 1), For every series_id, find the best year: the year with the max/largest sum of "value" for all quarters in that year. 
# Generate a report with each series id, the best year for that series, and the summed value for that year. 
yearly_sums = df_timeseries \
    .groupBy("series_id", "year") \
    .agg(sum("value").alias("yearly_sum"))

print("Sample yearly sums:")
yearly_sums.orderBy("series_id", "year").show(10)

Sample yearly sums:
+-----------+----+-------------------+
|  series_id|year|         yearly_sum|
+-----------+----+-------------------+
|PRS30006011|1995|                7.1|
|PRS30006011|1996|               -0.5|
|PRS30006011|1997|                4.4|
|PRS30006011|1998|                4.2|
|PRS30006011|1999| -7.699999999999999|
|PRS30006011|2000|               -1.5|
|PRS30006011|2001|              -22.9|
|PRS30006011|2002|              -35.9|
|PRS30006011|2003|-23.900000000000002|
|PRS30006011|2004|               -6.9|
+-----------+----+-------------------+
only showing top 10 rows


In [5]:
# Find the year with maximum sum for each series_id
window_spec = Window.partitionBy("series_id").orderBy(desc("yearly_sum"))

best_years = yearly_sums \
    .withColumn("rank", row_number().over(window_spec)) \
    .filter(col("rank") == 1) \
    .select("series_id", "year", "yearly_sum") \
    .orderBy("series_id")

print("BEST YEAR FOR EACH SERIES (Top 20):")
best_years.show(20)

print(f"Total unique series analyzed: {best_years.count()}")

BEST YEAR FOR EACH SERIES (Top 20):
+-----------+----+------------------+
|  series_id|year|        yearly_sum|
+-----------+----+------------------+
|PRS30006011|2022|              20.5|
|PRS30006012|2022|              17.1|
|PRS30006013|1998|           705.895|
|PRS30006021|2010|              17.7|
|PRS30006022|2010|12.399999999999999|
|PRS30006023|2014|503.21600000000007|
|PRS30006031|2022|              20.5|
|PRS30006032|2021|              17.1|
|PRS30006033|1998|           702.672|
|PRS30006061|2022|              37.0|
|PRS30006062|2021|              31.6|
|PRS30006063|2024|           646.748|
|PRS30006081|2021|              24.4|
|PRS30006082|2021|              24.4|
|PRS30006083|2021|           110.742|
|PRS30006091|2002|              43.3|
|PRS30006092|2002| 44.39999999999999|
|PRS30006093|2013| 514.1560000000001|
|PRS30006101|2020|              33.5|
|PRS30006102|2020|              36.2|
+-----------+----+------------------+
only showing top 20 rows
Total unique series analyze

In [6]:
# Using both dataframes from Part 1 and Part 2, generate a report that will provide the value for series_id = PRS30006032 and period = Q01 
# and the population for that given year (if available in the population dataset) 
target_series = "PRS30006032"
target_period = "Q01"

filtered_ts = df_timeseries \
    .filter((col("series_id") == target_series) & (col("period") == target_period)) \
    .select("series_id", "year", "period", "value")

print(f"Data for {target_series}, {target_period}:")
filtered_ts.orderBy("year").show()

Data for PRS30006032, Q01:
+-----------+----+------+-----+
|  series_id|year|period|value|
+-----------+----+------+-----+
|PRS30006032|1995|   Q01|  0.0|
|PRS30006032|1996|   Q01| -4.2|
|PRS30006032|1997|   Q01|  2.8|
|PRS30006032|1998|   Q01|  0.9|
|PRS30006032|1999|   Q01| -4.1|
|PRS30006032|2000|   Q01|  0.5|
|PRS30006032|2001|   Q01| -6.3|
|PRS30006032|2002|   Q01| -6.6|
|PRS30006032|2003|   Q01| -5.7|
|PRS30006032|2004|   Q01|  2.0|
|PRS30006032|2005|   Q01| -0.5|
|PRS30006032|2006|   Q01|  1.8|
|PRS30006032|2007|   Q01| -0.8|
|PRS30006032|2008|   Q01| -3.5|
|PRS30006032|2009|   Q01|-21.0|
|PRS30006032|2010|   Q01|  3.2|
|PRS30006032|2011|   Q01|  1.5|
|PRS30006032|2012|   Q01|  2.5|
|PRS30006032|2013|   Q01|  0.5|
|PRS30006032|2014|   Q01| -0.1|
+-----------+----+------+-----+
only showing top 20 rows


In [7]:
combined_report = filtered_ts \
    .join(
        df_population.select("ID Year", "Population"),
        filtered_ts.year == df_population["ID Year"],
        "left"
    ) \
    .select("series_id", "year", "period", "value", "Population") \
    .orderBy("year")

print(f"Combined report {target_series}, {target_period}:")
combined_report.show()

print(f"Records found: {combined_report.count()}")

Combined report PRS30006032, Q01:
+-----------+----+------+-----+----------+
|  series_id|year|period|value|Population|
+-----------+----+------+-----+----------+
|PRS30006032|1995|   Q01|  0.0|      NULL|
|PRS30006032|1996|   Q01| -4.2|      NULL|
|PRS30006032|1997|   Q01|  2.8|      NULL|
|PRS30006032|1998|   Q01|  0.9|      NULL|
|PRS30006032|1999|   Q01| -4.1|      NULL|
|PRS30006032|2000|   Q01|  0.5|      NULL|
|PRS30006032|2001|   Q01| -6.3|      NULL|
|PRS30006032|2002|   Q01| -6.6|      NULL|
|PRS30006032|2003|   Q01| -5.7|      NULL|
|PRS30006032|2004|   Q01|  2.0|      NULL|
|PRS30006032|2005|   Q01| -0.5|      NULL|
|PRS30006032|2006|   Q01|  1.8|      NULL|
|PRS30006032|2007|   Q01| -0.8|      NULL|
|PRS30006032|2008|   Q01| -3.5|      NULL|
|PRS30006032|2009|   Q01|-21.0|      NULL|
|PRS30006032|2010|   Q01|  3.2|      NULL|
|PRS30006032|2011|   Q01|  1.5|      NULL|
|PRS30006032|2012|   Q01|  2.5|      NULL|
|PRS30006032|2013|   Q01|  0.5| 311536594|
|PRS30006032|2014|  

In [8]:
import os

# Create output directory if it doesn't exist
output_dir = "output"
os.makedirs(output_dir, exist_ok=True)

print("Saving results...")

# Save population statistics
stats_result.coalesce(1).write \
    .mode("overwrite") \
    .option("header", "true") \
    .csv(f"{output_dir}/population_stats")

print("Population statistics saved")

# Save best years report
best_years.coalesce(1).write \
    .mode("overwrite") \
    .option("header", "true") \
    .csv(f"{output_dir}/best_years")

print("Best years report saved")

# Save combined report
combined_report.coalesce(1).write \
    .mode("overwrite") \
    .option("header", "true") \
    .csv(f"{output_dir}/combined_report")

print("Combined report saved")

Saving results...
Population statistics saved
Best years report saved
Combined report saved
