
# AWS Pipeline

In [1]:
#Importing all required libraries:
#pip install boto3
#pip install psycopg2

import boto3
import json
import psycopg2
import datetime

In [2]:
#Setting up the AWS Access keys:
#NOTE: DO NOT EXPOSE THE ACCESS & SECRET KEYS

config_file = open(r'Config.json')
config_file = json.loads(config_file.read())

access_key = config_file['access_key']
secret_access_key = config_file['secret_access_key']

In [3]:
#Configuration Parameters:

bucket_name = "info7374s3fnalprojectteam8"#------------------------------------S3: Name of the creating AWS S3 Bucket

policy_name = "info7374policyfnalprojectteam8"#----------------------------------IAM: Access policy name
iam_role_name = "info7374rolefnalprojectteam8"#----------------------------------IAM: IAM Role for the architecural access

db_name = "info7374dbfnalprojectteam8"#-------------------------------------Redshift: Database Name for gaming data
cluster_identifier = "info7374clusterfnalprojectteam8"#---------------------Redshift: Redshift Cluster Name
master_username = "root"#----------------------------------------Redshift: Admin Username
master_password = "Info7374gap"#---------------------------------Redshift: Admin Password
node_type = "dc2.large"#-----------------------------------------Redshift: Cluster configuration
cluster_type = "single-node"#------------------------------------Redshift: Cluster Type
availability_zone = "us-east-1a"#--------------------------------Redshift: Cluster Availability Zone
table_name = "OlistCustomers"#------------------------------------Redshift: Database table name
kinesis_data_stream_name = "info7374datastreamfnalprojectteam8"#-------------Kinesis: Data Stream Name
shard_count = 100#------------------------------------------------Kinesis: Data Stream Shard Count

log_group_name = "info7374loggroupfnalprojectteam8"#------------------------CloudWatch: Log Group Name
log_stream_name = "info7374logstreamfnalprojectteam8"#------------------------CloudWatch: Log Group Name
delivery_stream_name = "info7374deliverystreamfnalprojectteam8"#--------------Kinesis: Delivery Stream Name
stream_type = "KinesisStreamAsSource"#-----------------------------Kinesis: Delivery Stream Type

In [4]:
#Creating AWS S3 Bucket:

s3_client = boto3.client('s3', aws_access_key_id = access_key, aws_secret_access_key = secret_access_key)

response_s3 = s3_client.create_bucket(ACL="private", Bucket=bucket_name)
print(response_s3)

{'ResponseMetadata': {'RequestId': '1E587CBE07B973EE', 'HostId': 'nvhni2oqag2vuaJ2tEvNEGyAZzZDuli9EpRXUmeXJKUqEpt1Hm7GkymU2I1x0iNBbUGYZK2sRoc=', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amz-id-2': 'nvhni2oqag2vuaJ2tEvNEGyAZzZDuli9EpRXUmeXJKUqEpt1Hm7GkymU2I1x0iNBbUGYZK2sRoc=', 'x-amz-request-id': '1E587CBE07B973EE', 'date': 'Sun, 11 Aug 2019 23:00:01 GMT', 'location': '/info7374s3fnalprojectteam8', 'content-length': '0', 'server': 'AmazonS3'}, 'RetryAttempts': 0}, 'Location': '/info7374s3fnalprojectteam8'}


In [5]:
bucket_arn = "arn:aws:s3:::"+bucket_name#---------------------------S3: Bucker ARN
print(bucket_arn)

arn:aws:s3:::info7374s3fnalprojectteam8


In [6]:
#Creating IAM Policy for Architecural access:

iam_client = boto3.client('iam', aws_access_key_id = access_key, aws_secret_access_key = secret_access_key)

In [7]:

policy_details = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "",
            "Effect": "Allow",
            "Action": [
                "glue:GetTableVersions"
            ],
            "Resource": "*"
        },
        {
            "Sid": "",
            "Effect": "Allow",
            "Action": [
                "s3:AbortMultipartUpload",
                "s3:GetBucketLocation",
                "s3:GetObject",
                "s3:ListBucket",
                "s3:ListBucketMultipartUploads",
                "s3:PutObject"
            ],
            "Resource": "*"
        },
        {
            "Sid": "",
            "Effect": "Allow",
            "Action": [
                "lambda:InvokeFunction",
                "lambda:GetFunctionConfiguration"
            ],
            "Resource": "*"
        },
        {
            "Sid": "",
            "Effect": "Allow",
            "Action": [
                "logs:PutLogEvents"
            ],
            "Resource": "*"
        },
        {
            "Sid": "",
            "Effect": "Allow",
            "Action": [
                "kinesis:DescribeStream",
                "kinesis:GetShardIterator",
                "kinesis:GetRecords"
            ],
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "kms:Decrypt"
            ],
            "Resource": "*"
        }
    ]
}

In [8]:
response_iam_policy = iam_client.create_policy(PolicyName=policy_name, PolicyDocument=json.dumps(policy_details))
print(response_iam_policy)

{'Policy': {'PolicyName': 'info7374policyfnalprojectteam8', 'PolicyId': 'ANPA2LJT624XOCNLPCL5K', 'Arn': 'arn:aws:iam::711462934318:policy/info7374policyfnalprojectteam8', 'Path': '/', 'DefaultVersionId': 'v1', 'AttachmentCount': 0, 'PermissionsBoundaryUsageCount': 0, 'IsAttachable': True, 'CreateDate': datetime.datetime(2019, 8, 11, 23, 1, 3, tzinfo=tzutc()), 'UpdateDate': datetime.datetime(2019, 8, 11, 23, 1, 3, tzinfo=tzutc())}, 'ResponseMetadata': {'RequestId': 'e472b710-bc8b-11e9-b732-47dad839ecd3', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'e472b710-bc8b-11e9-b732-47dad839ecd3', 'content-type': 'text/xml', 'content-length': '795', 'date': 'Sun, 11 Aug 2019 23:01:02 GMT'}, 'RetryAttempts': 0}}


In [9]:
policy_arn = response_iam_policy['Policy']['Arn']#------------------------IAM: Policy ID for assignment
print(policy_arn)

arn:aws:iam::711462934318:policy/info7374policyfnalprojectteam8


In [10]:
#Creating IAM Role for Architectural access:

assume_role_policy_doc = {
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "",
      "Effect": "Allow",
      "Principal": {
        "Service": "firehose.amazonaws.com"
      },
      "Action": "sts:AssumeRole",
      "Condition": {
        "StringEquals": {
          "sts:ExternalId": "711462934318"
        }
      }
    }
  ]
}

response_iam_role = iam_client.create_role(RoleName=iam_role_name, AssumeRolePolicyDocument=json.dumps(assume_role_policy_doc))

print(response_iam_role)

{'Role': {'Path': '/', 'RoleName': 'info7374rolefnalprojectteam8', 'RoleId': 'AROA2LJT624XJ54EQIOTK', 'Arn': 'arn:aws:iam::711462934318:role/info7374rolefnalprojectteam8', 'CreateDate': datetime.datetime(2019, 8, 11, 23, 6, 59, tzinfo=tzutc()), 'AssumeRolePolicyDocument': {'Version': '2012-10-17', 'Statement': [{'Sid': '', 'Effect': 'Allow', 'Principal': {'Service': 'firehose.amazonaws.com'}, 'Action': 'sts:AssumeRole', 'Condition': {'StringEquals': {'sts:ExternalId': '711462934318'}}}]}}, 'ResponseMetadata': {'RequestId': 'b8fef24c-bc8c-11e9-8330-9ba199808f57', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'b8fef24c-bc8c-11e9-8330-9ba199808f57', 'content-type': 'text/xml', 'content-length': '948', 'date': 'Sun, 11 Aug 2019 23:06:58 GMT'}, 'RetryAttempts': 0}}


In [11]:
role_arn = response_iam_role['Role']['Arn']#----------------------------IAM: Role ARN

In [12]:
#Attaching a Policy to a Role:

response_iam_role_policy_attach = iam_client.attach_role_policy(PolicyArn=policy_arn,RoleName=iam_role_name)
print(response_iam_role_policy_attach)

{'ResponseMetadata': {'RequestId': 'c8bf8a0e-bc8c-11e9-8330-9ba199808f57', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'c8bf8a0e-bc8c-11e9-8330-9ba199808f57', 'content-type': 'text/xml', 'content-length': '212', 'date': 'Sun, 11 Aug 2019 23:07:25 GMT'}, 'RetryAttempts': 0}}


In [13]:
#Creating AWS Redshift Cluster:

redshift_client = boto3.client('redshift', region_name='us-east-1', aws_access_key_id = access_key, aws_secret_access_key = secret_access_key)

response_redshift = redshift_client.create_cluster(
    DBName=db_name,
    ClusterIdentifier=cluster_identifier,
    ClusterType=cluster_type,
    NodeType=node_type,
    MasterUsername=master_username,
    MasterUserPassword=master_password,
    AvailabilityZone=availability_zone,
    IamRoles=[role_arn],
    PubliclyAccessible=True)

print(response_redshift)

{'Cluster': {'ClusterIdentifier': 'info7374clusterfnalprojectteam8', 'NodeType': 'dc2.large', 'ClusterStatus': 'creating', 'ClusterAvailabilityStatus': 'Modifying', 'MasterUsername': 'root', 'DBName': 'info7374dbfnalprojectteam8', 'AutomatedSnapshotRetentionPeriod': 1, 'ManualSnapshotRetentionPeriod': -1, 'ClusterSecurityGroups': [], 'VpcSecurityGroups': [{'VpcSecurityGroupId': 'sg-5147e80b', 'Status': 'active'}], 'ClusterParameterGroups': [{'ParameterGroupName': 'default.redshift-1.0', 'ParameterApplyStatus': 'in-sync'}], 'ClusterSubnetGroupName': 'default', 'VpcId': 'vpc-99cba6e3', 'AvailabilityZone': 'us-east-1a', 'PreferredMaintenanceWindow': 'thu:10:00-thu:10:30', 'PendingModifiedValues': {'MasterUserPassword': '****'}, 'ClusterVersion': '1.0', 'AllowVersionUpgrade': True, 'NumberOfNodes': 1, 'PubliclyAccessible': True, 'Encrypted': False, 'Tags': [], 'EnhancedVpcRouting': False, 'IamRoles': [{'IamRoleArn': 'arn:aws:iam::711462934318:role/info7374rolefnalprojectteam8', 'ApplyStatu

In [14]:
#Describe AWS Redshift Cluster to get an endpoint:

response_redshift_desc = redshift_client.describe_clusters(ClusterIdentifier=cluster_identifier)
print(response_redshift_desc)

{'Clusters': [{'ClusterIdentifier': 'info7374clusterfnalprojectteam8', 'NodeType': 'dc2.large', 'ClusterStatus': 'available', 'ClusterAvailabilityStatus': 'Unavailable', 'MasterUsername': 'root', 'DBName': 'info7374dbfnalprojectteam8', 'Endpoint': {'Address': 'info7374clusterfnalprojectteam8.cf3cwiqaf3ud.us-east-1.redshift.amazonaws.com', 'Port': 5439}, 'ClusterCreateTime': datetime.datetime(2019, 8, 11, 23, 14, 48, 141000, tzinfo=tzutc()), 'AutomatedSnapshotRetentionPeriod': 1, 'ManualSnapshotRetentionPeriod': -1, 'ClusterSecurityGroups': [], 'VpcSecurityGroups': [{'VpcSecurityGroupId': 'sg-5147e80b', 'Status': 'active'}], 'ClusterParameterGroups': [{'ParameterGroupName': 'default.redshift-1.0', 'ParameterApplyStatus': 'in-sync'}], 'ClusterSubnetGroupName': 'default', 'VpcId': 'vpc-99cba6e3', 'AvailabilityZone': 'us-east-1a', 'PreferredMaintenanceWindow': 'thu:10:00-thu:10:30', 'PendingModifiedValues': {}, 'ClusterVersion': '1.0', 'AllowVersionUpgrade': True, 'NumberOfNodes': 1, 'Publ

In [15]:
hostname = response_redshift_desc['Clusters'][0]['Endpoint']['Address']#----------------Redshift: Hostname for database
port_number = response_redshift_desc['Clusters'][0]['Endpoint']['Port']#----------------Redshift: Port Number for databse

In [16]:
print(hostname)

info7374clusterfnalprojectteam8.cf3cwiqaf3ud.us-east-1.redshift.amazonaws.com


In [17]:
#Creating Database table on Redshift:

con = psycopg2.connect(dbname= db_name, host= hostname, port= port_number, user= master_username, password= master_password)

con.set_session(autocommit=True)
cur = con.cursor()

In [None]:
query = "create table OlistCustomers (\n"
query+= "customer_id varchar(200),\n"
query+= "customer_unique_id varchar(200),\n"
query+= "customer_zip_code_prefix Integer,\n"
query+= "customer_city varchar(200),\n"
query+= "customer_state varchar(200));"

print(query)

In [None]:
cur.execute(query)

In [None]:
#Creating Kinesis Stream:

kinesis_client = boto3.client('kinesis', region_name='us-east-1', aws_access_key_id = access_key, aws_secret_access_key = secret_access_key)

response_kinesis_data_stream = kinesis_client.create_stream(StreamName=kinesis_data_stream_name,ShardCount=shard_count)
print(response_kinesis_data_stream)

In [None]:
response_kinesis_data_stream_desc = kinesis_client.describe_stream(StreamName=kinesis_data_stream_name)
print(response_kinesis_data_stream_desc)

In [None]:
kinesis_stream_arn = response_kinesis_data_stream_desc['StreamDescription']['StreamARN']#-----------------------Kinesis: Datastream ARN
print(kinesis_stream_arn)

In [None]:
#Creating Kinesis Delivery Stream: Firehose

firehose_client = boto3.client('firehose', region_name='us-east-1', aws_access_key_id = access_key, aws_secret_access_key = secret_access_key)

cluster_jdbc_url = "jdbc:redshift://"+hostname+":"+str(port_number)+"/"+db_name
print(cluster_jdbc_url)

In [None]:
role_arn = "arn:aws:iam::711462934318:role/info7374rolefnalprojectteam8"

In [None]:
response_firehose = firehose_client.create_delivery_stream(
    DeliveryStreamName = delivery_stream_name,
    DeliveryStreamType = stream_type,
    KinesisStreamSourceConfiguration={
        'KinesisStreamARN': kinesis_stream_arn,
        'RoleARN': role_arn
    },
    RedshiftDestinationConfiguration={
        'RoleARN': role_arn,
        'ClusterJDBCURL': cluster_jdbc_url,
        'CopyCommand': {
            'DataTableName': 'OlistCustomers',
            'DataTableColumns': 'customer_id,customer_unique_id,customer_zip_code_prefix,customer_city,customer_state',
            'CopyOptions': "json 'auto'"
        },
        'Username': master_username,
        'Password': master_password,
        'S3Configuration': {
            'RoleARN': role_arn,
            'BucketARN': bucket_arn,
            'BufferingHints': {
                'IntervalInSeconds': 60
            }
        }
    })

print(response_firehose)

In [None]:
#Real-time data streaming:

kinesis_delivery_client = boto3.client('kinesis', region_name="us-east-1", aws_access_key_id = access_key, aws_secret_access_key = secret_access_key)

In [None]:
import pandas as pd
#Reading the csv file
data_file = r'olist_customers_dataset.csv'
data = pd.read_csv(data_file)
data.head()

In [None]:
data.columns

In [None]:
data.shape[0]

In [None]:
data.info()

In [None]:
for i in range(0,data.shape[0]):
    customer_id = data['customer_id'][i]
    customer_unique_id = data['customer_unique_id'][i]
    customer_zip_code_prefix = int(data['customer_zip_code_prefix'][i])
    customer_city = data['customer_city'][i]
    customer_state = data['customer_state'][i]


    data_row = {"customer_id":customer_id,"customer_unique_id":customer_unique_id,"customer_zip_code_prefix":customer_zip_code_prefix,
             "customer_city":customer_city, "customer_state":customer_state}
    data_row = json.dumps(data_row)
    data_row = bytes(data_row, 'utf-8')
    formatted_records = [{'PartitionKey': "pk",'Data': data_row}]
    print(formatted_records)
    delivery_stream_name = kinesis_data_stream_name
    response = kinesis_delivery_client.put_records(StreamName=delivery_stream_name, Records=formatted_records)

In [None]:
response

In [None]:
#Now we will demolish the complete architecture:

#Delete Redshift Cluster:
response_delete_redshift = redshift_client.delete_cluster(
    ClusterIdentifier=cluster_identifier,
    SkipFinalClusterSnapshot=True
)

print(response_delete_redshift)

In [None]:
#Delete Kinesis Delivery stream:
response_delete_firehose = firehose_client.delete_delivery_stream(DeliveryStreamName=delivery_stream_name)
print(response_delete_firehose)

In [None]:
#Delete Kinesis Data stream:
response_delete_data_stream = kinesis_client.delete_stream(StreamName=kinesis_data_stream_name,EnforceConsumerDeletion=True)
print(response_delete_data_stream)

In [None]:
#Delete S3 Bucket:

#All of the Objects in a bucket must be deleted before the bucket itself can be deleted:

s3 = boto3.resource('s3', aws_access_key_id = access_key, aws_secret_access_key = secret_access_key)
bucket = s3.Bucket(bucket_name)

for key in bucket.objects.all():
    key.delete()
bucket.delete()

print("Bucket deleted successfully!")

In [None]:
#Delete IAM Role:

response_detach_policy = iam_client.detach_role_policy(PolicyArn=policy_arn,RoleName=iam_role_name)
print(response_detach_policy)

response_delete_role = iam_client.delete_role(RoleName=iam_role_name)
print(response_delete_role)

In [None]:
#Delete IAM Policy:

response_delete_policy = iam_client.delete_policy(PolicyArn=policy_arn)
print(response_delete_policy)