Import Libraries and Set Up Spark

In [34]:
import pandas as pd
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import trim
import requests
from io import StringIO
from datetime import datetime

# Dynamically generate the current date in YYYY-MM-DD format
current_date = datetime.now().strftime('%Y-%m-%d')

# Initialize Spark session
spark = SparkSession.builder.appName("Data Analytics").getOrCreate()

Load Data from Part 1 (CSV File)

In [35]:

# Fetch the data from the URL
url = 'https://s3-rearcdataquest.s3.amazonaws.com/pr.data.0.Current'
response = requests.get(url)

# Convert the data to a Pandas DataFrame
data = StringIO(response.text)
df_pandas = pd.read_csv(data, sep='\t')

# Convert the Pandas DataFrame to a Spark DataFrame
df_spark = spark.createDataFrame(df_pandas)

# Clean the column names by stripping any leading/trailing whitespace
df_spark = df_spark.select([F.col(col).alias(col.strip()) for col in df_spark.columns])

# Trim whitespace from the 'series_id' column
df_spark = df_spark.withColumn('series_id', trim(df_spark['series_id']))

# Display the first few rows to check
df_spark.show(5)



+-----------+----+------+-----+--------------+
|  series_id|year|period|value|footnote_codes|
+-----------+----+------+-----+--------------+
|PRS30006011|1995|   Q01|  2.6|           NaN|
|PRS30006011|1995|   Q02|  2.1|           NaN|
|PRS30006011|1995|   Q03|  0.9|           NaN|
|PRS30006011|1995|   Q04|  0.1|           NaN|
|PRS30006011|1995|   Q05|  1.4|           NaN|
+-----------+----+------+-----+--------------+
only showing top 5 rows



Load Data from Part 2 (JSON File)

In [36]:

# Fetch the JSON data from the provided link
url = f'https://s3-rearcdataquest.s3.amazonaws.com/data_{current_date}.json'
response = requests.get(url)
data = response.json()

# Normalize the 'data' part of the JSON
df_json = pd.json_normalize(data['data'])

df_population_spark = spark.createDataFrame(df_json)

# Display the first few rows to check
df_population_spark.show(5)

+---------+-------------+-------+----+----------+-------------+
|ID Nation|       Nation|ID Year|Year|Population|  Slug Nation|
+---------+-------------+-------+----+----------+-------------+
|  01000US|United States|   2022|2022| 331097593|united-states|
|  01000US|United States|   2021|2021| 329725481|united-states|
|  01000US|United States|   2020|2020| 326569308|united-states|
|  01000US|United States|   2019|2019| 324697795|united-states|
|  01000US|United States|   2018|2018| 322903030|united-states|
+---------+-------------+-------+----+----------+-------------+
only showing top 5 rows



Calculate Mean and Standard Deviation of Population (2013-2018)

In [37]:
# Filter the population data for years 2013-2018
df_population_filtered = df_population_spark.filter((F.col("Year") >= 2013) & (F.col("Year") <= 2018))

# Calculate the mean and standard deviation
mean_population = df_population_filtered.agg(F.mean('Population')).collect()[0][0]
std_population = df_population_filtered.agg(F.stddev('Population')).collect()[0][0]

# Display results
print(f"Mean Population (2013-2018): {mean_population}")
print(f"Standard Deviation Population (2013-2018): {std_population}")

Mean Population (2013-2018): 317437383.0
Standard Deviation Population (2013-2018): 4257089.541529327


Find Best Year for Each series_id

In [38]:
# Group by 'series_id' and 'year', then sum the 'value' column
df_grouped = df_spark.groupBy('series_id', 'year').agg(F.sum('value').alias('total_value'))

# Find the best year for each 'series_id'
df_best_year = df_grouped.groupBy('series_id').agg(F.max(F.struct('total_value', 'year')).alias('max_value_year'))

# Select the relevant columns
df_best_year = df_best_year.select('series_id', 'max_value_year.year', 'max_value_year.total_value')

# Display the final report
df_best_year.show()

+-----------+----+------------------+
|  series_id|year|       total_value|
+-----------+----+------------------+
|PRS30006011|2022|              20.5|
|PRS30006012|2022|17.099999999999998|
|PRS30006013|1998|           704.125|
|PRS30006021|2010|17.599999999999998|
|PRS30006022|2010|              12.5|
|PRS30006023|2014|           503.171|
|PRS30006031|2022|20.400000000000002|
|PRS30006032|2021|17.099999999999998|
|PRS30006033|1998|           700.712|
|PRS30006061|2022|              38.9|
|PRS30006062|2022|              31.7|
|PRS30006063|2023| 631.8059999999999|
|PRS30006081|2021|              23.4|
|PRS30006082|2021|              23.4|
|PRS30006083|2021|           112.459|
|PRS30006091|2002|43.400000000000006|
|PRS30006092|2002|              44.5|
|PRS30006093|2011|           520.086|
|PRS30006101|2020|              33.5|
|PRS30006102|2020|              36.0|
+-----------+----+------------------+
only showing top 20 rows



Generate Combined Report

In [39]:
# Display the cleaned DataFrame
df_filtered = df_spark.filter((df_spark.series_id == 'PRS30006032') & (df_spark.period == 'Q01'))

# # Rename the 'Year' column in the population DataFrame to avoid ambiguity
df_population_spark = df_population_spark.withColumnRenamed('Year', 'population_year')

# # Join with the population data on 'year'
df_final = df_filtered.join(df_population_spark, df_filtered.year == df_population_spark.population_year, 'inner')

# # Select and display the relevant columns
df_report = df_final.select('series_id', df_filtered.year.alias('year'), 'period', 'value', 'Population')

df_report.show()

+-----------+----+------+-----+----------+
|  series_id|year|period|value|Population|
+-----------+----+------+-----+----------+
|PRS30006032|2013|   Q01|  0.8| 311536594|
|PRS30006032|2014|   Q01| -0.1| 314107084|
|PRS30006032|2015|   Q01| -1.6| 316515021|
|PRS30006032|2016|   Q01| -1.4| 318558162|
|PRS30006032|2017|   Q01|  0.7| 321004407|
|PRS30006032|2018|   Q01|  0.4| 322903030|
|PRS30006032|2019|   Q01| -1.6| 324697795|
|PRS30006032|2020|   Q01| -6.7| 326569308|
|PRS30006032|2021|   Q01|  1.2| 329725481|
|PRS30006032|2022|   Q01|  5.6| 331097593|
+-----------+----+------+-----+----------+

