# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


####  Run this cell to set up and start your interactive session.


In [5]:
%idle_timeout 2880
%glue_version 5.0
%worker_type G.8X
%number_of_workers 8

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.7 
Current idle_timeout is None minutes.
idle_timeout has been set to 2880 minutes.
Setting Glue version to: 5.0
Previous worker type: None
Setting new worker type to: G.8X
Previous number of workers: None
Setting new number of workers to: 8


Importing neccesary library

In [1]:
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
from pyspark.sql.types import IntegerType, DoubleType, LongType
from pyspark.sql.functions import col, trim, mean, stddev, min, max, count, sum, expr, row_number, desc, explode
from pyspark.sql.types import IntegerType, DoubleType, LongType
import sys
from datetime import datetime
import json
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

Trying to create a Glue session for the kernel.
Session Type: glueetl
Worker Type: G.8X
Number of Workers: 8
Idle Timeout: 2880
Session ID: b07cc753-c701-4998-9a03-be953fc6bf9c
Applying the following default arguments:
--glue_kernel_version 1.0.7
--enable-glue-datacatalog true
Waiting for session b07cc753-c701-4998-9a03-be953fc6bf9c to get into ready status...
Session b07cc753-c701-4998-9a03-be953fc6bf9c has been created.



In [2]:
## initializing spark and glue context
from awsglue.context import GlueContext
from pyspark.context import SparkContext

def initialize_spark_context():
    """Initialize AWS Glue and Spark context with optimal configurations."""

    # Get existing SparkContext (do NOT create a new one)
    sc = SparkContext.getOrCreate()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session

    # Optional: Set Spark configurations (some may be restricted in AWS Glue)
    spark.conf.set("spark.sql.adaptive.enabled", "true")
    spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
    
    print("AWS Glue and PySpark initialized successfully!")
    print(f"Spark Version: {spark.version}")
    print(f"Glue Context: {glueContext}")

    return sc, glueContext, spark

# Initialize contexts
sc, glueContext, spark = initialize_spark_context()

AWS Glue and PySpark initialized successfully!
Spark Version: 3.5.4-amzn-0
Glue Context: <awsglue.context.GlueContext object at 0x7f268799ec50>


In [3]:
#  Define S3 bucket pathsfor source files needed
bucket_name = 'yemi-data-quest'
bls_key = 'bls/pr/pub/time.series/pr/pr.data.0.Current'
population_key = 'API_DATA/population_data.json'




In [4]:
####part3-0(a) data ingestion for bls data 
def load_dynamic_frame_from_s3(glueContext, bucket_name, key, file_format="csv", separator="\t", with_header=True):
    """Generic function to load a DynamicFrame from S3."""
    return glueContext.create_dynamic_frame.from_options(
        connection_type="s3",
        connection_options={
            "paths": [f"s3://{bucket_name}/{key}"],
            "recurse": True
        },
        format=file_format,
        format_options={
            "withHeader": with_header,
            "separator": separator
        }
    )

def dynamic_frame_to_df(dynamic_frame):
    """Convert a DynamicFrame to a Spark DataFrame."""
    return dynamic_frame.toDF()

def display_df_info(df, name="DataFrame", show_rows=5):
    """Display information about a DataFrame."""
    print(f"{name} loaded successfully!")
    print(f"Shape: ({df.count()}, {len(df.columns)})")
    print(f"Columns: {df.columns}")
    print("\nSchema:")
    df.printSchema()
    print(f"\nFirst {show_rows} rows:")
    df.show(show_rows, truncate=False)

def load_bls_data(glueContext, bucket_name, bls_key):
    """Main loader for BLS data with column casting."""
    try:
        dynamic_frame = load_dynamic_frame_from_s3(glueContext, bucket_name, bls_key)
        df = dynamic_frame_to_df(dynamic_frame)

        # Cast columns to correct types
        df = df.select(
            df["series_id"].cast("string"),
            df["year"].cast("int"),
            df["period"].cast("string"),
            df["value"].cast("double"),
            df["footnote_codes"].cast("string")
        )

        display_df_info(df, name="BLS Data")
        return df
    except Exception as e:
        print(f"Error loading BLS data from S3: {str(e)}")
        raise

bls_df = load_bls_data(glueContext, bucket_name, bls_key)
bls_df.show(5, truncate=False)

BLS Data loaded successfully!
Shape: (37239, 5)
Columns: ['series_id', 'year', 'period', 'value', 'footnote_codes']

Schema:
root
 |-- series_id: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- period: string (nullable = true)
 |-- value: double (nullable = true)
 |-- footnote_codes: string (nullable = true)


First 5 rows:
+-----------------+----+------+-----+--------------+
|series_id        |year|period|value|footnote_codes|
+-----------------+----+------+-----+--------------+
|PRS30006011      |1995|Q01   |2.6  |              |
|PRS30006011      |1995|Q02   |2.1  |              |
|PRS30006011      |1995|Q03   |0.9  |              |
|PRS30006011      |1995|Q04   |0.1  |              |
|PRS30006011      |1995|Q05   |1.4  |              |
+-----------------+----+------+-----+--------------+
only showing top 5 rows

+-----------------+----+------+-----+--------------+
|series_id        |year|period|value|footnote_codes|
+-----------------+----+------+-----+----------

In [5]:
## part3-0(b) data ingestion from s3 bucket
def load_dynamic_frame_from_json(glueContext, bucket_name, key):
    """Load a JSON file from S3 into a DynamicFrame."""
    return glueContext.create_dynamic_frame.from_options(
        connection_type="s3",
        connection_options={
            "paths": [f"s3://{bucket_name}/{key}"],
            "recurse": True
        },
        format="json"
    )

def extract_nested_json(df, nested_field="data"):
    """Extract data from a nested JSON column if it exists."""
    if nested_field in df.columns:
        return df.select(explode(col(nested_field)).alias("data")).select("data.*")
    return df

def display_df_info(df, name="DataFrame", show_rows=10):
    """Display information about a DataFrame."""
    print(f"{name} loaded successfully!")
    print(f"Shape: ({df.count()}, {len(df.columns)})")
    print(f"Columns: {df.columns}")
    print("\nSchema:")
    df.printSchema()
    print(f"\nFirst {show_rows} rows:")
    df.show(show_rows, truncate=False)

def load_population_data(glueContext, bucket_name, population_key):
    """Load and process population data from S3 JSON using Glue."""
    try:
        dynamic_frame = load_dynamic_frame_from_json(glueContext, bucket_name, population_key)
        df = dynamic_frame.toDF()
        df = extract_nested_json(df, nested_field="data")
        display_df_info(df, name="Population Data", show_rows=10)
        return df
    except Exception as e:
        print(f"Error loading population data: {str(e)}")
        raise
population_df = load_population_data(glueContext, bucket_name, population_key)
population_df.show(5, truncate=False)


Population Data loaded successfully!
Shape: (10, 4)
Columns: ['Nation ID', 'Nation', 'Year', 'Population']

Schema:
root
 |-- Nation ID: string (nullable = true)
 |-- Nation: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Population: double (nullable = true)


First 10 rows:
+---------+-------------+----+------------+
|Nation ID|Nation       |Year|Population  |
+---------+-------------+----+------------+
|01000US  |United States|2013|3.16128839E8|
|01000US  |United States|2014|3.18857056E8|
|01000US  |United States|2015|3.21418821E8|
|01000US  |United States|2016|3.23127515E8|
|01000US  |United States|2017|3.25719178E8|
|01000US  |United States|2018|3.27167439E8|
|01000US  |United States|2019|3.28239523E8|
|01000US  |United States|2021|3.31893745E8|
|01000US  |United States|2022|3.33287562E8|
|01000US  |United States|2023|3.34914896E8|
+---------+-------------+----+------------+

+---------+-------------+----+------------+
|Nation ID|Nation       |Year|Population  |

In [6]:
### data cleansing and transformstion to get ready for part 3-1
def clean_bls_data(bls_df):
    """Clean the BLS DataFrame by trimming strings, casting types, and removing nulls."""
    print("Cleaning BLS data...")
    print(f"Original BLS shape: ({bls_df.count()}, {len(bls_df.columns)})")
    
    df_clean = bls_df.select(
        trim(col("series_id")).alias("series_id"),
        col("year").cast(IntegerType()).alias("year"),
        trim(col("period")).alias("period"),
        col("value").cast(DoubleType()).alias("value")
    ).filter(
        col("series_id").isNotNull() &
        col("year").isNotNull() &
        col("period").isNotNull() &
        col("value").isNotNull()
    )

    removed_rows = bls_df.count() - df_clean.count()
    print(f"Cleaned BLS shape: ({df_clean.count()}, {len(df_clean.columns)})")
    print(f"Removed {removed_rows} rows with missing values")
    df_clean.cache()
    return df_clean

def clean_population_data(population_df):
    """Clean the Population DataFrame by trimming strings, casting types, and removing nulls."""
    print("\nCleaning Population data...")
    print(f"Original Population shape: ({population_df.count()}, {len(population_df.columns)})")
    
    df_clean = population_df.select(
        trim(col("Nation ID")).alias("Nation_ID"),
        trim(col("Nation")).alias("Nation"),
        col("Year").cast(IntegerType()).alias("Year"),
        col("Population").cast(LongType()).alias("Population")
    ).filter(
        col("Nation_ID").isNotNull() &
        col("Nation").isNotNull() &
        col("Year").isNotNull() &
        col("Population").isNotNull()
    )

    removed_rows = population_df.count() - df_clean.count()
    print(f"Cleaned Population shape: ({df_clean.count()}, {len(df_clean.columns)})")
    print(f"Removed {removed_rows} rows with missing values")
    df_clean.cache()
    return df_clean

def display_final_data(bls_df_clean, population_df_clean):
    """Display schema and preview of cleaned DataFrames."""
    print("\nBLS Data Schema:")
    bls_df_clean.printSchema()
    print("\nPopulation Data Schema:")
    population_df_clean.printSchema()

    print("\nFirst 5 records of cleaned BLS Data:")
    bls_df_clean.show(5, truncate=False)

    print("\nFirst 5 records of cleaned Population Data:")
    population_df_clean.show(5, truncate=False)
bls_df_clean = clean_bls_data(bls_df)
population_df_clean = clean_population_data(population_df)
display_final_data(bls_df_clean, population_df_clean)

Cleaning BLS data...
Original BLS shape: (37239, 5)
Cleaned BLS shape: (37239, 4)
Removed 0 rows with missing values

Cleaning Population data...
Original Population shape: (10, 4)
Cleaned Population shape: (10, 4)
Removed 0 rows with missing values

BLS Data Schema:
root
 |-- series_id: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- period: string (nullable = true)
 |-- value: double (nullable = true)


Population Data Schema:
root
 |-- Nation_ID: string (nullable = true)
 |-- Nation: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Population: long (nullable = true)


First 5 records of cleaned BLS Data:
+-----------+----+------+-----+
|series_id  |year|period|value|
+-----------+----+------+-----+
|PRS30006011|1995|Q01   |2.6  |
|PRS30006011|1995|Q02   |2.1  |
|PRS30006011|1995|Q03   |0.9  |
|PRS30006011|1995|Q04   |0.1  |
|PRS30006011|1995|Q05   |1.4  |
+-----------+----+------+-----+
only showing top 5 rows


First 5 records of cleaned Populat

In [7]:
### part 3-1 generatingmean and STD logic
def filter_population_by_year(df, start_year, end_year):
    """Filter population data between specified years."""
    filtered_df = df.filter(
        (col("Year") >= start_year) & (col("Year") <= end_year)
    )
    print(f"\n Population Data for Years {start_year}-{end_year}:")
    filtered_df.select("Year", "Population").orderBy("Year").show()
    return filtered_df

def compute_population_statistics(df):
    """Compute summary statistics for population."""
    stats_row = df.agg(
        mean(col("Population")).alias("mean_population"),
        stddev(col("Population")).alias("std_population"),
        min(col("Population")).alias("min_population"),
        max(col("Population")).alias("max_population"),
        count(col("Population")).alias("count")
    ).collect()[0]
    
    stats = {
        "mean": stats_row["mean_population"],
        "std": stats_row["std_population"],
        "min": stats_row["min_population"],
        "max": stats_row["max_population"],
        "count": stats_row["count"]
    }

    print(f"\n Population Statistics:")
    print(f"Mean Population: {stats['mean']:,.0f}")
    print(f"Standard Deviation: {stats['std']:,.0f}")
    print(f"Min Population: {stats['min']:,.0f}")
    print(f"Max Population: {stats['max']:,.0f}")
    print(f"Count: {stats['count']}")
    
    return stats

def convert_population_to_pandas(df):
    """Convert Spark DataFrame to pandas for visualization."""
    return df.select("Year", "Population").orderBy("Year").toPandas()
# Step 1: Filter population data for 2013–2018
population_2013_2018 = filter_population_by_year(population_df_clean, 2013, 2018)

# Step 2: Compute statistics
population_stats = compute_population_statistics(population_2013_2018)

# Step 3: Prepare for visualization (if needed)
population_data_pandas = convert_population_to_pandas(population_2013_2018)


 Population Data for Years 2013-2018:
+----+----------+
|Year|Population|
+----+----------+
|2013| 316128839|
|2014| 318857056|
|2015| 321418821|
|2016| 323127515|
|2017| 325719178|
|2018| 327167439|
+----+----------+


 Population Statistics:
Mean Population: 322,069,808
Standard Deviation: 4,158,441
Min Population: 316,128,839
Max Population: 327,167,439
Count: 6


In [None]:
#optional visualization with output named poplulation_trend.png
def plot_population_trend(population_df_pandas, mean_population):
    """Plot population trend and histogram using matplotlib."""
    plt.figure(figsize=(12, 6))

    # Line chart: Population over years
    plt.subplot(1, 2, 1)
    plt.plot(
        population_df_pandas['Year'],
        population_df_pandas['Population'],
        'bo-', linewidth=2, markersize=8
    )
    plt.axhline(y=mean_population, color='r', linestyle='--', alpha=0.7,
                label=f'Mean: {mean_population:,.0f}')
    plt.xlabel('Year')
    plt.ylabel('Population')
    plt.title('US Population Trend (2013–2018)')
    plt.legend()
    plt.grid(True, alpha=0.3)

    # Histogram: Population distribution
    plt.subplot(1, 2, 2)
    plt.hist(
        population_df_pandas['Population'],
        bins=6,
        alpha=0.7,
        color='skyblue',
        edgecolor='black'
    )
    plt.axvline(x=mean_population, color='r', linestyle='--', alpha=0.7,
                label=f'Mean: {mean_population:,.0f}')
    plt.xlabel('Population')
    plt.ylabel('Frequency')
    plt.title('Population Distribution (2013–2018)')
    plt.legend()
    plt.grid(True, alpha=0.3)

    plt.tight_layout()
    plt.show()

def prepare_population_stats_dict(stats):
    """Prepare a dictionary of population statistics for reporting."""
    return {
        'mean': stats['mean'],
        'std': stats['std'],
        'min': stats['min'],
        'max': stats['max'],
        'count': stats['count']
    }
# Step 4: Plot charts
plot_population_trend(population_data_pandas, population_stats["mean"])

# Step 5: Prepare final stats for reporting
population_stats_dict = prepare_population_stats_dict(population_stats)




In [9]:
### 3-2 Best Year by Series ID
# --- Step 1: Compute yearly totals ---
def compute_yearly_sums(df):
    """Group by series_id and year, then compute total value per year."""
    yearly_sums = df.groupBy("series_id", "year").agg(
        sum("value").alias("total_value")
    ).orderBy("series_id", "year")
    
    print("Yearly Sums by Series ID (Sample):")
    yearly_sums.show(10)
    
    # Debug: Show all years for PRS30006011
    print("\nDebug: Yearly totals for PRS30006011")
    yearly_sums.filter(col("series_id") == "PRS30006011").orderBy("year").show(100)
    
    return yearly_sums

# --- Step 2: Get best year per series_id ---
def get_best_year_per_series(yearly_sums):
    """For each series_id, get the year with the max total_value using window function."""
    window_spec = Window.partitionBy("series_id").orderBy(desc("total_value"))
    
    best_years = yearly_sums.withColumn("rank", row_number().over(window_spec)) \
                            .filter(col("rank") == 1) \
                            .select("series_id", "year", "total_value") \
                            .orderBy("series_id")
    
    print("\nBest Year Report:")
    print(f"Total series analyzed: {best_years.count()}")
    best_years.show(20)
    
    return best_years


# --- Step 4: Summary statistics ---
def summarize_best_year_stats(best_years):
    """Compute summary statistics from best year data using PySpark."""
    summary = best_years.agg(
        count("*").alias("total_series"),
        min("year").alias("min_year"),
        max("year").alias("max_year"),
        min("total_value").alias("min_value"),
        max("total_value").alias("max_value"),
        mean("total_value").alias("mean_value"),
        expr("percentile_approx(total_value, 0.5)").alias("median_value")
    ).collect()[0]

    print(f"\nBest Year Report Summary:")
    print(f"Total series analyzed: {summary['total_series']}")
    print(f"Year range: {summary['min_year']} - {summary['max_year']}")
    print(f"Value range: {summary['min_value']:.2f} - {summary['max_value']:.2f}")
    print(f"Mean best year value: {summary['mean_value']:.2f}")
    print(f"Median best year value: {summary['median_value']:.2f}")
    
    return summary

# --- Execution ---
# Replace `bls_df_clean` with your cleaned BLS DataFrame
yearly_sums = compute_yearly_sums(bls_df_clean)
best_years = get_best_year_per_series(yearly_sums)

# Optional: Convert to pandas for plots and export
best_years_pandas = best_years.toPandas()
plot_best_year_insights(best_years_pandas)

# Summary
best_year_summary = summarize_best_year_stats(best_years)


NameError: name 'plot_best_year_insights' is not defined


In [None]:
# --- Step 3: Visualize using matplotlib ---optionalllll
def plot_best_year_insights(best_years_pandas, s3_uri):
    """Create a 2x2 set of matplotlib visualizations from best year data and save to S3."""
    fig = plt.figure(figsize=(15, 8))  # Capture the figure object

    # Plot 1: Distribution of best year values
    plt.subplot(2, 2, 1)
    plt.hist(best_years_pandas['total_value'], bins=20, alpha=0.7, color='lightgreen', edgecolor='black')
    plt.xlabel('Sum of Values')
    plt.ylabel('Frequency')
    plt.title('Distribution of Best Year Values')
    plt.grid(True, alpha=0.3)

    # Plot 2: Distribution of years
    plt.subplot(2, 2, 2)
    year_counts = best_years_pandas['year'].value_counts().sort_index()
    plt.bar(year_counts.index, year_counts.values, alpha=0.7, color='orange')
    plt.xlabel('Year')
    plt.ylabel('Number of Series')
    plt.title('Best Years Distribution')
    plt.xticks(rotation=45)
    plt.grid(True, alpha=0.3)

    # Plot 3: Top 10 series by total_value
    plt.subplot(2, 2, 3)
    top_10 = best_years_pandas.nlargest(10, 'total_value')
    plt.barh(range(len(top_10)), top_10['total_value'], alpha=0.7, color='purple')
    plt.yticks(range(len(top_10)), top_10['series_id'])
    plt.xlabel('Sum of Values')
    plt.title('Top 10 Series by Best Year Value')
    plt.grid(True, alpha=0.3)

    # Plot 4: Scatter of value vs year
    plt.subplot(2, 2, 4)
    plt.scatter(best_years_pandas['year'], best_years_pandas['total_value'], alpha=0.6, s=50)
    plt.xlabel('Year')
    plt.ylabel('Sum of Values')
    plt.title('Best Year Values vs Year')
    plt.grid(True, alpha=0.3)

    plt.tight_layout()
    save_plot_to_s3(fig, s3_uri)
    
    plot_best_year_insights(
    best_years_pandas,
    "s3://yemi-data-quest/rearc-bls-data/best_year_insights.png"

In [None]:
## 3-3 population for a given year
def get_series_with_population(bls_df, pop_df, target_series_id, target_period):
    """Join filtered BLS data with population data on year."""
    
    # Filter BLS data for given series_id and period
    filtered_bls = bls_df.filter(
        (trim(col("series_id")) == target_series_id) &
        (trim(col("period")) == target_period)
    ).select("series_id", "year", "period", "value")
    
    print("\nFiltered BLS Data:")
    filtered_bls.show()

    # Trim column names in population_df (optional, for cleanliness)
    pop_df_cleaned = pop_df.select(
        trim(col("Year")).alias("year"),
        col("Population")
    )

    # Join on 'year'
    result = filtered_bls.join(
        pop_df_cleaned,
        on="year",
        how="left"
    ).select("series_id", "year", "period", "value", "Population")

    print("\nJoined BLS and Population Data:")
    result.show()

    return result
# Step: Generate the final report for series PRS30006032 and period Q01
series_with_population = get_series_with_population(
    bls_df_clean,
    population_df_clean,
    target_series_id="PRS30006032",
    target_period="Q01"
)
