In [1]:
pip install boto3

Collecting boto3
  Downloading boto3-1.40.46-py3-none-any.whl.metadata (6.7 kB)
Collecting botocore<1.41.0,>=1.40.46 (from boto3)
  Downloading botocore-1.40.46-py3-none-any.whl.metadata (5.7 kB)
Collecting jmespath<2.0.0,>=0.7.1 (from boto3)
  Downloading jmespath-1.0.1-py3-none-any.whl.metadata (7.6 kB)
Collecting s3transfer<0.15.0,>=0.14.0 (from boto3)
  Downloading s3transfer-0.14.0-py3-none-any.whl.metadata (1.7 kB)
Downloading boto3-1.40.46-py3-none-any.whl (139 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m139.3/139.3 kB[0m [31m3.4 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading botocore-1.40.46-py3-none-any.whl (14.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m14.1/14.1 MB[0m [31m54.2 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading jmespath-1.0.1-py3-none-any.whl (20 kB)
Downloading s3transfer-0.14.0-py3-none-any.whl (85 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m85.7/85.7 kB[0m [31m3.9 MB/s[0m eta [36m0:0

# Import KEYS

In [40]:
import os
# Set via environment (or rely on your shell env); DO NOT paste real secrets in notebooks.

os.environ["AWS_ACCESS_KEY_ID"] = "<>"
os.environ["AWS_SECRET_ACCESS_KEY"] = "<>"
os.environ["AWS_DEFAULT_REGION"] = "ap-south-1"

# Start Spark Session
* Load CSV & JSON from different sources
* Download them locally to perform analysis

In [39]:
import os, json, boto3
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, trim, when, sum as _sum, avg, stddev, row_number, format_number
from pyspark.sql.window import Window


BUCKET     = "rearc-assessment-ss"
TS_KEY     = "part1/pr.data.0.Current"
POP_KEY    = "part2/population.json"


TS_LOCAL   = "/tmp/pr.data.0.Current"
POP_LOCAL  = "/tmp/population.json"


s3 = boto3.client("s3",
    aws_access_key_id=os.environ.get("AWS_ACCESS_KEY_ID"),
    aws_secret_access_key=os.environ.get("AWS_SECRET_ACCESS_KEY"),
    region_name=os.environ.get("AWS_DEFAULT_REGION")
)
s3.download_file(BUCKET, TS_KEY,  TS_LOCAL)
s3.download_file(BUCKET, POP_KEY, POP_LOCAL)
print(" Downloaded:")
print(" -", TS_LOCAL)
print(" -", POP_LOCAL)

spark = SparkSession.builder.appName("rearc-part3").getOrCreate()


 Downloaded:
 - /tmp/pr.data.0.Current
 - /tmp/population.json


# Load DATAUSA data from S3
* Data is already pushed via from API to S3

In [15]:
pop_raw = spark.read.text(POP_LOCAL).collect()[0][0]
pop_json = json.loads(pop_raw)
pop_rows = pop_json["data"]

pop_df = spark.createDataFrame(pop_rows) \
    .withColumnRenamed("Nation ID","NationID") \
    .withColumnRenamed("Nation","Nation") \
    .withColumnRenamed("Year","Year") \
    .withColumnRenamed("Population","Population") \
    .select(
        col("NationID"),
        col("Nation"),
        col("Year").cast("int").alias("Year"),
        col("Population").cast("double").alias("Population")
    )

print("Population schema")
pop_df.printSchema()
pop_df.show()

Population schema
root
 |-- NationID: string (nullable = true)
 |-- Nation: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Population: double (nullable = true)

+--------+-------------+----+------------+
|NationID|       Nation|Year|  Population|
+--------+-------------+----+------------+
| 01000US|United States|2013|3.16128839E8|
| 01000US|United States|2014|3.18857056E8|
| 01000US|United States|2015|3.21418821E8|
| 01000US|United States|2016|3.23127515E8|
| 01000US|United States|2017|3.25719178E8|
| 01000US|United States|2018|3.27167439E8|
| 01000US|United States|2019|3.28239523E8|
| 01000US|United States|2021|3.31893745E8|
| 01000US|United States|2022|3.33287562E8|
| 01000US|United States|2023|3.34914896E8|
+--------+-------------+----+------------+



# Load PART 1 CSV data
* Data is already pushed to S3 from scheduler that monitors any change
* Created Dataframe

In [34]:
ts_df = (spark.read
         .option("header", True)
         .option("sep", "\t")
         .option("inferSchema", True)
         .option("quote", "\u0000")
         .option("ignoreLeadingWhiteSpace", "true")
         .option("ignoreTrailingWhiteSpace", "true")
         .csv(TS_LOCAL))

ts_df.show(5, truncate=False)


ts_df = (ts_df
         .withColumn("series_id", trim(col("series_id")))
         .withColumn("period",    trim(col("period")))
         .withColumn("year",      col("year").cast("int"))
         .withColumn("value",     col("value").cast("double")))

print("Schema")
ts_df.printSchema()
ts_df.show(5, truncate=False)

+-----------+----+------+-----+--------------+
|series_id  |year|period|value|footnote_codes|
+-----------+----+------+-----+--------------+
|PRS30006011|1995|Q01   |2.6  |NULL          |
|PRS30006011|1995|Q02   |2.1  |NULL          |
|PRS30006011|1995|Q03   |0.9  |NULL          |
|PRS30006011|1995|Q04   |0.1  |NULL          |
|PRS30006011|1995|Q05   |1.4  |NULL          |
+-----------+----+------+-----+--------------+
only showing top 5 rows

Schema
root
 |-- series_id: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- period: string (nullable = true)
 |-- value: double (nullable = true)
 |-- footnote_codes: string (nullable = true)

+-----------+----+------+-----+--------------+
|series_id  |year|period|value|footnote_codes|
+-----------+----+------+-----+--------------+
|PRS30006011|1995|Q01   |2.6  |NULL          |
|PRS30006011|1995|Q02   |2.1  |NULL          |
|PRS30006011|1995|Q03   |0.9  |NULL          |
|PRS30006011|1995|Q04   |0.1  |NULL          |
|PRS3000601

# PART 3 : Task 1
Using the dataframe from the population data API (Part 2), generate the mean and the standard deviation of the annual US population across the years [2013, 2018] inclusive.[link text](https://)

In [22]:
pop_13_18 = pop_df.filter((col("Year") >= 2013) & (col("Year") <= 2018))

stats_df = pop_13_18.agg(
    avg("Population").alias("mean_population_2013_2018"),
    stddev("Population").alias("stddev_population_2013_2018")
)

stats_df.select(
    format_number(col("mean_population_2013_2018"), 0).alias("mean_population_2013_2018"),
    format_number(col("stddev_population_2013_2018"), 2).alias("stddev_population_2013_2018")
).show()


+-------------------------+---------------------------+
|mean_population_2013_2018|stddev_population_2013_2018|
+-------------------------+---------------------------+
|              322,069,808|               4,158,441.04|
+-------------------------+---------------------------+



# PART 3 : Task 2
UUsing the dataframe from the time-series (Part 1), For every series_id, find the best year: the year with the max/largest sum of "value" for all quarters in that year. Generate a report with each series id, the best year for that series, and the summed value for that year.

In [28]:
ts_q = ts_df.filter(col("period").rlike("^Q\\d{2}$"))
year_sums = (ts_q
             .groupBy("series_id", "year")
             .agg(_sum("value").alias("year_sum")))

w = Window.partitionBy("series_id").orderBy(col("year_sum").desc(), col("year").asc())
best_years = (year_sums
              .withColumn("rn", row_number().over(w))
              .filter(col("rn") == 1)
              .select("series_id", col("year"), format_number(col("year_sum"), 2).alias("year_sum")))

best_years.show(n=best_years.count(), truncate=False)

+-----------+----+--------+
|series_id  |year|year_sum|
+-----------+----+--------+
|PRS30006011|2022|20.50   |
|PRS30006012|2022|17.10   |
|PRS30006013|1998|705.89  |
|PRS30006021|2010|17.70   |
|PRS30006022|2010|12.40   |
|PRS30006023|2014|503.22  |
|PRS30006031|2022|20.50   |
|PRS30006032|2021|17.10   |
|PRS30006033|1998|702.67  |
|PRS30006061|2022|37.00   |
|PRS30006062|2021|31.60   |
|PRS30006063|2024|646.75  |
|PRS30006081|2021|24.40   |
|PRS30006082|2021|24.40   |
|PRS30006083|2021|110.74  |
|PRS30006091|2002|43.30   |
|PRS30006092|2002|44.40   |
|PRS30006093|2013|514.16  |
|PRS30006101|2020|33.50   |
|PRS30006102|2020|36.20   |
|PRS30006103|2024|644.92  |
|PRS30006111|2020|34.20   |
|PRS30006112|2008|42.70   |
|PRS30006113|2024|654.21  |
|PRS30006131|2021|18.50   |
|PRS30006132|2021|18.50   |
|PRS30006133|2021|114.16  |
|PRS30006151|2020|26.50   |
|PRS30006152|2020|31.10   |
|PRS30006153|2020|529.71  |
|PRS30006161|2010|50.60   |
|PRS30006162|2002|48.10   |
|PRS30006163|2014|51

# PART 3 : Task 3
* Using both dataframes from Part 1 and Part 2, generate a report that will provide the value for series_id = PRS30006032 and period = Q01 and the population for that given year (if available in the population

In [35]:
series_id_target = "PRS30006032"
q01 = (ts_df
       .filter( (col("series_id") == series_id_target) & (col("period") == "Q01") )
       .select("series_id", "year", "period", "value"))

report_c = (q01.join(pop_df.select("Year","Population"), q01.year == pop_df.Year, "left")
            .select(
                q01.series_id,
                q01.year,
                q01.period,
                q01.value,
                format_number(pop_df.Population, 0).alias("Population")
            )
            .orderBy("year"))

report_c.filter(report_c.year == 2018).show(truncate=False)

report_c.show(n=report_c.count(), truncate=False)

+-----------+----+------+-----+-----------+
|series_id  |year|period|value|Population |
+-----------+----+------+-----+-----------+
|PRS30006032|2018|Q01   |0.5  |327,167,439|
+-----------+----+------+-----+-----------+

+-----------+----+------+-----+-----------+
|series_id  |year|period|value|Population |
+-----------+----+------+-----+-----------+
|PRS30006032|1995|Q01   |0.0  |NULL       |
|PRS30006032|1996|Q01   |-4.2 |NULL       |
|PRS30006032|1997|Q01   |2.8  |NULL       |
|PRS30006032|1998|Q01   |0.9  |NULL       |
|PRS30006032|1999|Q01   |-4.1 |NULL       |
|PRS30006032|2000|Q01   |0.5  |NULL       |
|PRS30006032|2001|Q01   |-6.3 |NULL       |
|PRS30006032|2002|Q01   |-6.6 |NULL       |
|PRS30006032|2003|Q01   |-5.7 |NULL       |
|PRS30006032|2004|Q01   |2.0  |NULL       |
|PRS30006032|2005|Q01   |-0.5 |NULL       |
|PRS30006032|2006|Q01   |1.8  |NULL       |
|PRS30006032|2007|Q01   |-0.8 |NULL       |
|PRS30006032|2008|Q01   |-3.5 |NULL       |
|PRS30006032|2009|Q01   |-21.0|