
# Analyze Data Quality with SageMaker Processing Jobs and Spark

Typically a machine learning (ML) process consists of few steps. First, gathering data with various ETL jobs, then pre-processing the data, featurizing the dataset by incorporating standard techniques or prior knowledge, and finally training an ML model using an algorithm.

Often, distributed data processing frameworks such as Spark are used to process and analyze data sets in order to detect data quality issues and prepare them for model training.  

In this notebook we'll use Amazon SageMaker Processing with a library called [**Deequ**](https://github.com/awslabs/deequ), and leverage the power of Spark with a managed SageMaker Processing Job to run our data processing workloads.

Here are some great resources on Deequ: 
* Blog Post:  https://aws.amazon.com/blogs/big-data/test-data-quality-at-scale-with-deequ/
* Research Paper:  https://assets.amazon.science/4a/75/57047bd343fabc46ec14b34cdb3b/towards-automated-data-quality-management-for-machine-learning.pdf

![Deequ](img/deequ.png)

![](img/processing.jpg)

# Amazon Customer Reviews Dataset

https://s3.amazonaws.com/amazon-reviews-pds/readme.html

### Dataset Columns:

- `marketplace`: 2-letter country code (in this case all "US").
- `customer_id`: Random identifier that can be used to aggregate reviews written by a single author.
- `review_id`: A unique ID for the review.
- `product_id`: The Amazon Standard Identification Number (ASIN).  `http://www.amazon.com/dp/<ASIN>` links to the product's detail page.
- `product_parent`: The parent of that ASIN.  Multiple ASINs (color or format variations of the same product) can roll up into a single parent.
- `product_title`: Title description of the product.
- `product_category`: Broad product category that can be used to group reviews (in this case digital videos).
- `star_rating`: The review's rating (1 to 5 stars).
- `helpful_votes`: Number of helpful votes for the review.
- `total_votes`: Number of total votes the review received.
- `vine`: Was the review written as part of the [Vine](https://www.amazon.com/gp/vine/help) program?
- `verified_purchase`: Was the review from a verified purchase?
- `review_headline`: The title of the review itself.
- `review_body`: The text of the review.
- `review_date`: The date the review was written.

In [1]:
import sagemaker

sagemaker_session = sagemaker.Session()
role = sagemaker.get_execution_role()
bucket = sagemaker_session.default_bucket()

# Pull the Spark-Deequ Docker Image

In [2]:
public_image_uri='docker.io/datascienceonaws/spark-deequ:1.0.0'

In [3]:
!docker pull $public_image_uri

1.0.0: Pulling from datascienceonaws/spark-deequ

[1B52930446: Pulling fs layer 
[1B9b8e633f: Pulling fs layer 
[1B86d6fc62: Pulling fs layer 
[1Bca826205: Pulling fs layer 
[1B4eef3ec7: Pulling fs layer 
[1B53d21ef8: Pulling fs layer 
[2B53d21ef8: Waiting fs layer 
[1B407e90b8: Pulling fs layer 
[2B407e90b8: Waiting fs layer 
[1B41894336: Pulling fs layer 
[2B41894336: Waiting fs layer 
[1B7ba7bae0: Pulling fs layer 
[1B9af8056f: Pulling fs layer 
[1Ba3da4052: Pulling fs layer 
[1BDigest: sha256:4c897e8742a77beee7c473c463c21acbc1eb23da37bfface9da10d92e994ed2cK[12A[2K[10A[2K[15A[2K[9A[2K[12A[2K[9A[2K[12A[2K[9A[2K[12A[2K[9A[2K[9A[2K[8A[2K[7A[2K[15A[2K[6A[2K[14A[2K[10A[2K[6A[2K[13A[2K[6A[2K[10A[2K[6A[2K[5A[2K[6A[2K[5A[2K[6A[2K[10A[2K[6A[2K[10A[2K[6A[2K[10A[2K[6A[2K[10A[2K[6A[2K[12A[2K[11A[2K[6A[2K[11A[2K[5A[2K[10A[2K[6A[2K[6A[2K[6A[2K[10A[2K[5A[2K[5A[2K[5A[2K[4A[2K[3A[2K[2A[

# Push the Image to a Private Docker Repo

In [4]:
private_docker_repo = 'spark-deequ'
private_docker_tag = '1.0.0'

In [5]:
import boto3
account_id = boto3.client('sts').get_caller_identity().get('Account')
region = boto3.session.Session().region_name

private_image_uri = '{}.dkr.ecr.{}.amazonaws.com/{}:{}'.format(account_id, region, private_docker_repo, private_docker_tag)
print(private_image_uri)

889926741212.dkr.ecr.us-east-1.amazonaws.com/spark-deequ:1.0.0


In [6]:
!docker tag $public_image_uri $private_image_uri

In [7]:
!$(aws ecr get-login --region $region --registry-ids $account_id --no-include-email)

https://docs.docker.com/engine/reference/commandline/login/#credentials-store

Login Succeeded


In [8]:
!aws ecr describe-repositories --repository-names $private_docker_repo || aws ecr create-repository --repository-name $private_docker_repo


An error occurred (RepositoryNotFoundException) when calling the DescribeRepositories operation: The repository with name 'spark-deequ' does not exist in the registry with id '889926741212'
{
    "repository": {
        "repositoryArn": "arn:aws:ecr:us-east-1:889926741212:repository/spark-deequ",
        "registryId": "889926741212",
        "repositoryName": "spark-deequ",
        "repositoryUri": "889926741212.dkr.ecr.us-east-1.amazonaws.com/spark-deequ",
        "createdAt": 1600543740.0,
        "imageTagMutability": "MUTABLE",
        "imageScanningConfiguration": {
            "scanOnPush": false
        },
        "encryptionConfiguration": {
            "encryptionType": "AES256"
        }
    }
}


In [9]:
!docker push $private_image_uri

The push refers to repository [889926741212.dkr.ecr.us-east-1.amazonaws.com/spark-deequ]

[1B27265c55: Preparing 
[1B9fead42e: Preparing 
[1B08d6508a: Preparing 
[1Bc679c06f: Preparing 
[1Be5a9c66a: Preparing 
[1Bff8f884c: Preparing 
[1B71dfa891: Preparing 
[1B7150ba0e: Preparing 
[1B6dddc178: Preparing 
[1Bf98af65b: Preparing 
[1Bfd80cb68: Preparing 
[1B8102614d: Preparing 
[1B52ed4cbd: Preparing 
[1B0fa5728d: Preparing 
[6Bf98af65b: Pushed   490.3MB/481.8MB[15A[2K[15A[2K[11A[2K[14A[2K[11A[2K[12A[2K[11A[2K[10A[2K[10A[2K[10A[2K[11A[2K[10A[2K[11A[2K[10A[2K[9A[2K[11A[2K[10A[2K[11A[2K[7A[2K[11A[2K[7A[2K[8A[2K[7A[2K[11A[2K[10A[2K[11A[2K[7A[2K[11A[2K[6A[2K[11A[2K[6A[2K[11A[2K[6A[2K[7A[2K[6A[2K[10A[2K[6A[2K[10A[2K[11A[2K[6A[2K[7A[2K[6A[2K[7A[2K[6A[2K[11A[2K[10A[2K[6A[2K[10A[2K[6A[2K[10A[2K[7A[2K[10A[2K[11A[2K[10A[2K[6A[2K[10A[2K[6A[2K[7A[2K[11A[2K[7A[2K[11A

# Run the Analysis Job using a SageMaker Processing Job

Next, use the Amazon SageMaker Python SDK to submit a processing job. Use the Spark container that was just built with our Spark script.

# Review the Spark preprocessing script.

In [10]:
!pygmentize preprocess-deequ.py

[34mfrom[39;49;00m [04m[36m__future__[39;49;00m [34mimport[39;49;00m print_function
[34mfrom[39;49;00m [04m[36m__future__[39;49;00m [34mimport[39;49;00m unicode_literals

[34mimport[39;49;00m [04m[36mtime[39;49;00m
[34mimport[39;49;00m [04m[36msys[39;49;00m
[34mimport[39;49;00m [04m[36mos[39;49;00m
[34mimport[39;49;00m [04m[36mshutil[39;49;00m
[34mimport[39;49;00m [04m[36mcsv[39;49;00m

[34mimport[39;49;00m [04m[36mpyspark[39;49;00m
[34mfrom[39;49;00m [04m[36mpyspark[39;49;00m[04m[36m.[39;49;00m[04m[36msql[39;49;00m [34mimport[39;49;00m SparkSession
[34mfrom[39;49;00m [04m[36mpyspark[39;49;00m[04m[36m.[39;49;00m[04m[36msql[39;49;00m[04m[36m.[39;49;00m[04m[36mfunctions[39;49;00m [34mimport[39;49;00m *

[34mdef[39;49;00m [32mmain[39;49;00m():
    args_iter = [36miter[39;49;00m(sys.argv[[34m1[39;49;00m:])
    args = [36mdict[39;49;00m([36mzip[39;49;00m(args_iter, args_iter))
    
    

In [11]:
!pygmentize preprocess-deequ.scala

[34mimport[39;49;00m [04m[36mcom.amazon.deequ.analyzers.runners.[39;49;00m{[04m[32mAnalysisRunner[39;49;00m, [04m[32mAnalyzerContext[39;49;00m}
[34mimport[39;49;00m [04m[36mcom.amazon.deequ.analyzers.runners.AnalyzerContext.successMetricsAsDataFrame[39;49;00m
[34mimport[39;49;00m [04m[36mcom.amazon.deequ.analyzers.[39;49;00m{[04m[32mCompliance[39;49;00m, [04m[32mCorrelation[39;49;00m, [04m[32mSize[39;49;00m, [04m[32mCompleteness[39;49;00m, [04m[32mMean[39;49;00m, [04m[32mApproxCountDistinct[39;49;00m}
[34mimport[39;49;00m [04m[36mcom.amazon.deequ.[39;49;00m{[04m[32mVerificationSuite[39;49;00m, [04m[32mVerificationResult[39;49;00m}
[34mimport[39;49;00m [04m[36mcom.amazon.deequ.VerificationResult.checkResultsAsDataFrame[39;49;00m
[34mimport[39;49;00m [04m[36mcom.amazon.deequ.checks.[39;49;00m{[04m[32mCheck[39;49;00m, [04m[32mCheckLevel[39;49;00m}
[34mimport[39;49;00m [04m[36mcom.amazon.deequ.suggestions.[39;49;0

In [12]:
from sagemaker.processing import ScriptProcessor

processor = ScriptProcessor(base_job_name='spark-amazon-reviews-analyzer',
                            image_uri=private_image_uri,
                            command=['/opt/program/submit'],
                            role=role,
                            instance_count=2, # instance_count needs to be > 1 or you will see the following error:  "INFO yarn.Client: Application report for application_ (state: ACCEPTED)"
                            instance_type='ml.r5.2xlarge',
                            env={
                                'mode': 'jar',
                                'main_class': 'Main'
                            })

In [13]:
s3_input_data = 's3://{}/amazon-reviews-pds/tsv/'.format(bucket)
print(s3_input_data)

s3://sagemaker-us-east-1-889926741212/amazon-reviews-pds/tsv/


In [14]:
!aws s3 ls $s3_input_data

2020-09-19 18:20:56   18997559 amazon_reviews_us_Digital_Software_v1_00.tsv.gz
2020-09-19 18:20:59   27442648 amazon_reviews_us_Digital_Video_Games_v1_00.tsv.gz


## Setup Output Data

In [15]:
from time import gmtime, strftime
timestamp_prefix = strftime("%Y-%m-%d-%H-%M-%S", gmtime())

output_prefix = 'amazon-reviews-spark-analyzer-{}'.format(timestamp_prefix)
processing_job_name = 'amazon-reviews-spark-analyzer-{}'.format(timestamp_prefix)

print('Processing job name:  {}'.format(processing_job_name))

Processing job name:  amazon-reviews-spark-analyzer-2020-09-19-19-29-30


In [16]:
s3_output_analyze_data = 's3://{}/{}/output'.format(bucket, output_prefix)

print(s3_output_analyze_data)

s3://sagemaker-us-east-1-889926741212/amazon-reviews-spark-analyzer-2020-09-19-19-29-30/output


## Start the Spark Processing Job

_Notes on Invoking from Lambda:_
* However, if we use the boto3 SDK (ie. with a Lambda), we need to copy the `preprocess.py` file to S3 and specify the everything include --py-files, etc.
* We would need to do the following before invoking the Lambda:
     !aws s3 cp preprocess.py s3://<location>/sagemaker/spark-preprocess-reviews-demo/code/preprocess.py
     !aws s3 cp preprocess.py s3://<location>/sagemaker/spark-preprocess-reviews-demo/py_files/preprocess.py
* Then reference the s3://<location> above in the --py-files, etc.
* See Lambda example code in this same project for more details.

_Notes on not using ProcessingInput and Output:_
* Since Spark natively reads/writes from/to S3 using s3a://, we can avoid the copy required by ProcessingInput and ProcessingOutput (FullyReplicated or ShardedByS3Key) and just specify the S3 input and output buckets/prefixes._"
* See https://github.com/awslabs/amazon-sagemaker-examples/issues/994 for issues related to using /opt/ml/processing/input/ and output/
* If we use ProcessingInput, the data will be copied to each node (which we don't want in this case since Spark already handles this)

In [17]:
from sagemaker.processing import ProcessingOutput

processor.run(code='preprocess-deequ.py',
              arguments=['s3_input_data', s3_input_data,
                         's3_output_analyze_data', s3_output_analyze_data,
              ],
              # See https://github.com/aws/sagemaker-python-sdk/issues/1341 
              #   for why we need to specify a null-output
              outputs=[
                  ProcessingOutput(s3_upload_mode='EndOfJob',
                                   output_name='null-output',
                                   source='/opt/ml/processing/output')
              ],
              logs=True,
              wait=False
)

Parameter 'session' will be renamed to 'sagemaker_session' in SageMaker Python SDK v2.



Job Name:  spark-amazon-reviews-analyzer-2020-09-19-19-29-30-295
Inputs:  [{'InputName': 'code', 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-889926741212/spark-amazon-reviews-analyzer-2020-09-19-19-29-30-295/input/code/preprocess-deequ.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]
Outputs:  [{'OutputName': 'null-output', 'S3Output': {'S3Uri': 's3://sagemaker-us-east-1-889926741212/spark-amazon-reviews-analyzer-2020-09-19-19-29-30-295/output/null-output', 'LocalPath': '/opt/ml/processing/output', 'S3UploadMode': 'EndOfJob'}}]


In [18]:
from IPython.core.display import display, HTML

processing_job_name = processor.jobs[-1].describe()['ProcessingJobName']

display(HTML('<b>Review <a target="blank" href="https://console.aws.amazon.com/sagemaker/home?region={}#/processing-jobs/{}">Processing Job</a></b>'.format(region, processing_job_name)))


In [19]:
from IPython.core.display import display, HTML

processing_job_name = processor.jobs[-1].describe()['ProcessingJobName']

display(HTML('<b>Review <a target="blank" href="https://console.aws.amazon.com/cloudwatch/home?region={}#logStream:group=/aws/sagemaker/ProcessingJobs;prefix={};streamFilter=typeLogStreamPrefix">CloudWatch Logs</a> After a Few Minutes</b>'.format(region, processing_job_name)))


In [20]:
from IPython.core.display import display, HTML

s3_job_output_prefix = output_prefix

display(HTML('<b>Review <a target="blank" href="https://s3.console.aws.amazon.com/s3/buckets/{}/{}/?region={}&tab=overview">S3 Output Data</a> After The Spark Job Has Completed</b>'.format(bucket, s3_job_output_prefix, region)))


# Please Wait Until the Processing Job Completes!

In [21]:
running_processor = sagemaker.processing.ProcessingJob.from_processing_name(processing_job_name=processing_job_name,
                                                                            sagemaker_session=sagemaker_session)



# _Please Wait Until the ^^ Processing Job ^^ Completes Above._

In [22]:
running_processor.wait()

[34m2020-09-19 19:33:12,686 INFO namenode.NameNode: STARTUP_MSG: [0m
[34m/************************************************************[0m
[34mSTARTUP_MSG: Starting NameNode[0m
[34mSTARTUP_MSG:   host = algo-1/10.0.176.27[0m
[34mSTARTUP_MSG:   args = [-format, -force][0m
[34mSTARTUP_MSG:   version = 3.2.1[0m
[34mSTARTUP_MSG:   classpath = /usr/hadoop-3.2.1/etc/hadoop:/usr/hadoop-3.2.1/share/hadoop/common/lib/audience-annotations-0.5.0.jar:/usr/hadoop-3.2.1/share/hadoop/common/lib/jsr311-api-1.1.1.jar:/usr/hadoop-3.2.1/share/hadoop/common/lib/jetty-security-9.3.24.v20180605.jar:/usr/hadoop-3.2.1/share/hadoop/common/lib/jersey-json-1.19.jar:/usr/hadoop-3.2.1/share/hadoop/common/lib/hadoop-auth-3.2.1.jar:/usr/hadoop-3.2.1/share/hadoop/common/lib/jackson-core-asl-1.9.13.jar:/usr/hadoop-3.2.1/share/hadoop/common/lib/commons-text-1.4.jar:/usr/hadoop-3.2.1/share/hadoop/common/lib/kerb-simplekdc-1.0.1.jar:/usr/hadoop-3.2.1/share/hadoop/common/lib/commons-math3-3.1.1.jar:/usr/hadoop

[34mStarting resourcemanager[0m
[34mStarting nodemanagers[0m
[34mlocalhost: /usr/hadoop-3.2.1/bin/../libexec/hadoop-functions.sh: line 982: ssh: command not found[0m
[34m2020-09-19 19:33:24,865 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable[0m
[34ms3a://sagemaker-us-east-1-889926741212/amazon-reviews-pds/tsv/[0m
[34ms3a://sagemaker-us-east-1-889926741212/amazon-reviews-spark-analyzer-2020-09-19-19-29-30/output[0m
[34m2020-09-19 19:33:25,419 INFO spark.SparkContext: Running Spark version 2.4.6[0m
[34m2020-09-19 19:33:25,437 INFO spark.SparkContext: Submitted application: Amazon_Reviews_Spark_Analyzer[0m
[34m2020-09-19 19:33:25,478 INFO spark.SecurityManager: Changing view acls to: root[0m
[34m2020-09-19 19:33:25,478 INFO spark.SecurityManager: Changing modify acls to: root[0m
[34m2020-09-19 19:33:25,478 INFO spark.SecurityManager: Changing view acls groups to: [0m
[34m2020-09-19 19:

[34m2020-09-19 19:33:30,973 INFO yarn.Client: Application report for application_1600544003530_0001 (state: ACCEPTED)[0m
[34m2020-09-19 19:33:30,975 INFO yarn.Client: [0m
[34m#011 client token: N/A[0m
[34m#011 diagnostics: AM container is launched, waiting for AM container to Register with RM[0m
[34m#011 ApplicationMaster host: N/A[0m
[34m#011 ApplicationMaster RPC port: -1[0m
[34m#011 queue: default[0m
[34m#011 start time: 1600544009702[0m
[34m#011 final status: UNDEFINED[0m
[34m#011 tracking URL: http://algo-1:8088/proxy/application_1600544003530_0001/[0m
[34m#011 user: root[0m
[34m2020-09-19 19:33:31,978 INFO yarn.Client: Application report for application_1600544003530_0001 (state: ACCEPTED)[0m
[34m2020-09-19 19:33:32,980 INFO yarn.Client: Application report for application_1600544003530_0001 (state: ACCEPTED)[0m
[34m2020-09-19 19:33:33,984 INFO yarn.Client: Application report for application_1600544003530_0001 (state: ACCEPTED)[0m
[34m2020-09-19 19:33:

[34m2020-09-19 19:34:00,632 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on algo-2:44167 (size: 42.9 KB, free: 24.1 GB)[0m
[34m2020-09-19 19:34:04,631 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 5003 ms on algo-2 (executor 1) (1/2)[0m
[34m2020-09-19 19:34:04,633 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 4887 ms on algo-2 (executor 1) (2/2)[0m
[34m2020-09-19 19:34:04,634 INFO cluster.YarnScheduler: Removed TaskSet 0.0, whose tasks have all completed, from pool [0m
[34m2020-09-19 19:34:04,639 INFO scheduler.DAGScheduler: ShuffleMapStage 0 (collect at AnalysisRunner.scala:303) finished in 5.054 s[0m
[34m2020-09-19 19:34:04,639 INFO scheduler.DAGScheduler: looking for newly runnable stages[0m
[34m2020-09-19 19:34:04,639 INFO scheduler.DAGScheduler: running: Set()[0m
[34m2020-09-19 19:34:04,640 INFO scheduler.DAGScheduler: waiting: Set(ResultStage 1)[0m
[34m2020-09-19 19:34:04,640 INFO schedule

[34m2020-09-19 19:34:10,499 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 6.0 (TID 13) in 679 ms on algo-2 (executor 1) (1/2)[0m
[34m2020-09-19 19:34:10,682 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 6.0 (TID 12) in 862 ms on algo-2 (executor 1) (2/2)[0m
[34m2020-09-19 19:34:10,682 INFO cluster.YarnScheduler: Removed TaskSet 6.0, whose tasks have all completed, from pool [0m
[34m2020-09-19 19:34:10,683 INFO scheduler.DAGScheduler: ShuffleMapStage 6 (count at GroupingAnalyzers.scala:76) finished in 0.879 s[0m
[34m2020-09-19 19:34:10,683 INFO scheduler.DAGScheduler: looking for newly runnable stages[0m
[34m2020-09-19 19:34:10,683 INFO scheduler.DAGScheduler: running: Set()[0m
[34m2020-09-19 19:34:10,683 INFO scheduler.DAGScheduler: waiting: Set(ResultStage 7)[0m
[34m2020-09-19 19:34:10,683 INFO scheduler.DAGScheduler: failed: Set()[0m
[34m2020-09-19 19:34:10,683 INFO scheduler.DAGScheduler: Submitting ResultStage 7 (MapPartitionsRDD[24] at coun

[34m2020-09-19 19:34:22,642 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 15.0 (TID 231) in 3556 ms on algo-2 (executor 1) (1/2)[0m
[34m2020-09-19 19:34:23,398 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 15.0 (TID 230) in 4312 ms on algo-2 (executor 1) (2/2)[0m
[34m2020-09-19 19:34:23,398 INFO cluster.YarnScheduler: Removed TaskSet 15.0, whose tasks have all completed, from pool [0m
[34m2020-09-19 19:34:23,399 INFO scheduler.DAGScheduler: ShuffleMapStage 15 (collect at AnalysisRunner.scala:303) finished in 4.320 s[0m
[34m2020-09-19 19:34:23,399 INFO scheduler.DAGScheduler: looking for newly runnable stages[0m
[34m2020-09-19 19:34:23,399 INFO scheduler.DAGScheduler: running: Set()[0m
[34m2020-09-19 19:34:23,399 INFO scheduler.DAGScheduler: waiting: Set(ResultStage 16)[0m
[34m2020-09-19 19:34:23,399 INFO scheduler.DAGScheduler: failed: Set()[0m
[34m2020-09-19 19:34:23,399 INFO scheduler.DAGScheduler: Submitting ResultStage 16 (MapPartitionsRDD[5

[34m2020-09-19 19:34:32,207 INFO spark.SparkContext: Invoking stop() from shutdown hook[0m
[34m2020-09-19 19:34:32,211 INFO server.AbstractConnector: Stopped Spark@74f1bae5{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}[0m
[34m2020-09-19 19:34:32,213 INFO ui.SparkUI: Stopped Spark web UI at http://10.0.176.27:4040[0m
[34m2020-09-19 19:34:32,217 INFO cluster.YarnClientSchedulerBackend: Interrupting monitor thread[0m
[34m2020-09-19 19:34:32,234 INFO cluster.YarnClientSchedulerBackend: Shutting down all executors[0m
[34m2020-09-19 19:34:32,234 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint: Asking each executor to shut down[0m
[34m2020-09-19 19:34:32,237 INFO cluster.SchedulerExtensionServices: Stopping SchedulerExtensionServices[0m
[34m(serviceOption=None,
 services=List(),
 started=false)[0m
[34m2020-09-19 19:34:32,238 INFO cluster.YarnClientSchedulerBackend: Stopped[0m
[34m2020-09-19 19:34:32,242 INFO spark.MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint sto

# Inspect the Processed Output 

## These are the quality checks on our dataset.

## _The next cells will not work properly until the job completes above._

In [23]:
!aws s3 ls --recursive $s3_output_analyze_data/

2020-09-19 19:34:17          0 amazon-reviews-spark-analyzer-2020-09-19-19-29-30/output/constraint-checks/_SUCCESS
2020-09-19 19:34:16        768 amazon-reviews-spark-analyzer-2020-09-19-19-29-30/output/constraint-checks/part-00000-22187485-4b51-47ac-b112-ce04b05728ee-c000.csv
2020-09-19 19:34:32          0 amazon-reviews-spark-analyzer-2020-09-19-19-29-30/output/constraint-suggestions/_SUCCESS
2020-09-19 19:34:32       2289 amazon-reviews-spark-analyzer-2020-09-19-19-29-30/output/constraint-suggestions/part-00000-fe1487a3-8f4c-4db9-a821-462ddc759542-c000.csv
2020-09-19 19:34:08          0 amazon-reviews-spark-analyzer-2020-09-19-19-29-30/output/dataset-metrics/_SUCCESS
2020-09-19 19:34:08        364 amazon-reviews-spark-analyzer-2020-09-19-19-29-30/output/dataset-metrics/part-00000-6bfff2a4-c345-43cc-bb20-41c9020e523a-c000.csv
2020-09-19 19:34:19          0 amazon-reviews-spark-analyzer-2020-09-19-19-29-30/output/success-metrics/_SUCCESS
2020-09-19 19:34:18        277 amazon-re

## Copy the Output from S3 to Local
* dataset-metrics/
* constraint-checks/
* success-metrics/
* constraint-suggestions/


In [24]:
!aws s3 cp --recursive $s3_output_analyze_data ./amazon-reviews-spark-analyzer/ --exclude="*" --include="*.csv"

download: s3://sagemaker-us-east-1-889926741212/amazon-reviews-spark-analyzer-2020-09-19-19-29-30/output/constraint-checks/part-00000-22187485-4b51-47ac-b112-ce04b05728ee-c000.csv to amazon-reviews-spark-analyzer/constraint-checks/part-00000-22187485-4b51-47ac-b112-ce04b05728ee-c000.csv
download: s3://sagemaker-us-east-1-889926741212/amazon-reviews-spark-analyzer-2020-09-19-19-29-30/output/constraint-suggestions/part-00000-fe1487a3-8f4c-4db9-a821-462ddc759542-c000.csv to amazon-reviews-spark-analyzer/constraint-suggestions/part-00000-fe1487a3-8f4c-4db9-a821-462ddc759542-c000.csv
download: s3://sagemaker-us-east-1-889926741212/amazon-reviews-spark-analyzer-2020-09-19-19-29-30/output/success-metrics/part-00000-6331d7fd-ba89-4f89-8191-73b8603a521a-c000.csv to amazon-reviews-spark-analyzer/success-metrics/part-00000-6331d7fd-ba89-4f89-8191-73b8603a521a-c000.csv
download: s3://sagemaker-us-east-1-889926741212/amazon-reviews-spark-analyzer-2020-09-19-19-29-30/output/dataset-metrics/part-0000

## Analyze Constraint Checks

In [25]:
import glob
import pandas as pd
import os

def load_dataset(path, sep, header):
    data = pd.concat([pd.read_csv(f, sep=sep, header=header) for f in glob.glob('{}/*.csv'.format(path))], ignore_index = True)

    return data

In [26]:
df_constraint_checks = load_dataset(path='./amazon-reviews-spark-analyzer/constraint-checks/', sep='\t', header=0)
df_constraint_checks[['check', 'constraint', 'constraint_status', 'constraint_message']]

Unnamed: 0,check,constraint,constraint_status,constraint_message
0,Review Check,SizeConstraint(Size(None)),Success,
1,Review Check,"MinimumConstraint(Minimum(star_rating,None))",Success,
2,Review Check,"MaximumConstraint(Maximum(star_rating,None))",Success,
3,Review Check,"CompletenessConstraint(Completeness(review_id,...",Success,
4,Review Check,UniquenessConstraint(Uniqueness(List(review_id))),Success,
5,Review Check,CompletenessConstraint(Completeness(marketplac...,Success,
6,Review Check,ComplianceConstraint(Compliance(marketplace co...,Success,


## Analyze Dataset Metrics

In [27]:
df_dataset_metrics = load_dataset(path='./amazon-reviews-spark-analyzer/dataset-metrics/', sep='\t', header=0)
df_dataset_metrics

Unnamed: 0,entity,instance,name,value
0,Column,review_id,Completeness,1.0
1,Column,review_id,ApproxCountDistinct,238027.0
2,Mutlicolumn,"total_votes,star_rating",Correlation,-0.080881
3,Dataset,*,Size,247515.0
4,Column,star_rating,Mean,3.723706
5,Column,top star_rating,Compliance,0.663338
6,Mutlicolumn,"total_votes,helpful_votes",Correlation,0.980529


## Analyze Success Metrics

In [28]:
df_success_metrics = load_dataset(path='./amazon-reviews-spark-analyzer/success-metrics/', sep='\t', header=0)
df_success_metrics

Unnamed: 0,entity,instance,name,value
0,Column,review_id,Completeness,1.0
1,Column,review_id,Uniqueness,1.0
2,Dataset,*,Size,247515.0
3,Column,star_rating,Maximum,5.0
4,Column,star_rating,Minimum,1.0
5,Column,"marketplace contained in US,UK,DE,JP,FR",Compliance,1.0
6,Column,marketplace,Completeness,1.0


## Analyze Constraint Suggestions

In [29]:
df_constraint_suggestions = load_dataset(path='./amazon-reviews-spark-analyzer/constraint-suggestions/', sep='\t', header=0)
df_constraint_suggestions.columns=['column_name', 'description', 'code']
df_constraint_suggestions

Unnamed: 0,column_name,description,code
0,review_id,'review_id' is not null,".isComplete(\review_id\"")"""
1,customer_id,'customer_id' is not null,".isComplete(\customer_id\"")"""
2,customer_id,'customer_id' has type Integral,".hasDataType(\customer_id\"", ConstrainableData..."
3,customer_id,'customer_id' has no negative values,".isNonNegative(\customer_id\"")"""
4,review_date,'review_date' is not null,".isComplete(\review_date\"")"""
5,helpful_votes,'helpful_votes' is not null,".isComplete(\helpful_votes\"")"""
6,helpful_votes,'helpful_votes' has no negative values,".isNonNegative(\helpful_votes\"")"""
7,star_rating,'star_rating' is not null,".isComplete(\star_rating\"")"""
8,star_rating,'star_rating' has no negative values,".isNonNegative(\star_rating\"")"""
9,product_title,'product_title' is not null,".isComplete(\product_title\"")"""


# Save for the Next Notebook(s)

In [30]:
%store df_dataset_metrics

Stored 'df_dataset_metrics' (DataFrame)


In [None]:
%%javascript
Jupyter.notebook.save_checkpoint();
Jupyter.notebook.session.delete();