1. run in shell: `python create_env.py`
    * dependencies: `install_pkgs.py`, `requirements-aws.txt`, `requirements.txt`
    * output: `rearc_project`
2. run in shell: `source create_kernel.zsh`
    * dependencies: `jupyter-lab`
3. select kernel: `rearc_project`

In [1]:
import os
import json
import boto3
import hashlib
import requests
import numpy as np
import pandas as pd
from pandas import json_normalize
from urllib.parse import urljoin, urlparse
from bs4 import BeautifulSoup
from botocore.exceptions import ClientError

In [2]:
from aws_cdk import core
from aws_cdk import aws_lambda as _lambda
from aws_cdk import aws_sqs as sqs
from aws_cdk import aws_s3 as s3
from aws_cdk import aws_events as events
from aws_cdk import aws_events_targets as targets
from aws_cdk import aws_iam as iam
from aws_cdk import aws_s3_notifications
from aws_cdk.aws_s3_notifications import SqsDestination

##-- Initialize Spark session - OPTIONAL
# os.environ['PYARROW_IGNORE_TIMEZONE'] = '1'
# from pyspark.sql import SparkSession
# import pyspark.pandas as ps
# from pyspark.sql import functions as F
# from pyspark.sql.window import Window
# spark = SparkSession.builder.appName("LoadData").getOrCreate()        
# spark.sparkContext.setLogLevel("ERROR")

In [3]:
##------------------------------------------##
##-- Load AWS Configurations
##------------------------------------------##
info = (lambda f: json.load(f))(open("info.txt", 'r'))
    
AWS_ACCESS_KEY = info["secrets"]["AWS_ACCESS_KEY"]
AWS_SECRET_ACCESS = info["secrets"]["AWS_SECRET_ACCESS"]
AWS_ARN = info["secrets"]["AWS_ARN"]
AWS_REGION = info["secrets"]["AWS_REGION"]

S3_BUCKET = info['pipeline']["S3_BUCKET"]
SQS_QUEUE = info['pipeline']["SQS_QUEUE"] 
LAMBDA_P1_P2 = info['pipeline']["LAMBDA_P1_P2"]
LAMBDA_P3 = info['pipeline']["LAMBDA_P3"]

##------------------------------------------##
##-- Initialize AWS Clients
##------------------------------------------##

s3_client = boto3.client(
    's3',
    aws_access_key_id=AWS_ACCESS_KEY,
    aws_secret_access_key=AWS_SECRET_ACCESS,
    region_name=AWS_REGION
)

lambda_client = boto3.client(
    'lambda',
    aws_access_key_id=AWS_ACCESS_KEY,
    aws_secret_access_key=AWS_SECRET_ACCESS,
    region_name=AWS_REGION
)

sqs_client = boto3.client(
    'sqs',
    aws_access_key_id=AWS_ACCESS_KEY,
    aws_secret_access_key=AWS_SECRET_ACCESS,
    region_name=AWS_REGION
)

### Part 1: AWS S3 & Sourcing Datasets

1. Republish this open dataset (https://download.bls.gov/pub/time.series/pr) in Amazon S3 and share with us a link.
    * You may run into 403 Forbidden errors as you test accessing this data. There is a way to comply with the BLS data access policies and regain access to fetch this data programmatically - we have included some hints as to how to do this at the bottom of this README in the Q/A section.
2. Script this process so the files in the S3 bucket are kept in sync with the source when data on the website is updated, added, or deleted.
    * Don't rely on hard-coded names - the script should be able to handle added or removed files.
    * Ensure the script doesn't upload the same file more than once.


In [4]:
def md5(fname):
    hash_md5 = hashlib.md5()
    with open(fname, "rb") as f:
        for chunk in iter(lambda: f.read(4096), b""):
            hash_md5.update(chunk)
    return hash_md5.hexdigest()

In [5]:
##-- Function to download and upload files
def fetch_and_upload(file_name):
    file_url = urljoin(base_url, file_name)
    r = requests.get(file_url, headers={'User-Agent': 'test'})
    local_file = f"/tmp/{file_name}"

    with open(local_file, 'wb') as f:
        f.write(r.content)

    current_md5 = md5(local_file)
    
    try:
        s3_md5 = s3_client.head_object(Bucket=S3_BUCKET, Key=file_name)['ETag'][1:-1]
        
        if current_md5 != s3_md5:
            s3_client.upload_file(local_file, S3_BUCKET, file_name)
            print(f"Uploaded {file_name} to s3://{S3_BUCKET}/")
        else:
            # print(f"File {file_name} already exists")
            print(f"File {file_name} already exists in s3://{S3_BUCKET}/")

    except ClientError as e:
        if e.response['Error']['Code'] in ('403', '404'):
            s3_client.upload_file(local_file, S3_BUCKET, file_name)
            print(f"Uploaded {file_name} to s3://{S3_BUCKET}/")

    os.remove(local_file)

##-- Fetch the webpage with file links
base_url = "https://download.bls.gov/pub/time.series/pr/"
r = requests.get(base_url, headers={'User-Agent': 'test'})
soup = BeautifulSoup(r.text, 'html.parser')

##-- Loop through and download/upload each file
for link in soup.find_all('a'):
    file_name = os.path.basename(urlparse(link.get('href')).path)
    if file_name:
        # print(file_name)
        fetch_and_upload(file_name)

File pr.class already exists in s3://bls-data-sample-rearc/
File pr.contacts already exists in s3://bls-data-sample-rearc/
File pr.data.0.Current already exists in s3://bls-data-sample-rearc/
File pr.data.1.AllData already exists in s3://bls-data-sample-rearc/
File pr.duration already exists in s3://bls-data-sample-rearc/
File pr.footnote already exists in s3://bls-data-sample-rearc/
File pr.measure already exists in s3://bls-data-sample-rearc/
File pr.period already exists in s3://bls-data-sample-rearc/
File pr.seasonal already exists in s3://bls-data-sample-rearc/
File pr.sector already exists in s3://bls-data-sample-rearc/
File pr.series already exists in s3://bls-data-sample-rearc/
File pr.txt already exists in s3://bls-data-sample-rearc/


### Part 2: APIs

1. Create a script that will fetch data from this API (https://datausa.io/api/data?drilldowns=Nation&measures=Population). You can read the documentation here

In [6]:
##-- Fetch data from API
api_url = "https://datausa.io/api/data?drilldowns=Nation&measures=Population"
response = requests.get(api_url)
data = response.json()

In [7]:
##-- Save to temporary JSON file
with open("/tmp/population_data.json", "w") as f:
    json.dump(data, f)

2. Save the result of this API call as a JSON file in S3

In [8]:
##-- Upload to S3
s3_client.upload_file("/tmp/population_data.json", S3_BUCKET, "population_data.json")
print(f"Data uploaded to s3://{S3_BUCKET}/population_data.json")

Data uploaded to s3://bls-data-sample-rearc/population_data.json


### Part 3: Data Analytics

0. Load both the CSV file from Part 1 pr.data.0.Current and the JSON file from Part 2 as dataframes (Spark, Pyspark, Pandas, Koalas, etc).

In [9]:
##-- Download file from S3 to local
s3_client.download_file(f'{S3_BUCKET}', 'population_data.json', 'population_data.json')

In [10]:
##-- Load JSON file and flatten
read_json = lambda filename: json.load(open(filename, 'r'))
json_obj = read_json('population_data.json')
flattened_data = json_obj['data']
data_types_p2 = {'id':'str', 'nation':'str', 'slug_nation':'str', 'id_year':'int','year':'int', 'population':'int'}
col_mapping_p2 = {'ID Nation':'id', 'Nation':'nation', 'Slug Nation':'slug_nation','ID Year':'id_year', 'Year':'year', 'Population':'population'}
df_p2 = pd.DataFrame(flattened_data)
df_p2.rename(columns=col_mapping_p2, inplace=True)
df_p2 = df_p2.astype(data_types_p2)
df_p2.head()

Unnamed: 0,id,nation,id_year,year,population,slug_nation
0,01000US,United States,2020,2020,326569308,united-states
1,01000US,United States,2019,2019,324697795,united-states
2,01000US,United States,2018,2018,322903030,united-states
3,01000US,United States,2017,2017,321004407,united-states
4,01000US,United States,2016,2016,318558162,united-states


1. Using the dataframe from the population data API (Part 2), generate the mean and the standard deviation of the annual US population across the years [2013, 2018] inclusive.

In [11]:
##-- Filter DataFrame: 2013-2018
filtered_df = df_p2[df_p2['year'].isin([year for year in range(2013, 2019)])]

##-- Calculate mean & standard deviation
mean_pop = filtered_df['population'].mean()
std_pop = filtered_df['population'].std()

print(f"Mean Population: {mean_pop:.2f}")
print(f"Standard Deviation: {std_pop:.2f}")

Mean Population: 317437383.00
Standard Deviation: 4257089.54


2. Using the dataframe from the time series (Part 1), for every series_id, find the best year: the year with the max/largest sum of "value" for all quarters in that year. Generate a report with each series id, the best year for that series, and the summed value for that year. 

In [12]:
##-- Download file from S3 to local
s3_client.download_file(f'{S3_BUCKET}', 'pr.data.0.Current', 'current_localfile.txt')

In [13]:
##-- Load CSV file 
data_types_p1 = {'series_id':'str', 'year':'int', 'period':'str', 'value':'float', 'footnote_codes':'str'}
df_p1 = pd.read_csv('current_localfile.txt', sep="\t", dtype=data_types_p1)
df_p1.rename(columns=lambda x: x.strip(), inplace=True)
df_p1 = df_p1.map(lambda x: x.replace(' ', '') if isinstance(x, str) else x)
df_p1.head()

Unnamed: 0,series_id,year,period,value,footnote_codes
0,PRS30006011,1995,Q01,2.6,
1,PRS30006011,1995,Q02,2.1,
2,PRS30006011,1995,Q03,0.9,
3,PRS30006011,1995,Q04,0.1,
4,PRS30006011,1995,Q05,1.4,


In [14]:
##-- Group by 'series_id', sum the 'value' and find the max
grouped_df = df_p1.groupby('series_id')['value'].sum().reset_index()
max_value_row = grouped_df[grouped_df['value'] == grouped_df['value'].max()]
grouped_df.head()

Unnamed: 0,series_id,value
0,PRS30006011,-127.0
1,PRS30006012,-124.3
2,PRS30006013,16590.938
3,PRS30006021,-6.3
4,PRS30006022,-5.4


3. Generate a report that will provide the value for series_id = PRS30006032 and period = Q01 and the population for that given year (if available in the population dataset)

In [15]:
##-- filter dataframe 1
filtered_df_p1 = df_p1[(df_p1['series_id'] == 'PRS30006032') & (df_p1['period'] == 'Q01')]
filtered_df_p1.head()

Unnamed: 0,series_id,year,period,value,footnote_codes
994,PRS30006032,1995,Q01,0.0,
999,PRS30006032,1996,Q01,-4.4,
1004,PRS30006032,1997,Q01,2.7,
1009,PRS30006032,1998,Q01,1.0,
1014,PRS30006032,1999,Q01,-4.1,


In [16]:
##-- filter dataframe 2
filtered_df_p2 = df_p2[ df_p2['year'].isin(filtered_df_p1['year']) ]
filtered_df_p2.head()

Unnamed: 0,id,nation,id_year,year,population,slug_nation
0,01000US,United States,2020,2020,326569308,united-states
1,01000US,United States,2019,2019,324697795,united-states
2,01000US,United States,2018,2018,322903030,united-states
3,01000US,United States,2017,2017,321004407,united-states
4,01000US,United States,2016,2016,318558162,united-states


In [17]:
##-- Merge dataframes 1 & 2
result = pd.merge(filtered_df_p1, filtered_df_p2, left_on='year', right_on='year', how='left')
report_df = result[['series_id', 'year', 'value', 'population']]
display(report_df)

Unnamed: 0,series_id,year,value,population
0,PRS30006032,1995,0.0,
1,PRS30006032,1996,-4.4,
2,PRS30006032,1997,2.7,
3,PRS30006032,1998,1.0,
4,PRS30006032,1999,-4.1,
5,PRS30006032,2000,0.1,
6,PRS30006032,2001,-6.0,
7,PRS30006032,2002,-7.0,
8,PRS30006032,2003,-5.7,
9,PRS30006032,2004,2.4,


### Part 4: Infrastructure as Code & Data Pipeline with AWS CDK

1. run in shell: `python create_env.py`
    * dependencies: `install_pkgs.py`, `requirements-aws.txt`, `requirements.txt`
    * output: `rearc_project`
2. run in shell: `source create_kernel.zsh`
    * dependencies: `jupyter-lab`
3. run in shell: `python main.py`
    * dependencies: 
        * `info.txt`
        * `bls-lambda-p1-p2-sample-rearc.py`
        * `bls-lambda-p1-p2-sample-rearc.zip`
        * `bls-lambda-p3-sample-rearc.py`
        * `bls-lambda-p3-sample-rearc.zip`
    * output: 
        * `population_data.json`
        * `current_localfile.txt`