In [1]:
import time
import datetime as dt
import numpy as np
import psycopg2
import boto3
import json
import os

  """)


In [2]:
# Open kinesis client and define stream name
client = boto3.client('kinesis')
STREAM_NAME = "st-sentiment-demo-stream"

In [3]:
# Grab config
HOST = os.environ.get('DB_HOST') or "st-deploy-ds-apps-db.cypzti2esilk.us-east-1.rds.amazonaws.com"
DB_NAME = os.environ.get('DB_NAME') or "stdemo"
USER = os.environ.get('DB_USER') or "odsc"
PASSWORD = os.environ.get('DB_PASSWORD') or "password"

In [4]:
# define our universe of symbols
SYMBOLS = ["FB", "AMZN", "AAPL", "NFLX", "GOOG"]

In [5]:
# define helper function to generate our data
def generate_symbol():
    symbol = np.random.choice(SYMBOLS)
    return symbol

def generate_sentiment_score():
    sentiment = 2 * np.random.random() - 1
    return round(sentiment, 5)

In [6]:
# connect to db
# now we will create the postgres tables and insert rows from the lists we created
# open our connection
try: 
    conn = psycopg2.connect(f"host={HOST} dbname={DB_NAME} user={USER} password={PASSWORD}")
except psycopg2.Error as e: 
    print("Error: Could not make connection to the Postgres database")
    print(e)

In [7]:
# get our cursor
try: 
    cur = conn.cursor()
except psycopg2.Error as e: 
    print("Error: Could not get curser to the Database")
    print(e)

# set connection autocommit to be true
conn.set_session(autocommit=True)

In [8]:
# create our raw sentiment table
try: 
    cur.execute("CREATE TABLE IF NOT EXISTS raw_sent_stream (created_at timestamp, message_id int, symbol varchar, sent_score numeric) ;")
except psycopg2.Error as e: 
    print("Error: Issue creating users table")
    print (e)

In [None]:
# generate data for 10 minutes and write it to postgress
t0 = dt.datetime.utcnow()
t1 = dt.datetime.utcnow()
seconds_elapsed =  (t1 - t0).total_seconds()
message_id = 0
while seconds_elapsed < 600:
    created_at = dt.datetime.utcnow()
    message_id += 1
    symbol = generate_symbol()
    sent_score = generate_sentiment_score()

    print("Created At:", created_at, "Message Id:", message_id, "Symbol:", symbol, "Sentiment Score:", sent_score)
    try: 
        cur.execute("INSERT INTO raw_sent_stream (created_at, message_id, symbol, sent_score) VALUES (%s, %s, %s, %s)", \
                    (created_at, message_id, symbol, sent_score))
    except psycopg2.Error as e: 
        print("Error: Inserting Row")
        print (e)

    # send record to kinesis
    stream_data = {
        "created_at": created_at.strftime("%Y-%m-%d %H:%M:%S"), 
        "message_id" : message_id,
        "symbol": symbol, 
        "sent_score": sent_score
    }
    r = client.put_record(StreamName=STREAM_NAME, Data=json.dumps(stream_data), PartitionKey="partitionkey")
    
    # sleep for a millisecond
    time.sleep(0.05)

    # update seconds elapsed
    t1 = dt.datetime.utcnow()
    seconds_elapsed =  (t1 - t0).total_seconds()

Created At: 2019-06-28 23:57:31.466692 Message Id: 1 Symbol: AMZN Sentiment Score: 0.67755
Created At: 2019-06-28 23:57:31.606877 Message Id: 2 Symbol: AMZN Sentiment Score: -0.20824
Created At: 2019-06-28 23:57:31.668864 Message Id: 3 Symbol: AAPL Sentiment Score: 0.90402
Created At: 2019-06-28 23:57:31.729990 Message Id: 4 Symbol: NFLX Sentiment Score: 0.69035
Created At: 2019-06-28 23:57:31.790881 Message Id: 5 Symbol: NFLX Sentiment Score: -0.96557
Created At: 2019-06-28 23:57:31.855852 Message Id: 6 Symbol: FB Sentiment Score: 0.19316
Created At: 2019-06-28 23:57:31.918218 Message Id: 7 Symbol: NFLX Sentiment Score: -0.25987
Created At: 2019-06-28 23:57:31.979809 Message Id: 8 Symbol: NFLX Sentiment Score: -0.76798
Created At: 2019-06-28 23:57:32.041760 Message Id: 9 Symbol: AAPL Sentiment Score: -0.78111
Created At: 2019-06-28 23:57:32.102675 Message Id: 10 Symbol: GOOG Sentiment Score: -0.56041
Created At: 2019-06-28 23:57:32.164729 Message Id: 11 Symbol: NFLX Sentiment Score: 0