In [51]:
import random
import os
import glob
import logging
import pickle
import pandas as pd
import numpy as np
from datetime import datetime
import snscrape.modules.twitter as sntwitter
import itertools
from time import sleep
import pyarrow as pa
import pyarrow.parquet as pq
from joblib import Parallel, delayed

In [52]:
def get_conversations():
    if os.path.isfile('./data/raw/_tmp_conversations.pickle'):
        return pd.read_pickle('./data/raw/_tmp_conversations.pickle')
    else:
        return pd.DataFrame({'conversationId': pd.Series([], dtype='int')})

def get_users():
    if os.path.isfile('./data/raw/_tmp_users.pickle'):
        return pd.read_pickle('./data/raw/_tmp_users.pickle')
    else:
        return pd.DataFrame({'userId': pd.Series([], dtype='int')})

def get_page_covid_content(search):

    # Get temporary dataframe from collect
    df = pd.DataFrame(itertools.islice(sntwitter.TwitterSearchScraper(search).get_items(), None))
    if df.shape[0] == 0:
        return pd.DataFrame(), pd.DataFrame()
    df = df.rename(columns={'id':'tweetId'})
    users_df = pd.DataFrame(list(df.user.values))
    users_df = users_df.rename(columns={'id':'userId'})
    df['userId'] = users_df['userId']
    users_df = users_df.drop_duplicates('userId')
    users_df = users_df.drop('descriptionUrls', axis=1)
    df['longitude'] = df.coordinates.apply(lambda x: x['longitude'] if(np.all(pd.notnull(x))) else x)
    df['latitude'] = df.coordinates.apply(lambda x: x['latitude'] if(np.all(pd.notnull(x))) else x)
    df = df[[
        'tweetId', 'conversationId', 'userId', 'date', 'content', 'lang', 'sourceLabel', 
        'replyCount', 'retweetCount', 'likeCount', 'quoteCount', 'longitude', 'latitude', 'place'
    ]]

    return df, users_df

def check_page(page):
    return os.path.exists('./data/raw/_tweets_tmp' + page + '.parquet')

def append_to_parquet_table(dataframe, schema, filepath=None, writer=None):
    table = pa.Table.from_pandas(dataframe, schema=schema, preserve_index=False)
    if writer is None:
        writer = pq.ParquetWriter(filepath, schema, compression='gzip')
    writer.write_table(table=table)
    return writer

def get_tweets_fields():
    return [
        pa.field('tweetId', pa.int64()),
        pa.field('conversationId', pa.int64()),
        pa.field('userId', pa.int64()),
        pa.field('date', pa.timestamp('ns', tz='UTC')),
        pa.field('content', pa.string()),
        pa.field('lang', pa.string()),
        pa.field('sourceLabel', pa.string()),
        pa.field('replyCount', pa.int32()),
        pa.field('retweetCount', pa.int32()),
        pa.field('likeCount', pa.int32()),
        pa.field('quoteCount', pa.int32()),
        pa.field('longitude', pa.float32()),
        pa.field('latitude', pa.float32()),
        pa.field('place', pa.string())
    ]

def get_users_fields():
    return [
        pa.field('username', pa.string()),
        pa.field('displayname', pa.string()),
        pa.field('userId', pa.int64()),
        pa.field('description', pa.string()),
        pa.field('rawDescription', pa.string()),
        pa.field('verified', pa.bool_()),
        pa.field('created', pa.timestamp('ns', tz='UTC')),
        pa.field('followersCount', pa.int32()),
        pa.field('friendsCount', pa.int32()),
        pa.field('statusesCount', pa.int32()),
        pa.field('favouritesCount', pa.int32()),
        pa.field('listedCount', pa.int32()),
        pa.field('mediaCount', pa.int32()),
        pa.field('location', pa.string()),
        pa.field('protected', pa.bool_()),
        pa.field('linkUrl', pa.string()),
        pa.field('linkTcourl', pa.string()),
        pa.field('profileImageUrl', pa.string()),
        pa.field('profileBannerUrl', pa.string())
    ]

def split(a, n):
    k, m = divmod(len(a), n)
    return (a[i*k+min(i, m):(i+1)*k+min(i+1, m)] for i in range(n))

def simpleton_scrapper(batch):
    inner_break = False
    try:
        #sleep(random.randint(1,5))
        search = "conversation_id:" + " OR conversation_id:".join( [str(s) for s in batch]) + " lang:en"
        comments, users = get_page_covid_content(search)
        comments.to_parquet('./data/raw/tmp/comments/skynews/c_' + str(batch[0]) + '.parquet', index=False)
        users = users.drop_duplicates('userId')
        users.to_parquet('./data/raw/tmp/users/u_' + str(batch[0]) + '.parquet', index=False)
    except KeyboardInterrupt:
        inner_break = True
        print("Shutdown requested...exiting")
    except Exception:
        pass
        print('Problem with batch', batch[0])
    return inner_break

def wrapper_scrapper(batches):
    for batch in batches:
        inner_break = simpleton_scrapper(batch)
        if inner_break:
            break
    print('Finish scrapping batches', batches[0][0])

In [53]:
# Configurations
tweets_schema = pa.schema(get_tweets_fields())
users_schema = pa.schema(get_users_fields())

In [54]:
# Read 
df = pd.read_parquet('./../data/raw/tmp/news_tweets_skynews.parquet')
df = df[df.replyCount>0]

In [55]:
conversations = list(df.conversationId.unique())
random.shuffle(conversations)
search_groups = list(split(conversations, int(len(conversations)/12)))
pickle.dump(search_groups, open('./../data/external/search_groups6.sav', 'wb'))

In [12]:
search_groups[0]

[1372935842654736387,
 1243175145755357186,
 1311791626168291330,
 1336467265949229057,
 1382328427655659524,
 1309240770973634564,
 1249624736923082752,
 1233679505828524035,
 1248186716848087046,
 1276067020916867072,
 1284871802607501314,
 1265590572116594691,
 1262155721388498945]

In [13]:
downloaded = [int(x[26:-8]) for x in glob.glob('./data/raw/tmp/comments/skynews/*.parquet')]

In [15]:
search_groups = [batch for batch in search_groups if batch[0] not in downloaded]

In [16]:
search_batches = list(split(search_groups, int(len(search_groups)/64)))

In [19]:
batches = search_batches[0]

In [56]:
batch = search_groups[0]

In [57]:
search = "conversation_id:" + " OR conversation_id:".join( [str(s) for s in batch]) + " lang:en"

In [59]:
search = "conversation_id:1387905838241697797"

In [33]:
search

'conversation_id:1372935842654736387'

In [37]:
pd.options.display.max_columns = 100

In [27]:
comments, users = get_page_covid_content(search)

Error retrieving https://api.twitter.com/2/search/adaptive.json?include_profile_interstitial_type=1&include_blocking=1&include_blocked_by=1&include_followed_by=1&include_want_retweets=1&include_mute_edge=1&include_can_dm=1&include_can_media_tag=1&skip_status=1&cards_platform=Web-12&include_cards=1&include_ext_alt_text=true&include_quote_count=true&include_reply_count=1&tweet_mode=extended&include_entities=true&include_user_entities=true&include_ext_media_color=true&include_ext_media_availability=true&send_error_codes=true&simple_quoted_tweets=true&q=conversation_id%3A1372935842654736387+OR+conversation_id%3A1243175145755357186+OR+conversation_id%3A1311791626168291330+OR+conversation_id%3A1336467265949229057+OR+conversation_id%3A1382328427655659524+OR+conversation_id%3A1309240770973634564+OR+conversation_id%3A1249624736923082752+OR+conversation_id%3A1233679505828524035+OR+conversation_id%3A1248186716848087046+OR+conversation_id%3A1276067020916867072+OR+conversation_id%3A1284871802607501

ScraperException: 4 requests to https://api.twitter.com/2/search/adaptive.json?include_profile_interstitial_type=1&include_blocking=1&include_blocked_by=1&include_followed_by=1&include_want_retweets=1&include_mute_edge=1&include_can_dm=1&include_can_media_tag=1&skip_status=1&cards_platform=Web-12&include_cards=1&include_ext_alt_text=true&include_quote_count=true&include_reply_count=1&tweet_mode=extended&include_entities=true&include_user_entities=true&include_ext_media_color=true&include_ext_media_availability=true&send_error_codes=true&simple_quoted_tweets=true&q=conversation_id%3A1372935842654736387+OR+conversation_id%3A1243175145755357186+OR+conversation_id%3A1311791626168291330+OR+conversation_id%3A1336467265949229057+OR+conversation_id%3A1382328427655659524+OR+conversation_id%3A1309240770973634564+OR+conversation_id%3A1249624736923082752+OR+conversation_id%3A1233679505828524035+OR+conversation_id%3A1248186716848087046+OR+conversation_id%3A1276067020916867072+OR+conversation_id%3A1284871802607501314+OR+conversation_id%3A1265590572116594691+OR+conversation_id%3A1262155721388498945+lang%3Aen&tweet_search_mode=live&count=100&query_source=spelling_expansion_revert_click&pc=1&spelling_corrections=1&ext=mediaStats%2ChighlightedLabel failed, giving up.

In [60]:
df = pd.DataFrame(itertools.islice(sntwitter.TwitterSearchScraper(search).get_items(), None))

Error retrieving https://api.twitter.com/2/search/adaptive.json?include_profile_interstitial_type=1&include_blocking=1&include_blocked_by=1&include_followed_by=1&include_want_retweets=1&include_mute_edge=1&include_can_dm=1&include_can_media_tag=1&skip_status=1&cards_platform=Web-12&include_cards=1&include_ext_alt_text=true&include_quote_count=true&include_reply_count=1&tweet_mode=extended&include_entities=true&include_user_entities=true&include_ext_media_color=true&include_ext_media_availability=true&send_error_codes=true&simple_quoted_tweets=true&q=conversation_id%3A1387905838241697797&tweet_search_mode=live&count=100&query_source=spelling_expansion_revert_click&pc=1&spelling_corrections=1&ext=mediaStats%2ChighlightedLabel: non-200 status code
4 requests to https://api.twitter.com/2/search/adaptive.json?include_profile_interstitial_type=1&include_blocking=1&include_blocked_by=1&include_followed_by=1&include_want_retweets=1&include_mute_edge=1&include_can_dm=1&include_can_media_tag=1&s

ScraperException: 4 requests to https://api.twitter.com/2/search/adaptive.json?include_profile_interstitial_type=1&include_blocking=1&include_blocked_by=1&include_followed_by=1&include_want_retweets=1&include_mute_edge=1&include_can_dm=1&include_can_media_tag=1&skip_status=1&cards_platform=Web-12&include_cards=1&include_ext_alt_text=true&include_quote_count=true&include_reply_count=1&tweet_mode=extended&include_entities=true&include_user_entities=true&include_ext_media_color=true&include_ext_media_availability=true&send_error_codes=true&simple_quoted_tweets=true&q=conversation_id%3A1387905838241697797&tweet_search_mode=live&count=100&query_source=spelling_expansion_revert_click&pc=1&spelling_corrections=1&ext=mediaStats%2ChighlightedLabel failed, giving up.

In [48]:
df

In [None]:
if df.shape[0] == 0:
    return pd.DataFrame(), pd.DataFrame()
df = df.rename(columns={'id':'tweetId'})
users_df = pd.DataFrame(list(df.user.values))
users_df = users_df.rename(columns={'id':'userId'})
df['userId'] = users_df['userId']
users_df = users_df.drop_duplicates('userId')
users_df = users_df.drop('descriptionUrls', axis=1)
df['longitude'] = df.coordinates.apply(lambda x: x['longitude'] if(np.all(pd.notnull(x))) else x)
df['latitude'] = df.coordinates.apply(lambda x: x['latitude'] if(np.all(pd.notnull(x))) else x)
df = df[[
    'tweetId', 'conversationId', 'userId', 'date', 'content', 'lang', 'sourceLabel', 
    'replyCount', 'retweetCount', 'likeCount', 'quoteCount', 'longitude', 'latitude', 'place'
]]

In [None]:
comments, users = get_page_covid_content(search)
comments.to_parquet('./data/raw/tmp/comments/skynews/c_' + str(batch[0]) + '.parquet', index=False)
users = users.drop_duplicates('userId')
users.to_parquet('./data/raw/tmp/users/u_' + str(batch[0]) + '.parquet', index=False)

In [23]:
simpleton_scrapper(search_groups[0])

Error retrieving https://api.twitter.com/2/search/adaptive.json?include_profile_interstitial_type=1&include_blocking=1&include_blocked_by=1&include_followed_by=1&include_want_retweets=1&include_mute_edge=1&include_can_dm=1&include_can_media_tag=1&skip_status=1&cards_platform=Web-12&include_cards=1&include_ext_alt_text=true&include_quote_count=true&include_reply_count=1&tweet_mode=extended&include_entities=true&include_user_entities=true&include_ext_media_color=true&include_ext_media_availability=true&send_error_codes=true&simple_quoted_tweets=true&q=conversation_id%3A1372935842654736387+OR+conversation_id%3A1243175145755357186+OR+conversation_id%3A1311791626168291330+OR+conversation_id%3A1336467265949229057+OR+conversation_id%3A1382328427655659524+OR+conversation_id%3A1309240770973634564+OR+conversation_id%3A1249624736923082752+OR+conversation_id%3A1233679505828524035+OR+conversation_id%3A1248186716848087046+OR+conversation_id%3A1276067020916867072+OR+conversation_id%3A1284871802607501

Problem with batch 1372935842654736387


False

In [None]:
while len(search_groups) > 0:
    downloaded = [int(x[26:-8]) for x in glob.glob('./data/raw/tmp/comments/skynews/*.parquet')]
    search_groups = [batch for batch in search_groups if batch[0] not in downloaded]
    search_batches = list(split(search_groups, int(len(search_groups)/64)))
    Parallel(n_jobs=-1)(delayed(wrapper_scrapper)(batches) for batches in search_batches)