# FOMC Data Pipeline - Productivity Analysis

This notebook loads BLS and DataUSA data from S3 and generates three reports:
1. Population statistics (mean & std dev, 2013-2018)
2. Best year by series_id
3. Series + Population join for PRS30006032 Q01

In [None]:
import json
import sys
sys.path.insert(0, '..')

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.appName('FOMC-Analysis').master('local[*]').getOrCreate()
print(f'Spark version: {spark.version}')

## Load Data from S3

In [None]:
from src.analytics.reports import load_population_from_s3, load_bls_from_s3

pop_df = load_population_from_s3(spark)
bls_df = load_bls_from_s3(spark)

print(f'Population records: {pop_df.count()}')
print(f'BLS records: {bls_df.count()}')

pop_df.show()
bls_df.show(5)

## Report 1: Population Statistics (2013-2018)

Calculate mean and standard deviation of annual US population for years 2013-2018 inclusive.

In [None]:
from src.analytics.reports import report_population_stats

pop_stats = report_population_stats(pop_df)
print(f"Population Statistics (2013-2018):")
print(f"  Mean:   {pop_stats['mean']:,.0f}")
print(f"  StdDev: {pop_stats['stddev']:,.0f}")

## Report 2: Best Year by Series ID

For every series_id, find the year with the largest sum of quarterly values.

In [None]:
from src.analytics.reports import report_best_year_by_series

best_year_df = report_best_year_by_series(bls_df)
print(f'Series with best years: {best_year_df.count()}')
best_year_df.orderBy('series_id').show(50, truncate=False)

## Report 3: Series + Population Join

Join BLS data (series_id=PRS30006032, period=Q01) with population data by year.

In [None]:
from src.analytics.reports import report_series_population_join

join_df = report_series_population_join(bls_df, pop_df)
join_df.orderBy('year').show(truncate=False)

---

## Data Quality & Sync History

Load sync logs from S3 and visualize data pipeline health.

In [None]:
import json
from datetime import datetime
from src.helpers.aws_client import get_client

s3 = get_client('s3')

def load_sync_log(bucket, prefix):
    """Load sync log JSONL from S3."""
    key = f'{prefix}sync_log.jsonl'
    try:
        response = s3.get_object(Bucket=bucket, Key=key)
        lines = response['Body'].read().decode().strip().split('\n')
        return [json.loads(line) for line in lines if line.strip()]
    except Exception as e:
        print(f'Could not load {bucket}/{key}: {e}')
        return []

bls_log = load_sync_log('fomc-bls-raw', '_sync_state/pr/')
datausa_log = load_sync_log('fomc-datausa-raw', '_sync_state/')

print(f'BLS sync log entries: {len(bls_log)}')
print(f'DataUSA sync log entries: {len(datausa_log)}')

In [None]:
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from collections import Counter, defaultdict

# Chart 1: Sync Timeline
fig, axes = plt.subplots(2, 2, figsize=(14, 10))

# Parse timestamps and actions
if bls_log:
    bls_dates = []
    bls_actions = Counter()
    for entry in bls_log:
        ts = entry.get('timestamp', '')
        action = entry.get('action', 'unknown')
        bls_actions[action] += 1
        if ts:
            bls_dates.append(datetime.fromisoformat(ts.replace('Z', '+00:00')))

    ax = axes[0, 0]
    ax.bar(bls_actions.keys(), bls_actions.values(), color=['#2ecc71', '#3498db', '#e74c3c', '#95a5a6'])
    ax.set_title('Chart 1: BLS Sync Actions')
    ax.set_ylabel('Count')
else:
    axes[0, 0].text(0.5, 0.5, 'No BLS sync data', ha='center', va='center')
    axes[0, 0].set_title('Chart 1: BLS Sync Actions')

# Chart 2: Data Freshness
ax = axes[0, 1]
sources = []
days_since = []
now = datetime.now()
for log, name in [(bls_log, 'BLS/pr'), (datausa_log, 'DataUSA')]:
    if log:
        last_ts = max(
            (datetime.fromisoformat(e['timestamp'].replace('Z', '+00:00')).replace(tzinfo=None)
             for e in log if 'timestamp' in e),
            default=None
        )
        if last_ts:
            sources.append(name)
            days_since.append((now - last_ts).days)

if sources:
    colors = ['#2ecc71' if d <= 7 else '#f39c12' if d <= 30 else '#e74c3c' for d in days_since]
    ax.barh(sources, days_since, color=colors)
    ax.set_xlabel('Days Since Last Sync')
    ax.set_title('Chart 2: Data Freshness')
else:
    ax.text(0.5, 0.5, 'No sync data', ha='center', va='center')
    ax.set_title('Chart 2: Data Freshness')

# Chart 3: File Size Changes (BLS)
ax = axes[1, 0]
if bls_log:
    file_sizes = defaultdict(list)
    for entry in bls_log:
        if 'bytes' in entry and 'file' in entry:
            file_sizes[entry['file']].append(entry['bytes'])
    if file_sizes:
        for fname, sizes in list(file_sizes.items())[:5]:
            ax.plot(range(len(sizes)), sizes, marker='o', label=fname[:20])
        ax.set_title('Chart 3: File Size Changes (BLS)')
        ax.set_ylabel('Bytes')
        ax.legend(fontsize=7)
    else:
        ax.text(0.5, 0.5, 'No file size data', ha='center', va='center')
        ax.set_title('Chart 3: File Size Changes')
else:
    ax.text(0.5, 0.5, 'No BLS sync data', ha='center', va='center')
    ax.set_title('Chart 3: File Size Changes')

# Chart 4: Update Frequency
ax = axes[1, 1]
if bls_log:
    hours = []
    for entry in bls_log:
        ts = entry.get('timestamp', '')
        if ts and entry.get('action') in ('updated', 'added'):
            dt = datetime.fromisoformat(ts.replace('Z', '+00:00'))
            hours.append(dt.hour)
    if hours:
        ax.hist(hours, bins=24, range=(0, 24), color='#3498db', edgecolor='white')
        ax.set_xlabel('Hour of Day (UTC)')
        ax.set_ylabel('Updates')
        ax.set_title('Chart 4: Update Frequency by Hour')
    else:
        ax.text(0.5, 0.5, 'No update data', ha='center', va='center')
        ax.set_title('Chart 4: Update Frequency')
else:
    ax.text(0.5, 0.5, 'No sync data', ha='center', va='center')
    ax.set_title('Chart 4: Update Frequency')

plt.tight_layout()
plt.savefig('sync_history.png', dpi=100, bbox_inches='tight')
plt.show()
print('Charts saved to sync_history.png')

In [None]:
# Summary Table: Current state of all data sources
def load_state(bucket, key):
    try:
        response = s3.get_object(Bucket=bucket, Key=key)
        return json.loads(response['Body'].read())
    except Exception:
        return {}

bls_state = load_state('fomc-bls-raw', '_sync_state/pr/latest_state.json')
datausa_state = load_state('fomc-datausa-raw', '_sync_state/latest_state.json')

print(f"{'Source':<15} {'Last Sync':<22} {'Files':<8} {'Status'}")
print('-' * 60)

bls_sync = bls_state.get('last_sync', 'N/A')
bls_files = len(bls_state.get('files', {}))
print(f"{'BLS/pr':<15} {str(bls_sync)[:19]:<22} {bls_files:<8} {'Current' if bls_files > 0 else 'No data'}")

du_sync = datausa_state.get('last_sync', 'N/A')
du_hash = datausa_state.get('content_hash', 'N/A')
print(f"{'DataUSA':<15} {str(du_sync)[:19]:<22} {'1':<8} {'Current' if du_hash != 'N/A' else 'No data'}")

In [None]:
spark.stop()
print('Analysis complete.')