# Part 1 : Data Preparation and Process

### Install required and/or update third-party libraries

In [1]:
!python -m pip install -Uq pip
!python -m pip install -Uq sagemaker boto3

  from cryptography.utils import int_from_bytes
  from cryptography.utils import int_from_bytes
  from cryptography.utils import int_from_bytes
  from cryptography.utils import int_from_bytes


### Loading stored variables
If you ran this notebook before, you may want to re-use the resources you aready created with AWS. Run the cell below to load any prevously created variables. You should see a print-out of the existing variables. If you don't see anything printed then it's probably the first time you are running the notebook!

In [7]:
%store -r
%store

Stored variables and their in-db values:
base_uri                      -> 's3://sagemaker-us-east-1-631450739534/abalone'
batch_data_uri                -> 's3://sagemaker-us-east-1-631450739534/abalone/aba
input_data                    -> 's3://sagemaker-us-east-1-631450739534/sagemaker/D
input_data_uri                -> 's3://sagemaker-us-east-1-631450739534/abalone/aba
input_zones                   -> 's3://sagemaker-us-east-1-631450739534/sagemaker/D
local_path                    -> 'data/abalone-dataset.csv'
model_url                     -> 's3://sagemaker-us-east-1-631450739534/sagemaker/D
process_script                -> '/root/Amazon-SageMaker-Workshop-Custom/preprocess
test_path                     -> 's3://sagemaker-us-east-1-631450739534/sagemaker/D
train_path                    -> 's3://sagemaker-us-east-1-631450739534/sagemaker/D
training_job_name             -> 'DEMO-xgboost-tripfare-train-2021-09-30-03-08-58-4
validation_path               -> 's3://sagemaker-us-east-1-631

: You must have run the previous sequential notebooks to retrieve variables using the StoreMagic command.

In [11]:
# cell 01
import sagemaker
bucket=sagemaker.Session().default_bucket()
prefix = 'sagemaker/DEMO-xgboost-tripfare'
 
# Define IAM role
import boto3
import re
import os
from sagemaker import get_execution_role

role = get_execution_role()

In [12]:
# cell 02
import numpy as np                                # For matrix operations and numerical processing
import pandas as pd                               # For munging tabular data
import matplotlib.pyplot as plt                   # For charts and visualizations
from IPython.display import Image                 # For displaying images in the notebook
from IPython.display import display               # For displaying outputs in the notebook
from time import gmtime, strftime                 # For labeling SageMaker models, endpoints, etc.
import sys                                        # For writing outputs to notebook
import math                                       # For ceiling function
import json                                       # For parsing hosting outputs
import os                                         # For manipulating filepath names
import sagemaker 
import zipfile     # Amazon SageMaker's Python SDK provides many helper functions

In [13]:
sagemaker.__version__

'2.59.5'

In [27]:
input_source = f's3://{bucket}/{prefix}/input/'
input_source

's3://sagemaker-us-east-1-631450739534/sagemaker/DEMO-xgboost-tripfare/input/'

In [28]:
input_data = input_source + 'data/'
input_zones = input_source + 'zones/'

In [29]:
%store input_data
%store input_zones

Stored 'input_data' (str)
Stored 'input_zones' (str)


In [30]:
!aws s3 cp --recursive 's3://nyc-tlc/trip data/' $input_data --exclude '*' --include 'green_tripdata_2018-1*'
# !aws s3 cp 's3://nyc-tlc/trip data/green_tripdata_2018-02.csv' $input_data
!aws s3 cp 's3://nyc-tlc/misc/taxi_zones.zip' $input_zones

copy: s3://nyc-tlc/trip data/green_tripdata_2018-10.csv to s3://sagemaker-us-east-1-631450739534/sagemaker/DEMO-xgboost-tripfare/input/data/green_tripdata_2018-10.csv
copy: s3://nyc-tlc/trip data/green_tripdata_2018-12.csv to s3://sagemaker-us-east-1-631450739534/sagemaker/DEMO-xgboost-tripfare/input/data/green_tripdata_2018-12.csv
copy: s3://nyc-tlc/trip data/green_tripdata_2018-11.csv to s3://sagemaker-us-east-1-631450739534/sagemaker/DEMO-xgboost-tripfare/input/data/green_tripdata_2018-11.csv
copy: s3://nyc-tlc/misc/taxi_zones.zip to s3://sagemaker-us-east-1-631450739534/sagemaker/DEMO-xgboost-tripfare/input/zones/taxi_zones.zip


In [31]:
!aws s3 ls $input_data

2021-10-06 02:42:53   63666398 green_tripdata_2018-10.csv
2021-10-06 02:42:53   58812608 green_tripdata_2018-11.csv
2021-10-06 02:42:53   61371115 green_tripdata_2018-12.csv


In [49]:
# cell 05
from sagemaker import Session

sess = Session()

# Create Feature Store

In [36]:
!aws s3 cp $train_path/train-algo-1.csv .

download: s3://sagemaker-us-east-1-631450739534/sagemaker/DEMO-xgboost-tripfare/train/train-algo-1.csv to ./train-algo-1.csv


In [37]:
df = pd.read_csv('train-algo-1.csv')

In [38]:
df.dtypes

9.5                   float64
1                       int64
-73.94575022751276    float64
40.790010676294       float64
-73.90512246226346    float64
40.84905828506662     float64
7.397563308633733     float64
8                       int64
3                       int64
12                      int64
dtype: object

In [39]:
column_schemas = [
    {
        "name": "fare_amount",
        "type": "float"
    },
    {
        "name": "passenger_count",
        "type": "long"
    },
    {
        "name": "pickup_latitude",
        "type": "float"
    },
    {
        "name": "pickup_longitude",
        "type": "float"
    },
    {
        "name": "dropoff_latitude",
        "type": "float"
    },
    {
        "name": "dropoff_longitude",
        "type": "float"
    },
    {
        "name": "geo_distance",
        "type": "float"
    },
    {
        "name": "hour",
        "type": "int"
    },
    {
        "name": "weekday",
        "type": "int"
    },
    {
        "name": "month",
        "type": "int"
    },
    {
        "name": "FS_ID",
        "type": "long"
    },
    {
        "name": "FS_time",
        "type": "float"
    },
]

In [40]:
from sagemaker.feature_store.feature_definition import FeatureDefinition
from sagemaker.feature_store.feature_definition import FeatureTypeEnum

default_feature_type = FeatureTypeEnum.STRING
column_to_feature_type_mapping = {
    "float": FeatureTypeEnum.FRACTIONAL,
    "long": FeatureTypeEnum.INTEGRAL
}

feature_definitions = [
    FeatureDefinition(
        feature_name=column_schema['name'], 
        feature_type=column_to_feature_type_mapping.get(column_schema['type'], default_feature_type)
    ) for column_schema in column_schemas
]

### Initialize & Create Feature Group

In [98]:
region = boto3.Session().region_name
boto_session = boto3.Session(region_name=region)

sagemaker_client = boto_session.client(service_name='sagemaker', region_name=region)
featurestore_runtime = boto_session.client(service_name='sagemaker-featurestore-runtime', region_name=region)

feature_store_session = sagemaker.session.Session(
    boto_session=boto_session,
    sagemaker_client=sagemaker_client,
    sagemaker_featurestore_runtime_client=featurestore_runtime
)

In [99]:
current_timestamp = strftime('%m-%d-%H-%M', gmtime())

# prefix to track all the feature groups created as part of feature store champions workshop (fscw)
fs_prefix = 'fscw-'

tripfare_feature_group_name = f'{fs_prefix}tripfare-{current_timestamp}'
%store tripfare_feature_group_name

print(f'Customers feature group name = {tripfare_feature_group_name}')

Stored 'tripfare_feature_group_name' (str)
Customers feature group name = fscw-tripfare-10-06-06-01


Feature group is initialized and created below

In [100]:
from sagemaker.feature_store.feature_group import FeatureGroup

enable_online_store=True
feature_store_offline_s3_uri = 's3://' + bucket

record_identifier_feature_name = 'FS_ID'
event_time_feature_name = 'FS_time'

feature_group = FeatureGroup(
    name=tripfare_feature_group_name, sagemaker_session=feature_store_session, feature_definitions=feature_definitions)

feature_group.create(
    s3_uri=feature_store_offline_s3_uri,
    record_identifier_name=record_identifier_feature_name,
    event_time_feature_name=event_time_feature_name,
    role_arn=role,
    enable_online_store=enable_online_store
)

{'FeatureGroupArn': 'arn:aws:sagemaker:us-east-1:631450739534:feature-group/fscw-tripfare-10-06-06-01',
 'ResponseMetadata': {'RequestId': '2d56c240-bef7-4ec2-8cb3-bdf50f2ffaad',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '2d56c240-bef7-4ec2-8cb3-bdf50f2ffaad',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '102',
   'date': 'Wed, 06 Oct 2021 06:01:48 GMT'},
  'RetryAttempts': 0}}

In [101]:
import time
def wait_for_feature_group_creation_complete(feature_group):
    """Helper function to wait for the completions of creating a feature group"""
    status = feature_group.describe().get("FeatureGroupStatus")
    while status == "Creating":
        print("Waiting for Feature Group Creation")
        time.sleep(5)
        status = feature_group.describe().get("FeatureGroupStatus")
    if status != "Created":
        raise SystemExit(f"Failed to create feature group {feature_group.name}: {status}")
    print(f"FeatureGroup {feature_group.name} successfully created.")

wait_for_feature_group_creation_complete(feature_group=feature_group)

Waiting for Feature Group Creation
Waiting for Feature Group Creation
Waiting for Feature Group Creation
Waiting for Feature Group Creation
FeatureGroup fscw-tripfare-10-06-06-01 successfully created.


# Processing job

### Create Sklearn SageMaker Processing script

In [110]:
%%writefile preprocess.py

import glob
import logging
import os
import subprocess
import sys
subprocess.check_call([sys.executable, '-m', 'pip', 'install', 'sagemaker'])

from zipfile import ZipFile
# from time import gmtime, strftime
import socket
import shutil
import json
import time
import argparse
import boto3

n_cores = os.cpu_count()
# host_name = socket.gethostname()
# print(host_name)
# print(os.environ)

# Install geopandas dependency before including pandas
subprocess.check_call([sys.executable, "-m", "pip", "install", "geopandas==0.9.0"])

import pandas as pd  # noqa: E402
import geopandas as gpd  # noqa: E402
from sklearn.model_selection import train_test_split  # noqa: E402

import sagemaker
from sagemaker.feature_store.feature_group import FeatureGroup

def get_session(region, default_bucket):
    """Gets the sagemaker session based on the region.
    Args:
        region: the aws region to start the session
        default_bucket: the bucket to use for storing the artifacts
    Returns:
        `sagemaker.session.Session instance
    """

    boto_session = boto3.Session(region_name=region)

    sagemaker_client = boto_session.client("sagemaker")
#     runtime_client = boto_session.client("sagemaker-runtime")
    return sagemaker.session.Session(
        boto_session=boto_session,
        sagemaker_client=sagemaker_client,
#         sagemaker_runtime_client=runtime_client,
        default_bucket=default_bucket,
    )


logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())

def parse_args() -> None:
    parser = argparse.ArgumentParser()
    parser.add_argument('--fg_name', type=str)
    parser.add_argument('--region', type=str)
    parser.add_argument('--bucket', type=str)
    args, _ = parser.parse_known_args()
    return args

def extract_zones(zones_file: str, zones_dir: str):
    logger.info(f"Extracting zone file: {zones_file}")
    with ZipFile(zones_file, "r") as zip:
        zip.extractall(zones_dir)


def load_zones(zones_dir: str):
    logging.info(f"Loading zones from {zones_dir}")
    # Load the shape file and get the geometry and lat/lon
    zone_df = gpd.read_file(os.path.join(zones_dir, "taxi_zones.shp"))
    # Get centroids as EPSG code of 3310 to measure distance
    zone_df["centroid"] = zone_df.geometry.centroid.to_crs(epsg=3310)
    # Convert cordinates to the WSG84 lat/long CRS has a EPSG code of 4326.
    zone_df["latitude"] = zone_df.centroid.to_crs(epsg=4326).x
    zone_df["longitude"] = zone_df.centroid.to_crs(epsg=4326).y
    return zone_df


def load_data(file_list: list):
    # Define dates, and columns to use
    use_cols = [
        "fare_amount",
        "lpep_pickup_datetime",
        "lpep_dropoff_datetime",
        "passenger_count",
        "PULocationID",
        "DOLocationID",
    ]
    # Concat input files with select columns
    dfs = []
    for file in file_list:
        dfs.append(pd.read_csv(file, usecols=use_cols))
    return pd.concat(dfs, ignore_index=True)


def enrich_data(trip_df: pd.DataFrame, zone_df: pd.DataFrame):
    # Join trip DF to zones for poth pickup and drop off locations
    trip_df = gpd.GeoDataFrame(
        trip_df.join(zone_df, on="PULocationID").join(
            zone_df, on="DOLocationID", rsuffix="_DO", lsuffix="_PU"
        )
    )
    trip_df["geo_distance"] = (
        trip_df["centroid_PU"].distance(trip_df["centroid_DO"]) / 1000
    )

    # Add date parts
    trip_df["lpep_pickup_datetime"] = pd.to_datetime(trip_df["lpep_pickup_datetime"])
    trip_df["hour"] = trip_df["lpep_pickup_datetime"].dt.hour
    trip_df["weekday"] = trip_df["lpep_pickup_datetime"].dt.weekday
    trip_df["month"] = trip_df["lpep_pickup_datetime"].dt.month

    # Get calculated duration in minutes
    trip_df["lpep_dropoff_datetime"] = pd.to_datetime(trip_df["lpep_dropoff_datetime"])
    trip_df["duration_minutes"] = (
        trip_df["lpep_dropoff_datetime"] - trip_df["lpep_pickup_datetime"]
    ).dt.seconds / 60

    # Rename and filter cols
    trip_df = trip_df.rename(
        columns={
            "latitude_PU": "pickup_latitude",
            "longitude_PU": "pickup_longitude",
            "latitude_DO": "dropoff_latitude",
            "longitude_DO": "dropoff_longitude",
        }
    )
    
    trip_df['FS_ID'] = trip_df.index + 1000
    current_time_sec = int(round(time.time()))
    trip_df["FS_time"] = pd.Series([current_time_sec]*len(trip_df), dtype="float64")
    return trip_df


def clean_data(trip_df: pd.DataFrame):
    # Remove outliers
    trip_df = trip_df[
        (trip_df.fare_amount > 0)
        & (trip_df.fare_amount < 200)
        & (trip_df.passenger_count > 0)
        & (trip_df.duration_minutes > 0)
        & (trip_df.duration_minutes < 120)
        & (trip_df.geo_distance > 0)
        & (trip_df.geo_distance < 121)
    ].dropna()

    # Filter columns
    cols = [
        "fare_amount",
        "passenger_count",
        "pickup_latitude",
        "pickup_longitude",
        "dropoff_latitude",
        "dropoff_longitude",
        "geo_distance",
        "hour",
        "weekday",
        "month",
    ]
    
    cols_fg = [
        "fare_amount",
        "passenger_count",
        "pickup_latitude",
        "pickup_longitude",
        "dropoff_latitude",
        "dropoff_longitude",
        "geo_distance",
        "hour",
        "weekday",
        "month",
        "FS_ID",
        "FS_time"
    ]
    return trip_df[cols], trip_df[cols_fg]

def ingest_data(data_fg: pd.DataFrame, fg_name: str, sagemaker_session) -> None:
    
    # 4 threads per python process
    num_workers = 4
    num_processes = n_cores
    logger.info(f'Ingesting into feature group [{fg_name}] using {num_processes} processes and {num_workers} workers')
    fg = FeatureGroup(name=fg_name, sagemaker_session=sagemaker_session)
    response = fg.ingest(data_frame=data_fg, max_processes=num_processes, max_workers=num_workers, wait=True)
    """
    The ingest call above returns an IngestionManagerPandas instance as a response. Zero based indices of rows 
    that failed to be ingested are captured via failed_rows in this response. By asserting this count to be 0,
    we validated that all rows were successfully ingested without a failure.
    """
    assert len(response.failed_rows) == 0


def save_files(base_dir: str, data_df: pd.DataFrame, data_fg: pd.DataFrame, fg_name: str, 
               val_size=0.2, test_size=0.05, current_host=None, sagemaker_session=None):
        
    logger.info(f"Splitting {len(data_df)} rows of data into train, val, test.")
    if current_host == 'algo-1':
        train_df, val_df = train_test_split(data_df, test_size=val_size, random_state=42)
        val_df, test_df = train_test_split(val_df, test_size=test_size, random_state=42)

        logger.info(f"Writing out datasets to {base_dir}")
        train_df.to_csv(f"{base_dir}/train/train-{current_host}.csv", header=False, index=False)
        val_df.to_csv(f"{base_dir}/validation/validation-{current_host}.csv", header=False, index=False)

        # Save test data without header
        test_df.to_csv(f"{base_dir}/test/test-{current_host}.csv", header=False, index=False)
    else:
        logger.info(f"Writing out datasets to {base_dir}")
        data_df.to_csv(f"{base_dir}/train/train-{current_host}.csv", header=False, index=False)
    
    # batch ingestion to the feature group of all the data
    ingest_data(data_fg, fg_name, sagemaker_session)

    return 

def _read_json(path):  # type: (str) -> dict
    """Read a JSON file.
    Args:
        path (str): Path to the file.
    Returns:
        (dict[object, object]): A dictionary representation of the JSON file.
    """
    with open(path, "r") as f:
        return json.load(f)

def main(base_dir: str, args: argparse.Namespace):
    # Input data files
    input_dir = os.path.join(base_dir, "input/data")
    input_file_list = glob.glob(f"{input_dir}/*.csv")
    logger.info(f"Input file list: {input_file_list}")
    
    config_file_list = glob.glob("/opt/ml/config/*.json")
    logger.info(f"config file list: {config_file_list}")

    hosts = _read_json("/opt/ml/config/resourceconfig.json")
    logger.info(hosts)
    current_host = hosts["current_host"]
    logger.info(current_host)
        
    if len(input_file_list) == 0:
        raise Exception(f"No input files found in {input_dir}")

    # Input zones file
    zones_dir = os.path.join(base_dir, "input/zones")
    zones_file = os.path.join(zones_dir, "taxi_zones.zip")
    if not os.path.exists(zones_file):
        raise Exception(f"Zones file {zones_file} does not exist")

    # Extract and load taxi zones geopandas dataframe
    extract_zones(zones_file, zones_dir)
    zone_df = load_zones(zones_dir)

    # Load input files
    data_df = load_data(input_file_list)
    data_df = enrich_data(data_df, zone_df)
    data_df, data_fg = clean_data(data_df)

    fg_name = args.fg_name
    
    sagemaker_session = get_session(args.region, args.bucket)
    
    return save_files(base_dir, data_df, data_fg, fg_name, current_host=current_host, sagemaker_session=sagemaker_session)


if __name__ == "__main__":
    logger.info("Starting preprocessing.")
    args = parse_args()
    base_dir = "/opt/ml/processing"
    main(base_dir, args)
    logger.info("Done")


Overwriting preprocess.py


In [107]:
process_script = os.getcwd() + '/preprocess.py'
%store process_script

Stored 'process_script' (str)


In [104]:
# cell 07
train_path = f"s3://{bucket}/{prefix}/train"
validation_path = f"s3://{bucket}/{prefix}/validation"
test_path = f"s3://{bucket}/{prefix}/test"


In [111]:
# cell 08
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker import get_execution_role


sklearn_processor = SKLearnProcessor(
    framework_version="0.23-1",
    role=get_execution_role(),
    instance_type="ml.m5.4xlarge",
    instance_count=2, 
    base_job_name='sm-immday-skprocessing'
)

sklearn_processor.run(
    code='preprocess.py',
    arguments = ['--fg_name', tripfare_feature_group_name,
                 '--region', region,
                 '--bucket', bucket,
                ],
    inputs=[
        ProcessingInput(
            source=input_data,
            destination="/opt/ml/processing/input/data",
            s3_data_distribution_type="ShardedByS3Key",
        ),
        ProcessingInput(
            source=input_zones,
            destination="/opt/ml/processing/input/zones",
            s3_data_distribution_type="FullyReplicated",
        ),
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train", destination=train_path),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation", destination=validation_path),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test", destination=test_path),
    ]
        
)


Job Name:  sm-immday-skprocessing-2021-10-06-06-53-03-126
Inputs:  [{'InputName': 'input-1', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-631450739534/sagemaker/DEMO-xgboost-tripfare/input/data/', 'LocalPath': '/opt/ml/processing/input/data', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'ShardedByS3Key', 'S3CompressionType': 'None'}}, {'InputName': 'input-2', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-631450739534/sagemaker/DEMO-xgboost-tripfare/input/zones/', 'LocalPath': '/opt/ml/processing/input/zones', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'code', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-631450739534/sm-immday-skprocessing-2021-10-06-06-53-03-126/input/code/preprocess.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistr

In [19]:
%store train_path
%store validation_path
%store test_path

Stored 'train_path' (str)
Stored 'validation_path' (str)
Stored 'test_path' (str)
