In [1]:
#import json, numpy, datetime, boto3, random, time
import paramiko, boto3
from scp import SCPClient

In [2]:
session = boto3.Session()
kinesis = session.client('kinesis')
ec2 = session.resource('ec2')
ec2_client = session.client('ec2')

In [4]:
response = kinesis.create_stream(StreamName = 'assignment2_stream', ShardCount = 1)
waiter = kinesis.get_waiter('stream_exists')
waiter.wait(StreamName='assignment2_stream')

In [5]:
instances = ec2.create_instances(ImageId='ami-0915e09cc7ceee3ab',
                                 MinCount=1,
                                 MaxCount=2,
                                 InstanceType='t2.micro',
                                 KeyName='bjcliang_macs',
                                 SecurityGroupIds=['sg-04884b077c7fe478b'],
                                 SecurityGroups=['assignment2'],
                                 IamInstanceProfile=
                                     {'Name': 'EMR_EC2_DefaultRole'},
                                )

waiter = ec2_client.get_waiter('instance_running')
waiter.wait(InstanceIds=[instance.id for instance in instances])

In [6]:
sns = boto3.client('sns')
topic_arn = sns.create_topic(Name = 'w2_alerts')['TopicArn']
response = sns.subscribe(TopicArn=topic_arn, Protocol = 'email', 
                         Endpoint = 'bjcliang@gmail.com')
response

{'SubscriptionArn': 'pending confirmation',
 'ResponseMetadata': {'RequestId': '5c751b38-58a8-50da-997b-4c0433cce451',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '5c751b38-58a8-50da-997b-4c0433cce451',
   'content-type': 'text/xml',
   'content-length': '298',
   'date': 'Mon, 18 May 2020 10:53:24 GMT'},
  'RetryAttempts': 0}}

In [7]:
%%file producer.py
import boto3, datetime, random, json

kinesis = boto3.client('kinesis', region_name='us-east-1')
sns = boto3.client('sns', region_name='us-east-1')

def getReferrer():
    data = {}
    now = datetime.datetime.now()
    str_now = now.isoformat()
    data['EVENT_TIME'] = str_now
    data['TICKER'] = 'AAPL'
    price = random.random() * 100 # Assume price is in USD
    data['PRICE'] = round(price, 2)
    return data

while True:
    data = getReferrer()
    kinesis.put_record(StreamName = "assignment2_stream",
                       Data = json.dumps(data),
                       PartitionKey = "partitionkey")
    

Overwriting producer.py


In [9]:
import datetime
datetime.datetime.now()

datetime.datetime(2020, 5, 18, 5, 53, 55, 422859)

In [17]:
kinesis = boto3.client('kinesis', region_name='us-east-1')    
shard_it = kinesis.get_shard_iterator(StreamName = "assignment2_stream",
                                     ShardId = 'shardId-000000000000',
                                     ShardIteratorType = 'TRIM_HORIZON',
                                     #Timestamp = datetime.datetime.now(),
                                     )["ShardIterator"]
kinesis.get_records(ShardIterator = shard_it, Limit = 1) 

{'Records': [{'SequenceNumber': '49607116597743420983118306085219934947394337729130004482',
   'ApproximateArrivalTimestamp': datetime.datetime(2020, 5, 18, 5, 55, 48, 416000, tzinfo=tzlocal()),
   'Data': b'{"PRICE": 96.87, "TICKER": "AAPL", "EVENT_TIME": "2020-05-18T10:55:48.374421"}',
   'PartitionKey': 'partitionkey'}],
 'NextShardIterator': 'AAAAAAAAAAEO4rNg4mcYLlBwqaPgA5WFaheZvI5Jtwpz/ckdy/r1REx0b5VVK47YcacYM/iOp28XsTjYcJlEoxIQnf2SQqj7WX/n3PDwvfdJX4fDQUGGQnvoTAlAT6Vg04rISz71yDDfi14zj6aMjXK5sSjQIDhCGM+xZXGq0KmVruGX7zrI6iniupYDOApxAkpKrRLY9TcLhWAvnshxShshK4BzSrjDqaklpqJZmkshtQD/1P6tLw==',
 'MillisBehindLatest': 25000,
 'ResponseMetadata': {'RequestId': 'cd61c0cd-12bd-e95a-9a5d-acc68b426d8f',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'cd61c0cd-12bd-e95a-9a5d-acc68b426d8f',
   'x-amz-id-2': '4joj8mHZlKZNtpbUYKyunhKIT9paUH1TMliaAnxxG0rn117EYuXFbDmFxxvlsLezICxT+zPWhnmxJ39qR4wi0GtE9EQURfwe',
   'date': 'Mon, 18 May 2020 10:56:11 GMT',
   'content-type': 'application

In [20]:
%%file consumer.py
import boto3, json, datetime, time, ast

def run_all():
    sns = boto3.client('sns', region_name='us-east-1')
    kinesis = boto3.client('kinesis', region_name='us-east-1')    
    shard_it = kinesis.get_shard_iterator(StreamName = "assignment2_stream",
                                     ShardId = 'shardId-000000000000',
                                     ShardIteratorType = 'TRIM_HORIZON',
                                     #Timestamp = datetime.datetime.now(),
                                     )["ShardIterator"]

    while True:
        out = kinesis.get_records(ShardIterator = shard_it, Limit = 1) 
        
        data = out['Records'][0]['Data']
        data = ast.literal_eval(data.decode("UTF-8"))
        price = data['PRICE']
        
        if price < 3:
            message = 'Current Price = ' + str(price)
            message += ', Current Time =' + str(datetime.datetime.now())
            response = sns.publish(TopicArn = 'arn:aws:sns:us-east-1:191178721407:w2_alerts',
                           Message = message,
                           Subject = 'Stock Price Alert')
            break

        shard_it = out['NextShardIterator']
        time.sleep(0.2)
    return
    
run_all()

Overwriting consumer.py


In [11]:
instance_dns = [instance.public_dns_name 
                 for instance in ec2.instances.all() 
                 if instance.state['Name'] == 'running']
code = ['producer.py', 'consumer.py']
instance_dns

['ec2-3-90-64-193.compute-1.amazonaws.com',
 'ec2-3-92-0-78.compute-1.amazonaws.com']

In [15]:
ssh_producer, ssh_consumer = paramiko.SSHClient(), paramiko.SSHClient()
instance = 0
stdin, stdout, stderr = [[None, None] for i in range(3)]
for ssh in [ssh_producer, ssh_consumer]:
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    ssh.connect(instance_dns[instance],
                username = 'ec2-user',
                key_filename='/Users/chen.liang/bjcliang_macs.pem')
    
    with SCPClient(ssh.get_transport()) as scp:
        scp.put(code[instance])
    
    stdin[instance], stdout[instance], stderr[instance] = \
            ssh.exec_command("sudo pip install boto3")
    instance += 1

In [22]:
producer_exit_status = stdout[0].channel.recv_exit_status() 
if producer_exit_status == 0:
    ssh_producer.exec_command("python %s" % code[0])
    print("Producer Instance is Running producer.py")
else: print("Error", producer_exit_status)

Producer Instance is Running producer.py


In [23]:
#time.sleep(20)
#ssh_producer.exec_command("python %s" % code[0])
#time.sleep(20)
ssh_consumer.exec_command("python %s" % code[1])

(<paramiko.ChannelFile from <paramiko.Channel 3 (open) window=2097152 -> <paramiko.Transport at 0x113bbf60 (cipher aes128-ctr, 128 bits) (active; 1 open channel(s))>>>,
 <paramiko.ChannelFile from <paramiko.Channel 3 (open) window=2097152 -> <paramiko.Transport at 0x113bbf60 (cipher aes128-ctr, 128 bits) (active; 1 open channel(s))>>>,
 <paramiko.ChannelFile from <paramiko.Channel 3 (open) window=2097152 -> <paramiko.Transport at 0x113bbf60 (cipher aes128-ctr, 128 bits) (active; 1 open channel(s))>>>)

In [24]:
ssh_producer.close(); ssh_consumer.close()

In [72]:
# Terminate EC2 Instances:
ec2_client.terminate_instances(InstanceIds=[instance.id for instance in instances])

# Confirm that EC2 instances were terminated:
waiter = ec2_client.get_waiter('instance_terminated')
waiter.wait(InstanceIds=[instance.id for instance in instances])
print("EC2 Instances Successfully Terminated")

# Delete Kinesis Stream (if it currently exists):
try: response = kinesis.delete_stream(StreamName='assignment2_stream')
except kinesis.exceptions.ResourceNotFoundException: pass

# Confirm that Kinesis Stream was deleted:
waiter = kinesis.get_waiter('stream_not_exists')
waiter.wait(StreamName='assignment2_stream')
print("Kinesis Stream Successfully Deleted")

EC2 Instances Successfully Terminated
Kinesis Stream Successfully Deleted


## Email Result

<img src="email_notification.png" width="600" align="left" />