In [None]:
!mkdir -p ~/.aws && cp /content/drive/MyDrive/AWS/d01_admin/* ~/.aws
!chmod 600 ~/.aws/credentials
!pip install awscli

In [None]:
%%capture
!pip install pymysql mariadb

In [None]:
%reload_ext sql

In [None]:
import pandas as pd
from sqlalchemy import create_engine
import boto3
import json

In [None]:
def get_secret(secret_name):
    region_name = "us-east-1"
    session = boto3.session.Session()
    client = session.client(
        service_name='secretsmanager',
        region_name=region_name)
    get_secret_value_response = client.get_secret_value(SecretId=secret_name)
    get_secret_value_response = json.loads(get_secret_value_response['SecretString'])
    return get_secret_value_response

db_credentials = get_secret(secret_name='dev/detraining/rdssql')

USERNAME = db_credentials["sparsh_rds_postgres_username"]
PASSWORD = db_credentials["sparsh_rds_postgres_password"]
HOST = db_credentials["sparsh_rds_postgres_host"]
PORT = db_credentials["sparsh_rds_postgres_port"]
DBNAME = "sakila"

CONN = f"mysql+pymysql://{USERNAME}:{PASSWORD}@{HOST}:{PORT}/{DBNAME}"

In [None]:
engine = create_engine(CONN)

In [None]:
pd.read_sql("SELECT * FROM actor LIMIT 10;", CONN)

Unnamed: 0,actor_id,first_name,last_name,last_update
0,1,PENELOPE,GUINESS,2006-02-15 04:34:33
1,2,NICK,WAHLBERG,2006-02-15 04:34:33
2,3,ED,CHASE,2006-02-15 04:34:33
3,4,JENNIFER,DAVIS,2006-02-15 04:34:33
4,5,JOHNNY,LOLLOBRIGIDA,2006-02-15 04:34:33
5,6,BETTE,NICHOLSON,2006-02-15 04:34:33
6,7,GRACE,MOSTEL,2006-02-15 04:34:33
7,8,MATTHEW,JOHANSSON,2006-02-15 04:34:33
8,9,JOE,SWANK,2006-02-15 04:34:33
9,10,CHRISTIAN,GABLE,2006-02-15 04:34:33


In [None]:
# !wget -q --show-progress https://downloads.mysql.com/docs/sakila-db.zip -O sakila.zip
# !unzip sakila.zip
# ref https://aws.amazon.com/premiumsupport/knowledge-center/rds-mysql-functions/

In [None]:
!aws s3api create-bucket --bucket sparsh-220712 --region us-east-1

{
    "Location": "/sparsh-220712"
}


In [None]:
%%writefile DataEngLambdaS3CWGluePolicy.json
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "logs:PutLogEvents",
                "logs:CreateLogGroup",
                "logs:CreateLogStream"
            ],
            "Resource": "arn:aws:logs:*:*:*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:*"
            ],
            "Resource": [
                "arn:aws:s3:::sparsh-220712/*",
                "arn:aws:s3:::sparsh-220712"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "glue:*"
            ],
            "Resource": "*"
        }
    ]
}

Writing DataEngLambdaS3CWGluePolicy.json


In [None]:
!aws iam create-policy --policy-name sparsh-policy-220712 --policy-document file://DataEngLambdaS3CWGluePolicy.json

{
    "Policy": {
        "PolicyName": "sparsh-policy-220712",
        "PolicyId": "ANPAVVYXO24ERWB5BKPVN",
        "Arn": "arn:aws:iam::390354360073:policy/sparsh-policy-220712",
        "Path": "/",
        "DefaultVersionId": "v1",
        "AttachmentCount": 0,
        "PermissionsBoundaryUsageCount": 0,
        "IsAttachable": true,
        "CreateDate": "2022-07-12T11:00:40Z",
        "UpdateDate": "2022-07-12T11:00:40Z"
    }
}


In [None]:
%%writefile role-trust.json
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Service": "lambda.amazonaws.com"
            },
            "Action": "sts:AssumeRole"
        }
    ]
}

Writing role-trust.json


In [None]:
!aws iam create-role --role-name sparsh-role-lambda-220712 --assume-role-policy-document file://role-trust.json

{
    "Role": {
        "Path": "/",
        "RoleName": "sparsh-role-lambda-220712",
        "RoleId": "AROAVVYXO24EUGAYQ4VNP",
        "Arn": "arn:aws:iam::390354360073:role/sparsh-role-lambda-220712",
        "CreateDate": "2022-07-12T11:01:09Z",
        "AssumeRolePolicyDocument": {
            "Version": "2012-10-17",
            "Statement": [
                {
                    "Effect": "Allow",
                    "Principal": {
                        "Service": "lambda.amazonaws.com"
                    },
                    "Action": "sts:AssumeRole"
                }
            ]
        }
    }
}


In [None]:
%%writefile role-trust.json
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Service": "dms.amazonaws.com"
            },
            "Action": "sts:AssumeRole"
        }
    ]
}

Overwriting role-trust.json


In [None]:
!aws iam create-role --role-name sparsh-role-dms-220712 --assume-role-policy-document file://role-trust.json

{
    "Role": {
        "Path": "/",
        "RoleName": "sparsh-role-dms-220712",
        "RoleId": "AROAVVYXO24EX7BECHBL7",
        "Arn": "arn:aws:iam::390354360073:role/sparsh-role-dms-220712",
        "CreateDate": "2022-07-12T11:02:32Z",
        "AssumeRolePolicyDocument": {
            "Version": "2012-10-17",
            "Statement": [
                {
                    "Effect": "Allow",
                    "Principal": {
                        "Service": "dms.amazonaws.com"
                    },
                    "Action": "sts:AssumeRole"
                }
            ]
        }
    }
}


In [None]:
!aws iam attach-role-policy --policy-arn arn:aws:iam::390354360073:policy/sparsh-policy-220712 --role-name sparsh-role-lambda-220712

In [None]:
!aws iam attach-role-policy --policy-arn arn:aws:iam::390354360073:policy/sparsh-policy-220712 --role-name sparsh-role-dms-220712

In [None]:
import boto3
import awswrangler as wr
from urllib.parse import unquote_plus

def lambda_handler(event, context):
    # Get the source bucket and object name as passed to the Lambda function
    for record in event['Records']:
        bucket = record['s3']['bucket']['name']
        key = unquote_plus(record['s3']['object']['key'])
    
    # We will set the DB and table name based on the last two elements of 
    # the path prior to the file name. If key = 'dms/sakila/film/LOAD01.csv',
    # then the following lines will set db to sakila and table_name to 'film'
    key_list = key.split("/")
    print(f'key_list: {key_list}')
    db_name = key_list[len(key_list)-3]
    table_name = key_list[len(key_list)-2]
    
    print(f'Bucket: {bucket}')
    print(f'Key: {key}')
    print(f'DB Name: {db_name}')
    print(f'Table Name: {table_name}')
    
    input_path = f"s3://{bucket}/{key}"
    print(f'Input_Path: {input_path}')
    output_path = f"s3://{bucket}/{db_name}/cleaned/{table_name}"
    print(f'Output_Path: {output_path}')
    
    input_df = wr.s3.read_csv([input_path])
    
    current_databases = wr.catalog.databases()
    wr.catalog.databases()
    if db_name not in current_databases.values:
        print(f'- Database {db_name} does not exist ... creating')
        wr.catalog.create_database(db_name)
    else:
        print(f'- Database {db_name} already exists')
    
    result = wr.s3.to_parquet(
        df=input_df, 
        path=output_path, 
        dataset=True,
        database=db_name,
        table=table_name,
        mode="append")
        
    print("RESULT: ")
    print(f'{result}')
    
    return result

We will create a DMS replication instance (a managed EC2 instance that connects to the source endpoint, retrieves data, and writes to the target endpoint), and also configure the source and target endpoints. We will then create a database migration task that provides the configuration settings for the migration.



- Creation replication instance
- Create source and target endpoint
- Create task

In [None]:
!aws s3 cp s3://sparsh-220712/sakila-db/sakila/actor/LOAD00000001.csv actor.csv

Completed 7.2 KiB/7.2 KiB (141.4 KiB/s) with 1 file(s) remainingdownload: s3://sparsh-220712/sakila-db/sakila/actor/LOAD00000001.csv to ./actor.csv


In [None]:
pd.read_csv("actor.csv")

Unnamed: 0,1,PENELOPE,GUINESS,2006-02-15 04:34:33
0,2,NICK,WAHLBERG,2006-02-15 04:34:33
1,3,ED,CHASE,2006-02-15 04:34:33
2,4,JENNIFER,DAVIS,2006-02-15 04:34:33
3,5,JOHNNY,LOLLOBRIGIDA,2006-02-15 04:34:33
4,6,BETTE,NICHOLSON,2006-02-15 04:34:33
...,...,...,...,...
194,196,BELA,WALKEN,2006-02-15 04:34:33
195,197,REESE,WEST,2006-02-15 04:34:33
196,198,MARY,KEITEL,2006-02-15 04:34:33
197,199,JULIA,FAWCETT,2006-02-15 04:34:33
