In [14]:
#import dask.dataframe as dd
#import pandas as pd
#import numpy as np
#import cv2
import os

from pathlib import Path

In [15]:
root_path = Path('/home/ec2-user/SageMaker/defect_detection/')#.resolve()

code_path = root_path / "notebooks/WM-811K/src/"
code_path.mkdir(exist_ok=True)
data_path = root_path / "data/MIR-WM811K/"

## Creating SageMaker Processing Job

### Build a Container for Dask Processing

Create a container for processing with Dask. The code below is based on [this example](https://github.com/aws/amazon-sagemaker-examples/blob/master/sagemaker_processing/feature_transformation_with_sagemaker_processing_dask/feature_transformation_with_sagemaker_processing_dask.ipynb).

In [7]:
%%sh 
pushd WM-811K/src/data_processing
docker build -t  wafer-data-processing .
popd

~/SageMaker/defect_detection/notebooks/WM-811K/src/container ~/SageMaker/defect_detection/notebooks
Sending build context to Docker daemon  16.38kB
Step 1/21 : FROM continuumio/miniconda3:4.7.12
 ---> 406f2b43ea59
Step 2/21 : RUN apt-get update
 ---> Using cache
 ---> bee7c1789cd8
Step 3/21 : RUN apt-get install -y curl unzip python3 python3-setuptools python3-pip python-dev python3-dev python-psutil ffmpeg libsm6 libxext6
 ---> Using cache
 ---> ca8821c9b19a
Step 4/21 : RUN pip3 install py4j psutil==5.6.5 numpy==1.17.4
 ---> Using cache
 ---> 44a6c1b8958e
Step 5/21 : RUN apt-get clean
 ---> Using cache
 ---> 0b6d3e8f0d43
Step 6/21 : RUN rm -rf /var/lib/apt/lists/*
 ---> Using cache
 ---> 1de28fac9d35
Step 7/21 : ENV PYTHONHASHSEED 0
 ---> Using cache
 ---> 9119ae001b0b
Step 8/21 : ENV PYTHONIOENCODING UTF-8
 ---> Using cache
 ---> d8659b8f85c9
Step 9/21 : ENV PIP_DISABLE_PIP_VERSION_CHECK 1
 ---> Using cache
 ---> b5ec0227bc57
Step 10/21 : RUN conda install --yes     -c conda-forge   

In [10]:
import boto3

account_id = boto3.client('sts').get_caller_identity().get('Account')
region = boto3.session.Session().region_name

ecr_repository = 'wafer-data-processing'
tag = ':latest'
uri_suffix = 'amazonaws.com'
dask_repository_uri = '{}.dkr.ecr.{}.{}/{}'.format(account_id, region, uri_suffix, ecr_repository + tag)

In [8]:
# Create ECR repository and push docker image
!$(aws ecr get-login --region $region --registry-ids $account_id --no-include-email)
!aws ecr create-repository --repository-name $ecr_repository
!docker tag {ecr_repository + tag} $dask_repository_uri
!docker push $dask_repository_uri

https://docs.docker.com/engine/reference/commandline/login/#credentials-store

Login Succeeded

An error occurred (RepositoryAlreadyExistsException) when calling the CreateRepository operation: The repository with name 'wafer-data-processing' already exists in the registry with id '160951647621'
The push refers to repository [160951647621.dkr.ecr.us-east-1.amazonaws.com/wafer-data-processing]

[1Be3d65572: Preparing 
[1B8882d34b: Preparing 
[1Be8fd58aa: Preparing 
[1B60aea3a7: Preparing 
[1B19ebf025: Preparing 
[1Bb5ea98e6: Preparing 
[1Bdf52a3f7: Preparing 
[1B1063c032: Preparing 
[1B95b5fb80: Preparing 
[1B25f60f51: Preparing 
[1Bb9b0dbf8: Preparing 
[1B533a7525: Preparing 
[1B6f4e0e2d: Preparing 
[1Bbea216ea: Preparing 
[1B4da79a36: Preparing 
[1Bebd8fc35: Preparing 
[1Bcb249b79: Preparing 
[2Bcb249b79: Waiting g 
[1B4bce66cd: Layer already exists [13A[2K[9A[2K[2A[2Klatest: digest: sha256:375870f763f68e8876ea6e61fc9873cd25cb755d174f977d99f41e2ac5b4a4f6 size:

In [17]:
# import sys
# import IPython
# dist_version = '2.9.2.dev0'
# !aws s3 cp s3://gianpo-public/sagemaker-{dist_version}.tar.gz .
# !{sys.executable} -m pip install -q -U pip
# !{sys.executable} -m pip install -q sagemaker-{dist_version}.tar.gz
# IPython.Application.instance().kernel.do_shutdown(True)

download: s3://gianpo-public/sagemaker-2.9.2.dev0.tar.gz to ./sagemaker-2.9.2.dev0.tar.gz


{'status': 'ok', 'restart': True}

In [17]:
with open(code_path / "data_processing.py", "w") as data_processing_script:
    data_processing_script.write(
        """
import sys
import logging
import dask.dataframe as dd
import pandas as pd
import numpy as np
import cv2
from pathlib import Path
from dask.distributed import Client


def hot_encode(img_arr):
    new_arr = np.zeros((676, 3))
    for x in range(676):
        new_arr[x, img_arr[x]] = 1
    return new_arr.reshape((26, 26, 3))


if __name__=='__main__':
    scheduler_ip = sys.argv[-1]
    root_path = Path('/opt/ml/processing')
    input_path = root_path / 'input'
    output_path = root_path / 'train'
    # Start the Dask cluster client
    try:
        client = Client(f"tcp://{scheduler_ip}:8786")
        logging.info(f"Printing cluster information: {client}")
    except Exception as err:
        logging.exception(err)
        
    logging.info("Loading data")
    input_df = dd.from_pandas(
        pd.read_pickle(
            input_path / "LSWMD.pkl"
        ).astype(
            {"waferIndex": "int32"}
        ),
        npartitions=100)

    logging.info("Cleaning data")
    clean_df = input_df.drop('waferIndex', axis=1)
    clean_df['waferMapDim'] = clean_df.waferMap.apply(lambda x: x.shape, meta=pd.Series({'waferMapDim': [(0, 0)]}))
    clean_df = clean_df[clean_df.waferMapDim.apply(lambda x: x[0] == x[1], meta=pd.Series({'x': True}))]
    clean_df['label'] = clean_df.failureType.apply(lambda x: x[0, 0] if (isinstance(x, np.ndarray) and x.shape[0] > 0) else 'unknown', meta=pd.Series({"x": "none"}))
    clean_df = clean_df[clean_df.label != "unknown"]
    
    label_dist = clean_df.groupby('label').size().compute()
    inv_prob_label = {k: v for (k, v) in ((1/(label_dist/label_dist.sum()) )/((1/(label_dist/label_dist.sum()) ).sum())).iteritems()}
    logging.info(f"Labels: {inv_prob_label}")
    
    x = np.stack(
        clean_df.apply(
            lambda x: x.waferMap.reshape(
                (26, 26, 1)
            ) if x.waferMapDim[0] == 26 else cv2.resize(
                x.waferMap.reshape(x.waferMapDim[0], x.waferMapDim[1]), (26, 26)
            ).reshape(26, 26, 1), axis=1, meta=pd.Series({'x': [np.zeros((26, 26, 1))]})).compute().values
    )
    x = np.apply_along_axis(hot_encode, axis=1, arr=x.reshape(-1, 26 * 26))
    y = clean_df.label.compute().values
    label_dist = clean_df.groupby('label').size().compute()
    label_classes = numpy.array({l: i for (i, l) in enumerate(label_dist.index.values)})

    logging.info(f"x: {x.shape}")
    logging.info(f"y: {y.shape}")
    np.savez_compressed(output_path / "data.npz", x=x, y=y, label_classes=label_classes)
    """
    )

In [18]:
import boto3
import sagemaker
from sagemaker import get_execution_role
from sagemaker.processing import ScriptProcessor

region = boto3.session.Session().region_name

role = get_execution_role()

dask_processor = ScriptProcessor(
    base_job_name="wafer-data-processing",
    image_uri=dask_repository_uri,
    command=["/opt/program/bootstrap.py"],
    role=role,
    instance_count=4,
    instance_type="local",
    max_runtime_in_seconds=1200,
)

In [19]:
from sagemaker.processing import ProcessingInput, ProcessingOutput

In [20]:
dask_processor.run(
    code=str(code_path / 'data_processing.py'),
    inputs=[ProcessingInput(
        source=str(data_path / "LSWMD.pkl"),
        destination='/opt/ml/processing/input'
    )],
    outputs=[ProcessingOutput(output_name='autoencoder/train', source='/opt/ml/processing/train')]
)


Job Name:  wafer-data-processing-2020-10-01-23-28-07-823
Inputs:  [{'InputName': 'input-1', 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-160951647621/wafer-data-processing-2020-10-01-23-28-07-823/input/input-1/LSWMD.pkl', 'LocalPath': '/opt/ml/processing/input', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'code', 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-160951647621/wafer-data-processing-2020-10-01-23-28-07-823/input/code/data_processing.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]
Outputs:  [{'OutputName': 'autoencoder/train', 'S3Output': {'S3Uri': 's3://sagemaker-us-east-1-160951647621/wafer-data-processing-2020-10-01-23-28-07-823/output/autoencoder/train', 'LocalPath': '/opt/ml/processing/train', 'S3UploadMode': 'EndOfJob'}}]
Creating ug67ffm7bs-algo-3

In [27]:
processed_data = dask_processor.latest_job.describe()['ProcessingOutputConfig']['Outputs'][0]['S3Output']['S3Uri']

In [38]:
bucket, *path = processed_data.split("/")[2:]
path = "/".join(path)
print(bucket, path)

In [39]:
sagemaker.utils.download_file(bucket, path + "/data.npz", "/tmp/data.npz", sagemaker.session.Session())

In [44]:
import numpy as np

with np.load("/tmp/data.npz", allow_pickle=True) as data:
    x = data['arr_0']
    y = data['arr_1']