# prepare environment

In [1]:
import sagemaker
import boto3
sess = sagemaker.Session()
# sagemaker session bucket -> used for uploading data, models and logs
# sagemaker will automatically create this bucket if it not exists
sagemaker_session_bucket=None
if sagemaker_session_bucket is None and sess is not None:
    # set to default bucket if a bucket name is not given
    sagemaker_session_bucket = sess.default_bucket()

try:
    role = sagemaker.get_execution_role()
except ValueError:
    iam = boto3.client('iam')
    role = iam.get_role(RoleName='sagemaker_execution_role')['Role']['Arn']

sess = sagemaker.Session(default_bucket=sagemaker_session_bucket)

print(f"sagemaker role arn: {role}")
print(f"sagemaker bucket: {sess.default_bucket()}")
print(f"sagemaker session region: {sess.boto_region_name}")

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/ec2-user/.config/sagemaker/config.yaml
sagemaker role arn: arn:aws:iam::452145973879:role/sagemaker_full_access
sagemaker bucket: sagemaker-us-west-2-452145973879
sagemaker session region: us-west-2


# prepare model and upload the model artifact

## create a new model.tar.gz file in the file 

In [2]:
!pwd

/home/ec2-user/SageMaker/efs/Projects/skillful_nowcasting/sagemaker-deploy


In [3]:
%cd dgmr

/home/ec2-user/SageMaker/efs/Projects/skillful_nowcasting/sagemaker-deploy/dgmr


  self.shell.db['dhist'] = compress_dhist(dhist)[-100:]


In [4]:
!rm -rf .ipynb_checkpoints

In [5]:
!tar zcvf model.tar.gz * 

config.json
inference.py
model.tar.gz
pytorch_model.bin
README.md


In [6]:
%cd ../
!pwd

/home/ec2-user/SageMaker/efs/Projects/skillful_nowcasting/sagemaker-deploy
/home/ec2-user/SageMaker/efs/Projects/skillful_nowcasting/sagemaker-deploy


## upload the new model with inference code

In [7]:
local_path = "dgmr"
model_id = "dgmr"
# model_id=repository.split("/")[-1]
s3_location=f"s3://{sess.default_bucket()}/custom_inference/{model_id}/model.tar.gz"

In [8]:
!aws s3 cp $local_path/model.tar.gz $s3_location

upload: dgmr/model.tar.gz to s3://sagemaker-us-west-2-452145973879/custom_inference/dgmr/model.tar.gz


## deploy

In [9]:
import boto3
import sagemaker
from datetime import datetime

current_datetime = datetime.now().strftime('%Y-%m-%d-%H-%M-%S')

sagemaker_client = boto3.client(service_name="sagemaker")
role = sagemaker.get_execution_role()

model_name = f"nowcast-dgmr-{current_datetime}"

primary_container = {
    "Image": '452145973879.dkr.ecr.us-west-2.amazonaws.com/dgmr_py310:latest',
    "ModelDataUrl": s3_location
}

create_model_response = sagemaker_client.create_model(
    ModelName=model_name,
    ExecutionRoleArn=role,
    PrimaryContainer=primary_container)

In [10]:
create_model_response

{'ModelArn': 'arn:aws:sagemaker:us-west-2:452145973879:model/nowcast-dgmr-2024-05-20-13-23-09',
 'ResponseMetadata': {'RequestId': 'b6816e4d-87a2-44fe-9894-57404bc3dae0',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'b6816e4d-87a2-44fe-9894-57404bc3dae0',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '94',
   'date': 'Mon, 20 May 2024 13:23:10 GMT'},
  'RetryAttempts': 0}}

# Create an Endpoint Configuration

In [11]:
endpoint_config_name = f"nowcast-dgmr-config-{current_datetime}"

sagemaker_client.create_endpoint_config(
    EndpointConfigName=endpoint_config_name,
    ProductionVariants=[{
        "InstanceType": "ml.g5.2xlarge",
        "InitialVariantWeight": 1,
        "InitialInstanceCount": 1,
        "ModelName": model_name,
        "VariantName": "AllTraffic"}])

{'EndpointConfigArn': 'arn:aws:sagemaker:us-west-2:452145973879:endpoint-config/nowcast-dgmr-config-2024-05-20-13-23-09',
 'ResponseMetadata': {'RequestId': 'cc99d255-0000-4e14-a051-fda0b617de61',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'cc99d255-0000-4e14-a051-fda0b617de61',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '120',
   'date': 'Mon, 20 May 2024 13:23:10 GMT'},
  'RetryAttempts': 0}}

# Create an Endpoint

In [12]:
endpoint_name = f"nowcast-dgmr-endpoint-{current_datetime}"

sagemaker_client.create_endpoint(
    EndpointName=endpoint_name,
    EndpointConfigName=endpoint_config_name)

{'EndpointArn': 'arn:aws:sagemaker:us-west-2:452145973879:endpoint/nowcast-dgmr-endpoint-2024-05-20-13-23-09',
 'ResponseMetadata': {'RequestId': 'e07a60f1-7eb0-41c8-a3fa-4f9da29cf236',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'e07a60f1-7eb0-41c8-a3fa-4f9da29cf236',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '109',
   'date': 'Mon, 20 May 2024 13:23:10 GMT'},
  'RetryAttempts': 0}}

In [13]:
response = sagemaker_client.describe_endpoint(EndpointName=endpoint_name)
response["EndpointStatus"]

'Creating'

# Invoke the Endpoint

In [14]:
import zipfile
import os
import re
from functools import cmp_to_key
from datetime import datetime, timedelta
from PIL import Image
import json
import random
# import pickle
import webdataset as wds
import numpy as np
from tqdm import tqdm
import pandas as pd
import time
import uuid

data_dir = "../data/zuimei-radar"

def get_file_paths(directory):
    file_paths = []
    for root, dirs, files in os.walk(directory):
        for file in files:
            if file.endswith('.zip'):
                file_path = os.path.join(root, file)
                file_paths.append(file_path)
                
                # try:
                #     with zipfile.ZipFile(file_path, 'r') as zip_file:
                #         # Check if the ZIP file is complete
                #         bad_file = zip_file.testzip()
                #         if bad_file is None:
                #             # The ZIP file is complete
                #             file_paths.append(file_path)
                #         else:
                #             print(f"Warning: '{file_path}' is a corrupted ZIP file.")
                # except zipfile.BadZipFile:
                #     print(f"Warning: '{file_path}' is not a valid ZIP file.")
                    
    return file_paths


def sort_key(file_path):
    match = re.search(r'BABJ_(\d+)_P', file_path)
    if match:
        return int(match.group(1))
    else:
        return float('inf')


# Example usage
file_paths = get_file_paths(data_dir)
print(f"number of files: {len(file_paths)}")
print("before sort: \n", file_paths[:5])

file_paths.sort(key=cmp_to_key(lambda x, y: (sort_key(x) > sort_key(y)) - (sort_key(x) < sort_key(y))))

print("after sort: \n", file_paths[:5])

# get consecutive time periods
def get_time_from_path(file_path, return_type='datetime'):
    match = re.search(r'BABJ_(\d+)_P', file_path)
    if match:
        time_str = match.group(1)
        if return_type == 'datetime':
            return datetime.strptime(time_str, '%Y%m%d%H%M%S')
        else:
            return time_str
    else:
        return None

times = [get_time_from_path(path) for path in file_paths]
times.sort()

periods = []
current_period = []

for i in range(len(times)):
    current_time = times[i]
    if i == 0 or (times[i] - times[i - 1]).total_seconds() >= 355 and (times[i] - times[i - 1]).total_seconds() <= 365:
        current_period.append(file_paths[i])
    else:
        periods.append(current_period)
        current_period = [file_paths[i]]

if current_period:
    periods.append(current_period)

print(f"Periods of consecutive file paths: {len(periods)}")
# for period in periods[:5]:
#     print(period)

def read_data(file_path):
    # Open the ZIP file
    with zipfile.ZipFile(file_path, 'r') as zip_file:
        # Get the name of the file inside the ZIP archive
        file_name = zip_file.namelist()[0]

        # Open the file inside the ZIP archive
        with zip_file.open(file_name) as file:
            # Load the NumPy array from the file
            data = np.loadtxt(file)
    
    return data

        
# Print the loaded NumPy array
data = read_data(file_paths[0])
print(type(data), data.shape)

count = sum([1 for period in periods if len(period)>=24])

lens = [len(period) for period in periods if len(period)>=0]
print(count)
print(lens)

def read_data(file_path):
    # Open the ZIP file
    with zipfile.ZipFile(file_path, 'r') as zip_file:
        # Get the name of the file inside the ZIP archive
        file_name = zip_file.namelist()[0]

        # Open the file inside the ZIP archive
        with zip_file.open(file_name) as file:
            # Load the NumPy array from the file
            data = np.loadtxt(file)
    
    return data


def read_frames(file_paths, vmin=0, vmax=75):
    
    frames = []
    for file_path in file_paths:
        try:
            data = read_data(file_path)
            frames.append(data)
        except Exception as e:
            print(f"{e}: {file_path}")
    
    frames = np.stack(frames, axis=0)
    frames[np.isnan(frames)] = 0
    frames[frames>vmax] = vmax
    frames[frames<vmin] = vmin
    frames = frames.astype(np.float32)
    
    return frames

number of files: 1084
before sort: 
 ['../data/zuimei-radar/20240305/Z_RADA_C_BABJ_20240305112414_P_ACHN.QREF.20240305.111800.bin.zip', '../data/zuimei-radar/20240305/Z_RADA_C_BABJ_20240305043016_P_ACHN.QREF.20240305.042400.bin.zip', '../data/zuimei-radar/20240305/Z_RADA_C_BABJ_20240304192416_P_ACHN.QREF.20240304.191800.bin.zip', '../data/zuimei-radar/20240305/Z_RADA_C_BABJ_20240305041216_P_ACHN.QREF.20240305.040600.bin.zip', '../data/zuimei-radar/20240305/Z_RADA_C_BABJ_20240305141214_P_ACHN.QREF.20240305.140600.bin.zip']
after sort: 
 ['../data/zuimei-radar/20240301/Z_RADA_C_BABJ_20240229160014_P_ACHN.QREF.20240229.155400.bin.zip', '../data/zuimei-radar/20240301/Z_RADA_C_BABJ_20240229160614_P_ACHN.QREF.20240229.160000.bin.zip', '../data/zuimei-radar/20240301/Z_RADA_C_BABJ_20240229161214_P_ACHN.QREF.20240229.160600.bin.zip', '../data/zuimei-radar/20240301/Z_RADA_C_BABJ_20240229161814_P_ACHN.QREF.20240229.161200.bin.zip', '../data/zuimei-radar/20240301/Z_RADA_C_BABJ_20240229162415_P_ACH

In [15]:
num_total_frames = 24
period_frames_raw = read_frames(periods[5][:num_total_frames])
_, width_raw, height_raw = period_frames_raw.shape
period_frames_raw.shape

(24, 4200, 6200)

In [16]:
def pad_frames(arr, base_num=32):
    # print("Original array shape:", arr.shape)

    # Calculate the padding needed for each dimension
    pad_dim1 = (base_num - arr.shape[1] % base_num) % base_num
    pad_dim2 = (base_num - arr.shape[2] % base_num) % base_num

    # Calculate the padding amount for each side
    pad_width = ((0, 0), (0, pad_dim1), (0, pad_dim2))

    # Pad the array with zeros
    padded_arr = np.pad(arr, pad_width, mode='constant', constant_values=0)
    padded_arr = padded_arr.astype(np.float32)
    
    return padded_arr

In [17]:
period_frames_pad = pad_frames(period_frames_raw)
period_frames_pad.shape

(24, 4224, 6208)

In [18]:
import copy

period_frames = period_frames_pad

In [19]:
num_input_frames = 4
num_forecast_frames = 20

input_frames = period_frames[:num_input_frames]
target_frames = period_frames[num_input_frames:num_input_frames+num_forecast_frames]

print(input_frames.shape, target_frames.shape)

(4, 4224, 6208) (20, 4224, 6208)


In [20]:
width, height = input_frames.shape[1], input_frames.shape[2]
print(f"width: {width}, height: {height}")

num_subframe = 4

width: 4224, height: 6208


In [21]:
endpoint_name

'nowcast-dgmr-endpoint-2024-05-20-13-23-09'

In [23]:
import torch
import json

predictor = sagemaker.predictor.Predictor(endpoint_name=endpoint_name)
# predictor.serializer = DataSerializer(content_type="image/x-image")
# Set the content type to 'application/json'
predictor.content_type = 'application/json'
# predictor.content_type = 'application/x-npy'

payload = {
    "input_frames": input_frames.tolist(), 
    "forecast_steps": num_forecast_frames,
    "latent_stack_shape": (8, width//num_subframe//32, height//32),
}

# Serialize the payload to JSON
serialized_payload = json.dumps(payload)

# serialized_payload = json.dumps({})
# Make the prediction
response = predictor.predict(serialized_payload)

# Deserialize the response
response_dict = json.loads(response.decode('utf-8'))

print(response_dict)

# predictor.predict(payload)

SSLError: SSL validation failed for https://runtime.sagemaker.us-west-2.amazonaws.com/endpoints/nowcast-dgmr-endpoint-2024-05-20-13-23-09/invocations EOF occurred in violation of protocol (_ssl.c:2426)

In [None]:
# with open(file_name, "rb") as f:
#     payload = f.read()


sagemaker_runtime = boto3.client("runtime.sagemaker")
response = sagemaker_runtime.invoke_endpoint(
    EndpointName=endpoint_name,
    ContentType='application/json', #"image/x-image",
    Body=serialized_payload # payload
)

response = json.loads(response["Body"].read().decode())

In [None]:
# from sagemaker.huggingface.model import HuggingFaceModel

# # create Hugging Face Model Class
# huggingface_model = HuggingFaceModel(
#    model_data=s3_location,       # path to your model and script
#    role=role,                    # iam role with permissions to create an Endpoint
#    image_uri='452145973879.dkr.ecr.us-west-2.amazonaws.com/dgmr_py310:latest', #
#    # transformers_version="4.26",  # transformers version used
#    # pytorch_version="2.2",        # pytorch version used
#    # py_version='py310',            # python version used
# )

# # deploy the endpoint endpoint
# predictor = huggingface_model.deploy(
#     initial_instance_count=1,
#     instance_type="ml.g5.2xlarge"
#     )

### if already deployed

In [None]:
# from sagemaker.huggingface.model import HuggingFacePredictor

# predictor1 = HuggingFacePredictor(endpoint_name="hf64infer-2023-11-07-03-38-49-969", sagemaker_session=sess)
# predictor1

## inference test

In [None]:
data = {
    "inputs": "the mesmerizing performances of the leads keep the film grounded and keep the audience riveted .",
    "top_k": 10,
    "prob_thr": 0
}

data = {
    "inputs": "鲜花",
    "top_k": 10,
    "prob_thr": 0.1
}

data = {
    "inputs": "止门阀",
    "top_k": 12,
}


res = predictor.predict(data=data)
print(res)
print(len(res))

## delete model and endpoint

In [None]:
predictor.delete_model()
predictor.delete_endpoint()