In [1]:
import requests
import os
import json
import pandas as pd
from pymongo import MongoClient
import dns
from pyspark import SparkContext
import postgre_queries as pg
from pyspark.sql import SQLContext
import findspark
findspark.init()
import datetime
findspark.find()
import re
from pyspark.sql import SparkSession
import nltk
from nltk.corpus import stopwords
from collections import defaultdict

[NbConvertApp] Converting notebook postgre_queries.ipynb to script
[NbConvertApp] Writing 4458 bytes to postgre_queries.py


In [2]:
#Set the bearer token environment variable as the token obtained from Twitter Developer API
os.environ['BEARER_TOKEN'] = 'AAAAAAAAAAAAAAAAAAAAAP8HXQEAAAAAKhiRO6HAg4IkfvqWZaqJs%2F2VMng%3Dy5pWOO4eC8BMAIDvgJSNt3ILsOFGitzJHsxzt9vTuEcRBvzrZ0'

In [3]:
#Get information about the environment variable for API bearer token
def auth():
    return os.getenv('BEARER_TOKEN')

In [4]:
#Create 
def create_headers(bearer_token):
    headers = {"Authorization": "Bearer {}".format(bearer_token)}
    return headers

In [5]:
def create_url(keyword, max_results):
    
    #Change to the endpoint you want to collect data from
    search_url = "https://api.twitter.com/2/tweets/search/recent" 

    #Change params based on the endpoint being used
    query_params = {'query': keyword,
                    'max_results': max_results,
                    'tweet.fields': 'created_at',
                    'expansions': 'author_id'
                    }
    return (search_url, query_params)

In [6]:
def connect_to_endpoint(url, headers, params, next_token = None):
    params['next_token'] = next_token   #params object received from create_url function
    response = requests.request("GET", url, headers = headers, params = params)
    #print(type(response))
    #print("Endpoint Response Code: " + str(response.status_code))
    if response.status_code != 200:
        raise Exception(response.status_code, response.text)
    return response.json()

In [7]:
def read_txt_file(filename):
    file_content = []
    with open(filename, "r") as file:
        for f in file:
            file_content.append(f.strip())
    file.close()
    return file_content

In [8]:
countries = read_txt_file("countries.txt")
n = len(countries)

In [9]:
def get_recent_tweets(keyword):
    bearer_token = auth()
    headers = create_headers(bearer_token)
    max_results = 10
    url = create_url(keyword, max_results)
    json_response = connect_to_endpoint(url[0], headers, url[1])
    return json_response

In [10]:
def augment_country(json_response, country):
    for data in json_response['data']:
        data['country'] = country
    return json_response

In [11]:
response_data = []
operators = "has:hashtags -is:retweet is:verified lang:en"
for i in range(5):
    country = countries[i%n]
    keyword = country + " " + operators
    json_response = get_recent_tweets(keyword)
    json_response = augment_country(json_response, country)
    response_data.extend(json_response['data'])

In [12]:
df = pd.DataFrame(response_data)
df = df.reindex(columns = ['id', 'author_id', 'text', 'created_at', 'country'])
#df.to_csv('data.csv')

In [13]:
'''
- Load data into postgre tables
- extract tweets for given date through spark sql
- run spark commands to obtain tokens and hashtags, store that in MongoDB by date
- setup airflow scheduler to load data from API and run all these above procedures sequentially
- Create a dashboard to get the most recent tokens
- Integrate Google news into this
'''

'\n- Load data into postgre tables\n- extract tweets for given date through spark sql\n- run spark commands to obtain tokens and hashtags, store that in MongoDB by date\n- setup airflow scheduler to load data from API and run all these above procedures sequentially\n- Create a dashboard to get the most recent tokens\n- Integrate Google news into this\n'

In [14]:
df.head(20)

Unnamed: 0,id,author_id,text,created_at,country
0,1533165953349038080,947147264,1st ODI: Afghanistan thrash Zimbabwe by 60 run...,2022-06-04T19:17:02.000Z,Afghanistan
1,1533161843929260032,294155637,#Afghanistan Exports to #Pakistan Mark Histori...,2022-06-04T19:00:43.000Z,Afghanistan
2,1533157440774094848,354365376,ACB chairman @MirwaisAshraf16 speaking to @bbc...,2022-06-04T18:43:13.000Z,Afghanistan
3,1533156894042312704,947147264,1st ODI: Afghanistan thrash Zimbabwe by 60 run...,2022-06-04T18:41:02.000Z,Afghanistan
4,1533151391669272576,29183527,Ongoing disaster on many levels in #Afghanista...,2022-06-04T18:19:11.000Z,Afghanistan
5,1533151024705417216,168259012,Blinken Concerned With Religious Freedom in Af...,2022-06-04T18:17:43.000Z,Afghanistan
6,1533149612068192256,3245129953,In today's matches:\nAfghanistan set a target ...,2022-06-04T18:12:06.000Z,Afghanistan
7,1533148971354312704,17028306,“It seemed to have been written into history t...,2022-06-04T18:09:33.000Z,Afghanistan
8,1533148328577441793,1340093874,Afghanistan and Cameroon have been added to th...,2022-06-04T18:07:00.000Z,Afghanistan
9,1533148159048052736,828841083257778176,ZIM vs AFG: Afghanistan Go Above India in Cric...,2022-06-04T18:06:20.000Z,Afghanistan


In [15]:
#sc.stop()
sc = SparkContext()
sqlContext = SQLContext(sc)
tweets_rdd = sqlContext.createDataFrame(df).rdd



In [16]:
print(tweets_rdd.take(5))

[Row(id='1533165953349038080', author_id='947147264', text='1st ODI: Afghanistan thrash Zimbabwe by 60 runs, take 1-0 lead https://t.co/G0E6pBv0Oz #Acer #Afghanistan #BCCI #Chase #ICC', created_at='2022-06-04T19:17:02.000Z', country='Afghanistan'), Row(id='1533161843929260032', author_id='294155637', text='#Afghanistan Exports to #Pakistan Mark Historic Rise https://t.co/LBDl5MMh57', created_at='2022-06-04T19:00:43.000Z', country='Afghanistan'), Row(id='1533157440774094848', author_id='354365376', text='ACB chairman @MirwaisAshraf16 speaking to @bbcpashto TV on #Afghanistan #Cricket @ACBofficials \n\nhttps://t.co/dbcyJguD7a', created_at='2022-06-04T18:43:13.000Z', country='Afghanistan'), Row(id='1533156894042312704', author_id='947147264', text='1st ODI: Afghanistan thrash Zimbabwe by 60 runs, take 1-0 lead https://t.co/OdyFCkWz6p #Acer #Afghanistan #BCCI #Chase #ICC Read Full  👇', created_at='2022-06-04T18:41:02.000Z', country='Afghanistan'), Row(id='1533151391669272576', author_id='2

In [17]:
def get_date(utc_timestamp):
    '''
    date = 'to_date("' + utc_timestamp.split('T')[0] + '", "YYYY-MM-DD")'
    #print(date)
    return date
    '''
    return utc_timestamp.split('T')[0]

In [18]:
def process_text(text):
    processed_text = re.sub("['\\$]", "", text)
    return processed_text

In [19]:
tweet_tuples = tweets_rdd.map(lambda x: (x[0], x[1], process_text(x[2]), get_date(x[3]), x[4])).collect()

In [20]:
tweet_tuples = tuple(tweet_tuples)

In [21]:
print(tweet_tuples)

(('1533165953349038080', '947147264', '1st ODI: Afghanistan thrash Zimbabwe by 60 runs, take 1-0 lead https://t.co/G0E6pBv0Oz #Acer #Afghanistan #BCCI #Chase #ICC', '2022-06-04', 'Afghanistan'), ('1533161843929260032', '294155637', '#Afghanistan Exports to #Pakistan Mark Historic Rise https://t.co/LBDl5MMh57', '2022-06-04', 'Afghanistan'), ('1533157440774094848', '354365376', 'ACB chairman @MirwaisAshraf16 speaking to @bbcpashto TV on #Afghanistan #Cricket @ACBofficials \n\nhttps://t.co/dbcyJguD7a', '2022-06-04', 'Afghanistan'), ('1533156894042312704', '947147264', '1st ODI: Afghanistan thrash Zimbabwe by 60 runs, take 1-0 lead https://t.co/OdyFCkWz6p #Acer #Afghanistan #BCCI #Chase #ICC Read Full  👇', '2022-06-04', 'Afghanistan'), ('1533151391669272576', '29183527', 'Ongoing disaster on many levels in #Afghanistan \n\nhttps://t.co/SOG0dxgbQP', '2022-06-04', 'Afghanistan'), ('1533151024705417216', '168259012', 'Blinken Concerned With Religious Freedom in Afghanistan\n#TOLOnews \n\nhttp

In [23]:
conn, cursor = pg.connect_to_postgre()
pg.drop_tables(cursor)
pg.create_tables(cursor)
pg.insert_records(cursor, "tweets_staging", tweet_tuples)
#pg.insert_records(cursor, "tweets_staging", tweet_tuples[1])

Tables dropped successfully
Tables created successfully
Records inserted successfully


In [24]:
records = pg.get_records(cursor, "tweets_staging", "tweet_date >= to_date('2022-05-22', 'YYYY-MM-DD')")
print(records)

Records fetched successfully
[('1533165953349038080', '947147264', '1st ODI: Afghanistan thrash Zimbabwe by 60 runs, take 1-0 lead https://t.co/G0E6pBv0Oz #Acer #Afghanistan #BCCI #Chase #ICC', datetime.date(2022, 6, 4), 'Afghanistan'), ('1533161843929260032', '294155637', '#Afghanistan Exports to #Pakistan Mark Historic Rise https://t.co/LBDl5MMh57', datetime.date(2022, 6, 4), 'Afghanistan'), ('1533157440774094848', '354365376', 'ACB chairman @MirwaisAshraf16 speaking to @bbcpashto TV on #Afghanistan #Cricket @ACBofficials \\n\\nhttps://t.co/dbcyJguD7a', datetime.date(2022, 6, 4), 'Afghanistan'), ('1533156894042312704', '947147264', '1st ODI: Afghanistan thrash Zimbabwe by 60 runs, take 1-0 lead https://t.co/OdyFCkWz6p #Acer #Afghanistan #BCCI #Chase #ICC Read Full  👇', datetime.date(2022, 6, 4), 'Afghanistan'), ('1533151391669272576', '29183527', 'Ongoing disaster on many levels in #Afghanistan \\n\\nhttps://t.co/SOG0dxgbQP', datetime.date(2022, 6, 4), 'Afghanistan'), ('1533151024705

In [25]:
len(records)

50

In [26]:
rdd = sc.parallelize(records)

In [27]:
print(rdd.collect())

[('1533165953349038080', '947147264', '1st ODI: Afghanistan thrash Zimbabwe by 60 runs, take 1-0 lead https://t.co/G0E6pBv0Oz #Acer #Afghanistan #BCCI #Chase #ICC', datetime.date(2022, 6, 4), 'Afghanistan'), ('1533161843929260032', '294155637', '#Afghanistan Exports to #Pakistan Mark Historic Rise https://t.co/LBDl5MMh57', datetime.date(2022, 6, 4), 'Afghanistan'), ('1533157440774094848', '354365376', 'ACB chairman @MirwaisAshraf16 speaking to @bbcpashto TV on #Afghanistan #Cricket @ACBofficials \\n\\nhttps://t.co/dbcyJguD7a', datetime.date(2022, 6, 4), 'Afghanistan'), ('1533156894042312704', '947147264', '1st ODI: Afghanistan thrash Zimbabwe by 60 runs, take 1-0 lead https://t.co/OdyFCkWz6p #Acer #Afghanistan #BCCI #Chase #ICC Read Full  👇', datetime.date(2022, 6, 4), 'Afghanistan'), ('1533151391669272576', '29183527', 'Ongoing disaster on many levels in #Afghanistan \\n\\nhttps://t.co/SOG0dxgbQP', datetime.date(2022, 6, 4), 'Afghanistan'), ('1533151024705417216', '168259012', 'Blinke

In [28]:
def get_tokens(tweet):
    tokens = tweet.split(" ")
    return tokens

In [29]:
def get_date(date):
    return str(date)

In [30]:
def get_sorted_token_list(word_counts):
    print("haha")
    return sorted(word_counts, key=lambda x: (-x[1], x[0]))

In [31]:
country_date_word_counts_rdd = rdd.map(lambda x: ((x[4], get_date(x[3])), list(get_tokens(x[2])))).flatMapValues(lambda x: x)\
          .map(lambda x: ((x[0][0], x[0][1], x[1]), 1)).reduceByKey(lambda x, y: x + y).filter(lambda x: x[1] > 5).cache()

In [32]:
sorted_word_counts_rdd = country_date_word_counts_rdd.map(lambda x: ((x[0][0], x[0][1]), (x[0][2], x[1])))\
                        .groupByKey().mapValues(list).map(lambda x: (x[0], get_sorted_token_list(x[1])))

In [33]:
print(sorted_word_counts_rdd.take(5))

[(('Australia', '2022-06-04'), [('the', 10), ('and', 8)]), (('China', '2022-06-04'), [('the', 14), ('in', 9), ('to', 8), ('and', 6)]), (('India', '2022-06-04'), [('the', 10), ('in', 9), ('a', 7), ('of', 6)]), (('Canada', '2022-06-04'), [('the', 14), ('and', 11), ('in', 10), ('of', 10), ('Canada', 9), ('to', 9), ('my', 6)]), (('Afghanistan', '2022-06-04'), [('#Afghanistan', 7)])]


In [35]:
def transform_data(tuples):
    input_data = []
    for t in tuples:
        records = {}
        records['country'] = t[0][0]
        records['date'] = t[0][1]
        records['tokens'] = t[1]
        input_data.append(records)
    return input_data

In [36]:
data = transform_data(tuples)
data

[{'country': 'Australia',
  'date': '2022-06-04',
  'tokens': [('the', 10), ('and', 8)]},
 {'country': 'China',
  'date': '2022-06-04',
  'tokens': [('the', 14), ('in', 9), ('to', 8), ('and', 6)]},
 {'country': 'India',
  'date': '2022-06-04',
  'tokens': [('the', 10), ('in', 9), ('a', 7), ('of', 6)]},
 {'country': 'Canada',
  'date': '2022-06-04',
  'tokens': [('the', 14),
   ('and', 11),
   ('in', 10),
   ('of', 10),
   ('Canada', 9),
   ('to', 9),
   ('my', 6)]},
 {'country': 'Afghanistan',
  'date': '2022-06-04',
  'tokens': [('#Afghanistan', 7)]}]

In [37]:
mongo_url = "mongodb+srv://admin:admin@cluster0.ydaey.mongodb.net/TwitterAPI?retryWrites=true&w=majority"
'''
try:
    client = MongoClient(mongo_url)
    print("Connected successfully!")
except:  
    print("Could not connect to MongoDB")
'''  
client = MongoClient(mongo_url)
db = client.TwitterAPI

In [38]:
db.token_counts.insert_many(data)

<pymongo.results.InsertManyResult at 0x264389e4bc8>

In [None]:
#db.token_counts.find({})

In [None]:
!jupyter nbconvert --to script First_pipeline.ipynb