# Assignment 2 - Question 3
### Heather Chen
#### 3. Streaming Stock Data

In [1]:
import boto3
import time

session = boto3.Session()

kinesis = session.client('kinesis')
ec2 = session.resource('ec2')
ec2_client = session.client('ec2')

In [2]:
response = kinesis.create_stream(StreamName = 'stock_stream',
                                 ShardCount = 1
                                )

# Is the stream active and ready to be written to/read from? Wait until it exists before moving on:
waiter = kinesis.get_waiter('stream_exists')
waiter.wait(StreamName='stock_stream')

In [3]:
instances = ec2.create_instances(ImageId='ami-0915e09cc7ceee3ab',
                                 MinCount=1,
                                 MaxCount=2,
                                 InstanceType='t2.micro',
                                 KeyName='Heather_Chen',
                                 SecurityGroupIds=['sg-0766f5a606dc4c8c5'],
                                 SecurityGroups=['Lab5'],
                                 IamInstanceProfile=
                                     {'Name': 'EMR_EC2_DefaultRole'},
                                )

# Wait until EC2 instances are running before moving on
waiter = ec2_client.get_waiter('instance_running')
waiter.wait(InstanceIds=[instance.id for instance in instances])

In [4]:
#Create a topic to send alerts
sns = boto3.client('sns', region_name='us-east-1')
response = sns.create_topic(Name='stock_price_alerts')
topic_arn = response['TopicArn']

#Subscribe my email to that response
response = sns.subscribe(TopicArn = topic_arn, Protocol='email', Endpoint='chichen@uchicago.edu')

In [5]:
%%file producer.py

import boto3
import random
import datetime
import json

kinesis = boto3.client('kinesis', region_name='us-east-1')

# Continously write stock data into Kinesis stream
while True:
    now = datetime.datetime.now() 
    str_now = now.isoformat() 
    price = random.random() * 100 
    data = {'event_time': str_now,
            'ticker': 'AAPL',
            'price': round(price, 2)
            }
    kinesis.put_record(StreamName = "stock_stream",
                       Data = json.dumps(data),
                       PartitionKey = "partitionkey"
                      )
    #print("data in")

Overwriting producer.py


In [6]:
%%file consumer.py

import boto3
import time
import json

kinesis = boto3.client('kinesis', region_name='us-east-1')
ec2 = boto3.client('ec2', region_name='us-east-1')

shard_it = kinesis.get_shard_iterator(StreamName = "stock_stream",
                                     ShardId = 'shardId-000000000000',
                                     ShardIteratorType = 'LATEST'
                                     )["ShardIterator"]

i = 0
s = 0
    
while True:
    out = kinesis.get_records(ShardIterator = shard_it, Limit = 1)
    for o in out['Records']:
        jdat = json.loads(o['Data'])
        price = jdat['price']
        event_time = jdat['event_time']
        i = i + 1
    
    if i != 0:
        print("Current Stock Price: ", price)
        print("Current Time:", event_time)
        print("\n")
        
        
    shard_it = out['NextShardIterator']
    time.sleep(0.2)

Overwriting consumer.py


In [7]:
instance_dns = [instance.public_dns_name 
                 for instance in ec2.instances.all() 
                 if instance.state['Name'] == 'running'
               ]

code = ['producer.py', 'consumer.py']

In [8]:
import paramiko
from scp import SCPClient
ssh_producer, ssh_consumer = paramiko.SSHClient(), paramiko.SSHClient()

# Initialization of SSH tunnels takes a bit of time; otherwise get connection error on first attempt
time.sleep(5)

# Install boto3 on each EC2 instance and Copy our producer/consumer code onto producer/consumer EC2 instances
instance = 0
stdin, stdout, stderr = [[None, None] for i in range(3)]
for ssh in [ssh_producer, ssh_consumer]:
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    ssh.connect(instance_dns[instance],
                username = 'ec2-user',
                key_filename='/Users/heatherchen/.ssh/Heather_Chen.pem')
    
    with SCPClient(ssh.get_transport()) as scp:
        scp.put(code[instance])
    
    if instance == 0:
        stdin[instance], stdout[instance], stderr[instance] = \
            ssh.exec_command("sudo pip install boto3 testdata")
    else:
        stdin[instance], stdout[instance], stderr[instance] = \
            ssh.exec_command("sudo pip install boto3")

    instance += 1

# Block until Producer has installed boto3 and testdata, then start running Producer script:
producer_exit_status = stdout[0].channel.recv_exit_status() 
if producer_exit_status == 0:
    ssh_producer.exec_command("python %s" % code[0])
    print("Producer Instance is Running producer.py\n.........................................")
else:
    print("Error", producer_exit_status)

# Close ssh and show connection instructions for manual access to Consumer Instance
ssh_consumer.close; ssh_producer.close()

print("Connect to Consumer Instance by running: ssh -i \"~/.ssh/Heather_Chen.pem\" ec2-user@%s" % instance_dns[1])

Producer Instance is Running producer.py
.........................................
Connect to Consumer Instance by running: ssh -i "~/.ssh/Heather_Chen.pem" ec2-user@ec2-54-234-46-117.compute-1.amazonaws.com


In [9]:
#Just run the same code on jupyter notebook to terminate
import json

kinesis = boto3.client('kinesis', region_name='us-east-1')
ec2 = boto3.client('ec2', region_name='us-east-1')

shard_it = kinesis.get_shard_iterator(StreamName = "stock_stream",
                                     ShardId = 'shardId-000000000000',
                                     ShardIteratorType = 'LATEST'
                                     )["ShardIterator"]

i = 0
s = 0
    
while True:
    out = kinesis.get_records(ShardIterator = shard_it, Limit = 1)
    for o in out['Records']:
        jdat = json.loads(o['Data'])
        price = jdat['price']
        event_time = jdat['event_time']
        i = i + 1
    
    if i != 0:
        print("Current Stock Price: ", price)
        print("Current Time:", event_time)
        print("\n")
    
    #If price>3, send email and terminate ec2 and kinesis
    if price > 3:
        sns = boto3.client('sns', region_name='us-east-1')
        response = sns.list_topics()
        topic_arn = response['Topics'][0]['TopicArn']
        
        message = 'The stock price is {str_price} at {str_time}!'.format(str_price = price, str_time = event_time)
        response = sns.publish(TopicArn=topic_arn, Message = message, Subject = 'Stock Price Alert!')
        
        #And terminate the ec2 instance and kinesis stream
        ec2_resource = boto3.resource('ec2', region_name='us-east-1')
        instances = ec2_resource.instances.filter(Filters=[{'Name': 'instance-state-name', 'Values': ['running']}])
        ec2.terminate_instances(InstanceIds=[instance.id for instance in instances])

        waiter = ec2.get_waiter('instance_terminated')
        waiter.wait(InstanceIds=[instance.id for instance in instances])
        print("EC2 Instances Successfully Terminated")

        # Delete Kinesis Stream (if it currently exists):
        try:
            response = kinesis.delete_stream(StreamName='stock_stream')
        except kinesis.exceptions.ResourceNotFoundException:
            pass

        # Confirm that Kinesis Stream was deleted:
        waiter = kinesis.get_waiter('stream_not_exists')
        waiter.wait(StreamName='stock_stream')
        print("Kinesis Stream Successfully Deleted")
        
        #Delete SNS topic
        sns.delete_topic(TopicArn=topic_arn)
        print("SNS Topic Sucessfully Deleted")
        
        break
        
        
    shard_it = out['NextShardIterator']
    time.sleep(0.2)

Current Stock Price:  23.97
Current Time: 2020-11-11T13:12:34.377281


EC2 Instances Successfully Terminated
Kinesis Stream Successfully Deleted
SNS Topic Sucessfully Deleted
