## Imports

In [None]:
import sagemaker as sm
from sagemaker.spark.processing import PySparkProcessor

### SageMaker Parameters

In [None]:
role              = sm.get_execution_role()
sagemaker_session = sm.session.Session()
region            = sagemaker_session._region_name
bucket            = sagemaker_session.default_bucket()

### Create PySpark Script

In [None]:
%%writefile ./data_processing_pyspark.py

import sys
import pyspark

from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType
from pyspark.sql.types import DoubleType
from pyspark.sql.types import StructField
from pyspark.sql.functions import *


def main(args):
    spark = SparkSession.builder.appName("PySparkApp").getOrCreate()
    spark.sparkContext._jsc.hadoopConfiguration().set(
        "mapred.output.committer.class",
        "org.apache.hadoop.mapred.FileOutputCommitter"
    )
    
    schema = StructType(
        [
            StructField("column1", DoubleType(), True),
            
        ]
    )
    # Place processing code here


if __name__ == "__main__":
    args = iter(sys.argv[1:])
    args = dict(zip(args, args))
    main(args)

### Set parameters to pass to the Spark Container

In [None]:
input_prefix_pyspark = 'csv_bucket'
output_prefix_pyspark = 'csv_bucket_processed'
spark_event_logs_s3_uri = f"s3://{bucket}/data_processing/store-spark-events"

### Create PySpark Processor

In [None]:
spark_processor = PySparkProcessor(
    base_job_name="spark-preprocessor",
    framework_version="2.4",
    role=role,
    instance_count=2,
    instance_type="ml.m5.xlarge",
    max_runtime_in_seconds=1200,
    env={"mode": "python"},
)

### Run PySpark Processing Job

In [None]:
spark_processor.run(
    submit_app="data_processing_pyspark.py",
    arguments=
    [
        's3_input_bucket', bucket,
        's3_input_key_prefix', input_prefix_pyspark,
        's3_output_bucket', bucket,
        's3_output_key_prefix', output_prefix_pyspark
    ],
    spark_event_logs_s3_uri=spark_event_logs_s3_uri
)