# Process data with PySpark running in SageMaker

The following demonstrates how to prepare a dataset with PySpark, both locally and using SageMaker Processing jobs.

Amazon SageMaker allows you to run steps for data pre- or post-processing, feature engineering, data validation, or model evaluation workloads using its managed infrastructure. This reduces the overhead in compute capacity and environments management.

<div style="text-align:center">
    <img src="media/manual.png" width="800"/>
</div>

## Part 1: you can develop and debug with Spark locally using sample data

In [None]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as f

spark = SparkSession.builder.appName("PySparkApp").getOrCreate()
df = spark.read.csv('data/netflix_titles.csv', header=True)

In [None]:
# DO PROCESSING HERE
df.show(3)

In [None]:
# SAVE OUTPUT FILES
df.write.save('data/output', format='csv', header=True)

## Part 2: when ready, you can scale Spark jobs with SageMaker Processing

### Import SageMaker Python sdk

In [None]:
!pip install -q sagemaker==2.16.1

In [None]:
import sagemaker
from sagemaker.spark.processing import PySparkProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput

sagemaker_session = sagemaker.Session()
role = sagemaker.get_execution_role() # we are using the notebook instance role for processing in this example
bucket = sagemaker_session.default_bucket() # you can specify a bucket name here
prefix = 'spark'

### Push data to S3 from the Notebook Instance

Let's now push this dataset in the default S3 Bucket from our SageMaker Notebook instance:

In [None]:
s3_input = sagemaker_session.upload_data(
    path='data/netflix_titles.csv', 
    bucket=bucket, 
    key_prefix=f'{prefix}/input'
)
print(s3_input)

### Launch SageMaker Processing job

<div style="text-align:center">
    <img src="media/processing_spark.jpeg" width="700"/>
</div>

In [None]:
pyspark_processor = PySparkProcessor(
    role=role,
    instance_count=1,
    instance_type='ml.c5.xlarge',
    base_job_name='spark-processing',
    framework_version='2.4'
)

In [None]:
s3_output=f's3://{bucket}/{prefix}/output'
s3_spark_event_logs=f's3://{bucket}/{prefix}/spark_event_logs'

pyspark_processor.run(
    submit_app='code/prepare_data.py',
    arguments= [
        f'--s3_input={s3_input}',
        f'--s3_output={s3_output}'
    ],
    spark_event_logs_s3_uri=s3_spark_event_logs,
    logs=False
)

## Part 3: using the Spark History Server

While script is running, or after script has run, you can view spark UI by running history server locally or in the notebook. By default, the s3 URI you provided in previous run() method will be used as spark event source, but you can also specify a different URI. Last but not the least, you can terminate the history server with terminate_history_server(). Note that only one history server process will be running at a time. [See here](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_processing.html#spark-history-server) for more details