## SM Pipelines + Custom Processing Pyspark -- Delta

In [27]:
import boto3
import sagemaker

from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.processing import ScriptProcessor, ProcessingInput

pipeline_session = PipelineSession()
role = sagemaker.get_execution_role()

In [4]:
!mkdir docker

In [15]:
%%writefile docker/Dockerfile

FROM 173754725891.dkr.ecr.us-east-1.amazonaws.com/sagemaker-spark-processing:3.2-cpu-py39-v1.0

RUN pip3 install pandas pyspark==3.2.0 delta-spark tldextract spark-nlp==3.4.1 azure-cosmos==4.2.0 pycryptodome

ENV PYTHONUNBUFFERED=TRUE

ENTRYPOINT ["python3"]

Overwriting docker/Dockerfile


In [16]:
account_id = boto3.client('sts').get_caller_identity().get('Account')
region = boto3.Session().region_name
tag = ':latest'
ecr_repository = 'sm-container'
processing_repository_uri = '{}.dkr.ecr.{}.amazonaws.com/{}'.format(account_id, region, ecr_repository + tag)

In [17]:
!aws ecr get-login-password --region {region} | docker login --username AWS --password-stdin {account_id}.dkr.ecr.{region}.amazonaws.com 
!aws ecr get-login-password --region {region} | docker login --username AWS --password-stdin 173754725891.dkr.ecr.{region}.amazonaws.com 
!docker build -t $ecr_repository docker
!aws ecr create-repository --repository-name $ecr_repository
!docker tag {ecr_repository + tag} $processing_repository_uri
!docker push $processing_repository_uri

https://docs.docker.com/engine/reference/commandline/login/#credentials-store

Login Succeeded
https://docs.docker.com/engine/reference/commandline/login/#credentials-store

Login Succeeded
Sending build context to Docker daemon  2.048kB
Step 1/4 : FROM 173754725891.dkr.ecr.us-east-1.amazonaws.com/sagemaker-spark-processing:3.2-cpu-py39-v1.0
 ---> a0c2fe62b00d
Step 2/4 : RUN pip3 install pandas pyspark==3.2.0 delta-spark tldextract spark-nlp==3.4.1 azure-cosmos==4.2.0 pycryptodome
 ---> Running in 8f6ad062e086
Collecting pyspark==3.2.0
  Downloading pyspark-3.2.0.tar.gz (281.3 MB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 281.3/281.3 MB 5.9 MB/s eta 0:00:00
  Preparing metadata (setup.py): started
  Preparing metadata (setup.py): finished with status 'done'
Collecting delta-spark
  Downloading delta_spark-2.1.1-py3-none-any.whl (20 kB)
Collecting tldextract
  Downloading tldextract-3.4.0-py3-none-any.whl (93 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 93.9/93.9 KB 966.3 kB/s et

In [21]:
input_location = "s3://aws-ml-blog/artifacts/delta-lake-bring-your-own-container/delta-table/california-housing/"

In [22]:
script_processor = ScriptProcessor(command=['python3'],
                image_uri=processing_repository_uri,
                role=role,
                instance_count=1,
                instance_type='ml.m5.xlarge',
                sagemaker_session=pipeline_session,
                )

In [26]:
processor_args = script_processor.run(
    inputs=[
        ProcessingInput(source=input_location, destination="/opt/ml/processing/input/")
    ],
    code="deltaprocess.py",
)

step_process = ProcessingStep(name="Delta-Process", step_args=processor_args)


Job Name:  sm-container-2022-11-02-22-53-38-392
Inputs:  [{'InputName': 'input-1', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://aws-ml-blog/artifacts/delta-lake-bring-your-own-container/delta-table/california-housing/', 'LocalPath': '/opt/ml/processing/input/', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'code', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-238023316787/sm-container-2022-11-02-22-53-38-392/input/code/deltaprocess.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]
Outputs:  []


In [28]:
pipeline_name = f"SM-Process-Pipeline"
pipeline = Pipeline(
    name=pipeline_name,
    steps=[step_process]
)

In [29]:
pipeline.upsert(role_arn=role)

{'PipelineArn': 'arn:aws:sagemaker:us-east-1:238023316787:pipeline/sm-process-pipeline',
 'ResponseMetadata': {'RequestId': '83295a4a-405b-4ba4-97b1-b2ee91e0bf12',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '83295a4a-405b-4ba4-97b1-b2ee91e0bf12',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '87',
   'date': 'Wed, 02 Nov 2022 22:54:53 GMT'},
  'RetryAttempts': 0}}

In [30]:
execution = pipeline.start()