
# Analyze Data Quality with SageMaker Processing Jobs and Spark

Typically a machine learning (ML) process consists of few steps. First, gathering data with various ETL jobs, then pre-processing the data, featurizing the dataset by incorporating standard techniques or prior knowledge, and finally training an ML model using an algorithm.

Often, distributed data processing frameworks such as Spark are used to process and analyze data sets in order to detect data quality issues and prepare them for model training.  

In this notebook we'll use Amazon SageMaker Processing with a library called [**Deequ**](https://github.com/awslabs/deequ), and leverage the power of Spark with a managed SageMaker Processing Job to run our data processing workloads.

Here is a great blog post on Deequ for more information:  https://aws.amazon.com/blogs/big-data/test-data-quality-at-scale-with-deequ/

![Deequ](img/deequ.png)

![](img/processing.jpg)

# Amazon Customer Reviews Dataset

https://s3.amazonaws.com/amazon-reviews-pds/readme.html

### Dataset Columns:

- `marketplace`: 2-letter country code (in this case all "US").
- `customer_id`: Random identifier that can be used to aggregate reviews written by a single author.
- `review_id`: A unique ID for the review.
- `product_id`: The Amazon Standard Identification Number (ASIN).  `http://www.amazon.com/dp/<ASIN>` links to the product's detail page.
- `product_parent`: The parent of that ASIN.  Multiple ASINs (color or format variations of the same product) can roll up into a single parent.
- `product_title`: Title description of the product.
- `product_category`: Broad product category that can be used to group reviews (in this case digital videos).
- `star_rating`: The review's rating (1 to 5 stars).
- `helpful_votes`: Number of helpful votes for the review.
- `total_votes`: Number of total votes the review received.
- `vine`: Was the review written as part of the [Vine](https://www.amazon.com/gp/vine/help) program?
- `verified_purchase`: Was the review from a verified purchase?
- `review_headline`: The title of the review itself.
- `review_body`: The text of the review.
- `review_date`: The date the review was written.

In [1]:
import sagemaker

sagemaker_session = sagemaker.Session()
role = sagemaker.get_execution_role()
bucket = sagemaker_session.default_bucket()

# Build a Spark Docker Image to Run the Processing Job

An example Spark container is included in the `./container` directory of this example. The container handles the bootstrapping of all Spark configuration, and serves as a wrapper around the `spark-submit` CLI. At a high level the container provides:
* A set of default Spark/YARN/Hadoop configurations
* A bootstrapping script for configuring and starting up Spark master/worker nodes
* A wrapper around the `spark-submit` CLI to submit a Spark application


After the container build and push process is complete, use the Amazon SageMaker Python SDK to submit a managed, distributed Spark application that performs our dataset preprocessing.

Build the example Spark container.

In [2]:
!pygmentize container/Dockerfile

[34mFROM[39;49;00m [33mopenjdk:8-jre-slim[39;49;00m

[34mRUN[39;49;00m apt-get update
[34mRUN[39;49;00m apt-get install -y curl unzip python3 python3-setuptools python3-pip python-dev python3-dev python-psutil
[34mRUN[39;49;00m pip3 install py4j [31mpsutil[39;49;00m==[34m5[39;49;00m.6.5 [31mnumpy[39;49;00m==[34m1[39;49;00m.17.4
[34mRUN[39;49;00m apt-get clean
[34mRUN[39;49;00m rm -rf /var/lib/apt/lists/*

[37m# http://blog.stuart.axelbrooke.com/python-3-on-spark-return-of-the-pythonhashseed[39;49;00m
[34mENV[39;49;00m PYTHONHASHSEED [34m0[39;49;00m
[34mENV[39;49;00m PYTHONIOENCODING UTF-8
[34mENV[39;49;00m PIP_DISABLE_PIP_VERSION_CHECK [34m1[39;49;00m

[37m# Install Hadoop[39;49;00m
[34mENV[39;49;00m HADOOP_VERSION [34m3[39;49;00m.2.1
[34mENV[39;49;00m HADOOP_HOME /usr/hadoop-[31m$HADOOP_VERSION[39;49;00m
[34mENV[39;49;00m [31mHADOOP_CONF_DIR[39;49;00m=[31m$HADOOP_HOME[39;49;00m/etc/hadoop
[34mENV[39;49;00m PATH [31m

In [3]:
docker_repo = 'amazon-reviews-spark-analyzer'
docker_tag = 'latest'

In [23]:
!docker build -t $docker_repo:$docker_tag -f container/Dockerfile ./container

Sending build context to Docker daemon  4.441MB
Step 1/33 : FROM openjdk:8-jre-slim
 ---> d2f9f3c77c25
Step 2/33 : RUN apt-get update
 ---> Using cache
 ---> de849c6fc99d
Step 3/33 : RUN apt-get install -y curl unzip python3 python3-setuptools python3-pip python-dev python3-dev python-psutil
 ---> Using cache
 ---> b573599ff428
Step 4/33 : RUN pip3 install py4j psutil==5.6.5 numpy==1.17.4
 ---> Using cache
 ---> c301bde7719a
Step 5/33 : RUN apt-get clean
 ---> Using cache
 ---> 7540ff0ab66c
Step 6/33 : RUN rm -rf /var/lib/apt/lists/*
 ---> Using cache
 ---> a79c398a7313
Step 7/33 : ENV PYTHONHASHSEED 0
 ---> Using cache
 ---> 069665758a56
Step 8/33 : ENV PYTHONIOENCODING UTF-8
 ---> Using cache
 ---> bea202ee24bf
Step 9/33 : ENV PIP_DISABLE_PIP_VERSION_CHECK 1
 ---> Using cache
 ---> 25d205e83039
Step 10/33 : ENV HADOOP_VERSION 3.2.1
 ---> Using cache
 ---> a1c381898ac2
Step 11/33 : ENV HADOOP_HOME /usr/hadoop-$HADOOP_VERSION
 ---> Using cache
 ---> 942dd8611a11
Step 12/33 : ENV HADOOP

# Check the Docker Image
If the image did not build properly, re-run the cell above.

In [5]:
!docker inspect $docker_repo:$docker_tag

[]
Error: No such object: amazon-reviews-spark-analyzer:latest


# Push the Image to a Private Docker Repo (Amazon ECR)

In [6]:
import boto3
account_id = boto3.client('sts').get_caller_identity().get('Account')
region = boto3.session.Session().region_name

image_uri = '{}.dkr.ecr.{}.amazonaws.com/{}:{}'.format(account_id, region, docker_repo, docker_tag)
print(image_uri)

032934710550.dkr.ecr.us-west-2.amazonaws.com/amazon-reviews-spark-analyzer:latest


In [7]:
!$(aws ecr get-login --region $region --registry-ids $account_id --no-include-email)

https://docs.docker.com/engine/reference/commandline/login/#credentials-store

Login Succeeded


In [8]:
!aws ecr describe-repositories --repository-names $docker_repo || aws ecr create-repository --repository-name $docker_repo


An error occurred (RepositoryNotFoundException) when calling the DescribeRepositories operation: The repository with name 'amazon-reviews-spark-analyzer' does not exist in the registry with id '032934710550'
{
    "repository": {
        "repositoryArn": "arn:aws:ecr:us-west-2:032934710550:repository/amazon-reviews-spark-analyzer",
        "registryId": "032934710550",
        "repositoryName": "amazon-reviews-spark-analyzer",
        "repositoryUri": "032934710550.dkr.ecr.us-west-2.amazonaws.com/amazon-reviews-spark-analyzer",
        "createdAt": 1598123779.0,
        "imageTagMutability": "MUTABLE",
        "imageScanningConfiguration": {
            "scanOnPush": false
        },
        "encryptionConfiguration": {
            "encryptionType": "AES256"
        }
    }
}


In [9]:
!docker tag $docker_repo:$docker_tag $image_uri

Error response from daemon: No such image: amazon-reviews-spark-analyzer:latest


In [10]:
!docker push $image_uri

The push refers to repository [032934710550.dkr.ecr.us-west-2.amazonaws.com/amazon-reviews-spark-analyzer]
An image does not exist locally with the tag: 032934710550.dkr.ecr.us-west-2.amazonaws.com/amazon-reviews-spark-analyzer


# Run the Analysis Job using a SageMaker Processing Job

Next, use the Amazon SageMaker Python SDK to submit a processing job. Use the Spark container that was just built with our Spark script.

# Review the Spark preprocessing script.

In [11]:
!pygmentize preprocess-deequ.py

[34mfrom[39;49;00m [04m[36m__future__[39;49;00m [34mimport[39;49;00m print_function
[34mfrom[39;49;00m [04m[36m__future__[39;49;00m [34mimport[39;49;00m unicode_literals

[34mimport[39;49;00m [04m[36mtime[39;49;00m
[34mimport[39;49;00m [04m[36msys[39;49;00m
[34mimport[39;49;00m [04m[36mos[39;49;00m
[34mimport[39;49;00m [04m[36mshutil[39;49;00m
[34mimport[39;49;00m [04m[36mcsv[39;49;00m

[34mimport[39;49;00m [04m[36mpyspark[39;49;00m
[34mfrom[39;49;00m [04m[36mpyspark[39;49;00m[04m[36m.[39;49;00m[04m[36msql[39;49;00m [34mimport[39;49;00m SparkSession
[34mfrom[39;49;00m [04m[36mpyspark[39;49;00m[04m[36m.[39;49;00m[04m[36msql[39;49;00m[04m[36m.[39;49;00m[04m[36mfunctions[39;49;00m [34mimport[39;49;00m *

[34mdef[39;49;00m [32mmain[39;49;00m():
    args_iter = [36miter[39;49;00m(sys.argv[[34m1[39;49;00m:])
    args = [36mdict[39;49;00m([36mzip[39;49;00m(args_iter, args_iter))
    
    

In [12]:
!pygmentize deequ/preprocess-deequ.scala

[34mimport[39;49;00m [04m[36mcom.amazon.deequ.analyzers.runners.[39;49;00m{[04m[32mAnalysisRunner[39;49;00m, [04m[32mAnalyzerContext[39;49;00m}
[34mimport[39;49;00m [04m[36mcom.amazon.deequ.analyzers.runners.AnalyzerContext.successMetricsAsDataFrame[39;49;00m
[34mimport[39;49;00m [04m[36mcom.amazon.deequ.analyzers.[39;49;00m{[04m[32mCompliance[39;49;00m, [04m[32mCorrelation[39;49;00m, [04m[32mSize[39;49;00m, [04m[32mCompleteness[39;49;00m, [04m[32mMean[39;49;00m, [04m[32mApproxCountDistinct[39;49;00m}
[34mimport[39;49;00m [04m[36mcom.amazon.deequ.[39;49;00m{[04m[32mVerificationSuite[39;49;00m, [04m[32mVerificationResult[39;49;00m}
[34mimport[39;49;00m [04m[36mcom.amazon.deequ.VerificationResult.checkResultsAsDataFrame[39;49;00m
[34mimport[39;49;00m [04m[36mcom.amazon.deequ.checks.[39;49;00m{[04m[32mCheck[39;49;00m, [04m[32mCheckLevel[39;49;00m}
[34mimport[39;49;00m [04m[36mcom.amazon.deequ.suggestions.[39;49;0

In [13]:
from sagemaker.processing import ScriptProcessor

processor = ScriptProcessor(base_job_name='spark-amazon-reviews-analyzer',
                            image_uri=image_uri,
                            command=['/opt/program/submit'],
                            role=role,
                            instance_count=2, # instance_count needs to be > 1 or you will see the following error:  "INFO yarn.Client: Application report for application_ (state: ACCEPTED)"
                            instance_type='ml.r5.2xlarge',
                            env={
                                'mode': 'jar',
                                'main_class': 'Main'
                            })

In [14]:
s3_input_data = 's3://{}/amazon-reviews-pds/tsv/'.format(bucket)
print(s3_input_data)

s3://sagemaker-us-west-2-032934710550/amazon-reviews-pds/tsv/


In [15]:
!aws s3 ls $s3_input_data

2020-08-22 17:44:30   18997559 amazon_reviews_us_Digital_Software_v1_00.tsv.gz
2020-08-22 17:44:34   27442648 amazon_reviews_us_Digital_Video_Games_v1_00.tsv.gz


## Setup Output Data

In [16]:
from time import gmtime, strftime
timestamp_prefix = strftime("%Y-%m-%d-%H-%M-%S", gmtime())

output_prefix = 'amazon-reviews-spark-analyzer-{}'.format(timestamp_prefix)
processing_job_name = 'amazon-reviews-spark-analyzer-{}'.format(timestamp_prefix)

print('Processing job name:  {}'.format(processing_job_name))

Processing job name:  amazon-reviews-spark-analyzer-2020-08-22-19-16-22


In [17]:
s3_output_analyze_data = 's3://{}/{}/output'.format(bucket, output_prefix)

print(s3_output_analyze_data)

s3://sagemaker-us-west-2-032934710550/amazon-reviews-spark-analyzer-2020-08-22-19-16-22/output


## Start the Spark Processing Job

_Notes on Invoking from Lambda:_
* However, if we use the boto3 SDK (ie. with a Lambda), we need to copy the `preprocess.py` file to S3 and specify the everything include --py-files, etc.
* We would need to do the following before invoking the Lambda:
     !aws s3 cp preprocess.py s3://<location>/sagemaker/spark-preprocess-reviews-demo/code/preprocess.py
     !aws s3 cp preprocess.py s3://<location>/sagemaker/spark-preprocess-reviews-demo/py_files/preprocess.py
* Then reference the s3://<location> above in the --py-files, etc.
* See Lambda example code in this same project for more details.

_Notes on not using ProcessingInput and Output:_
* Since Spark natively reads/writes from/to S3 using s3a://, we can avoid the copy required by ProcessingInput and ProcessingOutput (FullyReplicated or ShardedByS3Key) and just specify the S3 input and output buckets/prefixes._"
* See https://github.com/awslabs/amazon-sagemaker-examples/issues/994 for issues related to using /opt/ml/processing/input/ and output/
* If we use ProcessingInput, the data will be copied to each node (which we don't want in this case since Spark already handles this)

In [18]:
from sagemaker.processing import ProcessingOutput

processor.run(code='preprocess-deequ.py',
              arguments=['s3_input_data', s3_input_data,
                         's3_output_analyze_data', s3_output_analyze_data,
              ],
              # See https://github.com/aws/sagemaker-python-sdk/issues/1341 
              #   for why we need to specify a null-output
              outputs=[
                  ProcessingOutput(s3_upload_mode='EndOfJob',
                                   output_name='null-output',
                                   source='/opt/ml/processing/output')
              ],
              logs=True,
              wait=False
)

Parameter 'session' will be renamed to 'sagemaker_session' in SageMaker Python SDK v2.



Job Name:  spark-amazon-reviews-analyzer-2020-08-22-19-16-22-357
Inputs:  [{'InputName': 'code', 'S3Input': {'S3Uri': 's3://sagemaker-us-west-2-032934710550/spark-amazon-reviews-analyzer-2020-08-22-19-16-22-357/input/code/preprocess-deequ.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]
Outputs:  [{'OutputName': 'null-output', 'S3Output': {'S3Uri': 's3://sagemaker-us-west-2-032934710550/spark-amazon-reviews-analyzer-2020-08-22-19-16-22-357/output/null-output', 'LocalPath': '/opt/ml/processing/output', 'S3UploadMode': 'EndOfJob'}}]


In [19]:
from IPython.core.display import display, HTML

processing_job_name = processor.jobs[-1].describe()['ProcessingJobName']

display(HTML('<b>Review <a target="blank" href="https://console.aws.amazon.com/cloudwatch/home?region={}#logStream:group=/aws/sagemaker/ProcessingJobs;prefix={};streamFilter=typeLogStreamPrefix">CloudWatch Logs</a> After About 5 Minutes</b>'.format(region, processing_job_name)))


In [20]:
from IPython.core.display import display, HTML

s3_job_output_prefix = output_prefix

display(HTML('<b>Review <a target="blank" href="https://s3.console.aws.amazon.com/s3/buckets/{}/{}/?region={}&tab=overview">S3 Output Data</a> After The Spark Job Has Completed</b>'.format(bucket, s3_job_output_prefix, region)))


# Please Wait Until the Processing Job Completes!

In [21]:
running_processor = sagemaker.processing.ProcessingJob.from_processing_name(processing_job_name=processing_job_name,
                                                                            sagemaker_session=sagemaker_session)

processing_job_description = running_processor.describe()

processing_job_status = processing_job_description['ProcessingJobStatus']
print('\n')
print(processing_job_status)
print('\n')

print(processing_job_description)



InProgress


{'ProcessingInputs': [{'InputName': 'code', 'S3Input': {'S3Uri': 's3://sagemaker-us-west-2-032934710550/spark-amazon-reviews-analyzer-2020-08-22-19-16-22-357/input/code/preprocess-deequ.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}], 'ProcessingOutputConfig': {'Outputs': [{'OutputName': 'null-output', 'S3Output': {'S3Uri': 's3://sagemaker-us-west-2-032934710550/spark-amazon-reviews-analyzer-2020-08-22-19-16-22-357/output/null-output', 'LocalPath': '/opt/ml/processing/output', 'S3UploadMode': 'EndOfJob'}}]}, 'ProcessingJobName': 'spark-amazon-reviews-analyzer-2020-08-22-19-16-22-357', 'ProcessingResources': {'ClusterConfig': {'InstanceCount': 2, 'InstanceType': 'ml.r5.2xlarge', 'VolumeSizeInGB': 30}}, 'StoppingCondition': {'MaxRuntimeInSeconds': 86400}, 'AppSpecification': {'ImageUri': '032934710550.dkr.ecr.us-west-2.amazonaws.com/amazon-reviews

# _Please Wait Until the ^^ Processing Job ^^ Completes Above._

In [22]:
running_processor.wait()

...............
..

UnexpectedStatusException: Error for Processing job spark-amazon-reviews-analyzer-2020-08-22-19-16-22-357: Failed. Reason: ClientError: API error (404): manifest for 032934710550.dkr.ecr.us-west-2.amazonaws.com/amazon-reviews-spark-analyzer:latest not found: manifest unknown: Requested image not found

# Inspect the Processed Output 

## These are the quality checks on our dataset.

## _The next cells will not work properly until the job completes above._

In [None]:
!aws s3 ls --recursive $s3_output_analyze_data/

## Copy the Output from S3 to Local
* dataset-metrics/
* constraint-checks/
* success-metrics/
* constraint-suggestions/


In [None]:
!aws s3 cp --recursive $s3_output_analyze_data ./amazon-reviews-spark-analyzer/ --exclude="*" --include="*.csv"

## Analyze Constraint Checks

In [None]:
import glob
import pandas as pd
import os

def load_dataset(path, sep, header):
    data = pd.concat([pd.read_csv(f, sep=sep, header=header) for f in glob.glob('{}/*.csv'.format(path))], ignore_index = True)

    return data

In [None]:
df_constraint_checks = load_dataset(path='./amazon-reviews-spark-analyzer/constraint-checks/', sep='\t', header=0)
df_constraint_checks[['check', 'constraint', 'constraint_status', 'constraint_message']]

## Analyze Dataset Metrics

In [None]:
df_dataset_metrics = load_dataset(path='./amazon-reviews-spark-analyzer/dataset-metrics/', sep='\t', header=0)
df_dataset_metrics

## Analyze Success Metrics

In [None]:
df_success_metrics = load_dataset(path='./amazon-reviews-spark-analyzer/success-metrics/', sep='\t', header=0)
df_success_metrics

## Analyze Constraint Suggestions

In [None]:
df_constraint_suggestions = load_dataset(path='./amazon-reviews-spark-analyzer/constraint-suggestions/', sep='\t', header=0)
df_constraint_suggestions.columns=['column_name', 'description', 'code']
df_constraint_suggestions

# Save for the Next Notebook(s)

In [None]:
%store df_dataset_metrics

In [None]:
%%javascript
Jupyter.notebook.save_checkpoint();
Jupyter.notebook.session.delete();