In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("MyApp") \
    .master("local[*]") \
  .config("spark.hadoop.fs.s3a.aws.credentials.provider",
            "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
 .config("spark.hadoop.fs.s3a.vectored.read.min.seek.size", "131072") \
    .config("spark.hadoop.fs.s3a.vectored.read.max.merged.size", "2097152") \
       .getOrCreate()


Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
26/02/17 02:17:32 WARN Utils: Your hostname, MacBook-Air.local, resolves to a loopback address: 127.0.0.1; using 172.20.10.3 instead (on interface en0)
26/02/17 02:17:32 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.3.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /Users/anantkhannekar/.ivy2.5.2/cache
The jars for the packages stored in: /Users/anantkhannekar/.ivy2.5.2/jars
org.apache.hadoop#hadoop-aws added as a dependency
com.amazonaws#aws-java-sdk-bundle added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-5b2afc46-5371-477b-80f5-e9734db729a9;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-aws;3.3.4 in central
	found com.amazonaws#aws-java-sdk-bundle;1.12.262 in central
	found org.wildfly.openssl#wildfly-openssl;1.0.7.Final in central
:: resolution r

In [74]:
df_bls = spark.read.parquet("s3a://rearc-assignment-bls/raw/bls_data/part-0.parquet")
df_bls.printSchema()
df_bls.show(5)

root
 |-- series_id: string (nullable = true)
 |-- year: string (nullable = true)
 |-- period: string (nullable = true)
 |-- value: double (nullable = true)
 |-- footnote_codes: string (nullable = true)

+-----------------+----+------+-----+--------------+
|        series_id|year|period|value|footnote_codes|
+-----------------+----+------+-----+--------------+
|PRS30006011      |1988|   Q01|  1.9|          NULL|
|PRS30006011      |1988|   Q02|  2.2|          NULL|
|PRS30006011      |1988|   Q03|  1.9|          NULL|
|PRS30006011      |1988|   Q04|  1.1|          NULL|
|PRS30006011      |1988|   Q05|  1.8|          NULL|
+-----------------+----+------+-----+--------------+
only showing top 5 rows


In [77]:
from pyspark.sql.functions import col, trim

df_bls = (
    df_bls
    .withColumn("series_id", trim(col("series_id")))
    .withColumn("year", trim(col("year")).cast("int"))
)

In [78]:
df_raw_population  = spark.read.option("multiLine", "true") \
    .json("s3a://rearc-assignment-bls/raw/datausa/population.json")
df_raw_population.printSchema()

root
 |-- annotations: struct (nullable = true)
 |    |-- dataset_link: string (nullable = true)
 |    |-- dataset_name: string (nullable = true)
 |    |-- source_description: string (nullable = true)
 |    |-- source_name: string (nullable = true)
 |    |-- subtopic: string (nullable = true)
 |    |-- table_id: string (nullable = true)
 |    |-- topic: string (nullable = true)
 |-- columns: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- data: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- Nation: string (nullable = true)
 |    |    |-- Nation ID: string (nullable = true)
 |    |    |-- Population: double (nullable = true)
 |    |    |-- Year: long (nullable = true)
 |-- page: struct (nullable = true)
 |    |-- limit: long (nullable = true)
 |    |-- offset: long (nullable = true)
 |    |-- total: long (nullable = true)



In [79]:
from pyspark.sql.functions import explode

df_population = df_raw_population.select(explode("data").alias("record")) \
    .select("record.*")

df_population = (
    df_population
    .withColumn("year", trim(col("Year")).cast("int"))
    .withColumn(
    "population",col("Population").cast("long")
)
)
df_population.show()
df_population.printSchema()

+-------------+---------+----------+----+
|       Nation|Nation ID|population|year|
+-------------+---------+----------+----+
|United States|  01000US| 316128839|2013|
|United States|  01000US| 318857056|2014|
|United States|  01000US| 321418821|2015|
|United States|  01000US| 323127515|2016|
|United States|  01000US| 325719178|2017|
|United States|  01000US| 327167439|2018|
|United States|  01000US| 328239523|2019|
|United States|  01000US| 331893745|2021|
|United States|  01000US| 333287562|2022|
|United States|  01000US| 334914896|2023|
+-------------+---------+----------+----+

root
 |-- Nation: string (nullable = true)
 |-- Nation ID: string (nullable = true)
 |-- population: long (nullable = true)
 |-- year: integer (nullable = true)



In [80]:
"""1. Using the dataframe from the population data API (Part 2), generate the mean and the standard deviation of the annual US population across the years [2013, 2018] inclusive.
"""

from pyspark.sql.functions import col, mean, stddev

df_stats = (
    df_population
    .filter((col("Year") >= 2013) & (col("Year") <= 2018))
    .agg(
        mean("Population").alias("mean_population"),
        stddev("Population").alias("stddev_population")
    )
)

# df_stats.show()

result = df_stats.collect()[0]

mean_population = result["mean_population"]
stddev_population = result["stddev_population"]

print("Mean:", mean_population)
print("StdDev:", stddev_population)


Mean: 322069808.0
StdDev: 4158441.040908092


In [81]:
"""2. Using the dataframe from the time-series (Part 1), For every series_id, find the best year: the year with the max/largest sum of "value" for all quarters in that year. Generate a report with each series id, the best year for that series, and the summed value for that year. For example, if the table had the following values:
"""

from pyspark.sql.functions import sum as _sum

df_year_data = (
    df_bls
    .groupBy("series_id", "year")
    .agg(_sum("value").alias("year_total"))
)


In [82]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, desc

window_spec = Window.partitionBy("series_id").orderBy(desc("year_total"))

df_ranked = (
    df_year_data
    .withColumn("rn", row_number().over(window_spec))
)


In [41]:
from pyspark.sql.functions import col, round


df_best_year_data = df_ranked.select(
    "series_id",
    col("year").alias("best_year"),
    round(col("year_total").cast("double"), 2).alias("summed_value")
)


df_best_year_data.show()

+-----------------+---------+------------+
|        series_id|best_year|summed_value|
+-----------------+---------+------------+
|PRS30006033      |     2018|      508.86|
|PRS30006063      |     1999|      411.95|
|PRS30006063      |     2010|      403.22|
|PRS30006081      |     1989|         6.4|
|PRS30006113      |     2017|      499.99|
|PRS30006132      |     1995|         4.5|
|PRS30006152      |     2007|        -3.3|
|PRS30006162      |     1998|        17.7|
|PRS30006163      |     2001|       347.7|
|PRS30006172      |     2011|        -6.8|
|PRS30006213      |     1993|       351.4|
|PRS31006022      |     2018|        -3.6|
|PRS31006031      |     1988|        19.7|
|PRS31006033      |     2003|      563.23|
|PRS31006033      |     2012|      482.58|
|PRS31006131      |     2003|         0.9|
|PRS31006152      |     2023|        13.0|
|PRS32006021      |     1995|        -2.5|
|PRS32006022      |     2000|        -3.1|
|PRS32006061      |     2009|       -29.6|
+----------

In [83]:
"""3.Using both dataframes from Part 1 and Part 2, generate a report that will provide the value for series_id = PRS30006032 and period = Q01 and the population for that given year (if available in the population dataset). The below table shows an example of one row that might appear in the resulting table:
"""

from pyspark.sql.functions import col

df_filtered_bls = (
    df_bls
    .filter(
        (col("series_id") == "PRS30006032") &
        (col("period") == "Q01")
    )
)


In [84]:
df_filtered_bls.show()

+-----------+----+------+-----+--------------+
|  series_id|year|period|value|footnote_codes|
+-----------+----+------+-----+--------------+
|PRS30006032|1988|   Q01|  2.1|          NULL|
|PRS30006032|1989|   Q01|  1.8|          NULL|
|PRS30006032|1990|   Q01| -4.6|          NULL|
|PRS30006032|1991|   Q01| -7.9|          NULL|
|PRS30006032|1992|   Q01| -3.1|          NULL|
|PRS30006032|1993|   Q01|  1.3|          NULL|
|PRS30006032|1994|   Q01|  1.7|          NULL|
|PRS30006032|1995|   Q01|  0.0|          NULL|
|PRS30006032|1996|   Q01| -4.2|          NULL|
|PRS30006032|1997|   Q01|  2.8|          NULL|
|PRS30006032|1998|   Q01|  0.9|          NULL|
|PRS30006032|1999|   Q01| -4.1|          NULL|
|PRS30006032|2000|   Q01|  0.5|          NULL|
|PRS30006032|2001|   Q01| -6.3|          NULL|
|PRS30006032|2002|   Q01| -6.6|          NULL|
|PRS30006032|2003|   Q01| -5.7|          NULL|
|PRS30006032|2004|   Q01|  2.0|          NULL|
|PRS30006032|2005|   Q01| -0.5|          NULL|
|PRS30006032|

In [85]:
df_population_clean = (
    df_population
    .withColumnRenamed("Year", "year")
)


In [86]:
df_report = (
    df_filtered_bls
    .join(
        df_population_clean,
        on="year",
        how="left"
    )
    .select(
        "series_id",
        "year",
        "period",
        "value",
        "population"
    )
)


In [72]:
df_report.show()

+-----------+----+------+-----+----------+
|  series_id|year|period|value|Population|
+-----------+----+------+-----+----------+
|PRS30006032|1988|   Q01|  2.1|      NULL|
|PRS30006032|1989|   Q01|  1.8|      NULL|
|PRS30006032|1990|   Q01| -4.6|      NULL|
|PRS30006032|1991|   Q01| -7.9|      NULL|
|PRS30006032|1992|   Q01| -3.1|      NULL|
|PRS30006032|1993|   Q01|  1.3|      NULL|
|PRS30006032|1994|   Q01|  1.7|      NULL|
|PRS30006032|1995|   Q01|  0.0|      NULL|
|PRS30006032|1996|   Q01| -4.2|      NULL|
|PRS30006032|1997|   Q01|  2.8|      NULL|
|PRS30006032|1998|   Q01|  0.9|      NULL|
|PRS30006032|1999|   Q01| -4.1|      NULL|
|PRS30006032|2000|   Q01|  0.5|      NULL|
|PRS30006032|2001|   Q01| -6.3|      NULL|
|PRS30006032|2002|   Q01| -6.6|      NULL|
|PRS30006032|2003|   Q01| -5.7|      NULL|
|PRS30006032|2004|   Q01|  2.0|      NULL|
|PRS30006032|2005|   Q01| -0.5|      NULL|
|PRS30006032|2006|   Q01|  1.8|      NULL|
|PRS30006032|2007|   Q01| -0.8|      NULL|
+----------

In [87]:
from pyspark.sql.functions import col

df_report_not_null = df_report.filter(col("population").isNotNull())

df_report_not_null.show()

+-----------+----+------+-----+----------+
|  series_id|year|period|value|population|
+-----------+----+------+-----+----------+
|PRS30006032|2013|   Q01|  0.5| 316128839|
|PRS30006032|2014|   Q01| -0.1| 318857056|
|PRS30006032|2015|   Q01| -1.7| 321418821|
|PRS30006032|2016|   Q01| -1.4| 323127515|
|PRS30006032|2017|   Q01|  0.9| 325719178|
|PRS30006032|2018|   Q01|  0.5| 327167439|
|PRS30006032|2019|   Q01| -1.6| 328239523|
|PRS30006032|2021|   Q01|  0.7| 331893745|
|PRS30006032|2022|   Q01|  5.3| 333287562|
|PRS30006032|2023|   Q01|  0.3| 334914896|
+-----------+----+------+-----+----------+

