In [None]:
import sagemaker_pyspark
from pyspark.sql import SparkSession
import os
# --- Step 0: Clear any empty/conflicting AWS environment variables ---
# This ensures that the S3A connector does not pick up empty credentials.
os.environ.pop("AWS_ACCESS_KEY_ID", None)
os.environ.pop("AWS_SECRET_ACCESS_KEY", None)
# ------------------------------------------
# 1. Initialize SparkSession with SageMaker Jars
# ------------------------------------------
# This will add the SageMaker-related jar files to the Spark driver classpath.
classpath = ":".join(sagemaker_pyspark.classpath_jars())
spark = SparkSession.builder \
    .appName("XGBoostSageMakerExample") \
    .config("spark.driver.extraClassPath", classpath) \
    .config("spark.driver.userClassPathFirst", "true") \
    .config("spark.executor.userClassPathFirst", "true") \
    .config("spark.hadoop.fs.s3a.access.key", "AKIAR3HUOTHCI6ZEO5FV") \
    .config("spark.hadoop.fs.s3a.secret.key", "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") \
    .config("spark.hadoop.fs.s3a.endpoint", "s3.amazonaws.com") \
    .config("spark.hadoop.fs.s3a.aws.credentials.provider",
            "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .getOrCreate()

In [14]:
# 3. (Optional) Also explicitly set these properties in the Hadoop configuration.
hadoop_conf = spark.sparkContext._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3a.access.key", "AKIAR3HUOTHCI6ZEO5FV")
hadoop_conf.set("fs.s3a.secret.key",
                "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")
hadoop_conf.set("fs.s3a.endpoint", "s3.amazonaws.com")
hadoop_conf.set("fs.s3a.aws.credentials.provider",
                "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
hadoop_conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")

# 4. Define region and S3 path for the training data.
region = "us-east-1"
data_path = "s3a://sagemaker-sample-data-{}/spark/mnist/train/".format(region)

# 5. Load the training and test data in libsvm format.
training_data = spark.read.format("libsvm") \
    .option("numFeatures", "784") \
    .load(data_path)

test_data = spark.read.format("libsvm") \
    .option("numFeatures", "784") \
    .load(data_path)

print("Training data schema:", training_data.schema)

Training data schema: StructType([StructField('label', DoubleType(), True), StructField('features', VectorUDT(), True)])


In [16]:
training_data.printSchema()

root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)



In [17]:
# ------------------------------------------
# 4. Configure the XGBoost SageMaker Estimator
# ------------------------------------------
from sagemaker_pyspark.algorithms import XGBoostSageMakerEstimator
from sagemaker_pyspark import IAMRole

# Define the IAM role to use.
iam_role = "arn:aws:iam::127214197188:role/sagemaker-service-role"  # <-- Update this!

# Create an estimator instance with the desired configuration.
# Note that the instance types and counts can be adjusted to suit your needs.
xgboost_estimator = XGBoostSageMakerEstimator(
    # trainingInstanceType="ml.m4.xlarge",
    trainingInstanceType="local",
    trainingInstanceCount=1,
    # endpointInstanceType="ml.m4.xlarge",
    endpointInstanceType="local",
    endpointInitialInstanceCount=1,
    sagemakerRole=IAMRole(iam_role)
)

# Set the hyperparameters required by the XGBoost algorithm.
# In this example, we perform multi-class classification:
# - 'multi:softmax' sets the objective for multi-class classification.
# - 'numRound' is the number of boosting rounds.
# - 'numClasses' defines the number of classes.
xgboost_estimator.setObjective("multi:softmax")
xgboost_estimator.setNumRound(25)
xgboost_estimator.setNumClasses(10)

In [19]:
# # ------------------------------------------
# # 5. Train the Model and Deploy as a SageMaker Endpoint
# # ------------------------------------------
# # Calling fit() will:
# #   - Launch a SageMaker training job using the training_data.
# #   - Deploy the resulting model as a hosted endpoint.
# xgboost_model = xgboost_estimator.fit(training_data)

# # ------------------------------------------
# # 6. Use the Deployed Model for Predictions
# # ------------------------------------------
# # The returned xgboost_model is a SageMakerModel. Calling transform() sends the test data
# # to the deployed endpoint and returns a DataFrame with predictions.
# predictions = xgboost_model.transform(test_data)
# predictions.show(truncate=False)

# # ------------------------------------------
# # Optional: Cleanup
# # ------------------------------------------
# # Depending on the SDK behavior, the endpoint might persist after this code runs.
# # If you are done with the endpoint, make sure to delete it (either manually or via code)
# # to avoid incurring ongoing costs.

In [23]:
from sagemaker_pyspark import IAMRole
from sagemaker_pyspark.algorithms import XGBoostSageMakerEstimator
from pyspark.sql import SparkSession
import sagemaker_pyspark
import os
# Clear any conflicting AWS environment variables.
os.environ.pop("AWS_ACCESS_KEY_ID", None)
os.environ.pop("AWS_SECRET_ACCESS_KEY", None)


# Get SageMaker jar files from the sagemaker_pyspark package.
sagemaker_classpath = ":".join(sagemaker_pyspark.classpath_jars())

# (Optional) If you want to force a particular AWS SDK version on the driver,
# you can try adding a compatible AWS SDK bundle jar.
# For example:
# aws_sdk_bundle_jar = "/path/to/aws-java-sdk-bundle-1.11.375.jar"
# combined_classpath = sagemaker_classpath + ":" + aws_sdk_bundle_jar
# Otherwise, we can continue using sagemaker_classpath alone.
combined_classpath = sagemaker_classpath

# Build the SparkSession.
spark = SparkSession.builder \
    .appName("XGBoostSageMakerLocalExample") \
    .config("spark.driver.extraClassPath", combined_classpath) \
    .config("spark.executor.extraClassPath", combined_classpath) \
    .config("spark.jars", combined_classpath) \
    .config("spark.driver.userClassPathFirst", "true") \
    .config("spark.executor.userClassPathFirst", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.access.key", "AKIAR3HUOTHCI6ZEO5FV") \
    .config("spark.hadoop.fs.s3a.secret.key", "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") \
    .config("spark.hadoop.fs.s3a.endpoint", "s3.amazonaws.com") \
    .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \
    .getOrCreate()

# Also set these explicitly in the Hadoop configuration.
hadoop_conf = spark.sparkContext._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
hadoop_conf.set("fs.s3a.access.key", "AKIAR3HUOTHCI6ZEO5FV")
hadoop_conf.set("fs.s3a.secret.key",
                "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")
hadoop_conf.set("fs.s3a.endpoint", "s3.amazonaws.com")
hadoop_conf.set("fs.s3a.aws.credentials.provider",
                "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")

# Define region and S3 path for training data.
region = "us-east-1"
data_path = "s3a://sagemaker-sample-data-{}/spark/mnist/train/".format(region)

# Load training and test data in libsvm format.
training_data = spark.read.format("libsvm") \
    .option("numFeatures", "784") \
    .load(data_path)
test_data = spark.read.format("libsvm") \
    .option("numFeatures", "784") \
    .load(data_path)

print("Training data schema:")
training_data.printSchema()

# --------------------------------------------------------------------
# Configure the XGBoost SageMaker Estimator in local mode.
# --------------------------------------------------------------------

# Specify a training image known to work in local mode.
# For example, use the Amazon ECR image for XGBoost:1.0-1-cpu-py3.
# (Make sure you have Docker running so that local mode can pull and run this image.)
training_image = "382416733822.dkr.ecr.us-east-1.amazonaws.com/xgboost:1.0-1-cpu-py3"

xgboost_estimator = XGBoostSageMakerEstimator(
    # Run training in local (Docker) mode.
    trainingInstanceType="local",
    trainingInstanceCount=1,
    endpointInstanceType="local",           # Run inference in local mode.
    endpointInitialInstanceCount=1,
    sagemakerRole=IAMRole(
        "arn:aws:iam::127214197188:role/sagemaker-service-role"),
    trainingImage=training_image            # Force use of the specified image.
)

# Set hyperparameters for multi-class classification.
xgboost_estimator.setObjective("multi:softmax")
xgboost_estimator.setNumRound(25)
xgboost_estimator.setNumClasses(10)

# --------------------------------------------------------------------
# Train the model locally (using Docker) and get predictions.
# --------------------------------------------------------------------
xgboost_model = xgboost_estimator.fit(training_data)
predictions = xgboost_model.transform(test_data)
predictions.show(truncate=False)

25/02/05 20:20:24 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


Exception ignored in: <function JavaWrapper.__del__ at 0x109e21120>
Traceback (most recent call last):
  File "/Users/user/projects/sagemaker-spark/venv/lib/python3.12/site-packages/pyspark/ml/wrapper.py", line 53, in __del__
    if SparkContext._active_spark_context and self._java_obj is not None:
                                              ^^^^^^^^^^^^^^
AttributeError: 'XGBoostSageMakerEstimator' object has no attribute '_java_obj'


Training data schema:
root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)



TypeError: XGBoostSageMakerEstimator.__init__() got an unexpected keyword argument 'trainingImage'

In [None]:
# 4o
from pyspark.sql import SparkSession

# Initialize PySpark
spark = SparkSession.builder.appName("SageMakerPySparkExample").getOrCreate()

# AWS default dataset (Change region if needed)
region = "us-east-1"
training_data_path = f"s3a://sagemaker-sample-data-{region}/spark/mnist/train/"

# Load training data
training_data = spark.read.format("libsvm").option("numFeatures", "784").load(training_data_path)

test_data = spark.read.format("libsvm").option("numFeatures", "784").load(training_data_path)

print("Training data schema:")
training_data.printSchema()

Training data schema:
root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)



In [26]:
# R1 
from pyspark.sql import SparkSession
import sagemaker_pyspark

# Load SageMaker JARs
classpath = ":".join(sagemaker_pyspark.classpath_jars())
spark = SparkSession.builder.config("spark.driver.extraClassPath", classpath).getOrCreate()

# Load sample data
region = "us-east-1"
training_data = spark.read.format("libsvm") \
  .option("numFeatures", "784") \
  .load(f"s3a://sagemaker-sample-data-{region}/spark/mnist/train/")

test_data = spark.read.format("libsvm") \
  .option("numFeatures", "784") \
  .load(f"s3a://sagemaker-sample-data-{region}/spark/mnist/train/")
print("Training data schema:")
training_data.printSchema()

Training data schema:
root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)

