In [9]:
!pip install boto3 pandas pyathena pyarrow



In [10]:
import boto3
import time
import s3fs
import json
import pandas as pd

In [11]:
# at this point, I will have predictions from last week for current data
# and current actuals
# need to merge the latest weekly data with the predictions from last week
# how to get the latest weekly_actuals?

In [12]:
# Initialize the Boto3 Athena client
athena_client = boto3.client('athena', region_name='us-east-2')

In [18]:
# combine historical and latest data
#sql_query = """
#SELECT * FROM (
#    SELECT item_id, year, week, date, label, new_cases
#    FROM weekly
#    UNION ALL
#    SELECT item_id, year, week, date, label, new_cases
#    FROM weekly_staging
#) AS combined_data
#"""

sql_query = """
SELECT item_id, year, week, date, label, new_cases 
FROM weekly
"""

# Specify the Athena database and S3 output location
database = 'cdc_nndss'
s3_output_constant = 's3://nndss/query_results/weekly_combined_data'

# Adjust the ResultConfiguration to use the constant output location
response = athena_client.start_query_execution(
    QueryString=sql_query,
    QueryExecutionContext={'Database': database},
    ResultConfiguration={'OutputLocation': s3_output_constant}
)

# Get the query execution ID
query_execution_id = response['QueryExecutionId']

# Function to check the query execution status
def wait_for_query_completion(client, query_id):
    while True:
        response = client.get_query_execution(QueryExecutionId=query_id)
        state = response['QueryExecution']['Status']['State']
        if state in ['SUCCEEDED', 'FAILED', 'CANCELLED']:
            return state
        time.sleep(5)

# Wait for the query to complete
query_state = wait_for_query_completion(athena_client, query_execution_id)
if query_state == 'SUCCEEDED':
    print(f"Query completed successfully. Results are stored in the directory: {s3_output_constant}/{query_execution_id}/")
elif query_state in ['FAILED', 'CANCELLED']:
    print(f"Query did not complete successfully. State: {query_state}")

Query completed successfully. Results are stored in the directory: s3://nndss/query_results/weekly_combined_data/f745c878-7b09-44c1-8aff-8f848b122886/


In [19]:
result_file_path = f"{s3_output_constant}/{query_execution_id}.csv"
df = pd.read_csv(result_file_path)

df.head()

Unnamed: 0,item_id,year,week,date,label,new_cases
0,ALABAMA_Anthrax,2022,1,2022-01-03 00:00:00.000,Anthrax,
1,ALABAMA_Anthrax,2022,2,2022-01-10 00:00:00.000,Anthrax,
2,ALABAMA_Anthrax,2022,3,2022-01-17 00:00:00.000,Anthrax,
3,ALABAMA_Anthrax,2022,4,2022-01-24 00:00:00.000,Anthrax,
4,ALABAMA_Anthrax,2022,5,2022-01-31 00:00:00.000,Anthrax,


In [20]:
# Convert 'date' to the appropriate datetime format if not already
df['date'] = pd.to_datetime(df['date'])

# Sort the DataFrame by 'item_id' and 'date' to ensure the order of the time series
df.sort_values(by=['item_id', 'date'], inplace=True)

In [23]:
#df = df[df.date<pd.to_datetime("2024-03-11")]

In [24]:
df[df.item_id=='WYOMING_Zika virus disease, non-congenital']

Unnamed: 0,item_id,year,week,date,label,new_cases
697452,"WYOMING_Zika virus disease, non-congenital",2022,1,2022-01-03,"Zika virus disease, non-congenital",
697453,"WYOMING_Zika virus disease, non-congenital",2022,2,2022-01-10,"Zika virus disease, non-congenital",
697454,"WYOMING_Zika virus disease, non-congenital",2022,3,2022-01-17,"Zika virus disease, non-congenital",
697455,"WYOMING_Zika virus disease, non-congenital",2022,4,2022-01-24,"Zika virus disease, non-congenital",
697456,"WYOMING_Zika virus disease, non-congenital",2022,5,2022-01-31,"Zika virus disease, non-congenital",
...,...,...,...,...,...,...
697561,"WYOMING_Zika virus disease, non-congenital",2024,6,2024-02-05,"Zika virus disease, non-congenital",
697562,"WYOMING_Zika virus disease, non-congenital",2024,7,2024-02-12,"Zika virus disease, non-congenital",
697563,"WYOMING_Zika virus disease, non-congenital",2024,8,2024-02-19,"Zika virus disease, non-congenital",
697564,"WYOMING_Zika virus disease, non-congenital",2024,9,2024-02-26,"Zika virus disease, non-congenital",


In [25]:
s3 = boto3.resource('s3')
output_bucket = 'nndss'
output_key = 'deepar_input_data/deepar_dataset.jsonl'
s3_output_path = f's3://{output_bucket}/{output_key}'

In [26]:
# A function to convert NaN values to "NaN" string and others to float
def convert_target(target_series):
    return [float(x) if pd.notna(x) else "NaN" for x in target_series]


In [27]:
time_series_mapping = {}  # To store the mapping of item_id to its index in the JSON Lines file
json_lines = []  # To store the JSON Lines

for idx, (item_id, group) in enumerate(df.groupby('item_id')):
    time_series = {
        "start": str(group['date'].dt.date.iloc[0]),  # Assuming the 'date' column is already a datetime
        "target": convert_target(group['new_cases']),
    }
    json_lines.append(json.dumps(time_series))
    time_series_mapping[item_id] = idx  # Map item_id to its index in the JSON Lines file

# Convert JSON Lines list to a single string
json_lines_str = "\n".join(json_lines)

# Define S3 keys for the JSON Lines file and the mapping file
json_lines_key = 'deepar_input_data/deepar_dataset.jsonl'
mapping_key = 'deepar_input_data/time_series_mapping.json'


In [28]:
# Save the JSON Lines file to S3
s3.Object(output_bucket, json_lines_key).put(Body=json_lines_str)

# Save the mapping file to S3
mapping_str = json.dumps(time_series_mapping)
s3.Object(output_bucket, mapping_key).put(Body=mapping_str)

print(f"JSON Lines file saved to s3://{output_bucket}/{json_lines_key}")
print(f"Mapping file saved to s3://{output_bucket}/{mapping_key}")

JSON Lines file saved to s3://nndss/deepar_input_data/deepar_dataset.jsonl
Mapping file saved to s3://nndss/deepar_input_data/time_series_mapping.json


In [29]:
print(time_series)

{'start': '2022-01-03', 'target': ['NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN']}


In [35]:
import sagemaker
from sagemaker import image_uris, Session
from sagemaker.estimator import Estimator
from sagemaker.session import get_execution_role

In [36]:
sagemaker_session = sagemaker.Session()
region = sagemaker_session.boto_region_name
role = sagemaker.get_execution_role()  # IAM role to use by SageMaker

In [37]:
container = image_uris.retrieve('forecasting-deepar', region)

In [38]:
# Configure the estimator
deepar = Estimator(
    container,
    role,
    instance_count=1,
    instance_type='ml.c4.xlarge',
    output_path=f's3://{output_bucket}/deepar/output',
    sagemaker_session=sagemaker_session,
)

In [39]:
deepar.set_hyperparameters(
    time_freq='W',
    epochs=20,
    early_stopping_patience=10,
    prediction_length=1,
    context_length=1,
    num_cells=40,
    num_layers=2,
    mini_batch_size=64,
    learning_rate=0.001,
    dropout_rate=0.05,
    likelihood='negative-binomial'
)

In [40]:
# Specify data channels
data_channels = {
    'train': f's3://{output_bucket}/deepar_input_data/deepar_dataset.jsonl',
}

In [41]:
# Start training
deepar.fit(inputs=data_channels)

INFO:sagemaker:Creating training-job with name: forecasting-deepar-2024-03-23-17-21-42-127


2024-03-23 17:21:42 Starting - Starting the training job...
2024-03-23 17:21:57 Starting - Preparing the instances for training......
2024-03-23 17:22:51 Downloading - Downloading input data...
2024-03-23 17:23:21 Downloading - Downloading the training image...............
2024-03-23 17:26:12 Training - Training image download completed. Training in progress...[34mDocker entrypoint called with argument(s): train[0m
[34mRunning default environment configuration script[0m
[34mRunning custom environment configuration script[0m
  if num_device is 1 and 'dist' not in kvstore:[0m
[34m[03/23/2024 17:26:24 INFO 140470849898304] Reading default configuration from /opt/amazon/lib/python3.8/site-packages/algorithm/resources/default-input.json: {'_kvstore': 'auto', '_num_gpus': 'auto', '_num_kv_servers': 'auto', '_tuning_objective_metric': '', 'cardinality': 'auto', 'dropout_rate': '0.10', 'early_stopping_patience': '', 'embedding_dimension': '10', 'learning_rate': '0.001', 'likelihood': '

In [42]:
s3 = boto3.client('s3')
bucket_name = 'nndss' 
key = 'deepar_input_data/deepar_dataset.jsonl'  

obj = s3.get_object(Bucket=bucket_name, Key=key)
data = obj['Body'].read().decode('utf-8')
deepar_training = data.strip().split('\n')
deepar_training = [json.loads(line) for line in deepar_training]


In [None]:
from sagemaker.serializers import JSONSerializer
from sagemaker.deserializers import JSONDeserializer

predictor = deepar.deploy(
    initial_instance_count=1,  # Number of instances to support the endpoint
    instance_type='ml.m4.xlarge',  # Type of instance to run the endpoint
    serializer=JSONSerializer(),  # Specify how to serialize the input data
    deserializer=JSONDeserializer()  # Specify how to deserialize the prediction output
)


INFO:sagemaker:Creating model with name: forecasting-deepar-2024-03-23-17-27-57-227
INFO:sagemaker:Creating endpoint-config with name forecasting-deepar-2024-03-23-17-27-57-227
INFO:sagemaker:Creating endpoint with name forecasting-deepar-2024-03-23-17-27-57-227


In [None]:
# Preparing predictor input
predictor_input = {
    "instances": deepar_training,
    "configuration": {
        "num_samples": 100,
        "output_types": ["mean", "quantiles"],
        "quantiles": ["0.01", "0.5", "0.99"]
    }
}


In [None]:
prediction = predictor.predict(predictor_input)

In [None]:
predictor.delete_endpoint()

In [None]:
from datetime import datetime, timedelta

def find_max_date(deepar_training):
    latest_dates = []
    for series in deepar_training:
        start_date = datetime.strptime(series['start'], "%Y-%m-%d")
        # Assuming weekly frequency, calculate the end date of each series
        end_date = start_date + timedelta(weeks=len(series['target']) - 1)
        latest_dates.append(end_date)
    
    # Find the maximum date across all series, which is the last known date in the dataset
    max_date = max(latest_dates)
    return max_date

# Use the function to find the last known date in training data
last_known_date = find_max_date(deepar_training)

# The prediction_for_date is the next time period (e.g., the next week) after the last known date
prediction_for_date = pd.to_datetime(last_known_date + timedelta(weeks=1))
print(f"The prediction is for the date: {prediction_for_date.strftime('%Y-%m-%d')}")

In [None]:
import pandas as pd
import numpy as np

# Initialize a list to store prediction data
prediction_data = []

# Iterate through each prediction and its corresponding item_id
for idx, pred in enumerate(prediction['predictions']):
    # Retrieve the item_id using the index
    item_id = list(time_series_mapping.keys())[list(time_series_mapping.values()).index(idx)]
    
    # Extract quantiles
    pred_lower = pred['quantiles']['0.01']
    pred_upper = pred['quantiles']['0.99']
    pred_median = pred['quantiles']['0.5']
    pred_mean = pred['mean'] 
    
    # Append the data to the list
    prediction_data.append({
        'item_id': item_id,
        'pred_mean': pred_mean,
        'pred_median':pred_median,
        'pred_lower': pred_lower,
        'pred_upper': pred_upper
    })

# Convert the list to a DataFrame
prediction_df = pd.DataFrame(prediction_data)
prediction_df['prediction_for_date'] = prediction_for_date
prediction_df['pred_mean'] = prediction_df['pred_mean'].apply(lambda x: x[0] if x else None)
prediction_df['pred_median'] = prediction_df['pred_median'].apply(lambda x: x[0] if x else None)
prediction_df['pred_lower'] = prediction_df['pred_lower'].apply(lambda x: x[0] if x else None)
prediction_df['pred_upper'] = prediction_df['pred_upper'].apply(lambda x: x[0] if x else None)



In [None]:
prediction_df.head()

In [None]:
prediction_df[prediction_df.pred_mean>1]

In [None]:
len(prediction_df)

In [None]:
prediction_df.dtypes

In [None]:
# Specify your S3 bucket and path
bucket_name = 'nndss'
folder_path = 'predictions'
file_name = f"weekly_predictions_{prediction_for_date.strftime('%Y-%m-%d')}.parquet"
s3_path = f's3://{bucket_name}/{folder_path}/{file_name}'

# Save DataFrame to Parquet directly in S3
prediction_df.to_parquet(s3_path, engine='pyarrow', index=False)

In [15]:
# here I would dump the latest weekly data (from weekly_staging) into weekly, and delete the file in weekly_staging