## Environment setup

In [None]:
!pip install -U -q sagemaker

In [None]:
import warnings
warnings.filterwarnings(action="ignore")

In [None]:
import pandas as pd
import boto3
import sagemaker
from sagemaker.workflow.pipeline import Pipeline

from sagemaker import get_execution_role
region = sagemaker.Session().boto_region_name
role = get_execution_role()
sess = sagemaker.session.Session()

In [None]:
import boto3

# Replace 'your-bucket-name' with your desired bucket name
bucket_name = 'sagemaker-etl-prod'

def create_s3_bucket(bucket_name, region='ap-southeast-1'):
    try:
        s3_client = boto3.client('s3', region_name=region)
        s3_client.create_bucket(Bucket=bucket_name, CreateBucketConfiguration={'LocationConstraint': region})
        print("Bucket created successfully.")
    except s3_client.exceptions.ClientError as e:
        if e.response['Error']['Code'] == 'BucketAlreadyOwnedByYou':
            print("Bucket already exists and is owned by you.")
        else:
            print("An error occurred:", e)
            
# Check and create the bucket
create_s3_bucket(bucket_name)

## Processing folder & env scripts

In [None]:
!mkdir src

In [None]:
%%writefile src/__init__.py

# You can leave this file empty or write any desired content if needed.
# For example, you can define package-level attributes, import submodules, etc.
# If you don't need any specific content, you can leave the file empty.

# Content of __init__.py


In [None]:
%%writefile src/requirements.txt

psycopg2-binary  
pyathena
boto3
openpyxl

## Dependency scripts - SQL_dict.py

In [None]:
%%writefile src/SQL_dict.py

query_BH_OLTP = '''SELECT * FROM BH_OLTP'''
query_C_OLAP = '''SELECT * FROM C_OLAP'''
query_Bounty_OLAP = '''SELECT * FROM Bounty_OLAP'''
query_Bounty_OLTP = '''SELECT * FROM Bounty_OLTP'''


In [None]:
%cp src/SQL_dict.py SQL_dict.py

## Processing script - preprocessing.py

In [None]:
# !pip install -r src/requirements.txt

In [None]:
%%writefile src/preprocessing.py

#! /usr/bin/env python

import psycopg2  # pip install psycopg2-binary
from pyathena import connect
import os
import pandas as pd
import numpy as np
import pickle
from urllib.parse import urlparse
import re

# Ignore all warnings
import warnings
warnings.filterwarnings("ignore")

import boto3
s3 = boto3.client("s3")
session = boto3.session.Session()
region_name = session.region_name

from datetime import datetime, timedelta
now = datetime.now()
today_date = now.strftime("%Y-%m-%d")

# import custom repo python script
import SQL_dict
import importlib
importlib.reload(SQL_dict)


def RDS_extraction(query):
    ### prod
    # Set the connection parameters
    dbname = 'dbname'
    host = 'hostname-prod-psql.cluster-xxxxxxxxxxxx.ap-southeast-1.rds.amazonaws.com'
    port = '5432' # Default PostgreSQL port
    user = 'user'
    password = 'password'

    print('conn start')
    # Connect to the database
    conn = psycopg2.connect(dbname=dbname, host=host, port=port, user=user, password=password)

    # Open a cursor to perform database operations
    cur = conn.cursor() # to be tested (src/extraction.py:37:12: W0621: Redefining name 'df_endpoint' from outer scope (line 400)(redefined-outer-name))

    print('rollback')
    conn.rollback()

    print('read_sql_query')
    df_endpoint = pd.read_sql(query, conn)

    return df_endpoint



if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--input-path", type=str, default="/opt/ml/processing")
    args, _ = parser.parse_known_args()
    base_dir = args.input_path
    df = pd.read_csv(
        f"{base_dir}/input/bank-additional-full.csv",
        header=0
    )

    df_BH_OLAP = RDS_extraction(SQL_dict.query_BH_OLAP)
    df_C_OLAP = RDS_extraction(SQL_dict.query_C_OLAP)
    df_Bounty_OLAP = RDS_extraction(SQL_dict.query_Bounty_OLAP)
    df_Bounty_OLTP = RDS_extraction(SQL_dict.query_Bounty_OLTP)
    
    ############################################################################ 
    try:
        os.makedirs(f"/opt/ml/processing/output/")
        print("Successfully created directories")
    except Exception as e:
        print(e)
        print("Could not make directories")
        #pass 
    ############################################################################ 
    try:
        os.makedirs(f"/opt/ml/processing/output/logs/")
        print("Successfully created directories")
    except Exception as e:
        print(e)
        print("Could not make directories")
        #pass 
        
    ############################################################################    
    try:
        print('CSV exporting')  
        df_BH_OLAP.to_csv(f'/opt/ml/processing/output/df_BH_OLAP.csv', index=False)
        df_BH_OLAP.to_csv(f'/opt/ml/processing/output/logs/df_BH_OLAP_{today_date}.csv', index=False)

        df_C_OLAP.to_csv(f'/opt/ml/processing/output/df_C_OLAP.csv', index=False)
        df_C_OLAP.to_csv(f'/opt/ml/processing/output/logs/df_C_OLAP_{today_date}.csv', index=False)
        
        df_Bounty_OLAP.to_csv(f'/opt/ml/processing/output/df_Bounty_OLAP.csv', index=False)
        df_Bounty_OLAP.to_csv(f'/opt/ml/processing/output/logs/df_Bounty_OLAP_{today_date}.csv', index=False)     
        
        df_Bounty_OLTP.to_csv(f'/opt/ml/processing/output/df_Bounty_OLTP.csv', index=False)
        df_Bounty_OLTP.to_csv(f'/opt/ml/processing/output/logs/df_Bounty_OLTP_{today_date}.csv', index=False)      
        
        print("Wrote CSV files successfully")

        # DataFrames and corresponding sheet names
        data_frames = {
            "BH_OLAP": df_BH_OLAP,
            "B_OLAP": df_Bounty_OLAP,
            "B_OLTP": df_Bounty_OLTP,
            "C_OLAP": df_C_OLAP,
        }

        excel_file = f'/opt/ml/processing/output/summary_data.xlsx'
        
        # Write to Excel
        with pd.ExcelWriter(excel_file) as writer:
            for sheet_name, df in data_frames.items():
                df.to_excel(writer, sheet_name=sheet_name, index=False)
        print("Wrote XLSX files successfully") 
        
    except Exception as e:
        print("Failed to write the files")
        print(e)
        #pass
    ############################################################################  
    
    print("Completed running the processing job")

    ################################### SNS #########################################  
    # Define S3 file URL
    s3_file_url = 'https://s3.console.aws.amazon.com/s3/buckets/sagemaker-etl-prod?region=ap-southeast-1&bucketType=general&tab=objects'
    
    # Publish message to SNS topic
    sns_topic_arn = 'arn:aws:sns:ap-southeast-1:963727426434:milton'

    # Create SNS client
    sns_client = boto3.client('sns', region_name='ap-southeast-1')

    # Create the message containing the S3 file URL
    message = f"Daily ETL transformation done. Logs Date: {today_date}. Please find the file from S3 at the following URL: {s3_file_url}"

    # Publish message to SNS topic
    sns_client.publish(
        TopicArn=sns_topic_arn,
        Message=message,
        Subject='S3 File Notification'
    )

    print('Message published to SNS topic.')
    
    
    ################################### SES ######################################### 
    import base64
    
    S3_BUCKET='sagemaker-etl-prod'
    S3_FILE='summary_data.xlsx'
    S3_REGION='ap-southeast-1'
    SES_REGION='ap-southeast-1'
    SENDER_EMAIL='milton@domain.tech'
    DESTINATION_EMAILS='milton@domain.tech'
    
    # Create an S3 client
    s3_client = boto3.client('s3', region_name=S3_REGION)

    # Create an SES client
    ses_client = boto3.client('ses', region_name=SES_REGION)

    # Step 1: Retrieve the file from S3
    s3_object = s3_client.get_object(Bucket=S3_BUCKET, Key=S3_FILE)
    file_content = s3_object['Body'].read()
    
    # Step 2: Encode the file content
    file_content_encoded = base64.b64encode(file_content).decode('utf-8')

    # Step 3: Use the AWS SDK to send the email
    raw_email = (
        f"From: {SENDER_EMAIL}\n"
        f"To: {DESTINATION_EMAILS}\n"
        f"Subject: Daily Summary Data\n"
        f"MIME-Version: 1.0\n"
        f"Content-Type: multipart/mixed; boundary=\"NextPart\"\n\n"
        f"--NextPart\n"
        f"Content-Type: text/plain\n\n"
        f"Attachment files from Sagemaker ETL\n\n"
        f"--NextPart\n"
        f"Content-Type: application/vnd.openxmlformats-officedocument.spreadsheetml.sheet\n"
        f"Content-Disposition: attachment; filename=\"summary_data.xlsx\"\n"
        f"Content-Transfer-Encoding: base64\n\n"
        f"{file_content_encoded}\n\n"
        f"--NextPart--"
    )

    email_params = {
        'Source': SENDER_EMAIL,
        'Destinations': DESTINATION_EMAILS.split(','),
        'RawMessage': {'Data': raw_email}
    }

    response = ses_client.send_raw_email(**email_params)

    print('File sent!')


In [None]:
from sagemaker.network import NetworkConfig

# network_config object (please change the information to your own security groups and subnets)
security_group_ids = ["sg-xxxxxxxxxxxxxxxxx"]
subnets =  [ "subnet-xxxxxxxxxxxxxxxxx", "subnet-xxxxxxxxxxxxxxxxx", "subnet-xxxxxxxxxxxxxxxxx"]

network_config = NetworkConfig(enable_network_isolation=False, security_group_ids=security_group_ids, subnets=subnets)

from sagemaker import get_execution_role
from sagemaker.pytorch.processing import PyTorchProcessor

pytorch_processor = PyTorchProcessor(
    role=role,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    framework_version="1.13",
    py_version="py39",
    network_config = network_config 
)

from sagemaker.processing import ProcessingInput, ProcessingOutput

input_filepath_inject = 's3://sagemaker-lighthouse-log-prod/external/injecion_cheatsheet.csv'
output_path_ai_h1s1 = 's3://sagemaker-etl-prod'

classify_inputs = [
    ProcessingInput(
        source=input_filepath_inject,
        destination="/opt/ml/processing/inject"
    ),
]

classify_outputs= [
    ProcessingOutput(
        output_name=f"output_ETL",  # task name
        source="/opt/ml/processing/output",
        destination=output_path_ai_h1s1
    ), 
]

classify_arguments = [
    "--file-name", "reviews.tsv.gz",
    "--model-name", "distilbert-base-uncased",
    "--train-ratio", "0.7",
    "--val-ratio", "0.15",
    "--star-threshold", "4"
]

In [None]:
# fast running
pytorch_processor.run(code="preprocessing.py",
                      source_dir="scripts", # add processing.py and requirements.txt here
                      inputs=classify_inputs,
                      outputs=classify_outputs,
                      arguments=classify_arguments
                     )

In [None]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

###################################################################################################################
# frameworkprocessor github issue for 3rd party python packages
# https://github.com/aws/sagemaker-python-sdk/issues/2656

from sagemaker.processing import FrameworkProcessor  # or change with any other FrameworkProcessor like HuggingFaceProcessor
from sagemaker.pytorch.processing import PyTorch, PyTorchProcessor
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.network import NetworkConfig
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import ProcessingStep

session = PipelineSession()

processor = FrameworkProcessor(
    estimator_cls=PyTorch,
    framework_version='1.13',
    role = get_execution_role(),
    instance_count=1,
    # instance_type="ml.m5.4xlarge",
    instance_type="ml.m5.xlarge",
    sagemaker_session = session,
    py_version="py39",
    network_config = network_config 
)


###################################### - 2nd step - feature engineering

classify_args = processor.run(
    code='preprocessing.py',
    source_dir="src", # add processing.py and requirements.txt here
    inputs=classify_inputs,
    outputs=classify_outputs,
    arguments=classify_arguments
)

classify_process = ProcessingStep(
    name="ETL_daily",
    step_args=classify_args
)


In [None]:
from sagemaker.workflow.pipeline import Pipeline

pipeline_name = "ETL-daily"

pipeline = Pipeline(
    name=pipeline_name,
#     parameters=[
#         input_data,
#         processing_instance_type, 
#         processing_instance_count,
#         processed_data,
#     ],
    steps=[
        # extraction_process, 
        classify_process,
        # reporting_process, 
    ]
)

In [None]:
import json

definition = json.loads(pipeline.definition())
definition

In [None]:
pipeline.upsert(role_arn=role)

In [None]:
execution = pipeline.start()

In [None]:
from time import sleep

ex_desc = execution.describe()
status = ex_desc['PipelineExecutionStatus']
print(status+ " ", end="")

while status == 'Executing':
    status = execution.describe()['PipelineExecutionStatus']
    print(".", end="")
    sleep(10)
    
print(f"\n{status}")