In [0]:
###INFO7374: Digital Marketing Analytics:
###LAB: AWS Mobile Gaming Analytics Pipeline
###Date: 06/28/2019
###Instructor: Prof. Srikanth Krishnamurthy
###TA: Mr. Pramod Nagare and Mr. Syed Haroon

In [0]:
# Architectural Components:
# S3 Bucket: https://aws.amazon.com/s3/
# Kinesis Data Stream: https://aws.amazon.com/kinesis/data-streams/
# Kinesis Delivery Stream: https://aws.amazon.com/kinesis/data-firehose/
# Redshift Cluster: https://aws.amazon.com/redshift/
# IAM Policy: https://aws.amazon.com/iam/
# IAM Role: https://aws.amazon.com/iam/

<img src="AWS_GAP_Architecture_New.jpg">

In [1]:
#Importing all required libraries:
#pip install boto3
#pip install psycopg2

import boto3
import json
import psycopg2
import datetime

In [2]:
#Setting up the AWS Access keys:
#NOTE: DO NOT EXPOSE THE ACCESS & SECRET KEYS

config_file = open(r'Config.json')
config_file = json.loads(config_file.read())

access_key = config_file['access_key']
secret_access_key = config_file['secret_access_key']


In [87]:
#Configuration Parameters:

bucket_name = "info7374s3alyceteam4"#------------------------------------S3: Name of the creating AWS S3 Bucket

policy_name = "info7374policyalyceteam4"#----------------------------------IAM: Access policy name
iam_role_name = "info7374rolealyceteam4"#----------------------------------IAM: IAM Role for the architecural access

db_name = "info7374dbalyceteam4"#-------------------------------------Redshift: Database Name for gaming data
cluster_identifier = "info7374clusteralyceteam4"#---------------------Redshift: Redshift Cluster Name
master_username = "root"#----------------------------------------Redshift: Admin Username
master_password = "Info7374gap"#---------------------------------Redshift: Admin Password
node_type = "dc2.large"#-----------------------------------------Redshift: Cluster configuration
cluster_type = "single-node"#------------------------------------Redshift: Cluster Type
availability_zone = "us-east-1a"#--------------------------------Redshift: Cluster Availability Zone
#------------------------------------Redshift: Database table name
kinesis_data_stream_name = "info7374datastreamalyceteam4"#-------------Kinesis: Data Stream Name
shard_count = 100#------------------------------------------------Kinesis: Data Stream Shard Count

log_group_name = "info7374loggroupalyceteam4"#------------------------CloudWatch: Log Group Name
log_stream_name = "info7374logstreamalyceteam4"#------------------------CloudWatch: Log Group Name
delivery_stream_name = "info7374deliverystreamalyceteam4"#--------------Kinesis: Delivery Stream Name
stream_type = "KinesisStreamAsSource"#-----------------------------Kinesis: Delivery Stream Type


In [88]:
#Creating AWS S3 Bucket:

s3_client = boto3.client('s3', aws_access_key_id = access_key, aws_secret_access_key = secret_access_key)

response_s3 = s3_client.create_bucket(ACL="private", Bucket=bucket_name)
print(response_s3)

{'ResponseMetadata': {'RequestId': 'AD76F0F08BBE823A', 'HostId': 'WLaH59AgmlejFhAITpJ/SqDQIRKstKubuAxbBTzoNceku994JJRdbhPzQDY/zc2q9+5dzqQTMEM=', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amz-id-2': 'WLaH59AgmlejFhAITpJ/SqDQIRKstKubuAxbBTzoNceku994JJRdbhPzQDY/zc2q9+5dzqQTMEM=', 'x-amz-request-id': 'AD76F0F08BBE823A', 'date': 'Fri, 12 Jul 2019 03:59:01 GMT', 'location': '/info7374s3alyceteam4', 'content-length': '0', 'server': 'AmazonS3'}, 'RetryAttempts': 0}, 'Location': '/info7374s3alyceteam4'}


In [89]:
bucket_arn = "arn:aws:s3:::"+bucket_name#---------------------------S3: Bucker ARN
print(bucket_arn)

arn:aws:s3:::info7374s3alyceteam4


In [90]:
#Creating IAM Policy for Architecural access:

iam_client = boto3.client('iam', aws_access_key_id = access_key, aws_secret_access_key = secret_access_key)


In [91]:

policy_details = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "",
            "Effect": "Allow",
            "Action": [
                "glue:GetTableVersions"
            ],
            "Resource": "*"
        },
        {
            "Sid": "",
            "Effect": "Allow",
            "Action": [
                "s3:AbortMultipartUpload",
                "s3:GetBucketLocation",
                "s3:GetObject",
                "s3:ListBucket",
                "s3:ListBucketMultipartUploads",
                "s3:PutObject"
            ],
            "Resource": "*"
        },
        {
            "Sid": "",
            "Effect": "Allow",
            "Action": [
                "lambda:InvokeFunction",
                "lambda:GetFunctionConfiguration"
            ],
            "Resource": "*"
        },
        {
            "Sid": "",
            "Effect": "Allow",
            "Action": [
                "logs:PutLogEvents"
            ],
            "Resource": "*"
        },
        {
            "Sid": "",
            "Effect": "Allow",
            "Action": [
                "kinesis:DescribeStream",
                "kinesis:GetShardIterator",
                "kinesis:GetRecords"
            ],
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "kms:Decrypt"
            ],
            "Resource": "*"
        }
    ]
}

In [92]:
response_iam_policy = iam_client.create_policy(PolicyName=policy_name, PolicyDocument=json.dumps(policy_details))
print(response_iam_policy)

EntityAlreadyExistsException: An error occurred (EntityAlreadyExists) when calling the CreatePolicy operation: A policy called info7374policyalyceteam4 already exists. Duplicate names are not allowed.

In [93]:
policy_arn = response_iam_policy['Policy']['Arn']#------------------------IAM: Policy ID for assignment
print(policy_arn)

arn:aws:iam::730681287143:policy/info7374policyalyceteam4


In [94]:
#Creating IAM Role for Architectural access:

assume_role_policy_doc = {
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "",
      "Effect": "Allow",
      "Principal": {
        "Service": "firehose.amazonaws.com"
      },
      "Action": "sts:AssumeRole",
      "Condition": {
        "StringEquals": {
          "sts:ExternalId": "730681287143"
        }
      }
    }
  ]
}

response_iam_role = iam_client.create_role(RoleName=iam_role_name, AssumeRolePolicyDocument=json.dumps(assume_role_policy_doc))

print(response_iam_role)

EntityAlreadyExistsException: An error occurred (EntityAlreadyExists) when calling the CreateRole operation: Role with name info7374rolealyceteam4 already exists.

In [95]:
role_arn = response_iam_role['Role']['Arn']#----------------------------IAM: Role ARN
role_arn

'arn:aws:iam::730681287143:role/info7374rolealyceteam4'

In [96]:
#Attaching a Policy to a Role:

response_iam_role_policy_attach = iam_client.attach_role_policy(PolicyArn=policy_arn,RoleName=iam_role_name)
print(response_iam_role_policy_attach)

{'ResponseMetadata': {'RequestId': '7f83cf34-a459-11e9-b6d5-9fbc25767c03', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '7f83cf34-a459-11e9-b6d5-9fbc25767c03', 'content-type': 'text/xml', 'content-length': '212', 'date': 'Fri, 12 Jul 2019 03:59:50 GMT'}, 'RetryAttempts': 0}}


In [83]:
#Creating AWS Redshift Cluster:

redshift_client = boto3.client('redshift',region_name='us-east-1', aws_access_key_id = access_key, aws_secret_access_key = secret_access_key)

response_redshift = redshift_client.create_cluster(
    DBName=db_name,
    ClusterIdentifier=cluster_identifier,
    ClusterType=cluster_type,
    NodeType=node_type,
    MasterUsername=master_username,
    MasterUserPassword=master_password,
    AvailabilityZone=availability_zone,
    IamRoles=[role_arn],
    PubliclyAccessible=True)

print(response_redshift)

ClusterAlreadyExistsFault: An error occurred (ClusterAlreadyExists) when calling the CreateCluster operation: Cluster already exists

In [0]:
#Note: Wait for 5 min to get AWS Redshift cluster to setup as we need to create a Database table.

In [41]:
#Describe AWS Redshift Cluster to get an endpoint:

response_redshift_desc = redshift_client.describe_clusters(ClusterIdentifier=cluster_identifier)
print(response_redshift_desc)

{'Clusters': [{'ClusterIdentifier': 'info7374clusteralyceteam4', 'NodeType': 'dc2.large', 'ClusterStatus': 'available', 'ClusterAvailabilityStatus': 'Available', 'MasterUsername': 'root', 'DBName': 'info7374dbalyceteam4', 'Endpoint': {'Address': 'info7374clusteralyceteam4.cebdot7camvq.us-east-1.redshift.amazonaws.com', 'Port': 5439}, 'ClusterCreateTime': datetime.datetime(2019, 7, 11, 23, 45, 8, 731000, tzinfo=tzutc()), 'AutomatedSnapshotRetentionPeriod': 1, 'ManualSnapshotRetentionPeriod': -1, 'ClusterSecurityGroups': [], 'VpcSecurityGroups': [{'VpcSecurityGroupId': 'sg-f15e31aa', 'Status': 'active'}], 'ClusterParameterGroups': [{'ParameterGroupName': 'default.redshift-1.0', 'ParameterApplyStatus': 'in-sync'}], 'ClusterSubnetGroupName': 'default', 'VpcId': 'vpc-da6b0ba0', 'AvailabilityZone': 'us-east-1a', 'PreferredMaintenanceWindow': 'mon:09:30-mon:10:00', 'PendingModifiedValues': {}, 'ClusterVersion': '1.0', 'AllowVersionUpgrade': True, 'NumberOfNodes': 1, 'PubliclyAccessible': True

In [42]:
hostname = response_redshift_desc['Clusters'][0]['Endpoint']['Address']#----------------Redshift: Hostname for database
port_number = response_redshift_desc['Clusters'][0]['Endpoint']['Port']#----------------Redshift: Port Number for databse
print (hostname)
print(port_number)


info7374clusteralyceteam4.cebdot7camvq.us-east-1.redshift.amazonaws.com
5439


In [43]:
#Creating Database table on Redshift:

con = psycopg2.connect(dbname= db_name, host= hostname, port= port_number, user= master_username, password= master_password)

con.set_session(autocommit=True)
cur = con.cursor()

In [33]:
#alyce_services

query = "create table alyce_services(\
service_id integer not null,\
service_name varchar not null,\
primary key(service_id))"
print(query)


create table alyce_services(service_id integer not null,service_name varchar not null,primary key(service_id))


In [45]:
#alyce_client
query = "create table alyce_client(\
client_id integer not null,\
client_name varchar not null,\
client_sector varchar not null,\
client_city varchar not null,\
client_statecode varchar not null,\
client_zipcode integer not null,\
primary key(client_id))"
print(query)
cur.execute(query)


create table alyce_client(client_id integer not null,client_name varchar not null,client_sector varchar not null,client_city varchar not null,client_statecode varchar not null,client_zipcode integer not null,primary key(client_id))


In [46]:
#alyce_recipient
query="create table alyce_recipient(\
recipient_id integer not null,\
recipient_name varchar,\
recipient_sector varchar,\
recipient_city varchar,\
recipient_statecode varchar,\
recipient_zipcode integer,\
primary key(recipient_id))"
print(query)
cur.execute(query)

create table alyce_recipient(recipient_id integer not null,recipient_name varchar,recipient_sector varchar,recipient_city varchar,recipient_statecode varchar,recipient_zipcode integer,primary key(recipient_id))


In [47]:
#alyce_giftdata
query="create table alyce_giftdata(\
gift_id integer not null,\
gift_name varchar,\
primary key(gift_id))"
print(query)
cur.execute(query)

create table alyce_giftdata(gift_id integer not null,gift_name varchar,primary key(gift_id))


In [50]:
#alyce_clientexpenditure
query="create table alyce_clientexpenditure(\
recipient_id integer not null,\
client_id integer not null,\
gift_id integer not null,\
quantity integer not null,\
price integer,\
status varchar,\
isconverted varchar,\
foreign key(recipient_id) references alyce_recipient(recipient_id),\
foreign key(client_id) references alyce_client(client_id),\
foreign key(gift_id) references alyce_giftdata(gift_id))"
print(query)
cur.execute(query)

create table alyce_clientexpenditure(recipient_id integer not null,client_id integer not null,gift_id integer not null,quantity integer not null,price integer,status varchar,isconverted varchar,foreign key(recipient_id) references alyce_recipient(recipient_id),foreign key(client_id) references alyce_client(client_id),foreign key(gift_id) references alyce_giftdata(gift_id))


In [51]:
#alyce_facts
query="create table alyce_facts(\
client_id integer not null,\
service_id integer not null,\
total_gifts integer,\
total_amount integer,\
successful integer,\
unsuccessful integer,\
foreign key(client_id) references alyce_client(client_id),\
foreign key(service_id) references alyce_services(service_id))"
print(query)
cur.execute(query)


create table alyce_facts(client_id integer not null,service_id integer not null,total_gifts integer,total_amount integer,successful integer,unsuccessful integer,foreign key(client_id) references alyce_client(client_id),foreign key(service_id) references alyce_services(service_id))


In [97]:
#Creating Kinesis Stream:

kinesis_client = boto3.client('kinesis',region_name='us-east-1', aws_access_key_id = access_key, aws_secret_access_key = secret_access_key)

response_kinesis_data_stream = kinesis_client.create_stream(StreamName=kinesis_data_stream_name,ShardCount=shard_count)
print(response_kinesis_data_stream)

ResourceInUseException: An error occurred (ResourceInUseException) when calling the CreateStream operation: Stream info7374datastreamalyceteam4 under account 730681287143 already exists.

In [98]:
response_kinesis_data_stream_desc = kinesis_client.describe_stream(StreamName=kinesis_data_stream_name)
print(response_kinesis_data_stream_desc)

{'StreamDescription': {'StreamName': 'info7374datastreamalyceteam4', 'StreamARN': 'arn:aws:kinesis:us-east-1:730681287143:stream/info7374datastreamalyceteam4', 'StreamStatus': 'ACTIVE', 'Shards': [{'ShardId': 'shardId-000000000000', 'HashKeyRange': {'StartingHashKey': '0', 'EndingHashKey': '3402823669209384634633746074317682113'}, 'SequenceNumberRange': {'StartingSequenceNumber': '49597519892371678831354531156200982143467141429100281858'}}, {'ShardId': 'shardId-000000000001', 'HashKeyRange': {'StartingHashKey': '3402823669209384634633746074317682114', 'EndingHashKey': '6805647338418769269267492148635364228'}, 'SequenceNumberRange': {'StartingSequenceNumber': '49597519892393979576553061779342517861739789790606262290'}}, {'ShardId': 'shardId-000000000002', 'HashKeyRange': {'StartingHashKey': '6805647338418769269267492148635364229', 'EndingHashKey': '10208471007628153903901238222953046342'}, 'SequenceNumberRange': {'StartingSequenceNumber': '49597519892416280321751592402484053580012438152

In [99]:
kinesis_stream_arn = response_kinesis_data_stream_desc['StreamDescription']['StreamARN']#-----------------------Kinesis: Datastream ARN
print(kinesis_stream_arn)

arn:aws:kinesis:us-east-1:730681287143:stream/info7374datastreamalyceteam4


In [100]:
#Creating Kinesis Delivery Stream: Firehose

firehose_client = boto3.client('firehose',region_name='us-east-1',aws_access_key_id = access_key, aws_secret_access_key = secret_access_key)

cluster_jdbc_url = "jdbc:redshift://"+hostname+":"+str(port_number)+"/"+db_name
print(cluster_jdbc_url)

jdbc:redshift://info7374clusteralyceteam4.cebdot7camvq.us-east-1.redshift.amazonaws.com:5439/info7374dbalyceteam4


In [101]:
table_name = "alyce_facts"

In [102]:
response_firehose = firehose_client.create_delivery_stream(
    DeliveryStreamName = delivery_stream_name,
    DeliveryStreamType = stream_type,
    KinesisStreamSourceConfiguration={
        'KinesisStreamARN': kinesis_stream_arn,
        'RoleARN': role_arn
    },
    RedshiftDestinationConfiguration={
        'RoleARN': role_arn,
        'ClusterJDBCURL': cluster_jdbc_url,
        'CopyCommand': {
            'DataTableName': table_name,
            'DataTableColumns': 'client_id,service_id,total_gifts,total_amount',
            'CopyOptions': "json 'auto'"
        },
        'Username': master_username,
        'Password': master_password,
        'S3Configuration': {
            'RoleARN': role_arn,
            'BucketARN': bucket_arn,
            'BufferingHints': {
                'IntervalInSeconds': 60
            }
        }
    })

print(response_firehose)

{'DeliveryStreamARN': 'arn:aws:firehose:us-east-1:730681287143:deliverystream/info7374deliverystreamalyceteam4', 'ResponseMetadata': {'RequestId': 'd1eeafc6-636d-2f3b-844c-a7f4209ec5f9', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'd1eeafc6-636d-2f3b-844c-a7f4209ec5f9', 'x-amz-id-2': 'JCxTQ+4SjgwhIeCAKlnu5xDAtFSYbrlDgn1oqDneKPImegq2+s27Vr+pb9PZ0GenzHX6H3Kqh/7kyI1fTRggYkxPcFYLn3CP', 'content-type': 'application/x-amz-json-1.1', 'content-length': '111', 'date': 'Fri, 12 Jul 2019 04:00:50 GMT'}, 'RetryAttempts': 0}}


In [0]:
#Cogratulation!!! At this stage we have successfully created our Architacure

In [103]:
#Real-time data streaming:

kinesis_delivery_client = boto3.client('kinesis', region_name="us-east-1", aws_access_key_id = access_key, aws_secret_access_key = secret_access_key)

In [0]:
for i in range(0,20):
    now = datetime.datetime.now()
    data = {"player_id":i,"player_name":"player_"+str(i),"event_time":now.strftime("%Y-%m-%d %H:%M")}
    data = json.dumps(data)
    data = bytes(data, 'utf-8')
    formatted_records = [{'PartitionKey': "pk",'Data': data}]
    print(formatted_records)
    delivery_stream_name = kinesis_data_stream_name
    response = kinesis_delivery_client.put_records(StreamName=delivery_stream_name, Records=formatted_records)

In [0]:
response

In [0]:
#Now we will demolish the complete architecture:

#Delete Redshift Cluster:
response_delete_redshift = redshift_client.delete_cluster(
    ClusterIdentifier=cluster_identifier,
    SkipFinalClusterSnapshot=True
)

print(response_delete_redshift)

In [0]:
#Delete Kinesis Delivery stream:
response_delete_firehose = firehose_client.delete_delivery_stream(DeliveryStreamName=delivery_stream_name)
print(response_delete_firehose)

In [0]:
#Delete Kinesis Data stream:
response_delete_data_stream = kinesis_client.delete_stream(StreamName=kinesis_data_stream_name,EnforceConsumerDeletion=True)
print(response_delete_data_stream)

In [0]:
#Delete S3 Bucket:

#All of the Objects in a bucket must be deleted before the bucket itself can be deleted:

s3 = boto3.resource('s3', aws_access_key_id = access_key, aws_secret_access_key = secret_access_key)
bucket = s3.Bucket(bucket_name)

for key in bucket.objects.all():
    key.delete()
bucket.delete()

print("Bucket deleted successfully!")

In [0]:
#Delete IAM Policy:



In [0]:
#Delete IAM Role:

response_detach_policy = iam_client.detach_role_policy(PolicyArn=policy_arn,RoleName=iam_role_name)
print(response_detach_policy)

response_delete_role = iam_client.delete_role(RoleName=iam_role_name)
print(response_delete_role)

In [0]:
#Delete IAM Policy:

response_delete_policy = iam_client.delete_policy(PolicyArn=policy_arn)
print(response_delete_policy)