# Data Processing With Dask Processing Job

This notebook will create and execute a [Processing Job](https://aws.amazon.com/blogs/aws/amazon-sagemaker-processing-fully-managed-data-processing-and-model-evaluation/) using [Dask](https://dask.org) to execute the data cleansing task we designed on the local data processing. It will do the same steps as the "Data Processing with Dask" notebook in an automated, repeatable way.

## Setup

Let's start by specifying:
* The S3 bucket and prefixes that you use for training and model data. Use the default bucket specified by the Amazon SageMaker session.
* The IAM role ARN used to give processing and training access to the dataset.

In [1]:
from time import gmtime, strftime
import sagemaker
import boto3
import os

sagemaker_session = sagemaker.Session()
role = sagemaker.get_execution_role()
bucket = sagemaker_session.default_bucket()
s3client = boto3.client('s3')

prefix = "sagemaker/muse-dask-preprocess-demo"
input_prefix = prefix + "/input/book-depository/raw"
code_prefix = prefix + "/code"
input_preprocessed_prefix = prefix + "/input/book-depository/preprocessed"
input_descriptions_prefix = prefix + "/input/book-depository/descriptions"
input_rejected_prefix = prefix + "/input/book-depository/rejected"
input_reports_prefix = prefix + "/input/book-depository/reports"

## Using Amazon SageMaker to run a Dask Processing Job

The dataset used here is the [book depository dataset](https://www.kaggle.com/sp1thas/book-depository-dataset) KDD Dataset. It has been previous uploaded to the specified prefix.

In this example, we will use Dask distributed to preprocess and transform the data to make it ready for embedding generation. In the next section, you download from the bucket below then upload to your own bucket so that Amazon SageMaker can access the dataset.

### Build a dask container for running the preprocessing job

An example Dask container is included in the `./container` directory of this example. The container handles the bootstrapping of Dask Scheduler and mapping each instance to a Dask Worke. At a high level the container provides:

* A set of default worker/scheduler configurations
* A bootstrapping script for configuring and starting up  scheduler/worker nodes
* Starting dask cluster from all the workers including the scheduler node


After the container build and push process is complete, use the Amazon SageMaker Python SDK to submit a managed, distributed dask application that performs our dataset preprocessing.

In [2]:
!pygmentize container/Dockerfile

[34mFROM[39;49;00m[33m continuumio/miniconda3:4.7.12[39;49;00m


[34mRUN[39;49;00m apt-get update
[34mRUN[39;49;00m apt-get install -y curl unzip python3 python3-setuptools python3-pip python-dev python3-dev python-psutil
[34mRUN[39;49;00m pip3 install py4j [31mpsutil[39;49;00m==[34m5[39;49;00m.6.5 [31mnumpy[39;49;00m==[34m1[39;49;00m.17.4
[34mRUN[39;49;00m apt-get clean
[34mRUN[39;49;00m rm -rf /var/lib/apt/lists/*

[34mENV[39;49;00m[33m PYTHONHASHSEED 0[39;49;00m
[34mENV[39;49;00m[33m PYTHONIOENCODING UTF-8[39;49;00m
[34mENV[39;49;00m[33m PIP_DISABLE_PIP_VERSION_CHECK 1[39;49;00m


[34mRUN[39;49;00m conda install --yes [33m\[39;49;00m
    -c conda-forge [33m\[39;49;00m
    [31mpython[39;49;00m==[34m3[39;49;00m.8 [33m\[39;49;00m
    python-blosc [33m\[39;49;00m
    cytoolz [33m\[39;49;00m
    [31mdask[39;49;00m==[34m2[39;49;00m.17.2 [33m\[39;49;00m
    [31mdistributed[39;49;00m==[34m2[39;49;00m.20.0 [33m\[39;49;00m
    lz4 

### Build the container.

The creation of the container has to be done only when the libraries or configuration change. Normal operation just uses the latest pre-built image.

In [116]:
ecr_repository = 'sagemaker-dask-muse'

In [118]:
%cd container
!docker build -t {ecr_repository} .
%cd ../

/home/ec2-user/SageMaker/MUSE-sagemaker-development/notebooks/data-preparation/container
Sending build context to Docker daemon  17.92kB
Step 1/21 : FROM continuumio/miniconda3:4.7.12
 ---> 406f2b43ea59
Step 2/21 : RUN apt-get update
 ---> Using cache
 ---> 42e88a27af6f
Step 3/21 : RUN apt-get install -y curl unzip python3 python3-setuptools python3-pip python-dev python3-dev python-psutil
 ---> Using cache
 ---> bc2cbaba76b9
Step 4/21 : RUN pip3 install py4j psutil==5.6.5 numpy==1.17.4
 ---> Using cache
 ---> 52d2ef796f81
Step 5/21 : RUN apt-get clean
 ---> Using cache
 ---> 18e57ab33c50
Step 6/21 : RUN rm -rf /var/lib/apt/lists/*
 ---> Using cache
 ---> ed93c77f791e
Step 7/21 : ENV PYTHONHASHSEED 0
 ---> Using cache
 ---> 1fc6e8976218
Step 8/21 : ENV PYTHONIOENCODING UTF-8
 ---> Using cache
 ---> b74fa120645d
Step 9/21 : ENV PIP_DISABLE_PIP_VERSION_CHECK 1
 ---> Using cache
 ---> bb33dd73e6fd
Step 10/21 : RUN conda install --yes     -c conda-forge     python==3.8     python-blosc    

### Create an Amazon Elastic Container Registry (Amazon ECR) repository for the Dask container and push the image.

In [119]:
account_id = boto3.client('sts').get_caller_identity().get('Account')
region = boto3.session.Session().region_name

tag = ':latest'
uri_suffix = 'amazonaws.com'
if region in ['cn-north-1', 'cn-northwest-1']:
    uri_suffix = 'amazonaws.com.cn'
dask_repository_uri = f'{account_id}.dkr.ecr.{region}.{uri_suffix}/{ecr_repository + tag}'

# Create ECR repository and push docker image
!$(aws ecr get-login --region $region --registry-ids $account_id --no-include-email)
!aws ecr describe-repositories --repository-names $ecr_repository || aws ecr create-repository --repository-name $ecr_repository
!docker tag {ecr_repository + tag} $dask_repository_uri
!docker push $dask_repository_uri
print(f"Image {ecr_repository + tag} was pushed to {dask_repository_uri}")

https://docs.docker.com/engine/reference/commandline/login/#credentials-store

Login Succeeded
{
    "repositories": [
        {
            "repositoryArn": "arn:aws:ecr:eu-west-1:113147044314:repository/sagemaker-dask-muse",
            "registryId": "113147044314",
            "repositoryName": "sagemaker-dask-muse",
            "repositoryUri": "113147044314.dkr.ecr.eu-west-1.amazonaws.com/sagemaker-dask-muse",
            "createdAt": 1594041931.0,
            "imageTagMutability": "MUTABLE",
            "imageScanningConfiguration": {
                "scanOnPush": false
            }
        }
    ]
}
The push refers to repository [113147044314.dkr.ecr.eu-west-1.amazonaws.com/sagemaker-dask-muse]

[1B0a84af46: Preparing 
[1Bae5c86a7: Preparing 
[1B7005b026: Preparing 
[1Bcbf681dc: Preparing 
[1B565545ff: Preparing 
[1B144646f4: Preparing 
[1Bf0098f8e: Preparing 
[1Ba923921f: Preparing 
[1B050018b6: Preparing 
[1Bd1ae440b: Preparing 
[1B25bf8b36: Preparing 
[1B8acb45c2

### Run the preprocessing job using Amazon SageMaker Processing on Dask Cluster

Next, use the Amazon SageMaker Python SDK to submit a processing job. Use the the custom Dask container that was just built, and an adaptation of the processing code we generated for local processing.

The most important aspects that have to change are:
- We need to add the `__main__` part of the script to call the transform function.
- We need to parse the parameters expected by the code. That is done in the `parse_arguments` function.
- We need to pass inputs and outputs to the processing job. That is done in the `parse_processing_job_config` function:
    - Inputs:
        - `dataset`: the only input, points to the single dataset file on s3
    - Outputs:
        - `processed-dataset`: the result of all the processing steps.
        - `descriptions-dataset`: just the description field, in JSONLINES format, to be used later for batch transformation.
        - `rejected-dataset`: where all rejected records will be saved. There will be a file for each type of rejection. See `dump_rejected` for details.
        - `dataset-reports`: where reports on the job will be saved. Currently we're sending all reports to output (and SageMaker sends them from there to Cloudwatch logs).
          See `report_transformation` for details.
    - Inputs are automatically copied to the container before processing begins, and outputs are automatically copied to s3 after it ends.
    - The script just has to read from the local folder(s) defined in `Processing Input` and save to the local folders defined in `ProcessingOutput`.

In [111]:
%%writefile preprocessing.py
import argparse
import json
import logging
import os
import sys
import time
import csv
import pandas as pd
import dask.dataframe as dd
from dask.distributed import Client
from langdetect import detect as detect_lang, DetectorFactory
DetectorFactory.seed = 0


SUPPORTED_LANGUAGES = {'ar', 'nl', 'en', 'de', 'fr', 'it', 'pt', 'es', 'ja', 'ko', 'ru', 'pl', 'tr', 'zh', 'zh-tw', 'th'}

def detect_descr_lang(row: pd.Series) :
    if row.lang == 'en':  # Accept that english is correct
        return('en')
    else:
        try:
            detected_lang = detect_lang(row.description)  # If it can't detect the language, it returns 'unknown'
        except:
            detected_lang = 'unknown'
        if (row.lang == 'ru' and detected_lang != 'en'):   # If reported russion and detected not english, assume reported is correct
            detected_lang = 'ru'
        elif(detected_lang in {'zh-cn', 'ko', 'zh-tw'}):   # Consolidate all chinese variants and korean as general chinese.
            detected_lang = 'zh'
        return(detected_lang)
    
def detect_df_lang(df):
    return(df.apply(detect_descr_lang, axis=1))

def report_transformation(report_dest_dir, max_descr_length, langs_orig_dataset, truncated_descriptions):
    # TODO: Send reports to a file - currently sending to logs
    print("Generating reports...")
    print("---------------------------------------------")
    print(f"{langs_orig_dataset.num_books.sum()} records in total in the original dataset.")
    print(f"Number of books per language in the original dataset:\n{langs_orig_dataset.sort_values('num_books', ascending=False).to_string()}")
    print("---------------------------------------------")
    if max_descr_length > 0 and truncated_descriptions is not None:
        num_truncated = truncated_descriptions.shape[0].compute()
        print(f"{num_truncated} descriptions were truncated at {max_descr_length} characters")

def dump_rejected(rejected_dest_dir, dropped_na_description, dropped_non_supported_lang, dataset_langs_filtered_out, english_wrong, short_descriptions):
    def dump_df_one_file(df, dest):
        try:
            next(df.iterrows())  # Checking if there is anything on the dataframe
            df.to_csv(dest, compute=True, index=False, single_file=True, quoting=csv.QUOTE_NONNUMERIC)
            return(len(df))
        except StopIteration:
            return(0)
    print("---------------------------------------------")
    print("Saving rejected records...")
    cum_num_dropped = 0
    num_rows = dump_df_one_file(dropped_na_description, f'{rejected_dest_dir}/dropped_na.csv')
    cum_num_dropped += num_rows
    print(f"{num_rows} records rejected because description is empty.")
    num_rows = dump_df_one_file(dropped_non_supported_lang, f'{rejected_dest_dir}/dropped_non_supported_lang.csv')
    cum_num_dropped += num_rows
    print(f"{num_rows} records rejected because language is not supported.")
    num_rows = dump_df_one_file(dataset_langs_filtered_out, f'{rejected_dest_dir}/lang_filtered_out.csv')
    cum_num_dropped += num_rows
    print(f"{num_rows} records rejected because language was filtered out.")
    num_rows = dump_df_one_file(english_wrong, f'{rejected_dest_dir}/english_wrong.csv')
    cum_num_dropped += num_rows
    print(f"{num_rows} records rejected because english was wrongly reported as language.")
    num_rows = dump_df_one_file(short_descriptions, f'{rejected_dest_dir}/short_descriptions.csv')
    cum_num_dropped += num_rows
    print(f"{num_rows} records rejected because description was too short.")
    print(f"{cum_num_dropped} records rejected in total.")
    print('Rejected records saved...')
    print("---------------------------------------------")


def save_description(dest_file, df):
    print("---------------------------------------------")
    print(f"Saving descriptions to {dest_file}")
    with open(dest_file, 'w') as dest:
        for descr in df.iteritems():
            try:
                dest.write(f'{{"description": {json.dumps(descr[1])}}}\n')
            except e:
                print(f'Description rejected: {descr}')
    print("Descriptions saved.")
    print("---------------------------------------------")


def gen_cleaned_data(source_data_dir, dest_data_dir, descr_data_dir, rejected_data_dir, reports_dir, drop_languages, max_descr_length,
                     supported_languages=SUPPORTED_LANGUAGES, block_size='32MB', sample=1.0): 
    print("---------------------------------------------")
    print(f"Loading data from {source_data_dir}.")
    if sample < 1.0:
        print(f"Taking a fraction of {sample:0.2f} of the data")
    print(f"Rejected data will be sent to {rejected_data_dir}.")
    print(f"Reports (if any) will be sent to {reports_dir}.")
    print("---------------------------------------------")
    raw_df = dd.read_csv(
        f'{source_data_dir}/dataset.csv', header=0, 
        usecols=['description', 'authors', 'categories', 'lang', 'title'],
        blocksize=block_size,
    ).repartition(partition_size=block_size).sample(frac=sample)
    
    langs_orig_df = raw_df[['lang', 'title']].groupby('lang').count().compute().rename(columns={'title': 'num_books'})
    
    dropped_na_description_df = raw_df[raw_df.description.isna()]
    non_na_df = raw_df[~ raw_df.description.isna()]
    
    # Truncating descriptions if requested
    if max_descr_length > 0:
        truncated_descriptions_df = non_na_df[non_na_df.description.str.len() > max_descr_length]
        non_na_df.description = non_na_df.description.str.slice(stop=max_descr_length)
    else:
        truncated_descriptions_df = None
    non_na_df['descr_len_words'] = non_na_df.map_partitions(lambda df: df.description.apply(lambda t: len(t.split(' '))), meta=pd.Series(name='descr_len_words', dtype='i4'))
    non_na_df['detected_lang'] = non_na_df.map_partitions(detect_df_lang, meta=pd.Series(name='detected_lang', dtype='U'))
    
    dropped_non_supported_lang_df = non_na_df[~(non_na_df.lang.isin(supported_languages) | non_na_df.detected_lang.isin(supported_languages))]
    supported_lang_df = non_na_df[non_na_df.lang.isin(supported_languages) | non_na_df.detected_lang.isin(supported_languages)]
    
    langs_filtered_out_df = supported_lang_df[supported_lang_df.lang.isin(drop_languages)]
    filtered_df = supported_lang_df[~supported_lang_df.lang.isin(drop_languages)]  # Removing languages we were asked to filter out
    
    # Keep detected non-english (for language diversity) or detected and reported english (drop all reported english but detected something else)
    english_wrong_df = filtered_df[(filtered_df.detected_lang == 'en') & ~(filtered_df.detected_lang == filtered_df.lang)]
    non_english_or_lang_match_df = filtered_df[(filtered_df.detected_lang != 'en') | (filtered_df.detected_lang == filtered_df.lang)]

    # Removing very short descriptions from dataset. We keep all chinese because the language is more expressive.
    short_descriptions_df = non_english_or_lang_match_df[(non_english_or_lang_match_df.descr_len_words < 8) &
                                                (non_english_or_lang_match_df.detected_lang != 'zh')]
    processed_df = non_english_or_lang_match_df[(non_english_or_lang_match_df.descr_len_words >= 8) |
                                                (non_english_or_lang_match_df.detected_lang == 'zh')]  
    
    report_transformation(reports_dir, max_descr_length, langs_orig_df, truncated_descriptions_df)
    print(f"Saving transformed dataset to {dest_data_dir}")
    processed_df.to_csv(f'{dest_data_dir}/dataset-*.csv', compute=True, index=False, quoting=csv.QUOTE_NONNUMERIC)
    save_description(f'{descr_data_dir}/dataset.jsonl', processed_df.description)
    dump_rejected(rejected_data_dir, dropped_na_description_df, dropped_non_supported_lang_df, langs_filtered_out_df, english_wrong_df, short_descriptions_df)
    
    
def start_dask_cluster(scheduler_ip):
    # Start the Dask cluster client
    try:
        client = Client("tcp://{ip}:8786".format(ip=scheduler_ip))
        logging.info("Cluster information: {}".format(client))
    except Exception as err:
        logging.exception(err)


def parse_processing_job_config(config_file="/opt/ml/config/processingjobconfig.json"):
    with open(config_file, "r") as config_file:
        config = json.load(config_file)
    inputs = {in_path["InputName"]: in_path["S3Input"]["LocalPath"] for in_path in config["ProcessingInputs"]}
    outputs = {out_path["OutputName"]: out_path["S3Output"]["LocalPath"] for out_path in config["ProcessingOutputConfig"]["Outputs"]}
    return (inputs, outputs)
    
def parse_arguments():
    parser = argparse.ArgumentParser()
    parser.add_argument("--data-to-process", type=str, default="dataset")
    parser.add_argument("--data-to-generate", type=str, default="processed-dataset")
    parser.add_argument("--descriptions", type=str, default="descriptions-dataset")
    parser.add_argument("--rejected-data", type=str, default="rejected-dataset")
    parser.add_argument("--reports", type=str, default="dataset-reports")
    parser.add_argument("--supported-languages", nargs='+', default=SUPPORTED_LANGUAGES)
    parser.add_argument("--max-description-length", type=int, default=1024)
    parser.add_argument("--drop-languages", nargs="+", default=['ja', 'ar', 'ko', 'th'])
    parser.add_argument("--block-size", type=str, default="32MB")
    parser.add_argument("--scheduler-ip", type=str, default=sys.argv[-1])
    parser.add_argument("--sample", type=float, default=1.0)
    args, _ = parser.parse_known_args()
    
    print(f'Supported Languages: {args.supported_languages}')
    print(f'Languages {args.drop_languages} will be dropped from the dataset')
    # Get processor scrip arguments
    args_iter = iter(sys.argv[1:])
    script_args = dict(zip(args_iter, args_iter))
    return(args, script_args)

if __name__ == '__main__':
    inputs, outputs = parse_processing_job_config()
    args, script_args = parse_arguments()
    start_dask_cluster(args.scheduler_ip)
    
    print('----------------------------------------------------')
    print('Starting processing')
    print('----------------------------------------------------')
    gen_cleaned_data(
        source_data_dir=inputs[args.data_to_process], 
        dest_data_dir=outputs[args.data_to_generate],
        descr_data_dir=outputs[args.descriptions],
        rejected_data_dir=outputs[args.rejected_data],
        reports_dir=outputs[args.reports],
        drop_languages=set(args.drop_languages), 
        max_descr_length=args.max_description_length,
        supported_languages=args.supported_languages,
        block_size=args.block_size,
        sample=args.sample
    )
    print('----------------------------------------------------')
    print('Processing finished')
    print('----------------------------------------------------')


Overwriting preprocessing.py


The command below defines the processing job. It takes the the URI of the docker image we built and pushed before and parameters for number and type of instances. It also takes a timeout parameter, to avoid frozen processes running forever.

In [107]:
from sagemaker.processing import ProcessingInput, ProcessingOutput,  ScriptProcessor

dask_processor = ScriptProcessor(
    base_job_name="dask-preprocessor",
    image_uri=dask_repository_uri,
    command=["/opt/program/bootstrap.py"],
    role=role,
    instance_count=10,
    instance_type="ml.m5.large",
    max_runtime_in_seconds=1200,
)

Once we have defined our processing job, we define the inputs and outputs and execute it. Since the executor needs to access the processing script in S3, we upload it there.

We have one input, which is pointed to our fixed source dataset on s3.
We have output destinations for the processed full dataset, the descriptions, rejected data and reports.

When you execute the processing job, it will start instances for each worker, and then the outputs will be logged (each color represents one worker). This log in the notebook can be turned off, and it is always captured in CloudWatch logs, under the log group `/aws/sagemaker/ProcessingJobs`.

In [112]:
%%time
timestamp_prefix = strftime("%Y-%m-%d-%H-%M-%S", gmtime())

s3_code_location = f"s3://{bucket}/{code_prefix}/preprocessing.py"
s3client.upload_file("preprocessing.py", Bucket=bucket, Key=f"{code_prefix}/preprocessing.py")

input_data = ProcessingInput(source=f"s3://{bucket}/{input_prefix}", destination='/opt/ml/processing/input', input_name='dataset')
output_data = ProcessingOutput(source='/opt/ml/processing/processed/', destination=f"s3://{bucket}/{input_preprocessed_prefix}/{timestamp_prefix}", output_name='processed-dataset')
descriptions_data = ProcessingOutput(source='/opt/ml/processing/descriptions/', destination=f"s3://{bucket}/{input_descriptions_prefix}/{timestamp_prefix}", output_name='descriptions-dataset')
rejected_data = ProcessingOutput(source='/opt/ml/processing/rejected', destination=f"s3://{bucket}/{input_rejected_prefix}/{timestamp_prefix}", output_name='rejected-dataset')
reports_on_data = ProcessingOutput(source='/opt/ml/processing/reports', destination=f"s3://{bucket}/{input_reports_prefix}/{timestamp_prefix}", output_name='dataset-reports')

dask_processor.run(code=s3_code_location,
                   inputs=[input_data],
                   outputs=[output_data, descriptions_data, rejected_data, reports_on_data],
                   job_name=f'muse-dask-processing-{timestamp_prefix}',
                   arguments=['--sample', '0.5']
                  )


Job Name:  muse-dask-processing-2020-07-06-20-47-56
Inputs:  [{'InputName': 'dataset', 'S3Input': {'S3Uri': 's3://sagemaker-eu-west-1-113147044314/sagemaker/muse-dask-preprocess-demo/input/book-depository/raw', 'LocalPath': '/opt/ml/processing/input', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'code', 'S3Input': {'S3Uri': 's3://sagemaker-eu-west-1-113147044314/sagemaker/muse-dask-preprocess-demo/code/preprocessing.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]
Outputs:  [{'OutputName': 'processed-dataset', 'S3Output': {'S3Uri': 's3://sagemaker-eu-west-1-113147044314/sagemaker/muse-dask-preprocess-demo/input/book-depository/preprocessed/2020-07-06-20-47-56', 'LocalPath': '/opt/ml/processing/processed/', 'S3UploadMode': 'EndOfJob'}}, {'OutputName': 'descriptions-dat

Let's check the result of the processing job. In case of failure, the logs are available above, and also in [CloudWatch Console](console.aws.amazon.com/cloudwatch/) under *Log Groups > /aws/sagemaker/ProcessingJobs*.

In [113]:
preprocessing_job_description = dask_processor.jobs[-1].describe()

In [114]:
if preprocessing_job_description['ProcessingJobStatus'] != "Completed":
    raise RuntimeError(f"Processing job muse-dask-processing-{timestamp_prefix} failed. Please check the logs.")
else:
    print(f"Processing job {preprocessing_job_description['ProcessingJobName']} completed successfully.")

Processing job muse-dask-processing-2020-07-06-20-47-56 completed successfully.


## Verifying our Processing Job Results

Assuming everything executed correctly, let's download the results and check them.

In [115]:
s3_processed_data_path = next(output for output in preprocessing_job_description['ProcessingOutputConfig']['Outputs'] if output['OutputName']=='processed-dataset')['S3Output']['S3Uri']
s3_descriptions_data_path = next(output for output in preprocessing_job_description['ProcessingOutputConfig']['Outputs'] if output['OutputName']=='descriptions-dataset')['S3Output']['S3Uri']
print(f"The processed dataset is available at {s3_processed_data_path}")
print(f"Pure processed descriptions dataset is available at {s3_descriptions_data_path}")
local_processed_path = '/home/ec2-user/SageMaker/MUSE-sagemaker-development/data/book-depository/preprocessed'

The processed dataset is available at s3://sagemaker-eu-west-1-113147044314/sagemaker/muse-dask-preprocess-demo/input/book-depository/preprocessed/2020-07-06-20-47-56
Pure processed descriptions dataset is available at s3://sagemaker-eu-west-1-113147044314/sagemaker/muse-dask-preprocess-demo/input/book-depository/descriptions/2020-07-06-20-47-56


In [20]:
import sys
import shutil

print(f"Clearing {local_processed_path}...")
shutil.rmtree(local_processed_path, ignore_errors=True)
os.makedirs(local_processed_path)

for dataset_part in s3client.list_objects_v2(Bucket=bucket, Prefix='/'.join(s3_processed_data_path.split('//')[-1].split('/')[1:]))['Contents']:
    local_name = os.path.basename(dataset_part['Key'])
    sys.stdout.write(f' Downloading {local_name}...')
    s3client.download_file(Bucket=bucket, Key=dataset_part['Key'], Filename=f'{local_processed_path}/{local_name}')
    sys.stdout.write('downloaded.')

Clearing /home/ec2-user/SageMaker/MUSE-sagemaker-development/data/book-depository/preprocessed...
 Downloading dataset-00.csv...downloaded. Downloading dataset-01.csv...downloaded. Downloading dataset-02.csv...downloaded. Downloading dataset-03.csv...downloaded. Downloading dataset-04.csv...downloaded. Downloading dataset-05.csv...downloaded. Downloading dataset-06.csv...downloaded. Downloading dataset-07.csv...downloaded. Downloading dataset-08.csv...downloaded. Downloading dataset-09.csv...downloaded. Downloading dataset-10.csv...downloaded. Downloading dataset-11.csv...downloaded. Downloading dataset-12.csv...downloaded. Downloading dataset-13.csv...downloaded. Downloading dataset-14.csv...downloaded. Downloading dataset-15.csv...downloaded. Downloading dataset-16.csv...downloaded. Downloading dataset-17.csv...downloaded. Downloading dataset-18.csv...downloaded. Downloading dataset-19.csv...downloaded. Downloading dataset-20.csv...downloaded. Downloading dataset-21.csv...downloaded.

We'll need Dask and langdetect to double-check the processing results.

In [21]:
!pip install -U "dask[complete]>=2.17.2" cloudpickle>=1.3.0

[33mYou are using pip version 10.0.1, however version 20.2b1 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.[0m


In [22]:
!pip install langdetect

[33mYou are using pip version 10.0.1, however version 20.2b1 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.[0m


We then proceed to load the input and final output.

In [23]:
import dask.dataframe as dd
import pandas as pd

final_df = dd.read_csv(
   f'{local_processed_path}/dataset-*.csv', header=0,
    usecols=['authors', 'categories', 'description', 'lang', 'title', 'detected_lang']
)

raw_df = dd.read_csv(
   f'/home/ec2-user/SageMaker/MUSE-sagemaker-development/data/book-depository/raw/dataset.csv', header=0,
    usecols=['authors', 'categories', 'description', 'lang', 'title']
)

### Emulating the Processing Job Results

The variable below allows us to control if we want to emulate the results for comparison. Turn it to `True` if you want to check fully. Notice that the local dask processing will take about 25 minutes, though.

In [83]:
replicate_job = False

In [25]:
if replicate_job:
    from langdetect import detect as detect_lang, DetectorFactory
    DetectorFactory.seed = 0

    def detect_descr_lang(row: pd.Series) :
        if row.lang == 'en':  # Accept that english is correct
            return('en')
        else:
            try:
                detected_lang = detect_lang(row.description)  # If it can't detect the language, it returns 'unknown'
            except:
                detected_lang = 'unknown'
            if (row.lang == 'ru' and detected_lang != 'en'):   # If reported russion and detected not english, assume reported is correct
                detected_lang = 'ru'
            elif(detected_lang in {'zh-cn', 'ko', 'zh-tw'}):   # Consolidate all chinese variants and korean as general chinese.
                detected_lang = 'zh'
            return(detected_lang)

    def detect_df_lang(df):
        return(df.apply(detect_descr_lang, axis=1))

In [26]:
if replicate_job:
    non_na_df = raw_df[~ raw_df.description.isna()]
    truncated_descriptions_df = non_na_df[non_na_df.description.str.len() > 1024]
    non_na_df.description = non_na_df.description.str.slice(stop=1024).str.replace('"', ' ', regex=False)
    non_na_df['descr_len_words'] = non_na_df.map_partitions(lambda df: df.description.apply(lambda t: len(t.split(' '))), meta=pd.Series(name='descr_len_words', dtype='i4'))
    non_na_df['detected_lang'] = non_na_df.map_partitions(detect_df_lang, meta=pd.Series(name='detected_lang', dtype='U'))
    dropped_non_supported_lang_df = non_na_df[~(non_na_df.lang.isin({'ar', 'nl', 'en', 'de', 'fr', 'it', 'pt', 'es', 'ja', 'ko', 'ru', 'pl', 'tr', 'zh', 'zh-tw', 'th'}) |
                                                non_na_df.detected_lang.isin({'ar', 'nl', 'en', 'de', 'fr', 'it', 'pt', 'es', 'ja', 'ko', 'ru', 'pl', 'tr', 'zh', 'zh-tw', 'th'}))]
    supported_lang_df = non_na_df[non_na_df.lang.isin({'ar', 'nl', 'en', 'de', 'fr', 'it', 'pt', 'es', 'ja', 'ko', 'ru', 'pl', 'tr', 'zh', 'zh-tw', 'th'}) |
                                  non_na_df.detected_lang.isin({'ar', 'nl', 'en', 'de', 'fr', 'it', 'pt', 'es', 'ja', 'ko', 'ru', 'pl', 'tr', 'zh', 'zh-tw', 'th'})]
    langs_filtered_out_df = supported_lang_df[supported_lang_df.lang.isin(['ja', 'ar', 'ko', 'th'])]
    filtered_df = supported_lang_df[~supported_lang_df.lang.isin(['ja', 'ar', 'ko', 'th'])]
    english_wrong_df = filtered_df[(filtered_df.detected_lang == 'en') & ~(filtered_df.detected_lang == filtered_df.lang)]
    non_english_or_lang_match_df = filtered_df[(filtered_df.detected_lang != 'en') | (filtered_df.detected_lang == filtered_df.lang)]

    # Removing very short descriptions from dataset. We keep all chinese because the language is more expressive.
    short_descriptions_df = non_english_or_lang_match_df[(non_english_or_lang_match_df.descr_len_words < 8) &
                                                (non_english_or_lang_match_df.detected_lang != 'zh')]
    processed_df = non_english_or_lang_match_df[(non_english_or_lang_match_df.descr_len_words >= 8) |
                                                (non_english_or_lang_match_df.detected_lang == 'zh')]  

In [27]:
if replicate_job:
    non_na_df[['lang', 'title']].groupby('lang').count().compute().rename(columns={'title': 'num_books'}).sort_values('num_books', ascending=False)

### Reporting counts of Results

First, we report the comparison of books by reported language. We can see that some data was rejected by the process.

In [28]:
raw_df[['lang', 'title']].groupby('lang').count().compute().rename(columns={'title': 'num_books'}).join(
    final_df[['lang', 'title']].groupby('lang').count().compute().rename(columns={'title': 'num_books_new'})
).sort_values('num_books', ascending=False).head(15)

Unnamed: 0_level_0,num_books,num_books_new
lang,Unnamed: 1_level_1,Unnamed: 2_level_1
en,1080384,979722.0
de,40130,31425.0
es,21018,13473.0
fr,11604,6383.0
pl,8341,7305.0
it,3116,2013.0
ru,1508,1005.0
pt,1459,1098.0
nl,746,224.0
hi,478,


Now, let's compare the reported original language with the detected language.

In [30]:
raw_df[['lang', 'title']].groupby('lang').count().compute().rename(columns={'title': 'num_books'}).join(
    final_df[['detected_lang', 'title']].groupby('detected_lang').count().compute().rename(columns={'title': 'num_books_new'})
).sort_values('num_books', ascending=False).head(15)

Unnamed: 0_level_0,num_books,num_books_new
lang,Unnamed: 1_level_1,Unnamed: 2_level_1
en,1080384,979722.0
de,40130,31958.0
es,21018,13360.0
fr,11604,6286.0
pl,8341,7349.0
it,3116,2013.0
ru,1508,1005.0
pt,1459,1158.0
nl,746,233.0
hi,478,


In [31]:
rows, columns = final_df.shape
rows = rows.compute()
print(f'The dataset has {rows} rows and {columns} columns.')

The dataset has 1043465 rows and 6 columns.
