
# Analyze Data Quality with SageMaker Processing Jobs and Spark

Typically a machine learning (ML) process consists of few steps. First, gathering data with various ETL jobs, then pre-processing the data, featurizing the dataset by incorporating standard techniques or prior knowledge, and finally training an ML model using an algorithm.

Often, distributed data processing frameworks such as Spark are used to process and analyze data sets in order to detect data quality issues and prepare them for model training.  

In this notebook we'll use Amazon SageMaker Processing with a library called [Deequ](https://github.com/awslabs/deequ), and leverage the power of Spark with a managed SageMaker Processing Job to run our data processing workloads.

Here is a great blog post on Deequ for more information:  https://aws.amazon.com/blogs/big-data/test-data-quality-at-scale-with-deequ/

![Deequ](img/deequ.png)

![](img/processing.jpg)

# Amazon Customer Reviews Dataset

https://s3.amazonaws.com/amazon-reviews-pds/readme.html

### Dataset Columns:

- `marketplace`: 2-letter country code (in this case all "US").
- `customer_id`: Random identifier that can be used to aggregate reviews written by a single author.
- `review_id`: A unique ID for the review.
- `product_id`: The Amazon Standard Identification Number (ASIN).  `http://www.amazon.com/dp/<ASIN>` links to the product's detail page.
- `product_parent`: The parent of that ASIN.  Multiple ASINs (color or format variations of the same product) can roll up into a single parent.
- `product_title`: Title description of the product.
- `product_category`: Broad product category that can be used to group reviews (in this case digital videos).
- `star_rating`: The review's rating (1 to 5 stars).
- `helpful_votes`: Number of helpful votes for the review.
- `total_votes`: Number of total votes the review received.
- `vine`: Was the review written as part of the [Vine](https://www.amazon.com/gp/vine/help) program?
- `verified_purchase`: Was the review from a verified purchase?
- `review_headline`: The title of the review itself.
- `review_body`: The text of the review.
- `review_date`: The date the review was written.

In [1]:
import sagemaker

sagemaker_session = sagemaker.Session()
role = sagemaker.get_execution_role()
bucket = sagemaker_session.default_bucket()

# Create a Docker Container with Spark and our Python Code

An example Spark container is included in the `./container` directory of this example. The container handles the bootstrapping of all Spark configuration, and serves as a wrapper around the `spark-submit` CLI. At a high level the container provides:
* A set of default Spark/YARN/Hadoop configurations
* A bootstrapping script for configuring and starting up Spark master/worker nodes
* A wrapper around the `spark-submit` CLI to submit a Spark application


After the container build and push process is complete, use the Amazon SageMaker Python SDK to submit a managed, distributed Spark application that performs our dataset preprocessing.

Build the example Spark container.

In [2]:
!cat container/Dockerfile

FROM openjdk:8-jre-slim

RUN apt-get update
RUN apt-get install -y curl unzip python3 python3-setuptools python3-pip python-dev python3-dev python-psutil
RUN pip3 install py4j psutil==5.6.5 numpy==1.17.4
RUN apt-get clean
RUN rm -rf /var/lib/apt/lists/*

# http://blog.stuart.axelbrooke.com/python-3-on-spark-return-of-the-pythonhashseed
ENV PYTHONHASHSEED 0
ENV PYTHONIOENCODING UTF-8
ENV PIP_DISABLE_PIP_VERSION_CHECK 1

# Install Hadoop
ENV HADOOP_VERSION 3.2.1
ENV HADOOP_HOME /usr/hadoop-$HADOOP_VERSION
ENV HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
ENV PATH $PATH:$HADOOP_HOME/bin
RUN curl -sL --retry 3 \
  "http://archive.apache.org/dist/hadoop/common/hadoop-$HADOOP_VERSION/hadoop-$HADOOP_VERSION.tar.gz" \
  | gunzip \
  | tar -x -C /usr/ \
 && rm -rf $HADOOP_HOME/share/doc \
 && chown -R root:root $HADOOP_HOME

# Install Spark
ENV SPARK_VERSION 2.4.6
ENV SPARK_PACKAGE spark-${SPARK_VERSION}-bin-without-hadoop
ENV SPARK_HOME /usr/spark-${SPARK_VERSION}
ENV SP

In [3]:
docker_repo = 'amazon-reviews-spark-analyzer'
docker_tag = 'latest'

In [4]:
!docker build -t $docker_repo:$docker_tag -f container/Dockerfile ./container

Sending build context to Docker daemon  4.441MB
Step 1/33 : FROM openjdk:8-jre-slim
8-jre-slim: Pulling from library/openjdk

[1Bc9369e08: Pulling fs layer 
[1Be9b77806: Pulling fs layer 
[1B6b4d80d1: Pulling fs layer 
[1BDigest: sha256:bcbdeafc77c4ed16ae625f7b02f233ad5d21d070f72a0ee44710923f0bd7d13c[2K[4A[2K[4A[2K[4A[2K[4A[2K[4A[2K[4A[2K[4A[2K[4A[2K[4A[2K[4A[2K[3A[2K[3A[2K[3A[2K[2A[2K[2A[2K[1A[2K[1A[2K[1A[2K[1A[2K[1A[2K[1A[2K[1A[2K
Status: Downloaded newer image for openjdk:8-jre-slim
 ---> f2e91f81bf2c
Step 2/33 : RUN apt-get update
 ---> Running in 2dee308e47ea
Get:1 http://deb.debian.org/debian buster InRelease [121 kB]
Get:2 http://security.debian.org/debian-security buster/updates InRelease [65.4 kB]
Get:3 http://deb.debian.org/debian buster-updates InRelease [51.9 kB]
Get:4 http://deb.debian.org/debian buster/main amd64 Packages [7905 kB]
Get:5 http://security.debian.org/debian-security buster/updates/main amd64 Packages [213 kB]


Get:17 http://deb.debian.org/debian buster/main amd64 python2.7 amd64 2.7.16-2+deb10u1 [305 kB]
Get:18 http://deb.debian.org/debian buster/main amd64 libpython2-stdlib amd64 2.7.16-1 [20.8 kB]
Get:19 http://deb.debian.org/debian buster/main amd64 libpython-stdlib amd64 2.7.16-1 [20.8 kB]
Get:20 http://deb.debian.org/debian buster/main amd64 python2 amd64 2.7.16-1 [41.6 kB]
Get:21 http://deb.debian.org/debian buster/main amd64 python amd64 2.7.16-1 [22.8 kB]
Get:22 http://deb.debian.org/debian buster/main amd64 liblocale-gettext-perl amd64 1.07-3+b4 [18.9 kB]
Get:23 http://deb.debian.org/debian buster/main amd64 libpython3.7-minimal amd64 3.7.3-2+deb10u1 [589 kB]
Get:24 http://deb.debian.org/debian buster/main amd64 python3.7-minimal amd64 3.7.3-2+deb10u1 [1736 kB]
Get:25 http://deb.debian.org/debian buster/main amd64 python3-minimal amd64 3.7.3-1 [36.6 kB]
Get:26 http://deb.debian.org/debian buster/main amd64 libmpdec2 amd64 2.4.2-2 [87.2 kB]
Get:27 http://deb.debian.org/debian buster/

Get:110 http://deb.debian.org/debian buster/main amd64 gnupg all 2.2.12-1+deb10u1 [715 kB]
Get:111 http://deb.debian.org/debian buster/main amd64 libalgorithm-diff-perl all 1.19.03-2 [47.9 kB]
Get:112 http://deb.debian.org/debian buster/main amd64 libalgorithm-diff-xs-perl amd64 0.04-5+b1 [11.8 kB]
Get:113 http://deb.debian.org/debian buster/main amd64 libalgorithm-merge-perl all 0.08-3 [12.7 kB]
Get:114 http://deb.debian.org/debian buster/main amd64 libexpat1-dev amd64 2.2.6-2+deb10u1 [153 kB]
Get:115 http://deb.debian.org/debian buster/main amd64 libfile-fcntllock-perl amd64 0.22-3+b5 [35.4 kB]
Get:116 http://deb.debian.org/debian buster/main amd64 libglib2.0-data all 2.58.3-2+deb10u2 [1110 kB]
Get:117 http://deb.debian.org/debian buster/main amd64 libicu63 amd64 63.1-6+deb10u1 [8300 kB]
Get:118 http://deb.debian.org/debian buster/main amd64 libpython2.7 amd64 2.7.16-2+deb10u1 [1036 kB]
Get:119 http://deb.debian.org/debian buster/main amd64 libpython2.7-dev amd64 2.7.16-2+deb10u1 [31

Selecting previously unselected package liblocale-gettext-perl.
Preparing to unpack .../liblocale-gettext-perl_1.07-3+b4_amd64.deb ...
Unpacking liblocale-gettext-perl (1.07-3+b4) ...
Selecting previously unselected package libpython3.7-minimal:amd64.
Preparing to unpack .../libpython3.7-minimal_3.7.3-2+deb10u1_amd64.deb ...
Unpacking libpython3.7-minimal:amd64 (3.7.3-2+deb10u1) ...
Selecting previously unselected package python3.7-minimal.
Preparing to unpack .../python3.7-minimal_3.7.3-2+deb10u1_amd64.deb ...
Unpacking python3.7-minimal (3.7.3-2+deb10u1) ...
Setting up libpython3.7-minimal:amd64 (3.7.3-2+deb10u1) ...
Setting up libexpat1:amd64 (2.2.6-2+deb10u1) ...
Setting up python3.7-minimal (3.7.3-2+deb10u1) ...
Selecting previously unselected package python3-minimal.
(Reading database ... 9963 files and directories currently installed.)
Preparing to unpack .../python3-minimal_3.7.3-1_amd64.deb ...
Unpacking python3-minimal (3.7.3-1) ...
Selecting previously unselected package lib

Selecting previously unselected package patch.
Preparing to unpack .../042-patch_2.7.6-3+deb10u1_amd64.deb ...
Unpacking patch (2.7.6-3+deb10u1) ...
Selecting previously unselected package dpkg-dev.
Preparing to unpack .../043-dpkg-dev_1.19.7_all.deb ...
Unpacking dpkg-dev (1.19.7) ...
Selecting previously unselected package build-essential.
Preparing to unpack .../044-build-essential_12.6_amd64.deb ...
Unpacking build-essential (12.6) ...
Selecting previously unselected package libkeyutils1:amd64.
Preparing to unpack .../045-libkeyutils1_1.6-6_amd64.deb ...
Unpacking libkeyutils1:amd64 (1.6-6) ...
Selecting previously unselected package libkrb5support0:amd64.
Preparing to unpack .../046-libkrb5support0_1.17-3_amd64.deb ...
Unpacking libkrb5support0:amd64 (1.17-3) ...
Selecting previously unselected package libk5crypto3:amd64.
Preparing to unpack .../047-libk5crypto3_1.17-3_amd64.deb ...
Unpacking libk5crypto3:amd64 (1.17-3) ...
Selecting previously unselected package libkrb5-3:amd64.


Selecting previously unselected package libpython2-dev:amd64.
Preparing to unpack .../091-libpython2-dev_2.7.16-1_amd64.deb ...
Unpacking libpython2-dev:amd64 (2.7.16-1) ...
Selecting previously unselected package libpython-dev:amd64.
Preparing to unpack .../092-libpython-dev_2.7.16-1_amd64.deb ...
Unpacking libpython-dev:amd64 (2.7.16-1) ...
Selecting previously unselected package libpython3.7:amd64.
Preparing to unpack .../093-libpython3.7_3.7.3-2+deb10u1_amd64.deb ...
Unpacking libpython3.7:amd64 (3.7.3-2+deb10u1) ...
Selecting previously unselected package libpython3.7-dev:amd64.
Preparing to unpack .../094-libpython3.7-dev_3.7.3-2+deb10u1_amd64.deb ...
Unpacking libpython3.7-dev:amd64 (3.7.3-2+deb10u1) ...
Selecting previously unselected package libpython3-dev:amd64.
Preparing to unpack .../095-libpython3-dev_3.7.3-1_amd64.deb ...
Unpacking libpython3-dev:amd64 (3.7.3-1) ...
Selecting previously unselected package libsasl2-modules:amd64.
Preparing to unpack .../096-libsasl2-module

Setting up gnupg-l10n (2.2.12-1+deb10u1) ...
Setting up librtmp1:amd64 (2.4+20151223.gitfa8646d.1-2) ...
Setting up libdbus-1-3:amd64 (1.12.16-1) ...
Setting up dbus (1.12.16-1) ...
invoke-rc.d: could not determine current runlevel
invoke-rc.d: policy-rc.d denied execution of start.
Setting up xz-utils (5.2.4-1) ...
update-alternatives: using /usr/bin/xz to provide /usr/bin/lzma (lzma) in auto mode
Setting up libquadmath0:amd64 (8.3.0-6) ...
Setting up libmpc3:amd64 (1.1.0-1) ...
Setting up libatomic1:amd64 (8.3.0-6) ...
Setting up patch (2.7.6-3+deb10u1) ...
Setting up libk5crypto3:amd64 (1.17-3) ...
Setting up libsasl2-2:amd64 (2.1.27+dfsg-1+deb10u1) ...
Setting up libmpx2:amd64 (8.3.0-6) ...
Setting up libubsan1:amd64 (8.3.0-6) ...
Setting up libisl19:amd64 (0.20-2) ...
Setting up libgirepository-1.0-1:amd64 (1.58.3-2) ...
Setting up libssh2-1:amd64 (1.8.0-2.1) ...
Setting up netbase (5.6) ...
Setting up python-pip-whl (18.1-5) ...
Setting up libkrb5-3:amd64 (1.17-3) ...
Setting up 

Removing intermediate container bfdedbdbc58c
 ---> 4c0b61a8e5a6
Step 6/33 : RUN rm -rf /var/lib/apt/lists/*
 ---> Running in 6fdeb1acf1b5
Removing intermediate container 6fdeb1acf1b5
 ---> a29d440c3512
Step 7/33 : ENV PYTHONHASHSEED 0
 ---> Running in f589f82dd066
Removing intermediate container f589f82dd066
 ---> 44e45fa97bce
Step 8/33 : ENV PYTHONIOENCODING UTF-8
 ---> Running in 2e7018cc9cd8
Removing intermediate container 2e7018cc9cd8
 ---> 3a635a1b310f
Step 9/33 : ENV PIP_DISABLE_PIP_VERSION_CHECK 1
 ---> Running in c7891304c4d1
Removing intermediate container c7891304c4d1
 ---> 00533a1ded45
Step 10/33 : ENV HADOOP_VERSION 3.2.1
 ---> Running in 802d0b42e56a
Removing intermediate container 802d0b42e56a
 ---> 56e303fe9daa
Step 11/33 : ENV HADOOP_HOME /usr/hadoop-$HADOOP_VERSION
 ---> Running in d11b038bf94a
Removing intermediate container d11b038bf94a
 ---> bd2df5877dee
Step 12/33 : ENV HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
 ---> Running in 81bdac5f7f05
Removing intermediate cont

Create an Amazon Elastic Container Registry (Amazon ECR) repository for the Spark container and push the image.

In [5]:
import boto3
account_id = boto3.client('sts').get_caller_identity().get('Account')
region = boto3.session.Session().region_name

image_uri = '{}.dkr.ecr.{}.amazonaws.com/{}:{}'.format(account_id, region, docker_repo, docker_tag)
print(image_uri)

393371431575.dkr.ecr.us-west-2.amazonaws.com/amazon-reviews-spark-analyzer:latest


Create ECR repository and push docker image

In [6]:
!$(aws ecr get-login --region $region --registry-ids $account_id --no-include-email)

https://docs.docker.com/engine/reference/commandline/login/#credentials-store

Login Succeeded


In [7]:
!aws ecr describe-repositories --repository-names $docker_repo || aws ecr create-repository --repository-name $docker_repo


An error occurred (RepositoryNotFoundException) when calling the DescribeRepositories operation: The repository with name 'amazon-reviews-spark-analyzer' does not exist in the registry with id '393371431575'
{
    "repository": {
        "repositoryArn": "arn:aws:ecr:us-west-2:393371431575:repository/amazon-reviews-spark-analyzer",
        "registryId": "393371431575",
        "repositoryName": "amazon-reviews-spark-analyzer",
        "repositoryUri": "393371431575.dkr.ecr.us-west-2.amazonaws.com/amazon-reviews-spark-analyzer",
        "createdAt": 1595702964.0,
        "imageTagMutability": "MUTABLE",
        "imageScanningConfiguration": {
            "scanOnPush": false
        }
    }
}


In [8]:
!docker tag $docker_repo:$docker_tag $image_uri

In [9]:
!docker push $image_uri

The push refers to repository [393371431575.dkr.ecr.us-west-2.amazonaws.com/amazon-reviews-spark-analyzer]

[1B634d3e47: Preparing 
[1B6d81b659: Preparing 
[1B99a25dd4: Preparing 
[1Be84826f8: Preparing 
[1B04eaa3b2: Preparing 
[1B5d56d659: Preparing 
[1B20285432: Preparing 
[1B333168eb: Preparing 
[1B2c370ca9: Preparing 
[1B37da49ee: Preparing 
[1Bd076f217: Preparing 
[1Bc95dcfbb: Preparing 
[1B38d128fe: Preparing 
[1Be510e849: Preparing 
[6B37da49ee: Pushed   490.3MB/481.8MB[12A[2K[15A[2K[11A[2K[15A[2K[13A[2K[11A[2K[10A[2K[9A[2K[10A[2K[11A[2K[10A[2K[11A[2K[8A[2K[11A[2K[10A[2K[11A[2K[5A[2K[11A[2K[6A[2K[5A[2K[6A[2K[5A[2K[7A[2K[5A[2K[6A[2K[5A[2K[6A[2K[5A[2K[11A[2K[7A[2K[11A[2K[7A[2K[11A[2K[5A[2K[11A[2K[6A[2K[5A[2K[6A[2K[7A[2K[11A[2K[7A[2K[11A[2K[6A[2K[11A[2K[10A[2K[11A[2K[10A[2K[11A[2K[10A[2K[7A[2K[10A[2K[6A[2K[10A[2K[6A[2KPushing  46.14MB/103.6MB[10A[2K[7A[2K[

# Run our Analysis Job as a SageMaker Processing Job

Next, use the Amazon SageMaker Python SDK to submit a processing job. Use the Spark container that was just built with our Spark script.

## Review the Spark preprocessing script.

In [10]:
!cat preprocess-deequ.py

from __future__ import print_function
from __future__ import unicode_literals

import time
import sys
import os
import shutil
import csv

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

def main():
    args_iter = iter(sys.argv[1:])
    args = dict(zip(args_iter, args_iter))
    
    # Retrieve the args and replace 's3://' with 's3a://' (used by Spark)
    s3_input_data = args['s3_input_data'].replace('s3://', 's3a://')
    print(s3_input_data)
    s3_output_analyze_data = args['s3_output_analyze_data'].replace('s3://', 's3a://')
    print(s3_output_analyze_data)
    
    spark = SparkSession.builder \
        .appName("Amazon_Reviews_Spark_Analyzer") \
        .getOrCreate()

    # Invoke Main from preprocess-deequ.jar
    getattr(spark._jvm.SparkAmazonReviewsAnalyzer, "run")(s3_input_data, s3_output_analyze_data)

if __name__ == "__main__":
    main()


In [11]:
!pygmentize deequ/preprocess-deequ.scala

[34mimport[39;49;00m [04m[36mcom.amazon.deequ.analyzers.runners.[39;49;00m{[04m[32mAnalysisRunner[39;49;00m, [04m[32mAnalyzerContext[39;49;00m}
[34mimport[39;49;00m [04m[36mcom.amazon.deequ.analyzers.runners.AnalyzerContext.successMetricsAsDataFrame[39;49;00m
[34mimport[39;49;00m [04m[36mcom.amazon.deequ.analyzers.[39;49;00m{[04m[32mCompliance[39;49;00m, [04m[32mCorrelation[39;49;00m, [04m[32mSize[39;49;00m, [04m[32mCompleteness[39;49;00m, [04m[32mMean[39;49;00m, [04m[32mApproxCountDistinct[39;49;00m}
[34mimport[39;49;00m [04m[36mcom.amazon.deequ.[39;49;00m{[04m[32mVerificationSuite[39;49;00m, [04m[32mVerificationResult[39;49;00m}
[34mimport[39;49;00m [04m[36mcom.amazon.deequ.VerificationResult.checkResultsAsDataFrame[39;49;00m
[34mimport[39;49;00m [04m[36mcom.amazon.deequ.checks.[39;49;00m{[04m[32mCheck[39;49;00m, [04m[32mCheckLevel[39;49;00m}
[34mimport[39;49;00m [04m[36mcom.amazon.deequ.suggestions.[39;49;0

In [12]:
from sagemaker.processing import ScriptProcessor

processor = ScriptProcessor(base_job_name='spark-amazon-reviews-analyzer',
                            image_uri=image_uri,
                            command=['/opt/program/submit'],
                            role=role,
                            instance_count=2, # instance_count needs to be > 1 or you will see the following error:  "INFO yarn.Client: Application report for application_ (state: ACCEPTED)"
                            instance_type='ml.r5.2xlarge',
                            env={
                                'mode': 'jar',
                                'main_class': 'Main'
                            })

In [13]:
# Inputs
s3_input_data = 's3://{}/amazon-reviews-pds/tsv/'.format(bucket)
print(s3_input_data)

s3://sagemaker-us-west-2-393371431575/amazon-reviews-pds/tsv/


In [14]:
!aws s3 ls $s3_input_data

2020-07-25 17:13:26   18997559 amazon_reviews_us_Digital_Software_v1_00.tsv.gz
2020-07-25 17:13:29   27442648 amazon_reviews_us_Digital_Video_Games_v1_00.tsv.gz


## Setup Output Data

In [15]:
from time import gmtime, strftime
timestamp_prefix = strftime("%Y-%m-%d-%H-%M-%S", gmtime())

output_prefix = 'amazon-reviews-spark-analyzer-{}'.format(timestamp_prefix)
processing_job_name = 'amazon-reviews-spark-analyzer-{}'.format(timestamp_prefix)

print('Processing job name:  {}'.format(processing_job_name))

Processing job name:  amazon-reviews-spark-analyzer-2020-07-25-18-49-52


In [16]:
s3_output_analyze_data = 's3://{}/{}/output'.format(bucket, output_prefix)

print(s3_output_analyze_data)

s3://sagemaker-us-west-2-393371431575/amazon-reviews-spark-analyzer-2020-07-25-18-49-52/output


## Start the Spark Processing Job

_Notes on Invoking from Lambda:_
* However, if we use the boto3 SDK (ie. with a Lambda), we need to copy the `preprocess.py` file to S3 and specify the everything include --py-files, etc.
* We would need to do the following before invoking the Lambda:
     !aws s3 cp preprocess.py s3://<location>/sagemaker/spark-preprocess-reviews-demo/code/preprocess.py
     !aws s3 cp preprocess.py s3://<location>/sagemaker/spark-preprocess-reviews-demo/py_files/preprocess.py
* Then reference the s3://<location> above in the --py-files, etc.
* See Lambda example code in this same project for more details.

_Notes on not using ProcessingInput and Output:_
* Since Spark natively reads/writes from/to S3 using s3a://, we can avoid the copy required by ProcessingInput and ProcessingOutput (FullyReplicated or ShardedByS3Key) and just specify the S3 input and output buckets/prefixes._"
* See https://github.com/awslabs/amazon-sagemaker-examples/issues/994 for issues related to using /opt/ml/processing/input/ and output/
* If we use ProcessingInput, the data will be copied to each node (which we don't want in this case since Spark already handles this)

In [17]:
from sagemaker.processing import ProcessingOutput

processor.run(code='preprocess-deequ.py',
              arguments=['s3_input_data', s3_input_data,
                         's3_output_analyze_data', s3_output_analyze_data,
              ],
              # See https://github.com/aws/sagemaker-python-sdk/issues/1341 for why we need to specify a dummy-output
              outputs=[
                  ProcessingOutput(s3_upload_mode='EndOfJob',
                                   output_name='dummy-output',
                                   source='/opt/ml/processing/output')
              ],
              logs=True,
              wait=False
)

Parameter 'session' will be renamed to 'sagemaker_session' in SageMaker Python SDK v2.



Job Name:  spark-amazon-reviews-analyzer-2020-07-25-18-49-52-819
Inputs:  [{'InputName': 'code', 'S3Input': {'S3Uri': 's3://sagemaker-us-west-2-393371431575/spark-amazon-reviews-analyzer-2020-07-25-18-49-52-819/input/code/preprocess-deequ.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]
Outputs:  [{'OutputName': 'dummy-output', 'S3Output': {'S3Uri': 's3://sagemaker-us-west-2-393371431575/spark-amazon-reviews-analyzer-2020-07-25-18-49-52-819/output/dummy-output', 'LocalPath': '/opt/ml/processing/output', 'S3UploadMode': 'EndOfJob'}}]


In [18]:
processing_job_name = processor.jobs[-1].describe()['ProcessingJobName']

from IPython.core.display import display, HTML
display(HTML('<b>Review <a target="blank" href="https://console.aws.amazon.com/cloudwatch/home?region={}#logStream:group=/aws/sagemaker/ProcessingJobs;prefix={};streamFilter=typeLogStreamPrefix">CloudWatch Logs</a> After About 5 Minutes</b>'.format(region, processing_job_name)))

In [19]:
from IPython.core.display import display, HTML

s3_job_output_prefix = output_prefix

display(HTML('<b>Review <a target="blank" href="https://s3.console.aws.amazon.com/s3/buckets/{}/{}/?region={}&tab=overview">S3 Output Data</a> After The Spark Job Has Completed</b>'.format(bucket, s3_job_output_prefix, region)))

# Please Wait Until the Processing Job Completes!

In [20]:
running_processor = sagemaker.processing.ProcessingJob.from_processing_name(processing_job_name=processing_job_name,
                                                                            sagemaker_session=sagemaker_session)

processing_job_description = running_processor.describe()

processing_job_status = processing_job_description['ProcessingJobStatus']
print('\n')
print(processing_job_status)
print('\n')

print(processing_job_description)



InProgress


{'ProcessingInputs': [{'InputName': 'code', 'S3Input': {'S3Uri': 's3://sagemaker-us-west-2-393371431575/spark-amazon-reviews-analyzer-2020-07-25-18-49-52-819/input/code/preprocess-deequ.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}], 'ProcessingOutputConfig': {'Outputs': [{'OutputName': 'dummy-output', 'S3Output': {'S3Uri': 's3://sagemaker-us-west-2-393371431575/spark-amazon-reviews-analyzer-2020-07-25-18-49-52-819/output/dummy-output', 'LocalPath': '/opt/ml/processing/output', 'S3UploadMode': 'EndOfJob'}}]}, 'ProcessingJobName': 'spark-amazon-reviews-analyzer-2020-07-25-18-49-52-819', 'ProcessingResources': {'ClusterConfig': {'InstanceCount': 2, 'InstanceType': 'ml.r5.2xlarge', 'VolumeSizeInGB': 30}}, 'StoppingCondition': {'MaxRuntimeInSeconds': 86400}, 'AppSpecification': {'ImageUri': '393371431575.dkr.ecr.us-west-2.amazonaws.com/amazon-revie

# _Please Wait Until the ^^ Processing Job ^^ Completes Above._

In [21]:
running_processor.wait()

[34m2020-07-25 18:53:05,581 INFO namenode.NameNode: STARTUP_MSG: [0m
[34m/************************************************************[0m
[34mSTARTUP_MSG: Starting NameNode[0m
[34mSTARTUP_MSG:   host = algo-1/10.0.68.27[0m
[34mSTARTUP_MSG:   args = [-format, -force][0m
[34mSTARTUP_MSG:   version = 3.2.1[0m
[34mSTARTUP_MSG:   classpath = /usr/hadoop-3.2.1/etc/hadoop:/usr/hadoop-3.2.1/share/hadoop/common/lib/kerby-util-1.0.1.jar:/usr/hadoop-3.2.1/share/hadoop/common/lib/jersey-json-1.19.jar:/usr/hadoop-3.2.1/share/hadoop/common/lib/javax.servlet-api-3.1.0.jar:/usr/hadoop-3.2.1/share/hadoop/common/lib/kerb-util-1.0.1.jar:/usr/hadoop-3.2.1/share/hadoop/common/lib/kerby-xdr-1.0.1.jar:/usr/hadoop-3.2.1/share/hadoop/common/lib/stax2-api-3.1.4.jar:/usr/hadoop-3.2.1/share/hadoop/common/lib/jul-to-slf4j-1.7.25.jar:/usr/hadoop-3.2.1/share/hadoop/common/lib/kerb-simplekdc-1.0.1.jar:/usr/hadoop-3.2.1/share/hadoop/common/lib/hadoop-annotations-3.2.1.jar:/usr/hadoop-3.2.1/share/hadoop/co

[34m2020-07-25 18:53:23,519 INFO yarn.Client: Application report for application_1595703196239_0001 (state: ACCEPTED)[0m
[34m2020-07-25 18:53:23,521 INFO yarn.Client: [0m
[34m#011 client token: N/A[0m
[34m#011 diagnostics: AM container is launched, waiting for AM container to Register with RM[0m
[34m#011 ApplicationMaster host: N/A[0m
[34m#011 ApplicationMaster RPC port: -1[0m
[34m#011 queue: default[0m
[34m#011 start time: 1595703202412[0m
[34m#011 final status: UNDEFINED[0m
[34m#011 tracking URL: http://algo-1:8088/proxy/application_1595703196239_0001/[0m
[34m#011 user: root[0m
[34m2020-07-25 18:53:24,523 INFO yarn.Client: Application report for application_1595703196239_0001 (state: ACCEPTED)[0m
[34m2020-07-25 18:53:25,526 INFO yarn.Client: Application report for application_1595703196239_0001 (state: ACCEPTED)[0m
[34m2020-07-25 18:53:26,528 INFO yarn.Client: Application report for application_1595703196239_0001 (state: ACCEPTED)[0m
[34m2020-07-25 18:53:

[34m2020-07-25 18:53:53,303 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on algo-2:36157 (size: 42.8 KB, free: 24.1 GB)[0m
[34m2020-07-25 18:53:56,423 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 3902 ms on algo-2 (executor 1) (1/2)[0m
[34m2020-07-25 18:53:56,777 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 4379 ms on algo-2 (executor 1) (2/2)[0m
[34m2020-07-25 18:53:56,778 INFO cluster.YarnScheduler: Removed TaskSet 0.0, whose tasks have all completed, from pool [0m
[34m2020-07-25 18:53:56,779 INFO scheduler.DAGScheduler: ShuffleMapStage 0 (collect at AnalysisRunner.scala:303) finished in 4.418 s[0m
[34m2020-07-25 18:53:56,780 INFO scheduler.DAGScheduler: looking for newly runnable stages[0m
[34m2020-07-25 18:53:56,780 INFO scheduler.DAGScheduler: running: Set()[0m
[34m2020-07-25 18:53:56,780 INFO scheduler.DAGScheduler: waiting: Set(ResultStage 1)[0m
[34m2020-07-25 18:53:56,780 INFO schedule

[34m2020-07-25 18:54:03,672 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 8.0 (TID 16) in 1179 ms on algo-2 (executor 1) (1/2)[0m
[34m2020-07-25 18:54:04,075 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 8.0 (TID 15) in 1582 ms on algo-2 (executor 1) (2/2)[0m
[34m2020-07-25 18:54:04,075 INFO cluster.YarnScheduler: Removed TaskSet 8.0, whose tasks have all completed, from pool [0m
[34m2020-07-25 18:54:04,075 INFO scheduler.DAGScheduler: ShuffleMapStage 8 (collect at AnalysisRunner.scala:499) finished in 1.587 s[0m
[34m2020-07-25 18:54:04,075 INFO scheduler.DAGScheduler: looking for newly runnable stages[0m
[34m2020-07-25 18:54:04,076 INFO scheduler.DAGScheduler: running: Set()[0m
[34m2020-07-25 18:54:04,076 INFO scheduler.DAGScheduler: waiting: Set(ShuffleMapStage 9, ResultStage 10)[0m
[34m2020-07-25 18:54:04,076 INFO scheduler.DAGScheduler: failed: Set()[0m
[34m2020-07-25 18:54:04,076 INFO scheduler.DAGScheduler: Submitting ShuffleMapStage 9 (Ma

[34m2020-07-25 18:54:13,186 INFO datasources.FileSourceStrategy: Pruning directories with: [0m
[34m2020-07-25 18:54:13,186 INFO datasources.FileSourceStrategy: Post-Scan Filters: [0m
[34m2020-07-25 18:54:13,186 INFO datasources.FileSourceStrategy: Output Data Schema: struct<customer_id: string, product_parent: string, star_rating: int, helpful_votes: int, total_votes: int ... 3 more fields>[0m
[34m2020-07-25 18:54:13,187 INFO execution.FileSourceScanExec: Pushed Filters: [0m
[34m2020-07-25 18:54:13,243 INFO codegen.CodeGenerator: Code generated in 24.41274 ms[0m
[34m2020-07-25 18:54:13,254 INFO codegen.CodeGenerator: Code generated in 5.862414 ms[0m
[34m2020-07-25 18:54:13,257 INFO memory.MemoryStore: Block broadcast_22 stored as values in memory (estimated size 400.9 KB, free 364.3 MB)[0m
[34m2020-07-25 18:54:13,268 INFO memory.MemoryStore: Block broadcast_22_piece0 stored as bytes in memory (estimated size 42.8 KB, free 364.2 MB)[0m
[34m2020-07-25 18:54:13,268 INFO s

[35m2020-07-25 18:54:21[0m
[35mFinished Yarn configuration files setup.
[0m
[35mReceived end of job signal, exiting...[0m
[34mFinished Yarn configuration files setup.
[0m



# Inspect the Processed Output 

## These are the quality checks on our dataset.

## _The next cells will not work properly until the job completes above._

In [22]:
!aws s3 ls --recursive $s3_output_analyze_data/

2020-07-25 18:54:07          0 amazon-reviews-spark-analyzer-2020-07-25-18-49-52/output/constraint-checks/_SUCCESS
2020-07-25 18:54:06        768 amazon-reviews-spark-analyzer-2020-07-25-18-49-52/output/constraint-checks/part-00000-c58055c7-5656-4ed3-a361-0efa8f8a1bde-c000.csv
2020-07-25 18:54:21          0 amazon-reviews-spark-analyzer-2020-07-25-18-49-52/output/constraint-suggestions/_SUCCESS
2020-07-25 18:54:20       2289 amazon-reviews-spark-analyzer-2020-07-25-18-49-52/output/constraint-suggestions/part-00000-a64adfea-ee96-4e1b-979e-f6345649748c-c000.csv
2020-07-25 18:54:00          0 amazon-reviews-spark-analyzer-2020-07-25-18-49-52/output/dataset-metrics/_SUCCESS
2020-07-25 18:53:59        364 amazon-reviews-spark-analyzer-2020-07-25-18-49-52/output/dataset-metrics/part-00000-b0d06b04-b7e1-40e2-a223-3272ca996b84-c000.csv
2020-07-25 18:54:09          0 amazon-reviews-spark-analyzer-2020-07-25-18-49-52/output/success-metrics/_SUCCESS
2020-07-25 18:54:08        277 amazon-re

## Copy the Output from S3 to Local
* dataset-metrics/
* constraint-checks/
* success-metrics/
* constraint-suggestions/


In [23]:
!aws s3 cp --recursive $s3_output_analyze_data ./amazon-reviews-spark-analyzer/ --exclude="*" --include="*.csv"

Completed 364 Bytes/3.6 KiB (10.5 KiB/s) with 4 file(s) remainingdownload: s3://sagemaker-us-west-2-393371431575/amazon-reviews-spark-analyzer-2020-07-25-18-49-52/output/dataset-metrics/part-00000-b0d06b04-b7e1-40e2-a223-3272ca996b84-c000.csv to amazon-reviews-spark-analyzer/dataset-metrics/part-00000-b0d06b04-b7e1-40e2-a223-3272ca996b84-c000.csv
Completed 364 Bytes/3.6 KiB (10.5 KiB/s) with 3 file(s) remainingCompleted 1.1 KiB/3.6 KiB (25.4 KiB/s) with 3 file(s) remaining  download: s3://sagemaker-us-west-2-393371431575/amazon-reviews-spark-analyzer-2020-07-25-18-49-52/output/constraint-checks/part-00000-c58055c7-5656-4ed3-a361-0efa8f8a1bde-c000.csv to amazon-reviews-spark-analyzer/constraint-checks/part-00000-c58055c7-5656-4ed3-a361-0efa8f8a1bde-c000.csv
Completed 1.1 KiB/3.6 KiB (25.4 KiB/s) with 2 file(s) remainingCompleted 3.3 KiB/3.6 KiB (61.1 KiB/s) with 2 file(s) remainingdownload: s3://sagemaker-us-west-2-393371431575/amazon-reviews-spark-analyzer-2020-07-25-18-49-52/ou

## Analyze Constraint Checks

In [24]:
import glob
import pandas as pd
import os

def load_dataset(path, sep, header):
    data = pd.concat([pd.read_csv(f, sep=sep, header=header) for f in glob.glob('{}/*.csv'.format(path))], ignore_index = True)

    return data

In [25]:
df_constraint_checks = load_dataset(path='./amazon-reviews-spark-analyzer/constraint-checks/', sep='\t', header=0)
df_constraint_checks[['check', 'constraint', 'constraint_status', 'constraint_message']]

Unnamed: 0,check,constraint,constraint_status,constraint_message
0,Review Check,SizeConstraint(Size(None)),Success,
1,Review Check,"MinimumConstraint(Minimum(star_rating,None))",Success,
2,Review Check,"MaximumConstraint(Maximum(star_rating,None))",Success,
3,Review Check,"CompletenessConstraint(Completeness(review_id,...",Success,
4,Review Check,UniquenessConstraint(Uniqueness(List(review_id))),Success,
5,Review Check,CompletenessConstraint(Completeness(marketplac...,Success,
6,Review Check,ComplianceConstraint(Compliance(marketplace co...,Success,


## Analyze Dataset Metrics

In [26]:
df_dataset_metrics = load_dataset(path='./amazon-reviews-spark-analyzer/dataset-metrics/', sep='\t', header=0)
df_dataset_metrics

Unnamed: 0,entity,instance,name,value
0,Column,review_id,Completeness,1.0
1,Column,review_id,ApproxCountDistinct,238027.0
2,Mutlicolumn,"total_votes,star_rating",Correlation,-0.080881
3,Dataset,*,Size,247515.0
4,Column,star_rating,Mean,3.723706
5,Column,top star_rating,Compliance,0.663338
6,Mutlicolumn,"total_votes,helpful_votes",Correlation,0.980529


## Analyze Success Metrics

In [27]:
df_success_metrics = load_dataset(path='./amazon-reviews-spark-analyzer/success-metrics/', sep='\t', header=0)
df_success_metrics

Unnamed: 0,entity,instance,name,value
0,Column,review_id,Completeness,1.0
1,Column,review_id,Uniqueness,1.0
2,Dataset,*,Size,247515.0
3,Column,star_rating,Maximum,5.0
4,Column,star_rating,Minimum,1.0
5,Column,"marketplace contained in US,UK,DE,JP,FR",Compliance,1.0
6,Column,marketplace,Completeness,1.0


## Analyze Constraint Suggestions

In [28]:
df_constraint_suggestions = load_dataset(path='./amazon-reviews-spark-analyzer/constraint-suggestions/', sep='\t', header=0)
df_constraint_suggestions.columns=['column_name', 'description', 'code']
df_constraint_suggestions

Unnamed: 0,column_name,description,code
0,review_id,'review_id' is not null,".isComplete(\review_id\"")"""
1,customer_id,'customer_id' is not null,".isComplete(\customer_id\"")"""
2,customer_id,'customer_id' has type Integral,".hasDataType(\customer_id\"", ConstrainableData..."
3,customer_id,'customer_id' has no negative values,".isNonNegative(\customer_id\"")"""
4,review_date,'review_date' is not null,".isComplete(\review_date\"")"""
5,helpful_votes,'helpful_votes' is not null,".isComplete(\helpful_votes\"")"""
6,helpful_votes,'helpful_votes' has no negative values,".isNonNegative(\helpful_votes\"")"""
7,star_rating,'star_rating' is not null,".isComplete(\star_rating\"")"""
8,star_rating,'star_rating' has no negative values,".isNonNegative(\star_rating\"")"""
9,product_title,'product_title' is not null,".isComplete(\product_title\"")"""


In [None]:
%%javascript
Jupyter.notebook.save_checkpoint();
Jupyter.notebook.session.delete();