In [2]:
import os
import logging
from datetime import datetime

In [3]:


import boto3
import sagemaker
from sagemaker.session import TrainingInput
from sagemaker import image_uris
from sagemaker import hyperparameters

sagemaker.config INFO - Not applying SDK defaults from location: C:\ProgramData\sagemaker\sagemaker\config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: C:\Users\hemch\AppData\Local\sagemaker\sagemaker\config.yaml


In [4]:
boto3.set_stream_logger(name="botocore.credentials", level=logging.WARNING)

In [5]:
region = sagemaker.Session().boto_region_name
print(region)

us-east-1


In [6]:
%load_ext autoreload
%autoreload 2
%load_ext dotenv
%dotenv

import json
import logging
import sys
from pathlib import Path

import ipytest

#CODE_FOLDER = Path("code")
#sys.path.extend([f"./{CODE_FOLDER}"])

#DATA_FILEPATH = "../data/penguins.csv"

ipytest.autoconfig(raise_on_error=True)

# By default, The SageMaker SDK logs events related to the default
# configuration using the INFO level. To prevent these from spoiling
# the output of this notebook cells, we can change the logging
# level to ERROR instead.
logging.getLogger("sagemaker.config").setLevel(logging.ERROR)

In [7]:
import os

bucket = os.environ["BUCKET"]
role = os.environ["ROLE"]

COMET_API_KEY = os.environ.get("COMET_API_KEY", None)
COMET_PROJECT_NAME = os.environ.get("COMET_PROJECT_NAME", None)

In [8]:
bucket

'beercafe-learn-sagemaker'

In [9]:
from pathlib import Path
DATA_LOC = Path("../data/")
print(DATA_LOC)

S3_LOCATION = f"s3://{bucket}/uci_abalone"
print(S3_LOCATION)

..\data
s3://beercafe-learn-sagemaker/uci_abalone


In [10]:
job_name_base = 'uci-abalone-processor'

# Processing data with inbuilt sklearn container

## Create Processor

In [11]:
from sagemaker.session import Session
sagemaker_session=Session(default_bucket='beercafe-learn-sagemaker', default_bucket_prefix='abalone-processing')

In [12]:
from sagemaker.sklearn.processing import SKLearnProcessor


sklearn_processor = SKLearnProcessor(
    framework_version="1.0-1",
    role=role,
    instance_type='ml.m5.xlarge',
    instance_count=1,
    volume_size_in_gb=5,
    base_job_name=job_name_base,
    sagemaker_session=sagemaker_session
)

## Download data from sagemaker itself

In [13]:
import pandas as pd

DATA_LOC_ABALONE = DATA_LOC / 'abalone'
print(DATA_LOC_ABALONE)

s3 = boto3.client("s3")
s3.download_file(
    f"sagemaker-example-files-prod-{sagemaker_session.boto_region_name}",
    "datasets/tabular/uci_abalone/abalone.csv",
    f"{DATA_LOC_ABALONE}/abalone.csv",
)


..\data\abalone


In [14]:
df = pd.read_csv(f"{DATA_LOC_ABALONE}/abalone.csv",names=['sex','length','diameter','height','whole_weight','shucked_weight','viscera_weight','shell_weight','rings'],header=None)
df.to_csv(f"{DATA_LOC_ABALONE}/dataset.csv")
df.head()

Unnamed: 0,sex,length,diameter,height,whole_weight,shucked_weight,viscera_weight,shell_weight,rings
0,M,0.455,0.365,0.095,0.514,0.2245,0.101,0.15,15
1,M,0.35,0.265,0.09,0.2255,0.0995,0.0485,0.07,7
2,F,0.53,0.42,0.135,0.677,0.2565,0.1415,0.21,9
3,M,0.44,0.365,0.125,0.516,0.2155,0.114,0.155,10
4,I,0.33,0.255,0.08,0.205,0.0895,0.0395,0.055,7


## Prepare Processing script

In [15]:
CODE_FOLDER = Path('../code/abalone')
print(CODE_FOLDER)

..\code\abalone


In [16]:
%%writefile {CODE_FOLDER}/preprocessing.py
from __future__ import print_function
from __future__ import unicode_literals

import argparse
import csv
import os
import shutil
import sys
import time

import pyspark
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import (
    OneHotEncoder,
    StringIndexer,
    VectorAssembler,
    VectorIndexer,
)
from pyspark.sql.functions import *
from pyspark.sql.types import (
    DoubleType,
    StringType,
    StructField,
    StructType,
)


def csv_line(data):
    r = ",".join(str(d) for d in data[1])
    return str(data[0]) + "," + r


def main():
    parser = argparse.ArgumentParser(description="app inputs and outputs")
    parser.add_argument("--s3_input_bucket", type=str, help="s3 input bucket")
    parser.add_argument("--s3_input_key_prefix", type=str, help="s3 input key prefix")
    parser.add_argument("--s3_output_bucket", type=str, help="s3 output bucket")
    parser.add_argument("--s3_output_key_prefix", type=str, help="s3 output key prefix")
    args = parser.parse_args()

    spark = SparkSession.builder.appName("PySparkApp").getOrCreate()

    # This is needed to save RDDs which is the only way to write nested Dataframes into CSV format
    spark.sparkContext._jsc.hadoopConfiguration().set(
        "mapred.output.committer.class", "org.apache.hadoop.mapred.FileOutputCommitter"
    )

    # Defining the schema corresponding to the input data. The input data does not contain the headers
    schema = StructType(
        [
            StructField("sex", StringType(), True),
            StructField("length", DoubleType(), True),
            StructField("diameter", DoubleType(), True),
            StructField("height", DoubleType(), True),
            StructField("whole_weight", DoubleType(), True),
            StructField("shucked_weight", DoubleType(), True),
            StructField("viscera_weight", DoubleType(), True),
            StructField("shell_weight", DoubleType(), True),
            StructField("rings", DoubleType(), True),
        ]
    )

    # Downloading the data from S3 into a Dataframe
    total_df = spark.read.csv(
        ("s3://" + os.path.join(args.s3_input_bucket, args.s3_input_key_prefix, "abalone.csv")),
        header=False,
        schema=schema,
    )

    # StringIndexer on the sex column which has categorical value
    sex_indexer = StringIndexer(inputCol="sex", outputCol="indexed_sex")

    # one-hot-encoding is being performed on the string-indexed sex column (indexed_sex)
    sex_encoder = OneHotEncoder(inputCol="indexed_sex", outputCol="sex_vec")

    # vector-assembler will bring all the features to a 1D vector for us to save easily into CSV format
    assembler = VectorAssembler(
        inputCols=[
            "sex_vec",
            "length",
            "diameter",
            "height",
            "whole_weight",
            "shucked_weight",
            "viscera_weight",
            "shell_weight",
        ],
        outputCol="features",
    )

    # The pipeline is comprised of the steps added above
    pipeline = Pipeline(stages=[sex_indexer, sex_encoder, assembler])

    # This step trains the feature transformers
    model = pipeline.fit(total_df)

    # This step transforms the dataset with information obtained from the previous fit
    transformed_total_df = model.transform(total_df)

    # Split the overall dataset into 80-20 training and validation
    (train_df, validation_df) = transformed_total_df.randomSplit([0.8, 0.2])

    # Convert the train dataframe to RDD to save in CSV format and upload to S3
    train_rdd = train_df.rdd.map(lambda x: (x.rings, x.features))
    train_lines = train_rdd.map(csv_line)
    train_lines.saveAsTextFile(
        "s3://" + os.path.join(args.s3_output_bucket, args.s3_output_key_prefix, "train")
    )

    # Convert the validation dataframe to RDD to save in CSV format and upload to S3
    validation_rdd = validation_df.rdd.map(lambda x: (x.rings, x.features))
    validation_lines = validation_rdd.map(csv_line)
    validation_lines.saveAsTextFile(
        "s3://" + os.path.join(args.s3_output_bucket, args.s3_output_key_prefix, "validation")
    )


if __name__ == "__main__":
    main()

Overwriting ..\code\abalone/preprocessing.py


## Run Processing job

## Upload Data

In [17]:
from sagemaker.spark.processing import PySparkProcessor
from time import gmtime, strftime

timestamp_prefix = strftime("%Y-%m-%d-%H-%M-%S", gmtime())
prefix = "sagemaker/spark-preprocess-demo/{}".format(timestamp_prefix)
input_prefix_abalone = "{}/input/raw/abalone".format(prefix)
input_preprocessed_prefix_abalone = "{}/input/preprocessed/abalone".format(prefix)

sagemaker_session.upload_data(
    path=f"{DATA_LOC_ABALONE}/abalone.csv", bucket=bucket, key_prefix=input_prefix_abalone
)



's3://beercafe-learn-sagemaker/sagemaker/spark-preprocess-demo/2024-04-17-13-56-13/input/raw/abalone/abalone.csv'

## Run the processing job

In [18]:
# Run the processing job
spark_processor = PySparkProcessor(
    base_job_name="sm-spark",
    framework_version="3.1",
    role=role,
    instance_count=2,
    instance_type="ml.m5.xlarge",
    max_runtime_in_seconds=1200,
    sagemaker_session = sagemaker_session
)



spark_processor.run(
    submit_app= '../code/abalone/preprocessing.py',
    arguments=[
        "--s3_input_bucket",
        bucket,
        "--s3_input_key_prefix",
        input_prefix_abalone,
        "--s3_output_bucket",
        bucket,
        "--s3_output_key_prefix",
        input_preprocessed_prefix_abalone,
    ],
    spark_event_logs_s3_uri="s3://{}/{}/spark_event_logs".format(bucket, prefix),
    logs=False,
)

INFO:sagemaker:Creating processing-job with name sm-spark-2024-04-17-13-56-17-873


.....................................................................!

In [31]:
!aws s3 cp --quiet s3://$bucket/$input_preprocessed_prefix_abalone/train/part-00000 ../data/abalone/

Top 5 rows from s3://beercafe-learn-sagemaker/sagemaker/spark-preprocess-demo/2024-04-17-13-30-27/input/preprocessed/abalone/train/


In [1]:
spark_processor.start_history_server()

NameError: name 'spark_processor' is not defined