In [ ]:
# Import PySpark
from pyspark.sql import SparkSession, Window
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DecimalType, IntegerType, StringType
from pyspark import SparkConf
import pyspark.sql.functions as sf
import sagemaker_pyspark
import botocore.session


spark = (
    SparkSession
    .builder
    .config('spark.driver.extraClassPath', ":".join(sagemaker_pyspark.classpath_jars()))
    .appName("rerac_part_3")
    .getOrCreate()
)


Used white space cleanup and schema structuring with the load for easier use

In [2]:
src_bucket = 'yossi-rearc-test'
csv_path = 'api_rest/pr.data.0.Current'


schema = StructType(
    [
        StructField("series_id", StringType()),
        StructField("year", IntegerType()),
        StructField("period", StringType()),
        StructField("value", DecimalType()),
        StructField("footnote_codes", StringType())
    ]
)

df_csv = spark.read.format("csv").option("header", "true").\
option("delimiter", "\t").option("ignoreTrailingWhiteSpace", "true").\
option("ignoreLeadingWhiteSpace", "true").schema(schema).load(f's3a://{src_bucket}/{csv_path}')
df_csv.show(3)



24/02/13 17:23:12 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
                                                                                

+-----------+----+------+-----+--------------+
|  series_id|year|period|value|footnote_codes|
+-----------+----+------+-----+--------------+
|PRS30006011|1995|   Q01|    3|          NULL|
|PRS30006011|1995|   Q02|    2|          NULL|
|PRS30006011|1995|   Q03|    1|          NULL|
+-----------+----+------+-----+--------------+


In [7]:
json_path = 'json_api/us_pop.json'
df_json = spark.read.format("json").option("multiline","true").load(f's3a://{src_bucket}/{json_path}')
df_json.show(10)

+---------+-------+-------------+----------+-------------+----+
|ID Nation|ID Year|       Nation|Population|  Slug Nation|Year|
+---------+-------+-------------+----------+-------------+----+
|  01000US|   2021|United States| 329725481|united-states|2021|
|  01000US|   2020|United States| 326569308|united-states|2020|
|  01000US|   2019|United States| 324697795|united-states|2019|
|  01000US|   2018|United States| 322903030|united-states|2018|
|  01000US|   2017|United States| 321004407|united-states|2017|
|  01000US|   2016|United States| 318558162|united-states|2016|
|  01000US|   2015|United States| 316515021|united-states|2015|
|  01000US|   2014|United States| 314107084|united-states|2014|
|  01000US|   2013|United States| 311536594|united-states|2013|
+---------+-------+-------------+----------+-------------+----+


* Renamed columns for easier use in the future

In [4]:
df_json = df_json.withColumnRenamed("ID Nation", "id_nation").\
    withColumnRenamed("ID Year", "id_year").\
    withColumnRenamed("Nation", "nation").\
    withColumnRenamed("Population", "population").\
    withColumnRenamed("Slug Nation", "slug_nation").\
    withColumnRenamed("Year", "year")

pop_filtered_df = df_json.select("id_nation", "nation", "id_year", "population").filter(df_json.id_year.between(2013, 2018))

pop_result_df = pop_filtered_df.agg(sf.stddev("population").alias("pop_stdev"), sf.mean("population").alias("pop_mean"))
pop_result_df.show()

+----------------+------------+
|       pop_stdev|    pop_mean|
+----------------+------------+
|4257089.54152933|3.17437383E8|
+----------------+------------+


I assumed I don't have to normalize the data by choosing only years with 4 quarters (first time I saw a year with 5 quarters)

In [5]:
##### df_csv.show(3)

win_1 = Window.partitionBy("series_id", "year")
win_2 = Window.partitionBy("series_id").orderBy(sf.col("sum_value").desc())
bls_win_df = df_csv.withColumn("sum_value", sf.sum("value").over(win_1)).select("series_id", "year", "sum_value").distinct()
result_df = bls_win_df.withColumn("rank", sf.rank().over(win_2)).filter(sf.col("rank") == 1).drop("rank")
result_df.show(50)

[Stage 8:>                                                          (0 + 1) / 1]

+-----------+----+---------+
|  series_id|year|sum_value|
+-----------+----+---------+
|PRS30006011|2022|       20|
|PRS30006012|2022|       17|
|PRS30006013|1998|      703|
|PRS30006021|2010|       18|
|PRS30006022|2010|       13|
|PRS30006023|2014|      504|
|PRS30006031|2022|       22|
|PRS30006032|2021|       17|
|PRS30006033|1998|      699|
|PRS30006061|2022|       40|
|PRS30006062|2021|       33|
|PRS30006063|2023|      631|
|PRS30006081|2021|       24|
|PRS30006082|2021|       24|
|PRS30006083|2013|      112|
|PRS30006083|2014|      112|
|PRS30006083|2021|      112|
|PRS30006091|2002|       44|
|PRS30006092|2002|       44|
|PRS30006093|2011|      521|
|PRS30006101|2020|       34|
|PRS30006102|2020|       37|
|PRS30006103|2023|      620|
|PRS30006111|2020|       32|
|PRS30006112|2008|       43|
|PRS30006113|2023|      632|
|PRS30006131|2021|       18|
|PRS30006132|2021|       18|
|PRS30006133|2021|      115|
|PRS30006151|2020|       26|
|PRS30006152|2020|       31|
|PRS30006153|2

                                                                                

With predefined schemas at upload, join was easier

In [6]:
bls_results_df = df_csv.filter((df_csv.series_id == 'PRS30006032') & (df_csv.period == 'Q01'))
results_df = bls_results_df.join(df_json, bls_results_df.year == df_json.year, 'left').select(bls_results_df.series_id, bls_results_df.year, bls_results_df.period, bls_results_df.value, df_json.population)
results_df.show()

+-----------+----+------+-----+----------+
|  series_id|year|period|value|population|
+-----------+----+------+-----+----------+
|PRS30006032|1995|   Q01|    0|      NULL|
|PRS30006032|1996|   Q01|   -4|      NULL|
|PRS30006032|1997|   Q01|    3|      NULL|
|PRS30006032|1998|   Q01|    1|      NULL|
|PRS30006032|1999|   Q01|   -4|      NULL|
|PRS30006032|2000|   Q01|    0|      NULL|
|PRS30006032|2001|   Q01|   -6|      NULL|
|PRS30006032|2002|   Q01|   -7|      NULL|
|PRS30006032|2003|   Q01|   -6|      NULL|
|PRS30006032|2004|   Q01|    2|      NULL|
|PRS30006032|2005|   Q01|   -1|      NULL|
|PRS30006032|2006|   Q01|    3|      NULL|
|PRS30006032|2007|   Q01|    0|      NULL|
|PRS30006032|2008|   Q01|   -3|      NULL|
|PRS30006032|2009|   Q01|  -21|      NULL|
|PRS30006032|2010|   Q01|    4|      NULL|
|PRS30006032|2011|   Q01|    2|      NULL|
|PRS30006032|2012|   Q01|    3|      NULL|
|PRS30006032|2013|   Q01|    1| 311536594|
|PRS30006032|2014|   Q01|    0| 314107084|
+----------