In [2]:
import os
os.environ["PYSPARK_PYTHON"] = "/Users/ashwinsahay/rearc_data_quest/.venv/bin/python"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/Users/ashwinsahay/rearc_data_quest/.venv/bin/python"

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("PopulationAnalytics").getOrCreate()


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/10 12:40:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [None]:
DATA READING START

In [30]:
#READ ts_data
ts_data = spark.read.csv('/Users/ashwinsahay/rearc_data_quest/data/pr.data.0.Current', 
                    sep="\t",
                    header=True,
                    inferSchema=True)

# Strip leading/trailing whitespace from all column names
ts_data = ts_data.toDF(*[col_name.strip() for col_name in ts_data.columns])

# ts_data=ts_data.withColumn('series_id',trim(col("series_id")))
# ts_data.show(5)

ts_data.printSchema()


root
 |-- series_id: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- period: string (nullable = true)
 |-- value: double (nullable = true)
 |-- footnote_codes: string (nullable = true)



In [3]:
import json

with open("/Users/ashwinsahay/rearc_data_quest/data/population_data.json") as f:
    json_data = json.load(f)

In [4]:
records = json_data["data"]

In [6]:
print(records)

[{'ID Nation': '01000US', 'Nation': 'United States', 'ID Year': 2023, 'Year': '2023', 'Population': 332387540, 'Slug Nation': 'united-states'}, {'ID Nation': '01000US', 'Nation': 'United States', 'ID Year': 2022, 'Year': '2022', 'Population': 331097593, 'Slug Nation': 'united-states'}, {'ID Nation': '01000US', 'Nation': 'United States', 'ID Year': 2021, 'Year': '2021', 'Population': 329725481, 'Slug Nation': 'united-states'}, {'ID Nation': '01000US', 'Nation': 'United States', 'ID Year': 2020, 'Year': '2020', 'Population': 326569308, 'Slug Nation': 'united-states'}, {'ID Nation': '01000US', 'Nation': 'United States', 'ID Year': 2019, 'Year': '2019', 'Population': 324697795, 'Slug Nation': 'united-states'}, {'ID Nation': '01000US', 'Nation': 'United States', 'ID Year': 2018, 'Year': '2018', 'Population': 322903030, 'Slug Nation': 'united-states'}, {'ID Nation': '01000US', 'Nation': 'United States', 'ID Year': 2017, 'Year': '2017', 'Population': 321004407, 'Slug Nation': 'united-states'}

In [15]:
population_df = spark.createDataFrame(records)
population_df.show(5)


population_data = population_df.withColumn("Year", col("Year").cast("int")) 
population_data.printSchema()

+---------+-------+-------------+----------+-------------+----+
|ID Nation|ID Year|       Nation|Population|  Slug Nation|Year|
+---------+-------+-------------+----------+-------------+----+
|  01000US|   2023|United States| 332387540|united-states|2023|
|  01000US|   2022|United States| 331097593|united-states|2022|
|  01000US|   2021|United States| 329725481|united-states|2021|
|  01000US|   2020|United States| 326569308|united-states|2020|
|  01000US|   2019|United States| 324697795|united-states|2019|
+---------+-------+-------------+----------+-------------+----+
only showing top 5 rows

root
 |-- ID Nation: string (nullable = true)
 |-- ID Year: long (nullable = true)
 |-- Nation: string (nullable = true)
 |-- Population: long (nullable = true)
 |-- Slug Nation: string (nullable = true)
 |-- Year: integer (nullable = true)



DATA READ COMPLETE 

DATA ANALYSIS BELOW

In [19]:
#Task 1: Mean and Std Dev of US Population (2013–2018)
from pyspark.sql.functions import col,avg,stddev,round,sum
population_filtered = population_data.filter((col("Year") >= 2013) & (col("Year") <= 2018))

population_filtered.select(
    round(avg("Population"),4).alias("Mean_Population"),
    round(stddev("Population"),4).alias("StdDev_Population")
).show()


+---------------+-----------------+
|Mean_Population|StdDev_Population|
+---------------+-----------------+
|   3.17437383E8|     4257089.5415|
+---------------+-----------------+



In [32]:
#Task 2: Best Year by Total Value per Series ID
from pyspark.sql.window import Window
from pyspark.sql.functions import sum as sum, row_number, trim

# Remove whitespace from series_id
ts_data = ts_data.withColumn("series_id", trim(col("series_id")))

# Sum value by series_id and year
aggregated = ts_data.groupBy("series_id", "year").agg(sum("value").alias("year_total"))

# Use row_number to pick year with max total for each series
window_spec = Window.partitionBy("series_id").orderBy(col("year_total").desc())

best_years = aggregated.withColumn("rank", row_number().over(window_spec)) \
                       .filter(col("rank") == 1) \
                       .select("series_id", "year", col("year_total").alias("value"))

best_years.show(truncate=False)


+-----------+----+------------------+
|series_id  |year|value             |
+-----------+----+------------------+
|PRS30006011|2022|20.5              |
|PRS30006012|2022|17.1              |
|PRS30006013|1998|705.895           |
|PRS30006021|2010|17.7              |
|PRS30006022|2010|12.399999999999999|
|PRS30006023|2014|503.21600000000007|
|PRS30006031|2022|20.4              |
|PRS30006032|2021|17.1              |
|PRS30006033|1998|702.672           |
|PRS30006061|2022|37.0              |
|PRS30006062|2021|31.6              |
|PRS30006063|2024|647.479           |
|PRS30006081|2021|24.4              |
|PRS30006082|2021|24.4              |
|PRS30006083|2021|110.742           |
|PRS30006091|2002|43.3              |
|PRS30006092|2002|44.39999999999999 |
|PRS30006093|2013|514.1560000000001 |
|PRS30006101|2020|33.5              |
|PRS30006102|2020|36.2              |
+-----------+----+------------------+
only showing top 20 rows



In [34]:
#Task 3: Join PRS30006032 Q01 Records with Population

# Filter time-series data
target_series = ts_data.filter(
    (trim(col("series_id")) == "PRS30006032") &
    (trim(col("period")) == "Q01")
)

# Prepare population for join
population_df = population_data.withColumnRenamed("Year", "year")

# Join
joined = target_series.join(population_df, on="year", how="left") \
                      .select("series_id", "year", "period", "value", "Population")

joined.show(truncate=False)


+-----------+----+------+-----+----------+
|series_id  |year|period|value|Population|
+-----------+----+------+-----+----------+
|PRS30006032|2003|Q01   |-5.7 |NULL      |
|PRS30006032|2007|Q01   |-0.8 |NULL      |
|PRS30006032|2015|Q01   |-1.7 |316515021 |
|PRS30006032|2006|Q01   |1.8  |NULL      |
|PRS30006032|2013|Q01   |0.5  |311536594 |
|PRS30006032|1997|Q01   |2.8  |NULL      |
|PRS30006032|2014|Q01   |-0.1 |314107084 |
|PRS30006032|2004|Q01   |2.0  |NULL      |
|PRS30006032|1996|Q01   |-4.2 |NULL      |
|PRS30006032|1998|Q01   |0.9  |NULL      |
|PRS30006032|2012|Q01   |2.5  |NULL      |
|PRS30006032|2009|Q01   |-21.0|NULL      |
|PRS30006032|1995|Q01   |0.0  |NULL      |
|PRS30006032|2001|Q01   |-6.3 |NULL      |
|PRS30006032|2005|Q01   |-0.5 |NULL      |
|PRS30006032|2000|Q01   |0.5  |NULL      |
|PRS30006032|2010|Q01   |3.2  |NULL      |
|PRS30006032|2011|Q01   |1.5  |NULL      |
|PRS30006032|2008|Q01   |-3.5 |NULL      |
|PRS30006032|1999|Q01   |-4.1 |NULL      |
+----------

In [None]:
DATA ANALYSIS COMPLETE