### Setup for streaming Reddit data

In [21]:
import boto3
import paramiko
from scp import SCPClient
import time

In [22]:
session = boto3.Session()

kinesis = boto3.client('kinesis')
ec2 = session.resource('ec2')
ec2_client = session.client('ec2')

# turn on the stream
kinesis.create_stream(StreamName = 'reddit',
                     ShardCount = 1)

# make sure we wait until AWS is ready
waiter = kinesis.get_waiter('stream_exists')
waiter.wait(StreamName = 'reddit')

In [23]:
instances = ec2.create_instances(ImageId='ami-02e136e904f3da870',
                                MinCount=1,
                                MaxCount=2,
                                InstanceType='t2.micro',
                                KeyName='test', # pem name
                                SecurityGroupIds=['sg-04746102478f6abe5'],
                                SecurityGroups=['launch-wizard-1'],
                                IamInstanceProfile={'Name': 'EMR_EC2_DefaultRole'})

In [24]:
# wait again
waiter = ec2_client.get_waiter('instance_running')
waiter.wait(InstanceIds = [instance.id for instance in instances])

In [25]:
# send programs to ec2 instance:
instance_dns = [instance.public_dns_name for instance in ec2.instances.all() if instance.state['Name'] == 'running']
code = ['producer.py', 'consumer.py']
ssh_producer, ssh_consumer = paramiko.SSHClient(), paramiko.SSHClient()

# Initialization of SSH tunnels takes a bit of time; otherwise get connection error on first attempt
time.sleep(60)

In [26]:
# Install boto3 on each EC2 instance and Copy our producer/consumer code onto producer/consumer EC2 instances
instance = 0
stdin, stdout, stderr = [[None, None] for i in range(3)]
for ssh in [ssh_producer, ssh_consumer]:
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    ssh.connect(instance_dns[instance],
                username = 'ec2-user',
                key_filename='C:/Users/Beau Smit/Documents/UChicago/Fall_2021/30123_LSC/test.pem')
    
    with SCPClient(ssh.get_transport()) as scp:
        scp.put(code[instance])
    
    if instance == 0:
        stdin[instance], stdout[instance], stderr[instance] = \
            ssh.exec_command("sudo pip3 install boto3 pandas praw")
    else:
        stdin[instance], stdout[instance], stderr[instance] = \
            ssh.exec_command("sudo pip3 install boto3 pandas praw")

    instance += 1


In [27]:
# Block until Producer has installed boto3, then start running Producer script:
producer_exit_status = stdout[0].channel.recv_exit_status() 
if producer_exit_status == 0:
    ssh_producer.exec_command("python3 %s" % code[0])
    print("Producer Instance is Running producer.py\n.........................................")
else:
    print("Error", producer_exit_status)

consumer_exit_status = stdout[1].channel.recv_exit_status() 
if consumer_exit_status == 0:
#     ssh_consumer.exec_command("python3 %s" % code[1])
    print("Consumer Instance is Running consumer.py\n.........................................")
else:
    print("Error", consumer_exit_status)
    

# Close ssh and show connection instructions for manual access to Consumer Instance
ssh_consumer.close; ssh_producer.close()
# print("Connect to Producer Instance by running: ssh -i \"test.pem\" ec2-user@%s" % instance_dns[0])
print("Connect to Consumer Instance by running: ssh -i \"test.pem\" ec2-user@%s" % instance_dns[1])

Producer Instance is Running producer.py
.........................................
Consumer Instance is Running consumer.py
.........................................
Connect to Consumer Instance by running: ssh -i "test.pem" ec2-user@ec2-18-206-253-226.compute-1.amazonaws.com


### Shut down AWS resources

In [28]:
# terminate ec2 instances
ec2 = session.resource('ec2')
ec2_client = session.client('ec2')

instances = ec2.instances.filter(
    Filters=[{'Name': 'instance-state-name', 'Values': ['running']}])
for instance in instances:
    print(instance.id, instance.instance_type)

response = ec2_client.terminate_instances(
    InstanceIds=[instance.id for instance in instances])
waiter = ec2_client.get_waiter('instance_terminated')
waiter.wait(InstanceIds = [instance.id for instance in instances])
print("Terminated EC2 instances.")


try:
    response = kinesis.delete_stream(StreamName = 'reddit')
except kinesis.exceptions.ResourceNotFoundException:
    print('Kinesis Stream did not delete')

waiter = kinesis.get_waiter('stream_not_exists')
waiter.wait(StreamName='reddit')
print("Deleted reddit Kinesis stream.")

i-0bc9755c5e1d3073e t2.micro
i-015db72aff4522c37 t2.micro
Terminated EC2 instances.
Deleted reddit Kinesis stream.


### Streaming using the Reddit API instead

In [None]:
reddit = praw.Reddit(client_id="bugSUScyyC98vFTHpyZXMQ", 
                    client_secret="KDhBF8XvjxL11WZmHIuhnaNQ3nzzSg", 
                    user_agent="Scraping")

In [None]:
term_list = []
for terms in key_terms['Key Terms']:
    for term in terms.split(', '):
        term_list.append(term)
search_terms = "|".join(term_list)

In [None]:
# look for our words of interest across all new submissions
for submission in reddit.subreddit("all").stream.submissions():
    if re.search(search_terms, submission.title):
        print(submission.title)