In [1]:
from pyspark.sql.functions import col, mean, stddev, sum, trim, max_by, row_number
from pyspark.sql import SparkSession, Window

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, mean, stddev, sum

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("Rearc Data Analysis") \
    .master("local[2]") \
    .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.2") \
    .getOrCreate()

# Set up Hadoop AWS configurations
hadoop_conf = spark._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3a.endpoint", "s3.ap-south-1.amazonaws.com")
hadoop_conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
hadoop_conf.set("com.amazonaws.services.s3.enableV4", "true")
hadoop_conf.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")


print("SPARK SESSION CREATED SUCCESSFULLY")



25/03/18 00:30:19 WARN Utils: Your hostname, Abhijeets-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.1.6 instead (on interface en0)
25/03/18 00:30:19 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Ivy Default Cache set to: /Users/abhijeet/.ivy2/cache
The jars for the packages stored in: /Users/abhijeet/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-87e33019-14ea-4a64-b5b9-9f0c208bb336;1.0
	confs: [default]


:: loading settings :: url = jar:file:/Users/abhijeet/Desktop/Mine/abhijeet/rearc-data-quest/.venv/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


	found org.apache.hadoop#hadoop-aws;3.2.2 in central
	found com.amazonaws#aws-java-sdk-bundle;1.11.563 in central
:: resolution report :: resolve 121ms :: artifacts dl 7ms
	:: modules in use:
	com.amazonaws#aws-java-sdk-bundle;1.11.563 from central in [default]
	org.apache.hadoop#hadoop-aws;3.2.2 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   2   |   0   |   0   |   0   ||   2   |   0   |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-87e33019-14ea-4a64-b5b9-9f0c208bb336
	confs: [default]
	0 artifacts copied, 2 already retrieved (0kB/4ms)
25/03/18 00:30:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your pla

SPARK SESSION CREATED SUCCESSFULLY


In [3]:
# AWS S3 Configuration
S3_BUCKET = "abh-de-rearc"
CSV_S3_PATH = f"s3a://{S3_BUCKET}/bls-data/pr.data.0.Current"
JSON_S3_PATH = f"s3a://{S3_BUCKET}/api-data/us_population.json"

# Load CSV file from S3 into PySpark DataFrame
print("Loading CSV data from S3...")
csv_df = spark.read.option("header", "false").option("delimiter", "\t").csv(CSV_S3_PATH)

# Rename columns properly
csv_df = csv_df.toDF("series_id", "year", "period", "value", "footnote_codes")

# Trim strings to remove unwanted spaces
csv_df = csv_df.withColumn("series_id", trim(col("series_id"))) \
               .withColumn("period", trim(col("period"))) \
               .withColumn("value", col("value").cast("float"))  # Ensure value is float

# Load JSON file from S3 into PySpark DataFrame
print("Loading JSON data from S3...")
json_df = spark.read.option("multiline", "true").json(JSON_S3_PATH)

# Extract relevant columns from JSON
population_df = json_df.selectExpr("explode(data) as data").select(
    col("data.Year").alias("year"),
    col("data.Population").alias("population")
)

# Convert population to integer
population_df = population_df.withColumn("population", col("population").cast("int"))



Loading CSV data from S3...


25/03/18 00:37:32 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
25/03/18 00:37:32 WARN BasicProfileConfigLoader: Your profile name includes a 'profile ' prefix. This is considered part of the profile name in the Java SDK, so you will need to include this prefix in your profile name when you reference this profile from your Java code.


Loading JSON data from S3...


In [4]:
# --- Task 1: Calculate Mean & Std Dev of Population (2013-2018) ---
pop_stats_df = population_df.filter((col("year") >= 2013) & (col("year") <= 2018))
pop_summary_df = pop_stats_df.select(mean("population").alias("mean_population"),
                                     stddev("population").alias("stddev_population"))

print("Population Statistics (2013-2018):")
pop_summary_df.show()

Population Statistics (2013-2018):
+---------------+-----------------+
|mean_population|stddev_population|
+---------------+-----------------+
|   3.17437383E8| 4257089.54152933|
+---------------+-----------------+



In [7]:
# --- Task 2: Find Best Year for Each `series_id` (Max Sum of `value`) ---
agg_df = csv_df.groupBy("series_id", "year").agg(sum("value").alias("total_value"))

# Define a window specification to rank years by total_value per series_id
window_spec = Window.partitionBy("series_id").orderBy(col("total_value").desc())

# Assign ranks to each year and keep only the best (highest sum) year for each series_id
best_year_df = agg_df.withColumn("rank", row_number().over(window_spec)).filter(col("rank") == 1).drop("rank")

# Order the final results by `year` before displaying
best_year_df = best_year_df.orderBy("year")

print("Best Year for Each Series ID:")
best_year_df.show(50, truncate=False)



Best Year for Each Series ID:


[Stage 17:>                                                         (0 + 1) / 1]

+-----------+----+------------------+
|series_id  |year|total_value       |
+-----------+----+------------------+
|PRS32006013|1995|726.1410064697266 |
|PRS32006033|1995|710.8509826660156 |
|PRS30006212|1997|38.80000019073486 |
|PRS31006023|1997|503.9239959716797 |
|PRS31006211|1997|47.60000038146973 |
|PRS31006212|1997|55.5              |
|PRS84006023|1997|518.505989074707  |
|PRS85006023|1997|519.088996887207  |
|PRS88003023|1997|517.8190002441406 |
|PRS31006033|1998|705.6860046386719 |
|PRS30006013|1998|705.8950042724609 |
|PRS31006013|1998|707.8460083007812 |
|PRS30006033|1998|702.6719970703125 |
|PRS32006173|1998|137.18699645996094|
|PRS31006101|2000|34.59999942779541 |
|PRS30006173|2001|123.10700225830078|
|PRS31006173|2001|112.14299774169922|
|PRS84006173|2001|559.6599960327148 |
|PRS85006173|2001|558.5790023803711 |
|PRS88003173|2001|558.625           |
|PRS30006091|2002|43.299999713897705|
|PRS30006092|2002|44.39999961853027 |
|PRS30006162|2002|48.100000858306885|
|PRS32006091

                                                                                

In [9]:
# --- Task 3: Report for `series_id = PRS30006032` and `period = Q01` ---
from pyspark.sql.functions import col

# Ensure "year" is integer in both DataFrames
csv_df = csv_df.withColumn("year", col("year").cast("int"))
population_df = population_df.withColumn("year", col("year").cast("int"))


# Ensure we **retain all years** from the CSV (using full outer join for debugging)
final_report_df = (
    csv_df
    .filter(col("series_id") == "PRS30006032")  # Filter for the specific series
    .filter(col("period") == "Q01")
    .join(population_df, on="year", how="inner")  # inner join retains all CSV years
    .select("series_id", "year", "period", "value", "population")  # Clean output
)

#DEBUG STEP: Show the results before saving
final_report_df.orderBy("year").show()  # Ensure all years are present


+-----------+----+------+-----+----------+
|  series_id|year|period|value|population|
+-----------+----+------+-----+----------+
|PRS30006032|2013|   Q01|  0.5| 311536594|
|PRS30006032|2014|   Q01| -0.1| 314107084|
|PRS30006032|2015|   Q01| -1.7| 316515021|
|PRS30006032|2016|   Q01| -1.4| 318558162|
|PRS30006032|2017|   Q01|  0.9| 321004407|
|PRS30006032|2018|   Q01|  0.5| 322903030|
|PRS30006032|2019|   Q01| -1.6| 324697795|
|PRS30006032|2020|   Q01| -7.0| 326569308|
|PRS30006032|2021|   Q01|  0.7| 329725481|
|PRS30006032|2022|   Q01|  5.2| 331097593|
+-----------+----+------+-----+----------+

