In [0]:
# All imports needed
import pyspark.sql.functions as F
from pyspark.sql.window import Window

# Just for fun
from pyspark.testing import assertDataFrameEqual

In [0]:
# Variables / parameters
dbutils.widgets.text("bucket_name", "af-rearc-quest", "AWS S3 Bucket Name")
bucket_name = dbutils.widgets.get("bucket_name")

In [0]:
# Helper Functions
def read_from_s3(path: str, format: str, **options):
    reader = spark.read.format(format)

    if options:
        reader = reader.options(**options)

    return reader.load(path)

# Just for fun
def dataframe_equal(df1, df2):
    try:
        assertDataFrameEqual(df1, df2)
        return ("DataFrames are equal.\n")
    except AssertionError as e:
        return (f"DataFrames are not equal. Differences:\n{e}\n")

In [0]:
# Read S3 Files into Data Frames and display them
## Read BLS/pr.data.0.Current into Dataframe
bls_df = read_from_s3(path="s3://af-rearc-quest/BLS/pr.data.0.Current",
                           format="csv",
                           sep="\t",
                           header=True,
                           ignoreLeadingWhiteSpace=True,
                           ignoreTrailingWhiteSpace=True,
                           inferSchema=True)
display(bls_df)

## Read honolulu-api/yearly_population.json into Dataframe
api_df = read_from_s3(path="s3://af-rearc-quest/honolulu-api/yearly_population.json",
                      format="json")
display(api_df)

In [0]:
# Transform honolulu-api dataframe to retrieve the data column as multiple rows
api_df = api_df.select("data").withColumn("data", F.explode(F.col("data")))
display(api_df)

# Transform honolulu-api dataframe to retrieve the data column as a proper table
api_df = api_df.withColumn("Nation", F.col("data.Nation"))\
               .withColumn("Nation_ID", F.col("data.`Nation ID`"))\
               .withColumn("Population", F.col("data.Population"))\
               .withColumn("Year", F.col("data.Year"))\
               .select("Nation", "Nation_ID", "Population", "Year")
display(api_df)

In [0]:
# Create windowSpec for window function usage
windowSpec = Window.partitionBy("series_id").orderBy(F.desc("value"))

In [0]:
# Creates the Dataframe for the best year by series and displays it
bls_best_year_by_series_df = bls_df.groupBy("series_id","year")\
                                   .agg(F.sum("value").alias("value"))\
                                   .withColumn("row_num", F.row_number().over(windowSpec))\
                                   .where(F.col("row_num") == 1)\
                                   .select("series_id","year","value")
display(bls_best_year_by_series_df)

In [0]:
bls_api_df = bls_df.join(api_df, bls_df.year == api_df.Year, "left")\
                   .where((bls_df.series_id == 'PRS30006032') & (bls_df.period == 'Q01'))\
                   .select("series_id",bls_df.year,"period","value","Population")
display(bls_api_df)

In [0]:
# Data Analytics
## BLS/pr.data.0.Current - Best Year by Series: 
### bls_best_year_by_series_df

## honolulu-api/yearly_population.json - Value and Population by Year for series_id = PRS30006032 and period = Q01:
### bls_api_df