# Producer Development


### Notebook for testing out the interface to Twitter and IEX

In [8]:
#Import
import numpy as np
import pandas as pd

#Twitter requirements
from tweepy.streaming import StreamListener
from tweepy import OAuthHandler
from tweepy import Stream
from tweepy import API

#IEX requirements
import urllib.request


import json

### Create Company List

In [9]:
companies = {
    "AAPL":"Apple",
    "FB":"Facebook",
    "GOOG":"Google"
    }

#company_df = pd.DataFrame(companies.items(), columns=['Company Name','Ticker'])
company_df = pd.DataFrame.from_dict(companies, orient='index')
company_df.index.name = 'Ticker'
company_df.columns=['Company']
company_df.head()

Unnamed: 0_level_0,Company
Ticker,Unnamed: 1_level_1
AAPL,Apple
FB,Facebook
GOOG,Google


In [91]:
#Write to CSV
company_df.to_csv('companies.csv')

### Twitter Ingestion

In [140]:
#Created a twitter tokens file. That has the tokens. This will not be in GIT
from twitter_tokens import *

In [10]:
#Add Code to read CSV file. In order to account for changes in the list, have to restart
companies = pd.read_csv('companies.csv')
companies.set_index('Ticker', inplace=True)

#Add code to add ticker symbol
companies['tweet_ticker']=companies.index.map(lambda x: '$'+x)
companies

Unnamed: 0_level_0,Company,tweet_ticker
Ticker,Unnamed: 1_level_1,Unnamed: 2_level_1
AAPL,Apple,$AAPL
FB,Facebook,$FB
GOOG,Google,$GOOG


In [11]:
#tickers = companies['tweet_ticker'].tolist()
tickers = companies['tweet_ticker'].tolist()
#Tweets with the ticker in front come in very slowly. May take a while to build up. But these are official tweets

In [142]:
#Create Filter function to filter attributes and Add Company name to the dictionary.

#Want to create a simple list, but how do I handle the users sub?
attributes = ['created_at',
             'id_str',
             'text',
              'quote_count',
              'reply_count',
              'retweet_count',
              'favorite_count',
              'retweeted',
              ['user','name'],
              ['user','followers_count'],
              ['user','statuses_count'],
              ['user','screen_name'],
              ['entities','hashtags'],
              ['entities','symbols']
               ]
def filter_attr(data):
    output = {}
    #Choose filter attributes
    for element in attributes:
        if isinstance(element, str):
            output[element]=data[element]
        else:
            string = str(element[0])+'_'+str(element[1])
            output[string]=data[element[0]][element[1]]
  
    #Need to also add the company name to output dictionary.
    #Add all companies tweet applies to in list
    attached_company = []
    
    for company in tickers:
        if data['text'].find(company) > -1:
            attached_company.append(company[1:])
            
    
    output['Company']=attached_company             
    
    return output

#This is a basic listener that just prints received tweets to stdout.
class TweetListener(StreamListener):
    global count
    count = 0
    def on_data(self, data):
        try:
            datajson = json.loads(data)
            filtered = filter_attr(datajson)
            #Check to see if a valid tweet
            if filtered['Company']:
                global count
                count += 1
                print(count)
                print(filtered)
            #print(datajson['text'])
            #coll_reference.insert_one(datajson)
            return True
        except Exception as e:
            print(e)

    def on_error(self, status):
        print('The error code is: ' + repr(status))
        #Continue even if there is an error
        return True

#This handles Twitter authetication and the connection to Twitter Streaming API
tweetlist = TweetListener(api=API(wait_on_rate_limit=True,wait_on_rate_limit_notify=True))
auth = OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
auth.set_access_token(ACCESS_TOKEN, ACCESS_TOKEN_SECRET)
stream = Stream(auth, tweetlist)

#Filters by the ticker names
print('Filtering: ' + str(tickers))
stream.filter(track=tickers)

Filtering: ['$AAPL', '$FB', '$GOOG']
1
{'created_at': 'Sun Jan 28 03:08:48 +0000 2018', 'id_str': '957450356350439424', 'text': 'RT @OMillionaires: Some implied moves for next week earnings, it is a big one:\n\n$AAPL 4.7%\n$BABA 6.9%\n$AMZN 6.8%\n$FB 5.8%\n$GOOGL 5%\n$MSFT 5…', 'quote_count': 0, 'reply_count': 0, 'retweet_count': 0, 'favorite_count': 0, 'retweeted': False, 'user_name': 'Nosson Herzka', 'user_followers_count': 1570, 'user_statuses_count': 24813, 'user_screen_name': 'nosmh', 'entities_hashtags': [], 'entities_symbols': [{'text': 'AAPL', 'indices': [80, 85]}, {'text': 'BABA', 'indices': [91, 96]}, {'text': 'AMZN', 'indices': [102, 107]}, {'text': 'FB', 'indices': [113, 116]}, {'text': 'GOOGL', 'indices': [122, 128]}, {'text': 'MSFT', 'indices': [132, 137]}], 'Company': ['AAPL', 'FB', 'GOOG']}


KeyboardInterrupt: 

# Stock - IEX Ingestion

In [1]:
!pip install schedule

Collecting schedule
  Downloading schedule-0.5.0-py2.py3-none-any.whl
Installing collected packages: schedule
Successfully installed schedule-0.5.0


In [2]:
!pip install boto3



In [4]:
import urllib.request
import json


In [5]:
response = urllib.request.urlopen("https://api.iextrading.com/1.0/stock/nati/quote")
str_response = response.read().decode('utf-8')
obj = json.loads(str_response)
obj

{'avgTotalVolume': 523792,
 'calculationPrice': 'close',
 'change': 0.86,
 'changePercent': 0.01867,
 'close': 46.93,
 'closeTime': 1517000400351,
 'companyName': 'National Instruments Corporation',
 'delayedPrice': 46.93,
 'delayedPriceTime': 1517003144777,
 'high': 46.97,
 'iexAskPrice': None,
 'iexAskSize': None,
 'iexBidPrice': None,
 'iexBidSize': None,
 'iexLastUpdated': None,
 'iexMarketPercent': None,
 'iexRealtimePrice': None,
 'iexRealtimeSize': None,
 'iexVolume': None,
 'latestPrice': 46.93,
 'latestSource': 'Close',
 'latestTime': 'January 26, 2018',
 'latestUpdate': 1517000400351,
 'latestVolume': 436306,
 'low': 44.74,
 'marketCap': 6135863882,
 'open': 46.18,
 'openTime': 1516977000156,
 'peRatio': 53.33,
 'previousClose': 46.07,
 'primaryExchange': 'Nasdaq Global Select',
 'sector': 'Technology',
 'symbol': 'NATI',
 'week52High': 46.97,
 'week52Low': 30.5621,
 'ytdChange': 0.10945626477541379}

In [17]:
import time
#Convert to seconds
ctime = obj['openTime']/1000

time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(ctime))

'2018-01-26 08:30:00'

In [14]:
stock_tickers = companies.index.tolist()
stock_tickers

['AAPL', 'FB', 'GOOG']

### Function Definition

In [30]:
#Want to create a simple list, and then convert the time stamps
attributes = ['latestUpdate',
             'companyName',
             'latestPrice',
              'latestVolume',
              'marketCap',
              'open',
              'previousClose',
              'sector',
              'high',
              'low',
              'ytdChange',
              'peRatio',
              'week52High',
              'week52Low'
               ]
def filter_stock_attributes(data):
    output = {}
    #Choose filter attributes
    for element in attributes:
        output[element]=data[element]
    
    #Convert time
    ctime = output['latestUpdate']/1000
    new_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(ctime))
    output['latestUpdate']= new_time
    
    return output

#Create function to fetch the stock data. This is to prepare for the schedule.
def fetch_stock_data(stocks=[]):
    
    for ticker in stocks:
        url = "https://api.iextrading.com/1.0/stock/{}/quote".format(ticker)
        response = urllib.request.urlopen(url)
        str_response = response.read().decode('utf-8')
        obj = json.loads(str_response)
        filtered = filter_stock_attributes(obj)
        #<----- Insert to Kinesis Stream ------->
        print(filtered)
    
    

### Script

In [35]:
import schedule
import time

#Read CSV File with the companies and setup dataframe
companies = pd.read_csv('companies.csv')
companies.set_index('Ticker', inplace=True)
stock_tickers = companies.index.tolist()

#Setup Schedule
schedule.clear()
schedule.every(1).minutes.do(fetch_stock_data, stocks=stock_tickers)

#Execute
while True:
    schedule.run_pending()
    time.sleep(1)

{'latestUpdate': '2018-01-26 15:00:00', 'companyName': 'Apple Inc.', 'latestPrice': 171.51, 'latestVolume': 39062792, 'marketCap': 872480974560, 'open': 172, 'previousClose': 171.11, 'sector': 'Technology', 'high': 172, 'low': 170.06, 'ytdChange': -0.004353883664228492, 'peRatio': 18.64, 'week52High': 180.1, 'week52Low': 120.62}
{'latestUpdate': '2018-01-26 15:00:00', 'companyName': 'Facebook Inc.', 'latestPrice': 190, 'latestVolume': 17392522, 'marketCap': 552103662120, 'open': 187.66, 'previousClose': 187.48, 'sector': 'Technology', 'high': 190, 'low': 186.81, 'ytdChange': 0.04729357292470518, 'peRatio': 36.61, 'week52High': 190.66, 'week52Low': 129.5157}
{'latestUpdate': '2018-01-26 15:00:00', 'companyName': 'Alphabet Inc.', 'latestPrice': 1175.84, 'latestVolume': 1978815, 'marketCap': 816976114198, 'open': 1175.08, 'previousClose': 1170.37, 'sector': 'Technology', 'high': 1175.84, 'low': 1158.11, 'ytdChange': 0.10407511737089194, 'peRatio': None, 'week52High': 1179.86, 'week52Low':

KeyboardInterrupt: 

### Redis example

In [36]:
from redis import Redis
from os import environ
REDIS = Redis(host=environ['THIS_REDIS_PORT_6379_TCP_ADDR'])

KeyError: 'THIS_REDIS_PORT_6379_TCP_ADDR'