# Processing large datasets with Apache Spark and Amazon SageMaker

***This notebook run on `Data Science 3.0 - Python 3` kernel on a `ml.t3.large` instance***.

Amazon SageMaker Processing Jobs are used  to analyze data and evaluate machine learning models on Amazon SageMaker. With Processing, you can use a simplified, managed experience on SageMaker to run your data processing workloads, such as feature engineering, data validation, model evaluation, and model interpretation. You can also use the Amazon SageMaker Processing APIs during the experimentation phase and after the code is deployed in production to evaluate performance.

        


![](https://docs.aws.amazon.com/images/sagemaker/latest/dg/images/Processing-1.png)

The preceding diagram shows how Amazon SageMaker spins up a Processing job. Amazon SageMaker takes your script, copies your data from Amazon Simple Storage Service (Amazon S3), and then pulls a processing container. The processing container image can either be an Amazon SageMaker built-in image or a custom image that you provide. The underlying infrastructure for a Processing job is fully managed by Amazon SageMaker. Cluster resources are provisioned for the duration of your job, and cleaned up when a job completes. The output of the Processing job is stored in the Amazon S3 bucket you specified.

## Our workflow for processing large amounts of data with SageMaker

We can divide our workflow into two steps:
    
1. Work with a small subset of the data with Spark running in local model in a SageMaker Studio Notebook.

1. Once we are able to work with the small subset of data we can provide the same code (as a Python script rather than a series of interactive steps) to SageMaker Processing which launched a Spark cluster, runs out code and terminates the cluster.

## In this notebook...

We will process the [News Summarization](https://www.kaggle.com/datasets/sbhatti/news-summarization) dataset on Kaggle. The following data is intended for advancing news summarization research. It's three datasets (XSum, CNN/Daily Mail, Multi-News) combined into one easy-to-use CSV file, we have converted this to Parquet format for this lab.

⚠️We have converted the CSV file to Parquet for this lab, why do you think that was done?⚠️

1. The data is available as two Parquet files: `s3://bigdatateaching/news/data1000.parquet` which is a small subset of data and `s3://bigdatateaching/news/data.parquet` which is the full dataset.

1. We will first read a small subset `s3://bigdatateaching/news/data1000.parquet` of the data locally and do all our analysis in this notebook. We will run some analytics (the usual derive new features, group-summarize type stuff) and then also some sentiment analysts using the [spark-nlp](https://sparknlp.org/) library.

1. We will then repeat the same operation on the full dataset `s3://bigdatateaching/news/data.parquet`. This dataset is about 800,000 rows. This operation is too big to be run on a `ml.t3.large` instance (2 vCPU, 8GB RAM) so we will run this on 6 machines of `ml.m5.xlarge` instance type (4 vCPUs, 16GB RAM).

### Spark NLP

The Spark NLP library provided by _John Snow Labs_ is available to us in Python via the [spark-nlp==5.1.3](https://pypi.org/project/spark-nlp/) Python package and the [Spark NLP Assembly Jar spark-nlp-assembly-5.1.3.jar](https://s3.amazonaws.com/auxdata.johnsnowlabs.com/public/jars/spark-nlp-assembly-5.1.3.jar) Java Archive (JAR) file, read mode [here](https://sparknlp.org/docs/en/install) (BOTH the Python package and Java archive are required to use Spark NLP).

Also see these compatability tables ([1](https://github.com/JohnSnowLabs/spark-nlp#apache-spark-support), [2](https://github.com/JohnSnowLabs/spark-nlp#scala-and-python-support)) for what version of Spark NLP is compatible with which all versions of Spark, Python, Java and Scala.

>Why do we need a Assembly JAR file? Because the Spark NLP JAR has dependencies, all of which need to be downloaded as well to use Spark NLP, so John Snow Labs has made it convenient to download all the JAR files in one single Assembly JAR.

## Setup
We need an available Java installation to run pyspark. The easiest way to do this is to install JDK and set the proper paths using conda.

In [2]:
# Setup - Run only once per Kernel App
%conda install openjdk -y

# install PySpark
%pip install pyspark==3.4.0

# install spark-nlp
%pip install spark-nlp==5.1.3

# restart kernel
from IPython.core.display import HTML
HTML("<script>Jupyter.notebook.kernel.restart()</script>")

Collecting package metadata (current_repodata.json): done
Solving environment: done


  current version: 23.3.1
  latest version: 23.9.0

Please update conda by running

    $ conda update -n base -c defaults conda

Or to minimize the number of packages updated during conda update use

     conda install conda=23.9.0



## Package Plan ##

  environment location: /opt/conda

  added / updated specs:
    - openjdk


The following packages will be downloaded:

    package                    |            build
    ---------------------------|-----------------
    ca-certificates-2023.08.22 |       h06a4308_0         123 KB
    certifi-2023.7.22          |  py310h06a4308_0         153 KB
    openjdk-11.0.13            |       h87a67e3_0       341.0 MB
    openssl-1.1.1w             |       h7f8727e_0         3.7 MB
    ------------------------------------------------------------
                                           Total:       345.0 MB

The following NEW packages will be INSTALLED:


## Download the data locally

In [3]:
!aws s3 cp s3://bigdatateaching/news/data1000.parquet . --request-payer requester

download: s3://bigdatateaching/news/data1000.parquet to ./data1000.parquet


In [9]:
## Copy the full dataset to the SageMaker bucket
import sagemaker
sess = sagemaker.Session()
bucket = sess.default_bucket()
!aws s3 cp s3://bigdatateaching/news/data1000.parquet s3://{bucket}/lab8/news/ --request-payer requester
!aws s3 cp s3://bigdatateaching/news/data.parquet s3://{bucket}/lab8/news/ --request-payer requester

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
copy: s3://bigdatateaching/news/data1000.parquet to s3://sagemaker-us-east-1-038932893404/lab8/news/data1000.parquet
copy: s3://bigdatateaching/news/data.parquet to s3://sagemaker-us-east-1-038932893404/lab8/news/data.parquet


In [12]:
!aws s3 ls s3://{bucket}/lab8/news/

2023-10-15 20:15:00 1617893454 data.parquet
2023-10-15 20:14:58    2833149 data1000.parquet


## With with the local PySpark cluster and Spark NLP

Now we are going to start a local [Spark NLP session with Python](https://sparknlp.org/docs/en/install#start-spark-nlp-session-from-python). ***Do NOT forget to remove the `.master("local[*]")` line when moving this code to a script that runs on a Spark cluster***.

In [2]:
import json
import sparknlp
import numpy as np
import pandas as pd
from sparknlp.base import *
from pyspark.ml import Pipeline
from sparknlp.annotator import *
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from sparknlp.pretrained import PretrainedPipeline

In [3]:
# Import pyspark and build Spark session
spark = SparkSession.builder \
    .appName("Spark NLP")\
    .master("local[*]")\
    .config("spark.driver.memory","16G")\
    .config("spark.driver.maxResultSize", "0") \
    .config("spark.kryoserializer.buffer.max", "2000M")\
    .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:5.1.3")\
    .getOrCreate()


:: loading settings :: url = jar:file:/opt/conda/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
com.johnsnowlabs.nlp#spark-nlp_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-804d719a-edab-44b8-b008-b10766fe2840;1.0
	confs: [default]
	found com.johnsnowlabs.nlp#spark-nlp_2.12;5.1.3 in central
	found com.typesafe#config;1.4.2 in central
	found org.rocksdb#rocksdbjni;6.29.5 in central
	found com.amazonaws#aws-java-sdk-bundle;1.11.828 in central
	found com.github.universal-automata#liblevenshtein;3.0.0 in central
	found com.google.protobuf#protobuf-java-util;3.0.0-beta-3 in central
	found com.google.protobuf#protobuf-java;3.0.0-beta-3 in central
	found com.google.code.gson#gson;2.3 in central
	found it.unimi.dsi#fastutil;7.0.12 in central
	found org.projectlombok#lombok;1.16.8 in central
	found com.google.cloud#google-cloud-storage;2.20.1 in central
	found com.google.guava#guava;31.1-jre in central
	found com.google.guava#failureaccess;1.0.1 

In [4]:
print(f"Spark version: {spark.version}")
print(f"sparknlp version: {sparknlp.version()}")

Spark version: 3.4.0
sparknlp version: 5.1.3


### Reading data into a Spark Dataframe
Note that we will be using the "s3a" adapter (read more [here](https://aws.amazon.com/blogs/opensource/community-collaboration-the-s3a-story)). S3A enables Hadoop to directly read and write Amazon S3 objects.

In [5]:
%%time
fpath = "data1000.parquet"
df = spark.read.parquet(fpath,
    header=True
)
df.show()

[Stage 1:>                                                          (0 + 1) / 1]

+--------------------+--------------------+--------------------+--------------+
|                  ID|             Content|             Summary|       Dataset|
+--------------------+--------------------+--------------------+--------------+
|f49ee725a0360aa68...|New York police a...|Police have inves...|CNN/Daily Mail|
|808fe317a53fbd313...|By . Ryan Lipman ...|Porn star Angela ...|CNN/Daily Mail|
|98fd67bd343e58bc4...|This was, Sergio ...|American draws in...|CNN/Daily Mail|
|e12b5bd7056287049...|An Ebola outbreak...|World Health Orga...|CNN/Daily Mail|
|b83e8bcfcd5141984...|By . Associated P...|A sinkhole opened...|CNN/Daily Mail|
|9c2b9de4b8928f63b...|Jerusalem woke up...|Two Palestinians ...|CNN/Daily Mail|
|550c7ea14b4ec91db...|An Australian fat...|Zia Abdul Haq is ...|CNN/Daily Mail|
|c6dbfd89aa8485511...|A mother whose pr...|Jocelyn Bennett a...|CNN/Daily Mail|
|85fa186e116866297...|A community stalw...|Rahmat Ali Raja, ...|CNN/Daily Mail|
|f6e79e2f206634f24...|(CNN) -- Congress.

                                                                                

In [6]:
df.printSchema()

root
 |-- ID: string (nullable = true)
 |-- Content: string (nullable = true)
 |-- Summary: string (nullable = true)
 |-- Dataset: string (nullable = true)



In [7]:
df = df.repartition(64)

### Analytics operations

Let us now do a few analytics operations locally in this notebook.

Dataframe shape

In [8]:
%%time
print(f"shape of the dataframe is {df.count():,}x{len(df.columns)}")



shape of the dataframe is 1,000x4
CPU times: user 5.71 ms, sys: 411 µs, total: 6.13 ms
Wall time: 2.2 s


                                                                                

### Construct four dummy variables using pyspark and the regex function `rlike`. You will make each dummy variable using the regex statement provided below:

|dummy | regex|
|-----------|-----------|
|politics|**(?i)politics\|(?i)political\|(?i)senate\|(?i)government\|(?i)president\|(?i)prime minister\|(?i)congress**|
|sports|**(?i)sport\|(?i)ball\|(?i)coach\|(?i)goal\|(?i)baseball\|(?i)football\|(?i)basketball**|
|arts|**(?i)art\|(?i)painting\|(?i)artist\|(?i)museum\|(?i)photography\|(?i)sculpture**|
|history|**(?i)history\|(?i)historical\|(?i)ancient\|(?i)archaeology\|(?i)heritage\|(?i)fossil**|

In [9]:
df = df\
    .withColumn('politics', F.col("Content").rlike("""(?i)politics|(?i)political|(?i)senate|(?i)democrats|
(?i)republicans|(?i)government|(?i)president|(?i)prime minister|(?i)congress"""))\
    .withColumn('sports', F.col("Content").rlike("""(?i)sport|(?i)ball|(?i)coach|(?i)goal|(?i)baseball|(?i)football|(?i)basketball"""))\
    .withColumn('arts', F.col("Content").rlike("""(?i)art|(?i)painting|(?i)artist|(?i)museum|(?i)photography|(?i)sculpture"""))\
    .withColumn('history', F.col("Content").rlike("""(?i)history|(?i)historical|(?i)ancient|(?i)archaeology|(?i)heritage|(?i)fossil""")).persist()


In [10]:
df.select('politics','sports','arts','history').show(5)

+--------+------+----+-------+
|politics|sports|arts|history|
+--------+------+----+-------+
|   false| false|true|  false|
|   false|  true|true|  false|
|   false| false|true|  false|
|    true| false|true|  false|
|   false|  true|true|  false|
+--------+------+----+-------+
only showing top 5 rows



### Show counts of each dummy variable in the dataset. Save the result for the dummy count of `arts` to the variable name specified

In [11]:
categories = ['politics', 'arts', 'sports', 'history']
for c in categories:
    df.groupBy(c).count().show()

                                                                                

+--------+-----+
|politics|count|
+--------+-----+
|    true|  326|
|   false|  674|
+--------+-----+



                                                                                

+-----+-----+
| arts|count|
+-----+-----+
| true|  809|
|false|  191|
+-----+-----+



                                                                                

+------+-----+
|sports|count|
+------+-----+
|  true|  342|
| false|  658|
+------+-----+





+-------+-----+
|history|count|
+-------+-----+
|   true|  131|
|  false|  869|
+-------+-----+



                                                                                

In [12]:
df_art_soln = df.groupBy('arts').count().toPandas().to_dict(orient='records')
df_art_soln

                                                                                

[{'arts': True, 'count': 809}, {'arts': False, 'count': 191}]

### Build a SparkNLP Pipeline to construct positive/negative sentiment

We are going to make a sentiment model work for this dataset. Remember, we are working with a smaller dataset before we move on to the larger dataset.

In [13]:
MODEL_NAME='sentimentdl_use_twitter'

documentAssembler = DocumentAssembler()\
    .setInputCol("Content")\
    .setOutputCol("document")
    
use = UniversalSentenceEncoder.pretrained(name="tfhub_use", lang="en")\
 .setInputCols(["document"])\
 .setOutputCol("sentence_embeddings")


sentimentdl = SentimentDLModel.pretrained(name=MODEL_NAME, lang="en")\
    .setInputCols(["sentence_embeddings"])\
    .setOutputCol("sentiment")

nlpPipeline = Pipeline(
      stages = [
          documentAssembler,
          use,
          sentimentdl
      ])

tfhub_use download started this may take some time.
Approximate size to download 923.7 MB
[ | ]tfhub_use download started this may take some time.
Approximate size to download 923.7 MB
[ / ]Download done! Loading the resource.
[ \ ]

2023-10-15 18:45:59.242674: I external/org_tensorflow/tensorflow/core/platform/cpu_feature_guard.cc:151] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.


[ / ]

2023-10-15 18:46:04.996808: W external/org_tensorflow/tensorflow/core/framework/cpu_allocator_impl.cc:82] Allocation of 60236800 exceeds 10% of free system memory.
2023-10-15 18:46:05.063314: W external/org_tensorflow/tensorflow/core/framework/cpu_allocator_impl.cc:82] Allocation of 60236800 exceeds 10% of free system memory.
2023-10-15 18:46:05.127811: W external/org_tensorflow/tensorflow/core/framework/cpu_allocator_impl.cc:82] Allocation of 60236800 exceeds 10% of free system memory.
2023-10-15 18:46:05.192137: W external/org_tensorflow/tensorflow/core/framework/cpu_allocator_impl.cc:82] Allocation of 60236800 exceeds 10% of free system memory.
2023-10-15 18:46:05.256899: W external/org_tensorflow/tensorflow/core/framework/cpu_allocator_impl.cc:82] Allocation of 60236800 exceeds 10% of free system memory.


[ — ]



[OK!]
sentimentdl_use_twitter download started this may take some time.
Approximate size to download 11.4 MB
[ | ]sentimentdl_use_twitter download started this may take some time.
Approximate size to download 11.4 MB
[ / ]Download done! Loading the resource.
[OK!]


In [14]:
pipelineModel = nlpPipeline.fit(df)
results = pipelineModel.transform(df)

In [15]:
results.printSchema()

root
 |-- ID: string (nullable = true)
 |-- Content: string (nullable = true)
 |-- Summary: string (nullable = true)
 |-- Dataset: string (nullable = true)
 |-- politics: boolean (nullable = true)
 |-- sports: boolean (nullable = true)
 |-- arts: boolean (nullable = true)
 |-- history: boolean (nullable = true)
 |-- document: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- annotatorType: string (nullable = true)
 |    |    |-- begin: integer (nullable = false)
 |    |    |-- end: integer (nullable = false)
 |    |    |-- result: string (nullable = true)
 |    |    |-- metadata: map (nullable = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: string (valueContainsNull = true)
 |    |    |-- embeddings: array (nullable = true)
 |    |    |    |-- element: float (containsNull = false)
 |-- sentence_embeddings: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- annotatorType: string (nullable = true)

### Pull out the sentiment output into its own column in the main dataframe. Create a new dataframe that includes sentiment, news source, and your four dummy variables.

In [16]:
results=results.withColumn('sentiment',F.explode(results.sentiment.result))
final_data=results.select('politics','arts','sports','history', F.expr("Dataset").alias("news_source"),'sentiment')
final_data.persist()
final_data.show()

[Stage 42:>                                                         (0 + 1) / 1]

+--------+-----+------+-------+--------------+---------+
|politics| arts|sports|history|   news_source|sentiment|
+--------+-----+------+-------+--------------+---------+
|   false| true| false|  false|CNN/Daily Mail| negative|
|   false| true|  true|  false|CNN/Daily Mail| negative|
|   false| true| false|  false|CNN/Daily Mail| negative|
|    true| true| false|  false|CNN/Daily Mail| negative|
|   false| true|  true|  false|CNN/Daily Mail| negative|
|    true| true| false|  false|CNN/Daily Mail| negative|
|   false| true| false|  false|          XSum| negative|
|   false| true| false|  false|          XSum| negative|
|    true| true| false|   true|CNN/Daily Mail| negative|
|   false| true| false|  false|CNN/Daily Mail| negative|
|   false|false| false|  false|CNN/Daily Mail| negative|
|   false| true| false|  false|CNN/Daily Mail| negative|
|   false| true|  true|  false|    Multi-News| negative|
|   false| true| false|  false|CNN/Daily Mail| negative|
|   false| true|  true|   true|

                                                                                

### Create a summary table of the count of articles grouped by your `politics` dummy variable, news source `news_source`, and sentiment classification `sentiment`. Save the resulting dataframe into a variable called `df_sent_baseline`, similar to the previous step for saving the Pandas dataframe.

***The following code gives an exception because the `ml.t3.large` instance type runs out of memory and we cant change the instance type to one with more memory, but the code works whe we put it on a Spark cluster as part of a script***.

In [19]:
"""
sum_counts = final_data.groupBy(['politics', 'news_source', 'sentiment']).count()
df_sent_baseline = sum_counts.toPandas().to_dict(orient='records')
df_sent_baseline
"""

"\nsum_counts = final_data.groupBy(['politics', 'news_source', 'sentiment']).count()\ndf_sent_baseline = sum_counts.toPandas().to_dict(orient='records')\ndf_sent_baseline\n"

## Process S3 data with SageMaker Processing Job `PySparkProcessor`

We are going to move the above processing code in a Python file and then submit that file to SageMaker Processing Job's [`PySparkProcessor`](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_processing.html#pysparkprocessor).

### Download the Spark NLP NLP Assembly JAR file

Notice how we are able to write the file directly to S3 without having to first store it in this notebook locally.

In [None]:
import sagemaker
session = sagemaker.Session()
bucket = session.default_bucket()
!wget -qO- https://s3.amazonaws.com/auxdata.johnsnowlabs.com/public/jars/spark-nlp-assembly-5.1.3.jar | aws s3 cp - s3://{bucket}/lab8/spark-nlp-assembly-5.1.3.jar
!aws s3 ls s3://{bucket}/lab8/spark-nlp-assembly-5.1.3.jar

In [4]:
!mkdir -p ./code

In [35]:
%%writefile ./code/process.py

import os
import sys
import logging
import argparse

# Import pyspark and build Spark session
from pyspark.sql.functions import *
from pyspark.sql.types import (
    DoubleType,
    IntegerType,
    StringType,
    StructField,
    StructType,
)

import json
import sparknlp
import numpy as np
import pandas as pd
from sparknlp.base import *
from pyspark.ml import Pipeline
from sparknlp.annotator import *
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from sparknlp.pretrained import PretrainedPipeline

logging.basicConfig(format='%(asctime)s,%(levelname)s,%(module)s,%(filename)s,%(lineno)d,%(message)s', level=logging.DEBUG)
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler(sys.stdout))

def main():
    parser = argparse.ArgumentParser(description="app inputs and outputs")
    parser.add_argument("--s3_dataset_path", type=str, help="Path of dataset in S3")
    parser.add_argument("--s3_output_bucket", type=str, help="s3 output bucket")
    parser.add_argument("--s3_output_key_prefix", type=str, help="s3 output key prefix")
    args = parser.parse_args()
    logger.info(f"args={args}")
    
    spark = SparkSession.builder \
    .appName("Spark NLP")\
    .config("spark.driver.memory","16G")\
    .config("spark.driver.maxResultSize", "0") \
    .config("spark.kryoserializer.buffer.max", "2000M")\
    .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:5.1.3")\
    .getOrCreate()
    
    logger.info(f"Spark version: {spark.version}")
    logger.info(f"sparknlp version: {sparknlp.version()}")
    
    # This is needed to save RDDs which is the only way to write nested Dataframes into CSV format
    sc = spark.sparkContext
    sc._jsc.hadoopConfiguration().set(
        "mapred.output.committer.class", "org.apache.hadoop.mapred.FileOutputCommitter"
    )

    # Defining the schema corresponding to the input data. The input data does not contain the headers
    schema = StructType(
        [
            StructField("ID", StringType(), True),
            StructField("Content", StringType(), True),
            StructField("Summary", StringType(), True),
            StructField("Dataset", StringType(), True)
        ]
    )
    
    # Downloading the data from S3 into a Dataframe
    logger.info(f"going to read {args.s3_dataset_path}")
    df = spark.read.parquet(args.s3_dataset_path, header=True, schema=schema)
    df = df.repartition(64)
    logger.info(f"finished reading files...")
    
    # get count
    row_count = df.count()
    # create a temp rdd and save to s3
    line = [f"count={row_count}"]
    logger.info(line)
    l = [('count', row_count)]
    tmp_df = spark.createDataFrame(l)
    s3_path = "s3://" + os.path.join(args.s3_output_bucket, args.s3_output_key_prefix, "count")
    logger.info(f"going to save count to {s3_path}")
    # we want to write to a single file so coalesce
    tmp_df.coalesce(1).write.format('csv').option('header', 'false').mode("overwrite").save(s3_path)
    
    df = df\
    .withColumn('politics', F.col("Content").rlike("""(?i)politics|(?i)political|(?i)senate|(?i)democrats|(?i)republicans|(?i)government|(?i)president|(?i)prime minister|(?i)congress"""))\
    .withColumn('sports', F.col("Content").rlike("""(?i)sport|(?i)ball|(?i)coach|(?i)goal|(?i)baseball|(?i)football|(?i)basketball"""))\
    .withColumn('arts', F.col("Content").rlike("""(?i)art|(?i)painting|(?i)artist|(?i)museum|(?i)photography|(?i)sculpture"""))\
    .withColumn('history', F.col("Content").rlike("""(?i)history|(?i)historical|(?i)ancient|(?i)archaeology|(?i)heritage|(?i)fossil""")).persist()
    
    categories = ['politics', 'arts', 'sports', 'history']
    for c in categories:
        df_soln = df.groupBy(c).count() #.toPandas().to_dict(orient='records')        
        s3_path = "s3://" + os.path.join(args.s3_output_bucket, args.s3_output_key_prefix, c)
        logger.info(f"going to save dataframe to {s3_path}")
        # we want to write to a single file so coalesce
        df_soln.coalesce(1).write.format('csv').option('header', 'false').mode("overwrite").save(s3_path)

    # sentiment analysis
    MODEL_NAME = 'sentimentdl_use_twitter'
    logger.info(f"setting up an nlp pipeline with model={MODEL_NAME}")
    documentAssembler = DocumentAssembler()\
    .setInputCol("Content")\
    .setOutputCol("document")
    
    use = UniversalSentenceEncoder.pretrained(name="tfhub_use", lang="en")\
     .setInputCols(["document"])\
     .setOutputCol("sentence_embeddings")

    sentimentdl = SentimentDLModel.pretrained(name=MODEL_NAME, lang="en")\
    .setInputCols(["sentence_embeddings"])\
    .setOutputCol("sentiment")

    nlp_pipeline = Pipeline(
      stages = [
          documentAssembler,
          use,
          sentimentdl
      ])
    logger.info(f"going to fit and transform pipeline on dataframe")
    pipeline_model = nlp_pipeline.fit(df)
    results = pipeline_model.transform(df)
    logger.info(f"done with fit and transform pipeline on dataframe")
    
    results=results.withColumn('sentiment', F.explode(results.sentiment.result))
    final_data=results.select('politics', 'arts', 'sports', 'history', F.expr("Dataset").alias("news_source"),'sentiment')
    final_data.persist()
    #final_data.show()
    cols = ['politics', 'news_source', 'sentiment']
    logger.info(f"going to run a group by and count on columns={cols}")
    sum_counts = final_data.groupBy(cols).count()
    logger.info(f"going to convert sum_counts to dict")
    df_sent_baseline = sum_counts #.toPandas().to_dict(orient='records')
    logger.info(df_sent_baseline)
    s3_path = "s3://" + os.path.join(args.s3_output_bucket, args.s3_output_key_prefix, "sentiment_baseline")
    logger.info(f"going to save dataframe to {s3_path}")
    # we want to write to a single file so coalesce
    df_sent_baseline.coalesce(1).write.format('csv').option('header', 'false').mode("overwrite").save(s3_path)
    logger.info("all done")
    
if __name__ == "__main__":
    main()

Overwriting ./code/process.py


Now submit this code to SageMaker Processing Job.

In [None]:
%%time
import boto3
import sagemaker
from sagemaker.spark.processing import PySparkProcessor

account_id = boto3.client('sts').get_caller_identity()['Account']

# Setup the PySpark processor to run the job. Note the instance type and instance count parameters. SageMaker will create these many instances of this type for the spark job.
role = sagemaker.get_execution_role()
spark_processor = PySparkProcessor(
    base_job_name="sm-spark-lab8",
    image_uri=f"{account_id}.dkr.ecr.us-east-1.amazonaws.com/sagemaker-spark:latest",
    role=role,
    instance_count=6,
    instance_type="ml.m5.xlarge",
    max_runtime_in_seconds=3600,
)

# s3 paths
session = sagemaker.Session()
bucket = session.default_bucket()
s3_dataset_path = f"s3://{bucket}/lab8/news/data.parquet"
print(f"account_id={account_id}, s3_dataset_path={s3_dataset_path}")
output_prefix_data = f"lab8/data"
output_prefix_logs = f"lab8/spark_logs"


# run the job now, the arguments array is provided as command line to the Python script (Spark code in this case).
spark_processor.run(
    submit_app="./code/process.py",
    submit_jars=[f"s3://{bucket}/lab8/spark-nlp-assembly-5.1.3.jar"],
    arguments=[
        "--s3_dataset_path",
        s3_dataset_path,
        "--s3_output_bucket",
        bucket,
        "--s3_output_key_prefix",
        output_prefix_data,
    ],
    spark_event_logs_s3_uri="s3://{}/{}/spark_event_logs".format(bucket, output_prefix_logs),
    logs=False,
)

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
account_id=038932893404, s3_dataset_path=s3://sagemaker-us-east-1-038932893404/lab8/news/data.parquet


INFO:sagemaker:Creating processing-job with name sm-spark-lab8-2023-10-15-21-05-07-498


...................................................................................................................................................................................................

Once the processing job completes, you can check the actual processing time for which your account will be billed in the SageMaker Processing Job details as shown in the screenshot below.

![SageMaker Processing Job details](./img/sm-process-job.png)

## Result files
All result files are now available in s3

In [39]:
!aws s3 ls s3://{bucket}/lab8/ --recursive

2023-10-15 20:08:12  708534094 lab8/-
2023-10-15 21:15:52          0 lab8/data/arts/_SUCCESS
2023-10-15 21:15:52         29 lab8/data/arts/part-00000-8d5ba47c-0e7e-4b2d-86f2-28c8ec65da30-c000.csv
2023-10-15 21:12:43          0 lab8/data/count/_SUCCESS
2023-10-15 21:12:43         13 lab8/data/count/part-00000-4f8b5f90-f273-41f4-9cba-b187cefc41c9-c000.csv
2023-10-15 21:15:57          0 lab8/data/history/_SUCCESS
2023-10-15 21:15:57         29 lab8/data/history/part-00000-94a20b52-4a3e-4ba5-a04c-97de54817bde-c000.csv
2023-10-15 21:15:50          0 lab8/data/politics/_SUCCESS
2023-10-15 21:15:49         29 lab8/data/politics/part-00000-db25f8a8-c386-4abe-b3b4-a38e9f5a6232-c000.csv
2023-10-15 21:23:12          0 lab8/data/sentiment_baseline/_SUCCESS
2023-10-15 21:23:12        575 lab8/data/sentiment_baseline/part-00000-1de077ff-fbf7-4bad-9ffb-70e99f7bcdbb-c000.csv
2023-10-15 21:15:54          0 lab8/data/sports/_SUCCESS
2023-10-15 21:15:54         29 lab8/data/sports/part-00000-a0cbb054-166