In [10]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from datetime import datetime
from delta import *
from pyspark.sql.types import *
from pyspark.sql.window import *
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType
import findspark
spark=None

def initiate_spark_session():
    """
    To instantiate the Spark session object with
    required libs to read and manipulate data from S3
    """
    findspark.init()

    # Create a Spark session
    # Provide valid AWS Access Key Id and Value
    global spark 
    spark = SparkSession.builder \
        .appName("Find Best Series Year") \
        .master('local') \
        .config("spark.hadoop.fs.s3a.access.key", "<AWS_Access_Key_Id>") \
        .config("spark.hadoop.fs.s3a.secret.key", "<AWS_Access_Key_Value>") \
        .config("spark.hadoop.fs.s3a.endpoint", "s3.amazonaws.com") \
        .config("spark.hadoop.fs.s3a.path.style.access", "true")\
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.1").getOrCreate()
    
     
    # my_packages = ["org.apache.hadoop:hadoop-aws:3.3.1"]

    # spark = configure_spark_with_delta_pip(builder, extra_packages=my_packages).getOrCreate()
    print("spark version::", spark.version)

initiate_spark_session()

spark version:: 3.5.3


In [7]:
def read_bls_data():
    """
    To read the BLS data from S3, pre-process and 
    return the transformed dataframe
    """
    # S3 bucket and file path
    s3_bucket = "rearc-assessment"
    s3_file_path = f"s3a://{s3_bucket}/bls/pr.data.0.Current"

    # Read BLS data CSV file from S3
    bls_df = spark.read \
        .format("csv") \
        .option("header", "true") \
        .option("inferSchema", "true") \
        .option("delimiter", "\t") \
        .load(s3_file_path)
    
    # Trim leading and tralining spaces in column names and values
    trimmed_columns = [col.strip() for col in bls_df.columns]  # Use strip() to trim spaces
    trimmed_df = bls_df.toDF(*trimmed_columns)
    trimmed_df=trimmed_df.select([trim(col).alias(col) for col in trimmed_df.columns])

    qtr_window = Window.partitionBy("series_id", "year").orderBy(desc("value"))
    year_window = Window.partitionBy("series_id").orderBy(desc("value"))

    final_df = trimmed_df.withColumn("qtr_row_no", row_number().over(qtr_window)).filter(col("qtr_row_no") == 1)\
        .withColumn("year_row_no", row_number().over(year_window)).filter(col("year_row_no") == 1)
    # Sort and Display the DataFrame schema and data
    final_df.orderBy(col("series_id").asc()).drop("year_row_no", "qtr_row_no", "period", "footnote_codes").show(truncate=False)
    return bls_df # Return the source dataframe for Part 3.c

bls_df=read_bls_data()

+-----------+----+------+
|series_id  |year|value |
+-----------+----+------+
|PRS30006011|2022|4.9   |
|PRS30006012|2020|6.5   |
|PRS30006013|2009|99.726|
|PRS30006021|2021|5.3   |
|PRS30006022|1996|6.7   |
|PRS30006023|1998|99.992|
|PRS30006031|2021|9.9   |
|PRS30006032|2020|8.9   |
|PRS30006033|2020|99.84 |
|PRS30006061|2021|9.5   |
|PRS30006062|2004|9.7   |
|PRS30006063|2017|99.204|
|PRS30006081|1995|9.0   |
|PRS30006082|1995|9.0   |
|PRS30006083|2008|99.78 |
|PRS30006091|2010|9.0   |
|PRS30006092|2009|9.6   |
|PRS30006093|2018|99.954|
|PRS30006101|2000|8.1   |
|PRS30006102|2004|9.9   |
+-----------+----+------+
only showing top 20 rows



In [8]:
def read_usa_population_data():
    """
    To read the USA population data from S3, pre-process and 
    return the transformed dataframe
    """
    # S3 bucket and file path
    s3_bucket = "rearc-assessment"
    s3_file_path = f"s3a://{s3_bucket}/usa_data/usa_population_data.json"

    # USA data population schema
    data_schema = StructType([
        StructField("ID Nation", StringType(), True),
        StructField("Nation", StringType(), True),
        StructField("ID Year", IntegerType(), True),
        StructField("Year", StringType(), True),
        StructField("Population", LongType(), True),
        StructField("Slug Nation", StringType(), True)
    ])

    # Read USA data population JSON file from S3
    pop_df = spark.read.schema(data_schema).option("multiline", "true")\
        .option("encoding", "UTF-8").json(s3_file_path)
    
    # Filter the DataFrame for years 2013 to 2018. Since the records are available from 2013.
    filtered_df = pop_df.filter((col("Year") >= "2013") & (col("Year") <= "2018"))

    # Calculate mean and standard deviation of the population
    stats_df = filtered_df.agg(
        mean(col("Population")).alias("Mean Population"),
        stddev(col("Population")).alias("Standard Deviation")
    )
    stats_df.show(truncate=False)
    return pop_df # Return the source dataframe for Part 3.c

pop_df=read_usa_population_data()

+---------------+------------------+
|Mean Population|Standard Deviation|
+---------------+------------------+
|3.17437383E8   |4257089.54152933  |
+---------------+------------------+



In [9]:
def join_bls_pop_data(bls_df, pop_df):
    """
    To join the BLS and Population data to derive the final output
    """
    join_df = bls_df.filter(col("year") >=2013).join(pop_df, bls_df["year"] == pop_df["year"], "left")\
        .select(bls_df["*"], pop_df["Population"]).drop("footnote_codes").show(truncate=False)
    return join_df

join_bls_pop_data(bls_df, pop_df)

+-----------------+----+------+------------+----------+
|series_id        |year|period|       value|Population|
+-----------------+----+------+------------+----------+
|PRS30006011      |2013|Q01   |1.4         |311536594 |
|PRS30006011      |2013|Q02   |0.4         |311536594 |
|PRS30006011      |2013|Q03   |0.3         |311536594 |
|PRS30006011      |2013|Q04   |0.7         |311536594 |
|PRS30006011      |2013|Q05   |0.7         |311536594 |
|PRS30006011      |2014|Q01   |0.6         |314107084 |
|PRS30006011      |2014|Q02   |1.1         |314107084 |
|PRS30006011      |2014|Q03   |1.5         |314107084 |
|PRS30006011      |2014|Q04   |1.9         |314107084 |
|PRS30006011      |2014|Q05   |1.3         |314107084 |
|PRS30006011      |2015|Q01   |1.5         |316515021 |
|PRS30006011      |2015|Q02   |1.5         |316515021 |
|PRS30006011      |2015|Q03   |1.4         |316515021 |
|PRS30006011      |2015|Q04   |0.8         |316515021 |
|PRS30006011      |2015|Q05   |1.3         |3165