## Running Spark History Server: 
#### (Note: this feature will only work in a local development environment with docker installed or on a Sagemaker Notebook Instance. This feature does not currently work in SageMaker Studio.)

In [None]:
# import packages
import json
import ast
import boto3
import sagemaker
from sagemaker.spark.processing import PySparkProcessor

sagemaker_session = sagemaker.Session()

s3 = boto3.resource('s3')

content_object = s3.Object('example-infra-bucket-pyspark-blogpost', 'ml_pipeline/params/pipeline_params.json')
file_content = content_object.get()['Body'].read().decode('utf-8')
pipeline_params = json.loads(file_content)

# getting pre-process spark ui log s3 output location
process_spark_ui_log_output = "s3://{}/spark_ui_logs/{}".format(
    pipeline_params["data_bucket"],
    pipeline_params["trial"]
)

# setting up processing arguments
process_args = [
    "--input_table", pipeline_params["pyspark_process_data_input"].format(pipeline_params["data_bucket"]),
    "--output_table", pipeline_params["pyspark_process_data_output"].format(pipeline_params["data_bucket"])
]

# setting process code
process_code = "s3://{}/{}/{}".format(
    pipeline_params["infra_bucket"],
    pipeline_params["processing_key"],
    pipeline_params["pyspark_process_code"]
)

# setting process support python files
process_helpers = [
    "s3://{}/{}/{}".format(
        pipeline_params["infra_bucket"],
        pipeline_params["helper_key"],
        pipeline_params["data_utils_code"]
    )
]

# import spark config used in pipeline run
content_object = s3.Object(pipeline_params["infra_bucket"], pipeline_params["spark_key"] + "/" + pipeline_params["spark_config"])
file_content = content_object.get()['Body'].read().decode('utf-8')
spark_conf = json.loads(file_content)
spark_conf = json.dumps(spark_conf)
# transforming string into literal 
spark_conf = ast.literal_eval(spark_conf)

# Create Spark Processor
spark_processor = PySparkProcessor(
    base_job_name=pipeline_params["pyspark_process_name"],
    framework_version=pipeline_params["pyspark_framework_version"],
    role=pipeline_params["pipeline_role"],
    instance_count=pipeline_params["pyspark_process_instance_count"],
    instance_type=pipeline_params["pyspark_process_instance_type"],
    sagemaker_session=sagemaker_session,
)

spark_processor.run(
    submit_app=process_code,
    submit_py_files=process_helpers,
    arguments=process_args,
    spark_event_logs_s3_uri=process_spark_ui_log_output,
    logs=False,
    kms_key=pipeline_params["pyspark_process_volume_kms"],
    configuration=spark_conf
)

# Run spark history server to show Spark UI
spark_processor.start_history_server(spark_event_logs_s3_uri=process_spark_ui_log_output)

# Terminate spark history server
spark_processor.terminate_history_server()
