In [1]:
# Installs
# pip install boto3

In [2]:
!python --version

Python 3.10.15


In [3]:
# Imports
import pandas as pd
from pandas import DataFrame, read_csv, concat
import boto3
import os
from os import remove
from datetime import datetime
import numpy as np
from sklearn.preprocessing import StandardScaler
import sagemaker
from sagemaker import get_execution_role
from sagemaker import Session
from sagemaker.inputs import TrainingInput
import json

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/ec2-user/.config/sagemaker/config.yaml


# Functions

In [4]:
# from pandas import DataFrame, read_csv, concat
# import boto3
# from os import remove

def download_and_concat_s3_csvs(output_bucket_name, concatenated_csv):
    """
    Downloads CSV files from the specified S3 bucket, concatenates them into a single DataFrame, and returns it.
    
    Parameters:
    - output_bucket_name (str): The name of the S3 bucket containing the CSV files.
    - concatenated_csv (str): The name of the concatenated output file to be saved locally.

    Returns:
    - pd.DataFrame: The concatenated DataFrame.
    """
    # Initialize S3 client
    s3 = boto3.client('s3')

    # List files already in the output directory
    written_output_files = s3.list_objects(Bucket=output_bucket_name, Prefix='csv/', Delimiter='/')
    written_output_files = [i['Key'] for i in written_output_files.get('Contents', [])]

    # Filter out empty keys or directories
    csv_files = [key for key in written_output_files if key.endswith('.csv')]

    # Initialize an empty list to store DataFrames
    dataframes = []

    # Process each CSV file
    for csv_file in csv_files:
        local_filename = csv_file.split('/')[-1]  # Extract the file name
        # Download the file locally
        s3.download_file(output_bucket_name, csv_file, local_filename)
        
        # Load the CSV into a DataFrame
        df = pd.read_csv(local_filename)
        dataframes.append(df)
        
        # Clean up the local file
        remove(local_filename)

    # Concatenate all DataFrames
    concat_df = pd.concat(dataframes, ignore_index=True)
    
    # Save the concatenated DataFrame locally if needed
    concat_df.to_csv(concatenated_csv, index=False)

    return concat_df

# TODO: deal with this later
# # Save the concatenated DataFrame to a new CSV file
# final_df.to_csv(concatenated_csv, index=False)

# # Optional: Upload the concatenated CSV back to S3
# s3.upload_file(concatenated_csv, output_bucket_name, f'csv/{concatenated_csv}')

# # Remove the local concatenated file
# remove(concatenated_csv)

# print(f"Concatenated CSV saved to {concatenated_csv} and uploaded to bucket.")

In [5]:
# from datetime import datetime

# Take the datetime object and turn it into the other needed columns
def extract_date_info(dt):
    # Extract basic components
    year = dt.year
    month = dt.month
    day = dt.day
    hour = dt.hour
    minute = dt.minute
    weekday = dt.weekday()  # 0=Monday, 6=Sunday
    is_weekend = weekday >= 5

    # Calculate the week of the month
    first_day_of_month = dt.replace(day=1)
    week_of_month = (dt.day + first_day_of_month.weekday()) // 7 + 1

    # Determine the season (Northern Hemisphere)
    if month in (12, 1, 2):
        season = "Winter"
    elif month in (3, 4, 5):
        season = "Spring"
    elif month in (6, 7, 8):
        season = "Summer"
    else:
        season = "Fall"

    return year, month, day, hour, minute, is_weekend, week_of_month, season

In [6]:
# import pandas as pd
# import numpy as np
# from sklearn.preprocessing import StandardScaler

def preprocess_expanded_df(data):
    """
    Preprocess the expanded DataFrame for use with the AWS Random Cut Forest (RCF) model.
    
    This function:
    - Removes `date_time`.
    - Encodes and transforms categorical and cyclical features.
    - Normalizes numerical features.

    Parameters:
    - data (pd.DataFrame): Expanded DataFrame with columns:
      ['n_cars', 'traffic_speed', 'date_time', 'camera_id', 
       'year', 'month', 'day', 'hour', 'minute', 
       'is_weekend', 'week_of_month', 'season'].

    Returns:
    - pd.DataFrame: Processed DataFrame ready for RCF.
    """
    # Drop the `date_time` column
    data = data.drop(columns=["date_time"])
    
    # Encode `is_weekend` as integer (0 or 1)
    data["is_weekend"] = data["is_weekend"].astype(int)
    
    # Map `season` to numeric values if it's of type object
    if data["season"].dtype == "object":
        season_mapping = {"Winter": 0, "Spring": 1, "Summer": 2, "Fall": 3}
        data["season"] = data["season"].map(season_mapping)
        if data["season"].isna().any():
            raise ValueError("Unmapped season value detected in the data.")
    
    # Define a function for cyclical transformations
    def add_cyclical_features(data, column, max_value):
        data[f"{column}_sin"] = np.sin(2 * np.pi * data[column] / max_value)
        data[f"{column}_cos"] = np.cos(2 * np.pi * data[column] / max_value)
        return data
    
    # Apply cyclical transformations
    data = add_cyclical_features(data, "month", 12)          # Cyclical for months
    data = add_cyclical_features(data, "day", 31)            # Cyclical for days
    data = add_cyclical_features(data, "hour", 24)           # Cyclical for hours
    data = add_cyclical_features(data, "minute", 60)         # Cyclical for minutes
    data = add_cyclical_features(data, "week_of_month", 5)   # Cyclical for weeks of the month
    data = add_cyclical_features(data, "season", 4)          # Cyclical for seasons
    data = add_cyclical_features(data, "year", 10)           # Cyclical for years (assuming a 10-year cycle, adjust as needed)
    
    # Drop the original cyclical columns
    columns_to_drop = ["month", "day", "hour", "minute", "week_of_month", "season", "year"]
    data = data.drop(columns=columns_to_drop)
    
    # Assuming df is your DataFrame
    data.loc[data['traffic_speed'] == -1, 'traffic_speed'] = 0
    
    # Normalize numerical features
    numerical_columns = [
        "n_cars", "traffic_speed", "camera_id", "is_weekend"
    ]
    scaler = StandardScaler()
    data[numerical_columns] = scaler.fit_transform(data[numerical_columns])
    
    return data

# Model

In [7]:
def save_preprocessed_data_to_local(data, filename='data.csv'):
    """
    Saves the preprocessed data to a local CSV file.

    Parameters:
    - data (pd.DataFrame): Preprocessed traffic data.
    - filename (str): The name of the local file to be saved (default is 'data.csv').
    """
    # Save the preprocessed data to CSV
    data.to_csv(filename, header=False, index=False)
    return filename

In [8]:
def upload_to_s3(local_file_path, bucket_name, s3_key):
    """
    Uploads a local file to S3.

    Parameters:
    - local_file_path (str): The local file path of the CSV to be uploaded.
    - bucket_name (str): The name of the S3 bucket.
    - s3_key (str): The S3 object key (path where the file will be stored).
    """
    s3_client = boto3.client('s3')
    s3_client.upload_file(local_file_path, bucket_name, s3_key)
    
    # Return the full S3 URI
    return f"s3://{bucket_name}/{s3_key}"

In [9]:
import sagemaker
from sagemaker import get_execution_role
from sagemaker import RandomCutForest
import pandas as pd

def train_rcf_model(input_data, bucket_name, output_data_file_path, region_name='us-east-1'):
    """
    Trains the RCF model using the preprocessed data stored locally or in S3.

    Parameters:
    - input_data (str): Local file path or S3 URI of the preprocessed data.
    - bucket_name (str): The name of the S3 bucket for output data.
    - output_data_file_path (str): The path in the S3 bucket where model output will be saved.
    - region_name (str): The AWS region for the SageMaker model.

    Returns:
    - sagemaker.estimator.Estimator: The trained RCF model.
    """
    # Get the SageMaker session and execution role
    session = sagemaker.Session()
    role = get_execution_role()

    # Upload the input data to S3 if it's a local file
    if input_data.startswith('s3://'):
        input_data_location = input_data  # Data is already in S3
    else:
        input_data_location = session.upload_data(path=input_data, bucket=bucket_name, key_prefix='rcf-clickstream')

    # Initialize the RandomCutForest estimator
    rcf = RandomCutForest(
        role=role,
        train_instance_count=1,
        train_instance_type='ml.m4.xlarge',
        data_location=input_data_location,
        output_path=f's3://{bucket_name}/{output_data_file_path}',  # Output location
        num_samples_per_tree=256,  # Hyperparameter
        num_trees=100,  # Hyperparameter
        sagemaker_session=session
    )

    # Load training data (assuming it's in CSV format)
    train_data = pd.read_csv(input_data)  # Assuming input is a CSV file
    
    # Ensure all data is numeric (e.g., convert to float32)
    train_data = train_data.apply(pd.to_numeric, errors='coerce')  # Convert all columns to numeric, non-convertible become NaN
    train_data = train_data.fillna(0)  # Replace NaN values with 0, if needed (or handle accordingly)
    
    # Convert to numpy array with float32 dtype
    train_data_numpy = train_data.to_numpy().astype('float32')

    # Convert the data to the record set format
    rcf_record_set = rcf.record_set(train_data_numpy, channel='train', encrypt=False)

    # Train the model
    rcf.fit(rcf_record_set)

    return rcf

In [10]:
# import json
# import numpy as np
# import sagemaker
# from sagemaker import get_execution_role
# from sagemaker.predictor import Predictor
# from sagemaker import RandomCutForest
# from sagemaker import Session

# # WARNING Duplicate
# # Deploy and predict
# def deploy_and_predict(rcf_model, inference_data):
#     """
#     Deploys the RCF model and makes a prediction.

#     Parameters:
#     - rcf_model (sagemaker.estimator.Estimator): The trained RCF model.
#     - inference_data (np.array): The data to be used for inference.

#     Returns:
#     - dict: The predicted anomaly scores.
#     """
#     # Deploy the model
#     rcf_predictor = rcf_model.deploy(
#         initial_instance_count=1,
#         instance_type='ml.t3.medium'
#     )
    
#     # Prepare the payload for inference (convert to numpy array and list)
#     payload = json.dumps(inference_data.tolist())

#     # Make a prediction (send payload to the deployed model)
#     response = rcf_predictor.predict(payload)
    
#     # The response needs to be decoded and parsed from the result
#     result = json.loads(response)

#     print("Anomaly scores:", result)

#     # Clean up the endpoint after inference
#     rcf_predictor.delete_endpoint()

#     return result

# Process

In [11]:
# Usage of the function
input_bucket_name = 'jfrechmsml650output'
concatenated_csv = 'concatenated_output.csv'
output_bucket_name = 'dulcichmsml650bucket' # Make and enter bucket name
output_path = 'output.csv' # ex. data/traffic_data.csv
local_file_path = ''

# Get the concatenated DataFrame
concat_df = download_and_concat_s3_csvs(input_bucket_name, concatenated_csv)

# Optionally, print the first few rows of the concatenated DataFrame
print(concat_df.head())

   n_cars  traffic_speed
0    29.0      14.734943
1    27.0      11.417725
2    40.0      14.436572
3    23.0      10.827753
4    30.0      12.084542


In [12]:
# TEMPORARY until james updates the data
# Data types
# Camera ID, Date-Time (Y-M-D-H-M) Weekend WeekofMonth Season Speed (traffic_speed) Density (n_cars)
# concat_df['date_time'] = pd.to_datetime('2024-11-18 15:45:00')
# concat_df['camera_id'] = '0'
# concat_df

# Final to fix date-time
concat_df['date_time'] = pd.to_datetime(concat_df['date_time'])


Unnamed: 0,n_cars,traffic_speed,date_time,camera_id
0,29.0,14.734943,2024-11-18 15:45:00,0
1,27.0,11.417725,2024-11-18 15:45:00,0
2,40.0,14.436572,2024-11-18 15:45:00,0
3,23.0,10.827753,2024-11-18 15:45:00,0
4,30.0,12.084542,2024-11-18 15:45:00,0
...,...,...,...,...
249,9.0,-1.000000,2024-11-18 15:45:00,0
250,17.0,2.959250,2024-11-18 15:45:00,0
251,37.0,2.663268,2024-11-18 15:45:00,0
252,15.0,3.130074,2024-11-18 15:45:00,0


In [13]:
# Extrapolate data
concat_df[['year', 'month', 'day', 'hour', 'minute', 'is_weekend', 'week_of_month', 'season']] = concat_df['date_time'].apply(lambda x: pd.Series(extract_date_info(x)))
concat_df

Unnamed: 0,n_cars,traffic_speed,date_time,camera_id,year,month,day,hour,minute,is_weekend,week_of_month,season
0,29.0,14.734943,2024-11-18 15:45:00,0,2024,11,18,15,45,False,4,Fall
1,27.0,11.417725,2024-11-18 15:45:00,0,2024,11,18,15,45,False,4,Fall
2,40.0,14.436572,2024-11-18 15:45:00,0,2024,11,18,15,45,False,4,Fall
3,23.0,10.827753,2024-11-18 15:45:00,0,2024,11,18,15,45,False,4,Fall
4,30.0,12.084542,2024-11-18 15:45:00,0,2024,11,18,15,45,False,4,Fall
...,...,...,...,...,...,...,...,...,...,...,...,...
249,9.0,-1.000000,2024-11-18 15:45:00,0,2024,11,18,15,45,False,4,Fall
250,17.0,2.959250,2024-11-18 15:45:00,0,2024,11,18,15,45,False,4,Fall
251,37.0,2.663268,2024-11-18 15:45:00,0,2024,11,18,15,45,False,4,Fall
252,15.0,3.130074,2024-11-18 15:45:00,0,2024,11,18,15,45,False,4,Fall


In [14]:
preprocessed_df = preprocess_expanded_df(concat_df)
preprocessed_df.head()

# Save unnormalized data locally and in s3
local_file_path = save_preprocessed_data_to_local(preprocessed_df)  # Save locally first
input_data = upload_to_s3(local_file_path, output_bucket_name, output_path)

In [15]:
# Test inference data
test_inference = preprocessed_df.head() # Get the first row
# test_inference_csv = test_inference.to_csv(index=False, header=False).encode('utf-8')
# Extract the first row

# Convert to numpy array and encode as float32
# test_inference_float32 = test_inference.to_numpy().astype(np.float32)

# # Verify the dtype
print(test_inference.to_numpy().dtype)

float64


In [16]:
# Train the model (input_data is a filepath)
rcf_model = train_rcf_model(input_data, bucket_name=output_bucket_name, output_data_file_path=output_path)

Couldn't call 'get_role' to get Role ARN from role name sagemaker_notebook_jupyter_lab to get Role path.
train_instance_count has been renamed in sagemaker>=2.
See: https://sagemaker.readthedocs.io/en/stable/v2.html for details.
train_instance_type has been renamed in sagemaker>=2.
See: https://sagemaker.readthedocs.io/en/stable/v2.html for details.
severe performance issues, see also https://github.com/dask/dask/issues/10276

To fix, you should specify a lower version bound on s3fs, or
update the current installation.

INFO:sagemaker.image_uris:Same images used for training and inference. Defaulting to image scope: inference.
INFO:sagemaker.image_uris:Ignoring unnecessary instance type: None.
INFO:sagemaker:Creating training-job with name: randomcutforest-2024-12-02-16-50-28-808


2024-12-02 16:50:29 Starting - Starting the training job...
2024-12-02 16:50:50 Starting - Preparing the instances for training...
2024-12-02 16:51:22 Downloading - Downloading input data...
2024-12-02 16:51:52 Downloading - Downloading the training image......
2024-12-02 16:53:03 Training - Training image download completed. Training in progress.......
2024-12-02 16:53:46 Uploading - Uploading generated training model
2024-12-02 16:53:46 Completed - Training job completed
..Training seconds: 145
Billable seconds: 145


In [17]:
# # TODO: Fix load model weights (save is handled by aws)
# # Define S3 bucket and model location
# bucket_name = session.default_bucket()
# model_artifacts_s3_path = 's3://{}/rcf-model-output/model.tar.gz'  # The S3 path where your model is stored

# # Load the trained model from S3
# rcf_model = RandomCutForest.attach(model_data=model_artifacts_s3_path)

In [18]:
import sagemaker
import pandas as pd
import numpy as np
from sagemaker.serializers import CSVSerializer
from sagemaker.deserializers import JSONDeserializer

def deploy_rcf_endpoint(rcf_model, instance_type='ml.m4.xlarge', initial_instance_count=1):
    """
    Deploys the trained RCF model to a SageMaker endpoint.

    Parameters:
    - rcf_model: The trained RCF model.
    - instance_type (str): The type of instance for the endpoint (default: 'ml.m4.xlarge').
    - initial_instance_count (int): The number of instances for the endpoint (default: 1).

    Returns:
    - Predictor: The deployed model's predictor object.
    """
    # Deploy the model
    rcf_predictor = rcf_model.deploy(
        instance_type=instance_type,
        initial_instance_count=initial_instance_count,
    )

    # Configure the serializer and deserializer
    rcf_predictor.serializer = CSVSerializer()  # Input will be CSV
    rcf_predictor.deserializer = JSONDeserializer()  # Output will be JSON
    
    return rcf_predictor

# Example usage

# Deploy the endpoint
rcf_detector = deploy_rcf_endpoint(rcf_model)

INFO:sagemaker.image_uris:Same images used for training and inference. Defaulting to image scope: inference.
INFO:sagemaker.image_uris:Ignoring unnecessary instance type: None.
INFO:sagemaker:Creating model with name: randomcutforest-2024-12-02-16-54-15-915
INFO:sagemaker:Creating endpoint-config with name randomcutforest-2024-12-02-16-54-15-915
INFO:sagemaker:Creating endpoint with name randomcutforest-2024-12-02-16-54-15-915


-------!

In [19]:
print(rcf_detector.content_type, rcf_detector.accept)

text/csv ('application/json',)


In [20]:
import numpy as np
import pandas as pd
import sagemaker
from sagemaker import get_execution_role
from sagemaker.predictor import Predictor

def invoke_rcf_endpoint(rcf_predictor, input_data):
    """
    Sends data to the deployed RCF endpoint for inference.

    Parameters:
    - rcf_predictor: The deployed model's predictor object.
    - input_data (pd.DataFrame or numpy.ndarray): Preprocessed input data.

    Returns:
    - dict: The JSON response from the endpoint.
    """
    # Ensure input_data is in the correct format for prediction (float32)
    if isinstance(input_data, pd.DataFrame):
        input_data = input_data.apply(pd.to_numeric, errors='coerce').fillna(0)
        input_data = input_data.to_numpy().astype('float32')
    elif isinstance(input_data, np.ndarray):
        input_data = input_data.astype('float32')

    # Convert numpy array to CSV string (no header, no index)
    input_data_csv = '\n'.join([','.join(map(str, row)) for row in input_data.tolist()])

    # Print the payload to inspect the format (optional)
    print("Payload sent to endpoint:")
    print(input_data_csv)  # CSV format (without header or index)

    try:
        # Invoke the endpoint for inference
        response = rcf_predictor.predict(input_data_csv)
        return response
    except Exception as e:
        print(f"Error invoking the endpoint: {e}")
        return None

# Example usage
# Assuming 'rcf_predictor' is your deployed RandomCutForest endpoint predictor

example_data = test_inference

result = invoke_rcf_endpoint(rcf_detector, example_data)

# Handle the result
if result:
    print("Prediction result:", result)
else:
    print("Failed to get prediction")

Payload sent to endpoint:
1.5181864500045776,0.11736033111810684,0.0,0.0,-0.5,0.8660253882408142,-0.4853019714355469,-0.8743466138839722,-0.7071067690849304,-0.7071067690849304,-1.0,-1.8369701465288538e-16,-0.9510565400123596,0.30901700258255005,-1.0,-1.8369701465288538e-16,0.5877852439880371,-0.80901700258255
1.2840520143508911,-0.17972011864185333,0.0,0.0,-0.5,0.8660253882408142,-0.4853019714355469,-0.8743466138839722,-0.7071067690849304,-0.7071067690849304,-1.0,-1.8369701465288538e-16,-0.9510565400123596,0.30901700258255005,-1.0,-1.8369701465288538e-16,0.5877852439880371,-0.80901700258255
2.8059256076812744,0.09063906967639923,0.0,0.0,-0.5,0.8660253882408142,-0.4853019714355469,-0.8743466138839722,-0.7071067690849304,-0.7071067690849304,-1.0,-1.8369701465288538e-16,-0.9510565400123596,0.30901700258255005,-1.0,-1.8369701465288538e-16,0.5877852439880371,-0.80901700258255
0.8157832622528076,-0.23255623877048492,0.0,0.0,-0.5,0.8660253882408142,-0.4853019714355469,-0.8743466138839722,-0.

In [23]:
import numpy as np
import pandas as pd
import boto3
import json

def invoke_rcf_endpoint(endpoint_name, input_data):
    """
    Sends data to a deployed Random Cut Forest endpoint for inference.

    Parameters:
    - endpoint_name (str): The name of the SageMaker endpoint.
    - input_data (pd.DataFrame or np.ndarray): Preprocessed input data.

    Returns:
    - list: A list of anomaly scores.
    """
    # Ensure input_data is in the correct format
    if isinstance(input_data, pd.DataFrame):
        input_data = input_data.apply(pd.to_numeric, errors='coerce').fillna(0)
        input_data = input_data.to_numpy().astype('float32')
    elif isinstance(input_data, np.ndarray):
        input_data = input_data.astype('float32')
    else:
        raise ValueError("Input data must be a pandas DataFrame or numpy ndarray.")

    # Convert numpy array to CSV format
    input_data_csv = '\n'.join([','.join(map(str, row)) for row in input_data.tolist()])

    # Initialize SageMaker runtime client
    sagemaker_runtime = boto3.client('sagemaker-runtime')

    try:
        # Invoke the SageMaker endpoint
        response = sagemaker_runtime.invoke_endpoint(
            EndpointName=endpoint_name,
            ContentType='text/csv',
            Body=input_data_csv
        )

        # Parse the response body
        result = json.loads(response['Body'].read().decode())
        anomaly_scores = [score['score'] for score in result.get('scores', [])]
        return anomaly_scores

    except Exception as e:
        print(f"Error invoking the endpoint: {e}")
        return []

# Example usage
# Replace 'your-endpoint-name' with the actual endpoint name
example_input_data = test_inference

endpoint_name = "randomcutforest-2024-12-02-16-54-15-915"
anomaly_scores = invoke_rcf_endpoint(endpoint_name, example_input_data)

if anomaly_scores:
    print("Anomaly scores:", anomaly_scores)
else:
    print("Failed to retrieve anomaly scores.")

Anomaly scores: [0.7528421222, 0.73545973, 0.8202757705, 0.7006394211, 0.7585047419]


In [25]:
# Delete the endpoint (clean up)
rcf_detector.delete_endpoint()

INFO:sagemaker:Deleting endpoint configuration with name: randomcutforest-2024-12-02-03-21-04-859
INFO:sagemaker:Deleting endpoint with name: randomcutforest-2024-12-02-03-21-04-859


# Temp Code

In [None]:
import sagemaker
from sagemaker import get_execution_role
from sagemaker.workflow.steps import ProcessingStep, TrainingStep, ModelStep
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.processing import ScriptProcessor
from sagemaker.sklearn.estimator import SKLearnEstimator
from sagemaker.model import Model
import boto3

# Initialize the SageMaker session and role
sagemaker_session = sagemaker.Session()
role = get_execution_role()
bucket = 'your-bucket-name'  # Replace with your S3 bucket name
prefix = 'sagemaker-pipeline'

# Step 1: Data preprocessing (use your preprocessing script)
script_processor = ScriptProcessor(
    image_uri=sagemaker.image_uris.retrieve('sklearn', sagemaker_session.boto_region_name, version='0.23-1'),
    role=role,
    instance_type='ml.m5.xlarge',
    instance_count=1,
    command=['python3']
)

# Preprocessing step (your custom preprocessing function here)
processing_step = ProcessingStep(
    name="DataPreprocessingStep",
    processor=script_processor,
    inputs=[
        sagemaker.processing.ProcessingInput(
            source=f's3://{bucket}/input_data/',
            destination='/opt/ml/processing/input'
        )
    ],
    outputs=[
        sagemaker.processing.ProcessingOutput(
            source='/opt/ml/processing/output',
            destination=f's3://{bucket}/preprocessed_data/'
        )
    ],
    code='scripts/preprocess.py'  # Preprocessing script path
)

# Define the pipeline
pipeline = Pipeline(
    name="RandomCutForestPipeline",
    steps=[processing_step, training_step, model_step]
)

# Execute the pipeline
pipeline.upsert(role_arn=role)
pipeline.start()

print(f"Pipeline started: {pipeline.name}")

In [None]:
results = json.loads(response["Body"].read().decode("utf-8"))
print("RCF Results:", results)

# Step 3: Save results to S3
s3_client.put_object(
    Bucket=bucket_name,
    Key=s3_key,
    Body=json.dumps(results),
    ContentType="application/json"
)

print(f"Results saved to s3://{bucket_name}/{s3_key}")