# Super SLO-MO with FILM model

## Environment

Make sure that you are using the following enviornment:

| Field | Value |
| :--- | :---------- |
| Instance type | ml.g4dn.xlarge (Note: other g4/g5 instance works) |
| Kernel | Python 3 |

### > Setup

Installs the dependencies required to process your video.


In [None]:
%%sh
sudo apt-get update && sudo apt upgrade -y

sudo apt-get install ffmpeg -y

pip install -U sagemaker

In [None]:
import sagemaker
from sagemaker.utils import name_from_base
import time
import shutil
import helper
import os
import json
import boto3

sagemaker_session = sagemaker.Session()
role = sagemaker.get_execution_role()  # execution role for the endpoint
region = sagemaker_session._region_name

default_bucket = sagemaker_session.default_bucket()

prefix = "slow-mo"
inference_prefix = f"{prefix}/inference"

### > Download pretrained models

In [None]:
CDN_URL = "https://d2yqlwoly7fl0b.cloudfront.net/super-slomo"

model_path = "slow_mo_generator/model"

# download pretrained model and unzip it to source/pretrained_model folder

PRETRAINED_MODEL = "Style-20230929T132001Z-001.zip"
!wget -L {CDN_URL}/pretrained_models/{PRETRAINED_MODEL} -O {PRETRAINED_MODEL}
!unzip Style-20230929T132001Z-001.zip -d {model_path}

#
# download sample video file
#
SAMPLE_VIDEO = "westiepoo.mov"
!wget -L {CDN_URL}/samples/{SAMPLE_VIDEO} -O {SAMPLE_VIDEO}

### > Setup Deep Java Library (DJL) Container

We will be using SageMaker Async inference and SageMaker managed Deep Java Library (DJL) container to generate slow-mo frames

Even though Async Endpoint is designed for large payload (up to 1GB), long running process (up to one hour), and near real-time inference. We can take advantages of it’s built-in queue and notification for real-time training requests. As long as our workload stays within the payload the processing time limits. Eliminate the need to self manage using additional components.

We also chose DJLServing because the SageMaker managed inference container already has many of the training libraries we need, such as Transformers, Accelerate, and s5cmd.

In [None]:
inference_image_uri = (
    f"763104351884.dkr.ecr.{region}.amazonaws.com/djl-inference:0.21.0-deepspeed0.8.3-cu117"
)
print(f"Image going to be used is ---- > {inference_image_uri}")

Push the parameter into `serving.properties` file

In [None]:
!sed -i 's@option.s3_bucket=.*@option.s3_bucket={default_bucket}@g' slow_mo_generator/serving.properties
!sed -i 's@option.s3_prefix=.*@option.s3_prefix={inference_prefix}@g' slow_mo_generator/serving.properties

In [None]:
!pygmentize slow_mo_generator/serving.properties | cat -n

Package the model for DJLServe on SageMaker

In [None]:
!rm -rf `find -type d -name .ipynb_checkpoints`

In [None]:
!find . | grep -E "(/__pycache__$|\.pyc$|\.pyo$)" | xargs rm -rf

In [None]:
!tar czvf model.tar.gz slow_mo_generator/

In [None]:
inference_artifact = sagemaker_session.upload_data("model.tar.gz", default_bucket, f"{prefix}/inference")
print(f"S3 Code or Model tar ball uploaded to --- > {inference_artifact}")

### > Create A Sagemaker Asynchronous Endpoint

Create a function to package the container information, model files, and the IAM role into a single SageMaker model. Then deploy that model.

In [None]:
from sagemaker.model import Model
from sagemaker.async_inference.async_inference_config import AsyncInferenceConfig

def deploy_model(image_uri, model_data, role, endpoint_name, instance_type, env, sagemaker_session, async_inference_config):
    
    """Helper function to create the SageMaker Endpoint resources and return a predictor"""
    
    model = Model(
            image_uri=image_uri, 
              model_data=model_data, 
              role=role,
              env=env
             )
    
    model.deploy(
        initial_instance_count=1,
        instance_type=instance_type,
        endpoint_name=endpoint_name,
        async_inference_config=async_inference_config
        )
    
    predictor = sagemaker.Predictor(
        endpoint_name=endpoint_name, 
        sagemaker_session=sagemaker_session
    )
    
    return predictor

Create an endpoint configuration that defines how our Async Inference will be served.

In [None]:
# create async endpoint configuration
async_config = AsyncInferenceConfig(
    output_path=f"s3://{default_bucket}/{prefix}/async_inference/output" , # Where our results will be stored
    max_concurrent_invocations_per_instance=1,
    # notification_config={
            #   "SuccessTopic": "arn:aws:sns:us-east-2:123456789012:MyTopic",
            #   "ErrorTopic": "arn:aws:sns:us-east-2:123456789012:MyTopic",
    # }, #  Notification configuration
)

In [None]:
endpoint_name = sagemaker.utils.name_from_base("slow-mo")
slow_mo_inference = deploy_model(image_uri=inference_image_uri,
                            model_data=inference_artifact,
                            role=role,
                            endpoint_name=endpoint_name, 
                            instance_type="ml.g5.4xlarge", 
                            sagemaker_session=sagemaker_session,
                            env={
                                'SERVING_MAX_REQUEST_SIZE': '1000000000',
                                "PREDICT_TIMEOUT": "3600", 
                                "MODEL_LOADING_TIMEOUT": "3600"
                            },
                            async_inference_config=async_config)

### > Setup AutoScaling Policy (Optional)

This section describes how to configure autoscaling on your asynchronous endpoint using Application Autoscaling. You need to first register your endpoint variant with Application Autoscaling, define a scaling policy.

In [None]:
client = boto3.client(
    "application-autoscaling"
)  # Common class representing Application Auto Scaling for SageMaker amongst other services

resource_id = (
    "endpoint/" + endpoint_name + "/variant/" + "AllTraffic"
)  # This is the format in which application autoscaling references the endpoint

# Configure Autoscaling on asynchronous endpoint down to zero instances
response = client.register_scalable_target(
    ServiceNamespace="sagemaker",
    ResourceId=resource_id,
    ScalableDimension="sagemaker:variant:DesiredInstanceCount",
    MinCapacity=0,
    MaxCapacity=5,
)

response = client.put_scaling_policy(
    PolicyName=f"{name_from_base(prefix)}-invoc-scaling",
    ServiceNamespace="sagemaker",  # The namespace of the AWS service that provides the resource.
    ResourceId=resource_id,  # Endpoint name
    ScalableDimension="sagemaker:variant:DesiredInstanceCount",  # SageMaker supports only Instance Count
    PolicyType="TargetTrackingScaling",  # 'StepScaling'|'TargetTrackingScaling'
    TargetTrackingScalingPolicyConfiguration={
        "TargetValue": 5.0,  # The target value for the metric. - here the metric is - SageMakerVariantInvocationsPerInstance
        "CustomizedMetricSpecification": {
            "MetricName": "ApproximateBacklogSizePerInstance",
            "Namespace": "AWS/SageMaker",
            "Dimensions": [{"Name": "EndpointName", "Value": endpoint_name}],
            "Statistic": "Average",
        },
        "ScaleInCooldown": 600,  # The cooldown period helps you prevent your Auto Scaling group from launching or terminating
        # additional instances before the effects of previous activities are visible.
        # You can configure the length of time based on your instance startup time or other application needs.
        # ScaleInCooldown - The amount of time, in seconds, after a scale in activity completes before another scale in activity can start.
        "ScaleOutCooldown": 300  # ScaleOutCooldown - The amount of time, in seconds, after a scale out activity completes before another scale out activity can start.
        # 'DisableScaleIn': True|False - ndicates whether scale in by the target tracking policy is disabled.
        # If the value is true , scale in is disabled and the target tracking policy won't remove capacity from the scalable resource.
    },
)

### > Define a scaling policy that scales up from zero for new requests (Optional)

You might have a use case where you have sporadic requests or periods with low numbers of requests. If your endpoint has been scaled down to zero instances during these periods, then your endpoint won’t scale up again until the number of requests in the queue exceeds the target specified in your scaling policy. 

To create a scaling policy for your endpoint that scales up from zero instances, do the following:

1) setup a policy when the queue is greater than zero and the current instance count for your endpoint is also zero, the policy scales your endpoint up.
2) Create a CloudWatch alarm with the custom metric `HasBacklogWithoutCapacity`. When triggered, the alarm initiates the previously defined scaling policy. 

In [None]:
# Create a ChangeInCapcity Policty
response = client.put_scaling_policy(
    PolicyName=f"{name_from_base(prefix)}-zero-scaling",
    ServiceNamespace="sagemaker",  # The namespace of the service that provides the resource.
    ResourceId=resource_id,  # Endpoint name
    ScalableDimension="sagemaker:variant:DesiredInstanceCount",  # SageMaker supports only Instance Count
    PolicyType="StepScaling",  # 'StepScaling' or 'TargetTrackingScaling'
    StepScalingPolicyConfiguration={
        "AdjustmentType": "ChangeInCapacity", # Specifies whether the ScalingAdjustment value in the StepAdjustment property is an absolute number or a percentage of the current capacity. 
        "MetricAggregationType": "Average", # The aggregation type for the CloudWatch metrics.
        "Cooldown": 300, # The amount of time, in seconds, to wait for a previous scaling activity to take effect. 
        "StepAdjustments": # A set of adjustments that enable you to scale based on the size of the alarm breach.
        [ 
            {
              "MetricIntervalLowerBound": 0,
              "ScalingAdjustment": 1
            }
          ]
    },    
)

step_scaling_policy_arn = response["PolicyARN"]
print(f"Policy arn: {step_scaling_policy_arn}...")

# create a cloudWatch alarm
cw_client = boto3.client('cloudwatch')

response = cw_client.put_metric_alarm(
    AlarmName=f"{name_from_base(prefix)}-zero-scale-alarm",
    MetricName='HasBacklogWithoutCapacity',
    Namespace='AWS/SageMaker',
    Statistic='Average',
    EvaluationPeriods= 2,
    DatapointsToAlarm= 2,
    Threshold= 1,
    ComparisonOperator='GreaterThanOrEqualToThreshold',
    TreatMissingData='missing',
    Dimensions=[
        { 'Name':'EndpointName', 'Value':endpoint_name },
    ],
    Period= 60,
    AlarmActions=[step_scaling_policy_arn]
)

### > Prepare The Video

This solution expects pre-extracted video frames as input, run inference (frame intepolation), and store the generated slow-mo frames in S3. input frames needs to be compressed into `tar.gz`.

In [None]:
sm_runtime = boto3.client("sagemaker-runtime")

### > Preview the video

In [None]:
from IPython.display import Video
Video(SAMPLE_VIDEO, width=640, height=360)

Extract the frames

In [None]:
frame_dir = helper.extract_frames(SAMPLE_VIDEO)

!ls {frame_dir}

### > Set configuration parameters for the inference job

| Parameter | Value | Description |
| :--- | :---------- | :---- |
| ALIGN | 64 | padding the height and width to align to the boundary |
| BLOCK_HEIGHT | 1 | breaking the image height into smaller blocks if the image is very large |
| BLOCK_WIDTH | 1 | breaking the image width into smaller blocks if the image is very large |
| TIME_TO_INTERPOLATE | 2 | numbers of interpolation to run between two frames |

In [None]:
config = {
  "align": 64,
  "block_height": 1, 
  "block_width": 1,
  "time_to_interpolate": 2 
}

with open(f"{frame_dir}/config.json", "w") as f:
  json.dump(config, f, indent=4)

!ls {frame_dir}

Generate the `tar.gz` and upload to S3

In [None]:
frames_tarfile = helper.make_tar(frame_dir)

input_s3_loc = sagemaker_session.upload_data(frames_tarfile, default_bucket, f"{prefix}/input_frames")

print(f"input file uploaded here --> {input_s3_loc}")

### > Invoke the endpoint to trigger the inference job

In [None]:
response = sm_runtime.invoke_endpoint_async(
    EndpointName=slow_mo_inference.endpoint_name,
    InputLocation=input_s3_loc,
    InvocationTimeoutSeconds=3600)

**WAIT FOR FRAME INTEPOLATION TO COMPLETE**

Depends number of frames you need process, this could take a few minutes

In [None]:
# write a function that checks if a full s3 path to a file exist
import boto3

max_wait_time = 60 * 25 # 25 minutes
current_wait_time = 0

def check_s3_file_exists(s3_path):
    # Split the S3 path into its components
    s3_components = s3_path.replace("s3://", "").split("/")
    bucket_name = s3_components[0]
    file_key = "/".join(s3_components[1:])
    
    # Create an S3 client
    s3 = boto3.client("s3")
    
    # Check if the object exists
    try:
        s3.head_object(Bucket=bucket_name, Key=file_key)
        return True
    except:
        return False

status = "Processing"
print("Status: " + status)
s3_path = response["OutputLocation"]

while status == "Processing":
    time.sleep(60)
    if check_s3_file_exists(s3_path):
        status = "Complete"
    current_wait_time += 60
    if current_wait_time > max_wait_time:
        status = "Failed - Model did not complete in the expected time. Check the endpoint CloudWatch logs for more information."

    print("Status: " + status)

### > Generate Slow Mo Video

Download and Assemble the frames

In [None]:
output_file = "output.json"
!aws s3 cp {response["OutputLocation"]} {output_file}

In [None]:
# write a function that import and then load a json file int a dictionary
def load_json_file(file_name):
    with open(file_name) as f:
        return json.load(f)
    
results = load_json_file(output_file)

if results['status'] == "SUCCESS":

    output_frames = "slow-mo-frames"

    # Remove all files and folders
    if os.path.exists(output_frames):
        shutil.rmtree(output_frames)

    # Recreate empty directory 
    os.makedirs(output_frames)
    
    # awscli to download a s3 file to a praticular filename
    !aws s3 sync {results['output_location']} {output_frames}
    
    print(f"Slow-Mo frames downloaded here: {output_frames}")

In [None]:
ouput_video = "output.mp4"

helper.create_video(output_frames, #slow-mo frames
                    ouput_video,   #generated video
                    fr=2)         #frame rate of the video
 
time.sleep(5)

Video(ouput_video, width=640, height=360)