## INFO7374: Digital Marketing Analytics
### AWS Analytics Pipeline

### Architectural Components:
 S3 Bucket: https://aws.amazon.com/s3/
 
 Kinesis Data Stream: https://aws.amazon.com/kinesis/data-streams/
 
 Kinesis Delivery Stream: https://aws.amazon.com/kinesis/data-firehose/
 
 Redshift Cluster: https://aws.amazon.com/redshift/
 
 IAM Policy: https://aws.amazon.com/iam/
 
 IAM Role: https://aws.amazon.com/iam/

In [None]:
#Importing all required libraries:
#pip install boto3
#pip install psycopg2

import boto3
import json
import psycopg2
import datetime

In [None]:
#Setting up the AWS Access keys:
#NOTE: DO NOT EXPOSE THE ACCESS & SECRET KEYS

config_file = open(r'Config.json')
config_file = json.loads(config_file.read())

access_key = config_file['access_key']
secret_access_key = config_file['secret_access_key']


In [None]:
#Configuration Parameters:

bucket_name = "info7374s3alyceteam4"#------------------------------------S3: Name of the creating AWS S3 Bucket

policy_name = "info7374policyalyceteam4"#----------------------------------IAM: Access policy name
iam_role_name = "info7374rolealyceteam4"#----------------------------------IAM: IAM Role for the architecural access

db_name = "info7374dbalyceteam4"#-------------------------------------Redshift: Database Name
cluster_identifier = "info7374clusteralyceteam4"#---------------------Redshift: Redshift Cluster Name
master_username = "root"#----------------------------------------Redshift: Admin Username
master_password = "Info7374gap"#---------------------------------Redshift: Admin Password
node_type = "dc2.large"#-----------------------------------------Redshift: Cluster configuration
cluster_type = "single-node"#------------------------------------Redshift: Cluster Type
availability_zone = "us-east-1a"#--------------------------------Redshift: Cluster Availability Zone
table_name1 = "alyce_facts"#------------------------------------Redshift: Database table name
table_name2 = "alyce_services"#------------------------------------Redshift: Database table name
table_name3 = "alyce_recipient"#------------------------------------Redshift: Database table name
table_name4 = "alyce_giftdata"#------------------------------------Redshift: Database table name
table_name5 = "alyce_clientexpenditure"#------------------------------------Redshift: Database table name
table_name6 = "alyce_client"#------------------------------------Redshift: Database table name
kinesis_data_stream_name = "info7374datastreamalyceteam4"#-------------Kinesis: Data Stream Name
shard_count = 100#------------------------------------------------Kinesis: Data Stream Shard Count

log_group_name = "info7374loggroupalyceteam4"#------------------------CloudWatch: Log Group Name
log_stream_name = "info7374logstreamalyceteam4"#------------------------CloudWatch: Log Group Name
delivery_stream_name = "info7374deliverystreamalyceteam4"#--------------Kinesis: Delivery Stream Name
stream_type = "KinesisStreamAsSource"#-----------------------------Kinesis: Delivery Stream Type


In [None]:
#Creating AWS S3 Bucket:

s3_client = boto3.client('s3', aws_access_key_id = access_key, aws_secret_access_key = secret_access_key)

response_s3 = s3_client.create_bucket(ACL="private", Bucket=bucket_name)
print(response_s3)

In [None]:
bucket_arn = "arn:aws:s3:::"+bucket_name#---------------------------S3: Bucker ARN
print(bucket_arn)

In [None]:
#Creating IAM Policy for Architecural access:

iam_client = boto3.client('iam', aws_access_key_id = access_key, aws_secret_access_key = secret_access_key)


In [None]:

policy_details = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "",
            "Effect": "Allow",
            "Action": [
                "glue:GetTableVersions"
            ],
            "Resource": "*"
        },
        {
            "Sid": "",
            "Effect": "Allow",
            "Action": [
                "s3:AbortMultipartUpload",
                "s3:GetBucketLocation",
                "s3:GetObject",
                "s3:ListBucket",
                "s3:ListBucketMultipartUploads",
                "s3:PutObject"
            ],
            "Resource": "*"
        },
        {
            "Sid": "",
            "Effect": "Allow",
            "Action": [
                "lambda:InvokeFunction",
                "lambda:GetFunctionConfiguration"
            ],
            "Resource": "*"
        },
        {
            "Sid": "",
            "Effect": "Allow",
            "Action": [
                "logs:PutLogEvents"
            ],
            "Resource": "*"
        },
        {
            "Sid": "",
            "Effect": "Allow",
            "Action": [
                "kinesis:DescribeStream",
                "kinesis:GetShardIterator",
                "kinesis:GetRecords"
            ],
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "kms:Decrypt"
            ],
            "Resource": "*"
        }
    ]
}

In [None]:
response_iam_policy = iam_client.create_policy(PolicyName=policy_name, PolicyDocument=json.dumps(policy_details))
print(response_iam_policy)

In [None]:
policy_arn = response_iam_policy['Policy']['Arn']#------------------------IAM: Policy ID for assignment
print(policy_arn)

In [None]:
#Creating IAM Role for Architectural access:

assume_role_policy_doc = {
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "",
      "Effect": "Allow",
      "Principal": {
        "Service": "firehose.amazonaws.com"
      },
      "Action": "sts:AssumeRole",
      "Condition": {
        "StringEquals": {
          "sts:ExternalId": "730681287143"
        }
      }
    }
  ]
}

response_iam_role = iam_client.create_role(RoleName=iam_role_name, AssumeRolePolicyDocument=json.dumps(assume_role_policy_doc))

print(response_iam_role)

In [None]:
role_arn = response_iam_role['Role']['Arn']#----------------------------IAM: Role ARN
role_arn

In [None]:
#Attaching a Policy to a Role:

response_iam_role_policy_attach = iam_client.attach_role_policy(PolicyArn=policy_arn,RoleName=iam_role_name)
print(response_iam_role_policy_attach)

In [None]:
#Creating AWS Redshift Cluster:

redshift_client = boto3.client('redshift',region_name='us-east-1', aws_access_key_id = access_key, aws_secret_access_key = secret_access_key)

response_redshift = redshift_client.create_cluster(
    DBName=db_name,
    ClusterIdentifier=cluster_identifier,
    ClusterType=cluster_type,
    NodeType=node_type,
    MasterUsername=master_username,
    MasterUserPassword=master_password,
    AvailabilityZone=availability_zone,
    IamRoles=[role_arn],
    PubliclyAccessible=True)

print(response_redshift)

In [None]:
#Note: Wait for 5 min to get AWS Redshift cluster to setup as we need to create a Database table.

In [None]:
#Describe AWS Redshift Cluster to get an endpoint:

response_redshift_desc = redshift_client.describe_clusters(ClusterIdentifier=cluster_identifier)
print(response_redshift_desc)

In [None]:
hostname = response_redshift_desc['Clusters'][0]['Endpoint']['Address']#----------------Redshift: Hostname for database
port_number = response_redshift_desc['Clusters'][0]['Endpoint']['Port']#----------------Redshift: Port Number for databse
print (hostname)
print(port_number)


In [None]:
#Creating Database table on Redshift:

con = psycopg2.connect(dbname= db_name, host= hostname, port= port_number, user= master_username, password= master_password)

con.set_session(autocommit=True)
cur = con.cursor()

In [None]:
#alyce_services
query = "create table alyce_services(\
service_id integer not null,\
service_name varchar not null,\
primary key(service_id))"
print(query)

In [None]:
#alyce_client
query = "create table alyce_client(\
client_id integer not null,\
client_name varchar not null,\
client_sector varchar not null,\
client_city varchar not null,\
client_statecode varchar not null,\
client_zipcode integer not null,\
primary key(client_id))"
print(query)
cur.execute(query)

In [None]:
#alyce_recipient
query="create table alyce_recipient(\
recipient_id integer not null,\
recipient_name varchar,\
recipient_sector varchar,\
recipient_city varchar,\
recipient_statecode varchar,\
recipient_zipcode integer,\
primary key(recipient_id))"
print(query)
cur.execute(query)

In [None]:
#alyce_giftdata
query="create table alyce_giftdata(\
gift_id integer not null,\
gift_name varchar,\
primary key(gift_id))"
print(query)
cur.execute(query)

In [None]:
#alyce_clientexpenditure
query="create table alyce_clientexpenditure(\
recipient_id integer not null,\
client_id integer not null,\
gift_id integer not null,\
quantity integer not null,\
price integer,\
status varchar,\
isconverted varchar,\
foreign key(recipient_id) references alyce_recipient(recipient_id),\
foreign key(client_id) references alyce_client(client_id),\
foreign key(gift_id) references alyce_giftdata(gift_id))"
print(query)
cur.execute(query)

In [None]:
#alyce_facts
query="create table alyce_facts(\
client_id integer not null,\
service_id integer not null,\
total_gifts integer,\
total_amount integer,\
successful integer,\
unsuccessful integer,\
foreign key(client_id) references alyce_client(client_id),\
foreign key(service_id) references alyce_services(service_id))"
print(query)
cur.execute(query)

In [None]:
#Creating Kinesis Stream:

kinesis_client = boto3.client('kinesis',region_name='us-east-1', aws_access_key_id = access_key, aws_secret_access_key = secret_access_key)

response_kinesis_data_stream = kinesis_client.create_stream(StreamName=kinesis_data_stream_name,ShardCount=shard_count)
print(response_kinesis_data_stream)

In [None]:
response_kinesis_data_stream_desc = kinesis_client.describe_stream(StreamName=kinesis_data_stream_name)
print(response_kinesis_data_stream_desc)

In [None]:
kinesis_stream_arn = response_kinesis_data_stream_desc['StreamDescription']['StreamARN']#-----------------------Kinesis: Datastream ARN
print(kinesis_stream_arn)

In [None]:
#Creating Kinesis Delivery Stream: Firehose

firehose_client = boto3.client('firehose',region_name='us-east-1',aws_access_key_id = access_key, aws_secret_access_key = secret_access_key)

cluster_jdbc_url = "jdbc:redshift://"+hostname+":"+str(port_number)+"/"+db_name
print(cluster_jdbc_url)

In [None]:
response_firehose = firehose_client.create_delivery_stream(
    DeliveryStreamName = delivery_stream_name,
    DeliveryStreamType = stream_type,
    KinesisStreamSourceConfiguration={
        'KinesisStreamARN': kinesis_stream_arn,
        'RoleARN': role_arn
    },
    RedshiftDestinationConfiguration={
        'RoleARN': role_arn,
        'ClusterJDBCURL': cluster_jdbc_url,
        'CopyCommand': {
            'DataTableName': table_name1,
            'DataTableColumns': 'client_id,service_id,total_gifts,total_amount',
            'CopyOptions': "json 'auto'"
        },
        'Username': master_username,
        'Password': master_password,
        'S3Configuration': {
            'RoleARN': role_arn,
            'BucketARN': bucket_arn,
            'BufferingHints': {
                'IntervalInSeconds': 60
            }
        }
    })

print(response_firehose)

In [None]:
response_firehose = firehose_client.create_delivery_stream(
    DeliveryStreamName = delivery_stream_name,
    DeliveryStreamType = stream_type,
    KinesisStreamSourceConfiguration={
        'KinesisStreamARN': kinesis_stream_arn,
        'RoleARN': role_arn
    },
    RedshiftDestinationConfiguration={
        'RoleARN': role_arn,
        'ClusterJDBCURL': cluster_jdbc_url,
        'CopyCommand': {
            'DataTableName': table_name2,
            'DataTableColumns': 'service_id,service_name',
            'CopyOptions': "json 'auto'"
        },
        'Username': master_username,
        'Password': master_password,
        'S3Configuration': {
            'RoleARN': role_arn,
            'BucketARN': bucket_arn,
            'BufferingHints': {
                'IntervalInSeconds': 60
            }
        }
    })

print(response_firehose)

In [None]:
response_firehose = firehose_client.create_delivery_stream(
    DeliveryStreamName = delivery_stream_name,
    DeliveryStreamType = stream_type,
    KinesisStreamSourceConfiguration={
        'KinesisStreamARN': kinesis_stream_arn,
        'RoleARN': role_arn
    },
    RedshiftDestinationConfiguration={
        'RoleARN': role_arn,
        'ClusterJDBCURL': cluster_jdbc_url,
        'CopyCommand': {
            'DataTableName': table_name3,
            'DataTableColumns': 'recipient_id,recipient_name,recipient_sector,recipient_city,recipient_statecode,recipient_zipcode',
            'CopyOptions': "json 'auto'"
        },
        'Username': master_username,
        'Password': master_password,
        'S3Configuration': {
            'RoleARN': role_arn,
            'BucketARN': bucket_arn,
            'BufferingHints': {
                'IntervalInSeconds': 60
            }
        }
    })

print(response_firehose)

In [None]:
response_firehose = firehose_client.create_delivery_stream(
    DeliveryStreamName = delivery_stream_name,
    DeliveryStreamType = stream_type,
    KinesisStreamSourceConfiguration={
        'KinesisStreamARN': kinesis_stream_arn,
        'RoleARN': role_arn
    },
    RedshiftDestinationConfiguration={
        'RoleARN': role_arn,
        'ClusterJDBCURL': cluster_jdbc_url,
        'CopyCommand': {
            'DataTableName': table_name4,
            'DataTableColumns': 'gift_id,gift_name',
            'CopyOptions': "json 'auto'"
        },
        'Username': master_username,
        'Password': master_password,
        'S3Configuration': {
            'RoleARN': role_arn,
            'BucketARN': bucket_arn,
            'BufferingHints': {
                'IntervalInSeconds': 60
            }
        }
    })

print(response_firehose)

In [None]:
response_firehose = firehose_client.create_delivery_stream(
    DeliveryStreamName = delivery_stream_name,
    DeliveryStreamType = stream_type,
    KinesisStreamSourceConfiguration={
        'KinesisStreamARN': kinesis_stream_arn,
        'RoleARN': role_arn
    },
    RedshiftDestinationConfiguration={
        'RoleARN': role_arn,
        'ClusterJDBCURL': cluster_jdbc_url,
        'CopyCommand': {
            'DataTableName': table_name5,
            'DataTableColumns': 'recipient_id,client_id,gift_id,quantity,price,status,isconverted',
            'CopyOptions': "json 'auto'"
        },
        'Username': master_username,
        'Password': master_password,
        'S3Configuration': {
            'RoleARN': role_arn,
            'BucketARN': bucket_arn,
            'BufferingHints': {
                'IntervalInSeconds': 60
            }
        }
    })

print(response_firehose)

In [None]:
response_firehose = firehose_client.create_delivery_stream(
    DeliveryStreamName = delivery_stream_name,
    DeliveryStreamType = stream_type,
    KinesisStreamSourceConfiguration={
        'KinesisStreamARN': kinesis_stream_arn,
        'RoleARN': role_arn
    },
    RedshiftDestinationConfiguration={
        'RoleARN': role_arn,
        'ClusterJDBCURL': cluster_jdbc_url,
        'CopyCommand': {
            'DataTableName': table_name6,
            'DataTableColumns': 'client_id,client_name,client_sector,client_city,client_statecode,client_zipcode',
            'CopyOptions': "json 'auto'"
        },
        'Username': master_username,
        'Password': master_password,
        'S3Configuration': {
            'RoleARN': role_arn,
            'BucketARN': bucket_arn,
            'BufferingHints': {
                'IntervalInSeconds': 60
            }
        }
    })

print(response_firehose)

### At this stage we have successfully created our Architacure

In [None]:
#Real-time data streaming:

kinesis_delivery_client = boto3.client('kinesis', region_name="us-east-1", aws_access_key_id = access_key, aws_secret_access_key = secret_access_key)

In [None]:
for i in range(0,20):
    now = datetime.datetime.now()
    data = {"player_id":i,"player_name":"player_"+str(i),"event_time":now.strftime("%Y-%m-%d %H:%M")}
    data = json.dumps(data)
    data = bytes(data, 'utf-8')
    formatted_records = [{'PartitionKey': "pk",'Data': data}]
    print(formatted_records)
    delivery_stream_name = kinesis_data_stream_name
    response = kinesis_delivery_client.put_records(StreamName=delivery_stream_name, Records=formatted_records)

In [None]:
response

### Code to demolish the complete architecture:

In [None]:
#Now we will demolish the complete architecture:

#Delete Redshift Cluster:
response_delete_redshift = redshift_client.delete_cluster(
    ClusterIdentifier=cluster_identifier,
    SkipFinalClusterSnapshot=True
)

print(response_delete_redshift)

In [None]:
#Delete Kinesis Delivery stream:
response_delete_firehose = firehose_client.delete_delivery_stream(DeliveryStreamName=delivery_stream_name)
print(response_delete_firehose)

In [None]:
#Delete Kinesis Data stream:
response_delete_data_stream = kinesis_client.delete_stream(StreamName=kinesis_data_stream_name,EnforceConsumerDeletion=True)
print(response_delete_data_stream)

In [None]:
#Delete S3 Bucket:

#All of the Objects in a bucket must be deleted before the bucket itself can be deleted:

s3 = boto3.resource('s3', aws_access_key_id = access_key, aws_secret_access_key = secret_access_key)
bucket = s3.Bucket(bucket_name)

for key in bucket.objects.all():
    key.delete()
bucket.delete()

print("Bucket deleted successfully!")

In [None]:
#Delete IAM Role:

response_detach_policy = iam_client.detach_role_policy(PolicyArn=policy_arn,RoleName=iam_role_name)
print(response_detach_policy)

response_delete_role = iam_client.delete_role(RoleName=iam_role_name)
print(response_delete_role)

In [None]:
#Delete IAM Policy:

response_delete_policy = iam_client.delete_policy(PolicyArn=policy_arn)
print(response_delete_policy)