In [4]:
import requests
import requests.auth
from requests_oauthlib import OAuth1
import oauth2 as oauth
import time
import os
import json
import pprint
import csv
import base64
from datetime import datetime
from kafka import KafkaProducer
from json import dumps


In [5]:
# Import credentials from local source:
reader = csv.reader(open("/Users/mchifala/Desktop/ATLS_5412/Credentials.csv"))
credentials = {}
for line in reader:
    credentials[line[0]] = line[1]
    

In [6]:
# Generate Reddit credentials for API:
def getRedditHeaders():
    reddit_id = credentials['reddit_id']
    reddit_secret = credentials['reddit_secret']
    reddit_username = credentials['reddit_username']
    reddit_password = credentials['reddit_password']
    reddit_user_agent = credentials['reddit_user_agent']

    reddit_auth = requests.auth.HTTPBasicAuth(reddit_id, reddit_secret)
    reddit_post_data = {'grant_type': 'password', 'username': reddit_username, 'password': reddit_password}
    reddit_token_headers = {'User-Agent': reddit_user_agent}
    r_auth = requests.post("https://www.reddit.com/api/v1/access_token", auth= reddit_auth, data=reddit_post_data, headers=reddit_token_headers)

    reddit_token = r_auth.json()['token_type'] + ' ' + r_auth.json()['access_token']
    
    # Return header dictionary
    return {"Authorization": reddit_token, "User-Agent": reddit_user_agent, 'Content-Type': 'application/json'}


In [7]:
# Get Twitter credentials:
def getTwitterAuth():
    twitter_key = credentials['twitter_key']
    twitter_secret =  credentials['twitter_secret']
    twitter_token = credentials['twitter_token']
    twitter_secret_token = credentials['twitter_secret_token']
    
    #Return authorization object
    return OAuth1(twitter_key, twitter_secret, twitter_token, twitter_secret_token)

getTwitterAuth()

<requests_oauthlib.oauth1_auth.OAuth1 at 0x10870cfd0>

In [8]:
# Get all available Twitter trends:
twitter_auth = getTwitterAuth()
r_trends = requests.get('https://api.twitter.com/1.1/trends/available.json', auth = twitter_auth)

In [12]:
# Extract the location ID codes for trends:
world_codes = []
for i in range(0,len(r_trends.json())):
    world_codes.append(r_trends.json()[i]['woeid'])

test_codes = world_codes[20:30]
print(test_codes)

[26042, 26062, 26734, 28218, 28869, 30079, 30720, 32185, 32452, 32566]


In [13]:
# Search each location ID for its specific trends:
topics = []
for code in world_codes:
    r_trends2 = requests.get('https://api.twitter.com/1.1/trends/place.json?id='+str(code), auth = twitter_auth)
    for i in range(0,len(r_trends2.json()[0])):
        print(r_trends2.json()[0]['trends'][i]['name'])
        topics.append((r_trends2.json()[0]['trends'][i]['name']))

#MuellerReport
#NAPARS
Dave East
Ontas
#CELLYSZN
Happy Easter
#MuellerReport
#EasterWeekend
Happy Easter
#MuellerReport
#ocsbLent
Stations of the Cross
#CELLYSZN
Happy Easter
#MuellerReport
#EasterWeekend
Québec
#CELLYSZN
Happy Easter
#MuellerReport
Toronto Public Health
Happy Easter
endangered species act
#longweekend
Albertans
Calgary
Kenney
#CELLYSZN
#CELLYSZN
Happy Easter
#MuellerReport
#EasterWeekend
#CELLYSZN
Happy Easter
#MuellerReport
#EasterWeekend
#COYS
Spurs
Easter
Liverpool
#Blackpool
Easter
#BWFC
Napoli
Easter
Spurs
Liverpool
#COYS
Napoli
#NAPARS
#ClimateCatastrophe
#CHESLA
Easter
Liverpool
Napoli
#NAPARS
Good Friday
#COYS
Spurs
Easter
#COYG
#EasterWeekend
#MCITOT
Happy Easter
#Easter
Happy Easter
Spurs
HTFP
Napoli
#NAPARS
#ClimateCatastrophe
#CHESLA
Napoli
Easter
Spurs
Alex McLeish
Napoli
#NAPARS
#ClimateCatastrophe
#CHESLA
Napoli
#NAPARS
#ClimateCatastrophe
#CHESLA
#COYS
Liverpool
Easter
Napoli
Easter
Barca
Ajax
Napoli
Napoli
#NAPARS
#ClimateCatastrophe
#CHESLA
Easter
Na

KeyError: 0

In [61]:
# Function to control the number of calls to Reddit API; sleep when limit is exceeded
def redditSearch(reddit_headers, elastic_headers, elastic_index, ratelimit_calls, ratelimit_reset, trends):
    for query in set(trends):
        if (int(float(ratelimit_calls)) > 0):
            #print('Before call')
            #print(ratelimit_calls, ratelimit_reset)
            ratelimit_calls, ratelimit_reset = redditQuery(reddit_headers, elastic_headers, elastic_index, ratelimit_calls, ratelimit_reset, query)  
            #print('After call')
            #print(ratelimit_calls, ratelimit_reset)
        else:
            print('Sleep')
            time.sleep(int(ratelimit_reset))
            ratelimit_calls, ratelimit_reset = redditQuery(reddit_headers, elastic_headers, elastic_index, ratelimit_calls, ratelimit_reset, query)

In [114]:
# Original function for publishing directly to Elasticsearch
def redditQuery(reddit_headers,elastic_headers,elastic_index, ratelimit_calls, ratelimit_reset, query):
    r_get = requests.get('https://oauth.reddit.com/r/all/search?q='+query+'&t=day&limit=25', headers = reddit_headers)
    posts = r_get.json()['data']['children']
    
    data = []
    fields_to_keep = ['id','title', 'author', 'score', 'subreddit', 'num_comments', 'created_utc', 'url']
    for post in posts:
        temp = {k: post['data'][k] for k in fields_to_keep}
        temp.update({'hashtag': query, 'source': 'reddit'})
        temp['post_date'] = datetime.isoformat(datetime.utcfromtimestamp(temp['created_utc']))
        temp['upvotes'] = temp['score']
        data.append({'index': {"_index": elastic_index, '_type': '_doc', '_id': 'r_'+ temp['id'] }})
        url_split = temp['url'].split('.')[-1]
        if (url_split != "jpg" and url_split != "png"):
            del temp['url']
        del temp['id']
        del temp['created_utc']
        del temp['score']
        data.append(temp)

    if data:
        data_to_post = '\n'.join(json.dumps(d) for d in data)
        print(data_to_post)
        r_post = requests.post('http://34.73.60.209:9200/'+elastic_index+'/_bulk', headers = elastic_headers, data=data_to_post+'\n')
        print(r_post.text)
    
    print(r_get.headers['x-ratelimit-remaining'], r_get.headers['x-ratelimit-reset'])
    
    return r_get.headers['x-ratelimit-remaining'], r_get.headers['x-ratelimit-reset']

In [47]:
# Revised function for publishing to Kakfa-Spark-Elasticsearch pipeline
def redditQuery(reddit_headers,elastic_headers,elastic_index, ratelimit_calls, ratelimit_reset, query, producer, topic):
    r_get = requests.get('https://oauth.reddit.com/r/all/search?q='+query+'&t=day&limit=25', headers = reddit_headers)
    posts = r_get.json()['data']['children']
    
    fields_to_keep = ['id','title', 'author', 'score', 'subreddit', 'num_comments', 'created_utc', 'url']
    for post in posts:
        
        data = {k: post['data'][k] for k in fields_to_keep}
        time = datetime.isoformat(datetime.utcfromtimestamp(data['created_utc']))
        data.update({'hashtag': query, 'source': 'reddit', 'id': 'r_'+ data['id'], 'upvotes': data['score'], 'post_date': time })    
        url_split = data['url'].split('.')[-1]
        if (url_split != "jpg" and url_split != "png"):
            del data['url']
            
        del data['created_utc']
        del data['score']
        
        producer.send(topic, value=data)

    return r_get.headers['x-ratelimit-remaining'], r_get.headers['x-ratelimit-reset']

producer = KafkaProducer(bootstrap_servers=['35.232.117.118:9092'], 
                        value_serializer=lambda x: dumps(x).encode('utf-8'),
                        api_version = (0,10))
topic = 'trending'
redditQuery(reddit_headers, elastic_headers, elastic_index, ratelimit_calls, ratelimit_reset, query, producer, topic)
    

('598.0', '172')

In [46]:
# Testing Enviroment for Rate Limits:
reddit_headers = getRedditHeaders()
elastic_headers = {'Content-Type':'application/json'}
elastic_index = 'hi_yash2'

r_get = requests.get('https://oauth.reddit.com', headers = reddit_headers)
ratelimit_calls =  r_get.headers['x-ratelimit-remaining']
ratelimit_reset = r_get.headers['x-ratelimit-reset']

redditSearch(reddit_headers, elastic_headers, elastic_index, ratelimit_calls, ratelimit_reset, topics)

In [None]:
# This is a working example of bulk uploads to Elasticsearch: make sure the indexes match
data = [
    { "index" : { "_index" : "test5", "_type" : "type1", "_id" : "1" } },
    { "field1" : "value1" },
    { "delete" : { "_index" : "test5", "_type" : "type1", "_id" : "2" }, },
    { "create" : { "_index" : "test5", "_type" : "type1", "_id" : "3" }, },
    { "field1" : "value3" },
    { "update" : {"_id" : "1", "_type" : "type1", "_index" : "test5"} },
    { "doc" : {"field2" : "value2"} }
]

data_to_post = '\n'.join(json.dumps(d) for d in data)
print(data_to_post)
r = requests.post('http://34.73.60.209:9200/test5/_bulk', headers = headers, data=data_to_post + '\n')
print(r.text)

In [9]:
# This is a working example of a single record uplaod to Elasticsearch:
requests.post('http://34.73.60.209:9200/hi_yash/_doc/',  json = {"user" : "mchifala",
  "post_date" : "2019-03-10T15:41:12",
  "message" : "Hi Yash"})


datetime.datetime(2019, 3, 10, 21, 19, 8)