# Part 3: Analysis

## Analysis Section 0
### Load the CSV file
pr.data.0.Current is tab delimited. Because of this, I added "sep = '\t'" to handle the tabs.
I also added code to remove leading and trailing spaces for the column names.

In [131]:
from pyspark.sql.functions import trim, col

# Load the CSV file and view it
df_csv = spark.read.csv("s3://rearc-chardee-project/bls-data/pr.data.0.Current", header=True, sep = '\t', inferSchema=True)

# There are extra spaces (leading and trailing) in the column names, so I will use the line below to remove them.
df_csv = df_csv.toDF(*[c.strip() for c in df_csv.columns])


# Automatically trim all string columns in a DataFrame
for c in df_csv.columns:
    if dict(df_csv.dtypes)[c] == 'string':
        df_csv = df_csv.withColumn(c, trim(col(c)))
df_csv.show(5)


+-----------+----+------+-----+--------------+
|  series_id|year|period|value|footnote_codes|
+-----------+----+------+-----+--------------+
|PRS30006011|1995|   Q01|  2.6|          null|
|PRS30006011|1995|   Q02|  2.1|          null|
|PRS30006011|1995|   Q03|  0.9|          null|
|PRS30006011|1995|   Q04|  0.1|          null|
|PRS30006011|1995|   Q05|  1.4|          null|
+-----------+----+------+-----+--------------+
only showing top 5 rows


### Load the JSON file

In [132]:
# Load the JSON file.
# If you load the JSON, it will give you a corrupt record error. 
# I added "multiLine=True" to handle this.

df_json = spark.read.json("s3://rearc-chardee-project/api_out.json", multiLine=True)
df_json.printSchema()


root
 |-- annotations: struct (nullable = true)
 |    |-- dataset_link: string (nullable = true)
 |    |-- dataset_name: string (nullable = true)
 |    |-- source_description: string (nullable = true)
 |    |-- source_name: string (nullable = true)
 |    |-- subtopic: string (nullable = true)
 |    |-- table_id: string (nullable = true)
 |    |-- topic: string (nullable = true)
 |-- columns: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- data: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- Nation: string (nullable = true)
 |    |    |-- Nation ID: string (nullable = true)
 |    |    |-- Population: double (nullable = true)
 |    |    |-- Year: long (nullable = true)
 |-- page: struct (nullable = true)
 |    |-- limit: long (nullable = true)
 |    |-- offset: long (nullable = true)
 |    |-- total: long (nullable = true)


In [133]:
# As you can see from the output, it needs more work. 
# It is only showing one record.

df_json.show(5)

+--------------------+--------------------+--------------------+----------+
|         annotations|             columns|                data|      page|
+--------------------+--------------------+--------------------+----------+
|{http://www.censu...|[Nation ID, Natio...|[{United States, ...|{0, 0, 10}|
+--------------------+--------------------+--------------------+----------+


### Explode the JSON file

In [134]:
# To turn this into multiple records, we will need to explode the data.
# Explode and view the data
from pyspark.sql.functions import explode

df_json_explode = df_json.select(explode("data").alias("row"))
df_json_explode.show(5, truncate=False)


+--------------------------------------------+
|row                                         |
+--------------------------------------------+
|{United States, 01000US, 3.16128839E8, 2013}|
|{United States, 01000US, 3.18857056E8, 2014}|
|{United States, 01000US, 3.21418821E8, 2015}|
|{United States, 01000US, 3.23127515E8, 2016}|
|{United States, 01000US, 3.25719178E8, 2017}|
+--------------------------------------------+
only showing top 5 rows


### Flatten the JSON file

In [135]:
# This is closer, but all of the data is still is one column. It will need to be flattened next.
df_json_flat = df_json_explode.select(
    df_json_explode.row["Nation ID"].alias("Nation_ID"),
    df_json_explode.row["Nation"].alias("Nation"),
    df_json_explode.row["Year"].alias("Year"),
    df_json_explode.row["Population"].alias("Population")
)

df_json_flat.show()

+---------+-------------+----+------------+
|Nation_ID|       Nation|Year|  Population|
+---------+-------------+----+------------+
|  01000US|United States|2013|3.16128839E8|
|  01000US|United States|2014|3.18857056E8|
|  01000US|United States|2015|3.21418821E8|
|  01000US|United States|2016|3.23127515E8|
|  01000US|United States|2017|3.25719178E8|
|  01000US|United States|2018|3.27167439E8|
|  01000US|United States|2019|3.28239523E8|
|  01000US|United States|2021|3.31893745E8|
|  01000US|United States|2022|3.33287562E8|
|  01000US|United States|2023|3.34914896E8|
+---------+-------------+----+------------+


## Analysis Section 1
### Filter the data from 2013 to 2018 inclusive

In [136]:
# Now the data is in a format I can use.

# Let's recap with the instructions:

## Using the dataframe from the population data API (Part 2), generate the mean and the standard deviation
## of the annual US population across the years [2013, 2018] inclusive.

df_json_filter = df_json_flat.filter(
    (df_json_flat['Year'] >= 2013) & (df_json_flat['Year'] <= 2018)
)

df_json_filter.show()

# Now we have the date range we need (2013 to 2018 inclusive) in df_json_filter.

+---------+-------------+----+------------+
|Nation_ID|       Nation|Year|  Population|
+---------+-------------+----+------------+
|  01000US|United States|2013|3.16128839E8|
|  01000US|United States|2014|3.18857056E8|
|  01000US|United States|2015|3.21418821E8|
|  01000US|United States|2016|3.23127515E8|
|  01000US|United States|2017|3.25719178E8|
|  01000US|United States|2018|3.27167439E8|
+---------+-------------+----+------------+


### Calculate the mean

In [137]:
# Calculate mean of the Population column
from pyspark.sql import functions as F
df_json_filter.agg(F.mean('Population')).collect()[0][0]


322069808.0


### Calculate the standard deviation

In [138]:
# Calculate the standard deviation of the column.
stddev_value = df_json_filter.agg(F.stddev("Population")).collect()[0][0]
print(stddev_value)

4158441.040908092


***

## Analysis Section #2

Here are the instructions:

Using the dataframe from the time-series (Part 1), For every series_id, find the best year: the year with the max/largest sum of "value" for all quarters in that year. Generate a report with each series id, the best year for that series, and the summed value for that year.

This data is already loaded into df_csv at the top of this file.

In [139]:
df_csv.show(5)

+-----------+----+------+-----+--------------+
|  series_id|year|period|value|footnote_codes|
+-----------+----+------+-----+--------------+
|PRS30006011|1995|   Q01|  2.6|          null|
|PRS30006011|1995|   Q02|  2.1|          null|
|PRS30006011|1995|   Q03|  0.9|          null|
|PRS30006011|1995|   Q04|  0.1|          null|
|PRS30006011|1995|   Q05|  1.4|          null|
+-----------+----+------+-----+--------------+
only showing top 5 rows


In [140]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Create a window partitioned by series_id. 
# Order by Total_Value
wind = Window.partitionBy("series_id").orderBy(F.col("Total_Value").desc())

# Create a new column called "ranker"
df_ranked = df_csv_grp.withColumn("ranker", F.rank().over(wind))

# Show only the series and years with a number 1 ranking (top)
# Then select the correct columns for the output.
df_output1 = df_ranked.filter(F.col("ranker") == 1) \
                          .select("series_id", "year", F.col("Total_Value").alias("value"))   
                          
# Clean up the values to 1 decimal place.
df_output2 = df_output1.withColumn("value", F.round(F.col("value"), 1))
df_output2.show(10)


+-----------------+----+-----+
|        series_id|year|value|
+-----------------+----+-----+
|PRS30006012      |2022| 17.1|
|PRS30006023      |2014|503.2|
|PRS31006093      |2014|507.3|
|PRS31006131      |2021| 12.7|
|PRS32006102      |2003| 34.6|
|PRS32006132      |2021| 21.3|
|PRS32006232      |2021| 22.6|
|PRS84006151      |2020| 33.2|
|PRS85006152      |2020| 42.7|
|PRS85006162      |2020| 26.3|
+-----------------+----+-----+
only showing top 10 rows


***
## Analysis Section 3
Using both dataframes from Part 1 and Part 2, generate a report that will provide the value for series_id = PRS30006032 and period = Q01 and the population for that given year (if available in the population dataset).

In [169]:
# For this I will use df_csv and df_json_flat.

from pyspark.sql.functions import col

df_filter_prs = df_csv.filter(
    (F.col("series_id") == "PRS30006032") &
    (F.col("period") == "Q01") &
    (F.col("year") == "2018")
)

df_json_2018 = df_json_flat.filter(F.col("Year") == '2018')

df_json_20182 = df_json_2018.withColumn("Population2", col("Population").cast("double").cast("long"))

# df1.join(df2, on=key, how=how)
df_joined_2018 = df_filter_prs.join(
    df_json_20182,
    df_filter_prs['year'] == df_json_20182['Year'],
    how='inner'
)

# df_joined_2018.show()

# df_filter_prs.show(5)
# df_json_20182.show(5)

# df_joined_2018[['series_id', 'year', 'period', 'value', 'Population']]
df_join_final = df_joined_2018.select("series_id", df_json_2018["year"].alias("year_from_left"),  "period", "value", "Population2")

# Fix the column names so they match
df_join_final = df_join_final.toDF("series_id", "year", "period", "value", "Population")

df_join_final.show()



+-----------+----+------+-----+----------+
|  series_id|year|period|value|Population|
+-----------+----+------+-----+----------+
|PRS30006032|2018|   Q01|  0.5| 327167439|
+-----------+----+------+-----+----------+
