In [0]:
%pip install fsspec
%pip install s3fs

Python interpreter will be restarted.
Python interpreter will be restarted.
Python interpreter will be restarted.
Python interpreter will be restarted.


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, stddev,mean, sum as _sum, row_number, lower, round
import requests
import pandas as pd
import boto3
from pyspark.sql.window import Window



In [0]:
# Step 1: Start Spark session
spark = SparkSession.builder \
    .appName("PopulationAnalysis") \
    .getOrCreate()

In [0]:
s3 = boto3.client(
    's3',
    aws_access_key_id='ACCESS_KEY',
    aws_secret_access_key='SECRET_ACCESS_KEY',
    region_name='ap-south-1'
)

## **Step 0:**
Load both the csv file from Part 1 pr.data.0.Current and the json file from Part 2 as dataframes (Spark, Pyspark, Pandas, Koalas, etc).

In [0]:
# Data sources
SERIES_DATA = "s3://rerac-quest-s3-bucket/pr.data.0.Current"
POPULATION_DATA = "https://rerac-quest-s3-bucket.amazonaws.com/population.json"

#Load Data
series_pd = pd.read_csv(SERIES_DATA, delimiter="\t")
series_df = spark.createDataFrame(series_pd)

# Load population JSON data
population_json = requests.get(POPULATION_DATA).json()
population_pd = pd.json_normalize(population_json, record_path="data")
population_df = spark.createDataFrame(population_pd)

print("Series Data:")
series_df.show(5)

print("Population Data:")
population_df.show(5)


Series Data:
+-----------------+----+------+------------+--------------+
|series_id        |year|period|       value|footnote_codes|
+-----------------+----+------+------------+--------------+
|PRS30006011      |1995|   Q01|         2.6|          null|
|PRS30006011      |1995|   Q02|         2.1|          null|
|PRS30006011      |1995|   Q03|         0.9|          null|
|PRS30006011      |1995|   Q04|         0.1|          null|
|PRS30006011      |1995|   Q05|         1.4|          null|
+-----------------+----+------+------------+--------------+
only showing top 5 rows

Population Data:
+---------+-------------+-------+----+----------+-------------+
|ID Nation|       Nation|ID Year|Year|Population|  Slug Nation|
+---------+-------------+-------+----+----------+-------------+
|  01000US|United States|   2023|2023| 332387540|united-states|
|  01000US|United States|   2022|2022| 331097593|united-states|
|  01000US|United States|   2021|2021| 329725481|united-states|
|  01000US|United Sta

## Step 1:
Using the dataframe from the population data API (Part 2), generate the mean and the standard deviation of the annual US population across the years [2013, 2018] inclusive.

In [0]:
population_filtered = population_df.filter(
    (col("Year").cast("int") >= 2013) & (col("Year").cast("int") <= 2018)
)

# Calculate mean and standard deviation of Population column
stats = population_filtered.select(
    mean(col("Population")).alias("mean_population"),
    stddev(col("Population")).alias("stddev_population")
).collect()[0]

print(f"Mean Population: {stats['mean_population']}")
print(f"Standard Deviation of Population: {stats['stddev_population']}")


Mean Population: 317437383.0
Standard Deviation of Population: 4257089.5415293295


## Step 2:
Using the dataframe from the time-series (Part 1), For every series_id, find the best year: the year with the max/largest sum of "value" for all quarters in that year. Generate a report with each series id, the best year for that series, and the summed value for that year.

In [0]:
series_df = series_df \
    .withColumnRenamed("series_id        ", "series_id") \
    .withColumnRenamed("       value", "value")

grouped_df = series_df.groupBy("series_id", "year").agg(_sum("value").alias("total_value"))

grouped_df = grouped_df.withColumn("total_value", round(col("total_value"), 2))

window_spec = Window.partitionBy("series_id").orderBy(col("total_value").desc())

ranked_df = grouped_df.withColumn("rank", row_number().over(window_spec))

max_value_series = ranked_df.filter(col("rank") == 1).drop("rank")

max_value_series = max_value_series.orderBy("series_id")

max_value_series.show()

+-----------------+----+-----------+
|        series_id|year|total_value|
+-----------------+----+-----------+
|PRS30006011      |2022|       20.5|
|PRS30006012      |2022|       17.1|
|PRS30006013      |1998|      705.9|
|PRS30006021      |2010|       17.7|
|PRS30006022      |2010|       12.4|
|PRS30006023      |2014|     503.22|
|PRS30006031      |2022|       20.5|
|PRS30006032      |2021|       17.1|
|PRS30006033      |1998|     702.67|
|PRS30006061      |2022|       37.0|
|PRS30006062      |2021|       31.6|
|PRS30006063      |2024|     647.52|
|PRS30006081      |2021|       24.4|
|PRS30006082      |2021|       24.4|
|PRS30006083      |2021|     110.74|
|PRS30006091      |2002|       43.3|
|PRS30006092      |2002|       44.4|
|PRS30006093      |2013|     514.16|
|PRS30006101      |2020|       33.5|
|PRS30006102      |2020|       36.2|
+-----------------+----+-----------+
only showing top 20 rows



## Step 3:
Using both dataframes from Part 1 and Part 2, generate a report that will provide the value for series_id = PRS30006032 and period = Q01 and the population for that given year (if available in the population dataset).

In [0]:
specific_series = series_df.filter(lower(col("series_id")).contains("prs30006032"))

specific_series = specific_series.filter(lower(col("period")).contains("q01"))

population_df = population_df.withColumn("Population", col("Population").cast("int")) \
                             .withColumn("Year", col("Year").cast("int"))

result = specific_series.join(population_df, specific_series.year == population_df.Year, "left")

result.select("series_id", specific_series["year"], "period", "value", "Population").show(100)


+-----------------+----+------+-----+----------+
|        series_id|year|period|value|Population|
+-----------------+----+------+-----+----------+
|PRS30006032      |1995|   Q01|  0.0|      null|
|PRS30006032      |1996|   Q01| -4.2|      null|
|PRS30006032      |1997|   Q01|  2.8|      null|
|PRS30006032      |1998|   Q01|  0.9|      null|
|PRS30006032      |1999|   Q01| -4.1|      null|
|PRS30006032      |2000|   Q01|  0.5|      null|
|PRS30006032      |2001|   Q01| -6.3|      null|
|PRS30006032      |2002|   Q01| -6.6|      null|
|PRS30006032      |2003|   Q01| -5.7|      null|
|PRS30006032      |2004|   Q01|  2.0|      null|
|PRS30006032      |2005|   Q01| -0.5|      null|
|PRS30006032      |2006|   Q01|  1.8|      null|
|PRS30006032      |2007|   Q01| -0.8|      null|
|PRS30006032      |2008|   Q01| -3.5|      null|
|PRS30006032      |2009|   Q01|-21.0|      null|
|PRS30006032      |2010|   Q01|  3.2|      null|
|PRS30006032      |2011|   Q01|  1.5|      null|
|PRS30006032      |2

In [0]:
spark.stop()