### Import dependencies

In [1]:
import csv
import tweepy
import pywren
from dotenv import load_dotenv
import os
import time

### Set up twitter API with tweepy

In [2]:
load_dotenv()
access_token = os.getenv('ACCESS_TOKEN')
access_token_secret = os.getenv('ACCESS_TOKEN_SECRET')
consumer_key=os.getenv('CONSUMER_KEY')
consumer_secret=os.getenv('CONSUMER_SECRET')

In [3]:
#from tweepy import API
auth = tweepy.OAuthHandler(consumer_key=consumer_key, 
                           consumer_secret=consumer_secret)

auth.set_access_token(access_token, access_token_secret)
api=tweepy.API(auth, wait_on_rate_limit=True)

### Filter the fields needed for network analysis
The fields we need are user_id, user_screen_name, retweeted_user_screen_name, and retweeted_user_id. I later found out that I only need `user_screen_name` and `retweeted_user_screen_name` for network analysis, but here I used the original code. I first filtered only the tweets that contain retweet status. I then created two lists to store the nodes and edges information separately for ease to deal with PySpark Graphframe.

Note that in the code I used 200 as the max number of tweets to be returned by running api.search. This can easily be scaled up by changing the `max_tweets`.

In [4]:
#define a function that returns the nodes and edges 
def misInfoScraper(query):
    max_tweets=200
    all_status=[]
    time.sleep(0.2) # not to make process request too frequently
    for status in tweepy.Cursor(api.search, q=query,lang="en", tweet_mode='extended').items(max_tweets):
        all_status.append(status)
    #create a dictionary to store `user_id` `user_screen_name` `retweeted_user_screen_name` `retweeted_user_name`
    nodes=[]
    edges=[]
    for tweet in all_status: 
        # only scrape those that have been retweeted since we are interested in retweet networks
        if hasattr(tweet, "retweeted_status"):
            source_node = {}
            dest_node={}  
            edge={}
            edge['source']=tweet.user.screen_name
            edge['dest']=tweet.retweeted_status.user.screen_name
            source_node['id']=tweet.user.screen_name
            source_node['type']='user_name'
            dest_node['id']=tweet.retweeted_status.user.screen_name
            dest_node['type']='retweeted_user_name'
            nodes.append(source_node)
            nodes.append(dest_node)
            edges.append(edge)
    return (nodes, edges)


Here I included 8 queries to identify misinformation after eyeballing tweets for misinformation, but this list can easily be extended to include more queries. 

In [5]:
#queries
query1= 'coronavirus lie'
query2='covid lie'
query3='corona lie'
query4='not 5G corona'
query5='no 5G corona'
query6='fact 5G corona'
query7='truth 5G corona'
query8='5G responsible corona'
search_query=[query1, query2,query3,query4,query5,query6,query7,query8]


In [6]:
%%time
# Set up a pywren executor and map the misInfoScraper function across all queries
pwex = pywren.default_executor()
futures = pwex.map(misInfoScraper, search_query)
results=[]
for f in futures:
    results.append(f.result())
    

CPU times: user 2.05 s, sys: 396 ms, total: 2.45 s
Wall time: 25.8 s


In [7]:
#check results[0] should be a tuple containing ([nodes],[edges])
results[0]

([{'id': 'smaddiebird', 'type': 'user_name'},
  {'id': 'MadisonSedona', 'type': 'retweeted_user_name'},
  {'id': 'PhydeauxL', 'type': 'user_name'},
  {'id': 'RepAndyBiggsAZ', 'type': 'retweeted_user_name'},
  {'id': 'connect2damon', 'type': 'user_name'},
  {'id': 'RepAndyBiggsAZ', 'type': 'retweeted_user_name'},
  {'id': 'nwokolo_leo', 'type': 'user_name'},
  {'id': 'mazinwakamma', 'type': 'retweeted_user_name'},
  {'id': 'AZPatriot16', 'type': 'user_name'},
  {'id': 'RepAndyBiggsAZ', 'type': 'retweeted_user_name'},
  {'id': 'superyayadize', 'type': 'user_name'},
  {'id': 'MadisonSedona', 'type': 'retweeted_user_name'},
  {'id': 'AZLD17GOP', 'type': 'user_name'},
  {'id': 'RepAndyBiggsAZ', 'type': 'retweeted_user_name'},
  {'id': 'Bernadettea777', 'type': 'user_name'},
  {'id': 'RepAndyBiggsAZ', 'type': 'retweeted_user_name'},
  {'id': 'drizzle2', 'type': 'user_name'},
  {'id': 'RepAndyBiggsAZ', 'type': 'retweeted_user_name'},
  {'id': 'DonaldEBain1', 'type': 'user_name'},
  {'id': 'Re

In [8]:
#check the results 
len(results)

8

In [9]:
# combine results into two lists 
all_nodes = []
all_edges=[]
for i in range(0,8):
    all_nodes.extend(results[i][0])
    all_edges.extend(results[i][1])
    

In [10]:
all_nodes[0]

{'id': 'smaddiebird', 'type': 'user_name'}

In [11]:
all_nodes[0].keys()

dict_keys(['id', 'type'])

In [12]:
len(all_nodes) #lengh of all_nodes = 2*len(all_edges)

1438

In [13]:
# write all_nodes into local csv
nodestoCSV=all_nodes
keys=nodestoCSV[0].keys()
with open('tweet_nodes_2.csv','w') as output_nodes:
    dict_writer= csv.DictWriter(output_nodes, keys)
    dict_writer.writeheader()
    dict_writer.writerows(nodestoCSV)



In [14]:
all_edges[0].keys()

dict_keys(['source', 'dest'])

In [15]:
#check length
len(all_edges)

719

In [16]:
# write all_edges into csv
edgestoCSV=all_edges
keys2=edgestoCSV[0].keys()
with open('tweet_edges_2.csv','w') as output_edges:
    dict_writer= csv.DictWriter(output_edges, keys2)
    dict_writer.writeheader()
    dict_writer.writerows(edgestoCSV)

### Upload CSV files to S3

In [17]:
import boto3
session = boto3.Session()
# create boto3 s3 client
s3=boto3.client('s3', 
                 region_name='us-east-1')


In [None]:
# create s3 bucket (no need to run since this bucket has already been created previously)
#bucket = s3.create_bucket(Bucket='finalproject2020') 

In [18]:
bucket_response=s3.list_buckets()

In [3]:
# check existing buckets
bucket_response

{'ResponseMetadata': {'RequestId': '0E2B6ECB8105CA12',
  'HostId': 'lvQ9e7AORqakKiuCXKLru9dpcBDs7X3neN+S5IDq9WKx6Tue9efU/pw7IzhoN2ZcRQkW0hbnvsw=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'lvQ9e7AORqakKiuCXKLru9dpcBDs7X3neN+S5IDq9WKx6Tue9efU/pw7IzhoN2ZcRQkW0hbnvsw=',
   'x-amz-request-id': '0E2B6ECB8105CA12',
   'date': 'Sun, 14 Jun 2020 00:01:08 GMT',
   'content-type': 'application/xml',
   'transfer-encoding': 'chunked',
   'server': 'AmazonS3'},
  'RetryAttempts': 0},
 'Buckets': [{'Name': 'aws-emr-resources-436654078546-us-east-1',
   'CreationDate': datetime.datetime(2020, 5, 25, 15, 11, 28, tzinfo=tzutc())},
  {'Name': 'aws-logs-436654078546-us-east-1',
   'CreationDate': datetime.datetime(2020, 5, 16, 1, 33, 32, tzinfo=tzutc())},
  {'Name': 'finalproject2020',
   'CreationDate': datetime.datetime(2020, 6, 10, 19, 3, 16, tzinfo=tzutc())},
  {'Name': 'mrjob-8f80937eb31450f1',
   'CreationDate': datetime.datetime(2020, 5, 16, 2, 4, 31, tzinfo=tzutc())},
  {'Name': 

In [19]:
# upload our files and make them publicly available
s3.upload_file(Bucket='finalproject2020',Filename='tweet_nodes_2.csv',Key='tweet_nodes_2.csv',
              ExtraArgs={'ACL':'public-read'})
s3.upload_file(Bucket='finalproject2020',Filename='tweet_edges_2.csv',Key='tweet_edges_2.csv',
              ExtraArgs={'ACL':'public-read'})