In [32]:
import boto3
import time

session = boto3.Session()

kinesis = session.client('kinesis')
ec2 = session.resource('ec2')
ec2_client = session.client('ec2')

In [33]:
response = kinesis.create_stream(StreamName = 'test_stream',
                                 ShardCount = 1
                                )

# Is the stream active and ready to be written to/read from? Wait until it exists before moving on:
waiter = kinesis.get_waiter('stream_exists')
waiter.wait(StreamName='test_stream')

In [34]:
instances = ec2.create_instances(ImageId='ami-0915e09cc7ceee3ab',
                                 MinCount=1,
                                 MaxCount=2,
                                 InstanceType='t2.micro',
                                 KeyName='YileC',
                                 SecurityGroupIds=['sg-09db00853c375ee8c'],
                                 SecurityGroups=['Q3'],
                                 IamInstanceProfile=
                                     {'Name': 'EMR_EC2_DefaultRole'},
                                )

# Wait until EC2 instances are running before moving on
waiter = ec2_client.get_waiter('instance_running')
waiter.wait(InstanceIds=[instance.id for instance in instances])

In [35]:
#create an sns topic and subscribe
sns = boto3.client('sns', region_name='us-east-1')
response = sns.create_topic(Name = 'Price_Alert')
topic_arn = response['TopicArn']
response1 = sns.subscribe(TopicArn=topic_arn, Protocol = 'email', 
                          Endpoint = 'yilec@uchicago.edu')

In [49]:
#check the topic_arn
topic_arn

'arn:aws:sns:us-east-1:698849050473:Price_Alert'

In [36]:
%%file producer.py

import boto3
import random
import datetime
import json

kinesis = boto3.client('kinesis', region_name='us-east-1')

def getReferrer():
    data = {}
    now = datetime.datetime.now()
    str_now = now.isoformat()
    data['EVENT_TIME'] = str_now
    data['TICKER'] = 'AAPL'
    price = random.random() * 100 # Assume price is in USD
    data['PRICE'] = round(price, 2)
    return data

while 1 == 1:
    data = getReferrer()
    kinesis.put_record(StreamName = "test_stream",
                       Data = json.dumps(data),
                       PartitionKey = "partitionkey"
                      )

Overwriting producer.py


In [62]:
%%file consumer.py

import boto3
import random
import datetime
import json
import time

sns = boto3.client('sns', region_name='us-east-1')
kinesis = boto3.client('kinesis', region_name='us-east-1')

shard_it = kinesis.get_shard_iterator(StreamName = "test_stream",
                                     ShardId = 'shardId-000000000000',
                                     ShardIteratorType = 'LATEST'
                                     )["ShardIterator"]


while 1==1:
    out = kinesis.get_records(ShardIterator = shard_it,
                              Limit = 1)
    o = out['Records'][0]
    jdat = json.loads(o['Data'])
    price = jdat['PRICE']
    event_time = jdat['EVENT_TIME']
        
    print("Price: ", price)
    print("Time:", event_time)
    print("\n")
    
    #####For Question 3b####
    if price < 3:
        response = sns.publish(TopicArn = 'arn:aws:sns:us-east-1:698849050473:Price_Alert',
                               Message = 'Current Price: {Price}, Current Time: {Time}'.format(Price=price, Time=event_time),
                               Subject = 'Price Alert')
                
        break
    #######################    
    
    
    shard_it = out['NextShardIterator']
    time.sleep(0.2)

Overwriting consumer.py


In [63]:
instance_dns = [instance.public_dns_name 
                 for instance in ec2.instances.all() 
                 if instance.state['Name'] == 'running'
               ]

code = ['producer.py', 'consumer.py']

In [64]:
import paramiko
from scp import SCPClient
ssh_producer, ssh_consumer = paramiko.SSHClient(), paramiko.SSHClient()

# Initialization of SSH tunnels takes a bit of time; otherwise get connection error on first attempt
time.sleep(5)

# Install boto3 on each EC2 instance and Copy our producer/consumer code onto producer/consumer EC2 instances
instance = 0
stdin, stdout, stderr = [[None, None] for i in range(3)]
for ssh in [ssh_producer, ssh_consumer]:
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    ssh.connect(instance_dns[instance],
                username = 'ec2-user',
                key_filename='/Users/miao/Desktop/YileC.pem')
    
    with SCPClient(ssh.get_transport()) as scp:
        scp.put(code[instance])
    
    stdin[instance], stdout[instance], stderr[instance] = \
            ssh.exec_command("sudo pip install boto3")
    instance += 1


# Block until Producer has installed boto3 and testdata, then start running Producer script:
producer_exit_status = stdout[0].channel.recv_exit_status() 
if producer_exit_status == 0:
    ssh_producer.exec_command("python %s" % code[0])
    print("Producer Instance is Running producer.py\n.........................................")
else:
    print("Error", producer_exit_status)

# Close ssh and show connection instructions for manual access to Consumer Instance
ssh_consumer.close; ssh_producer.close()

print("Connect to Consumer Instance by running: ssh -i \"YileC.pem\" ec2-user@%s" % instance_dns[1])

Producer Instance is Running producer.py
.........................................
Connect to Consumer Instance by running: ssh -i "YileC.pem" ec2-user@ec2-52-72-230-251.compute-1.amazonaws.com


In [65]:
# Terminate EC2 Instances:
ec2_client.terminate_instances(InstanceIds=[instance.id for instance in instances])

# Confirm that EC2 instances were terminated:
waiter = ec2_client.get_waiter('instance_terminated')
waiter.wait(InstanceIds=[instance.id for instance in instances])
print("EC2 Instances Successfully Terminated")

# Delete Kinesis Stream (if it currently exists):
try:
    response = kinesis.delete_stream(StreamName='test_stream')
except kinesis.exceptions.ResourceNotFoundException:
    pass

# Confirm that Kinesis Stream was deleted:
waiter = kinesis.get_waiter('stream_not_exists')
waiter.wait(StreamName='test_stream')
print("Kinesis Stream Successfully Deleted")

#Delete SNS topic
sns.delete_topic(TopicArn=topic_arn)

EC2 Instances Successfully Terminated
Kinesis Stream Successfully Deleted
