Skip to content

Commit

Permalink
Add twitter-tweet scraper for retrieving tweets by ID, including scro…
Browse files Browse the repository at this point in the history
…ll and recursion modes

Closes #51, closes #137
  • Loading branch information
JustAnotherArchivist committed May 23, 2021
1 parent 98b798b commit 45d1fa2
Showing 1 changed file with 145 additions and 19 deletions.
164 changes: 145 additions & 19 deletions snscrape/modules/twitter.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import bs4
import collections
import dataclasses
import datetime
import email.utils
import enum
import itertools
import json
import random
Expand Down Expand Up @@ -138,6 +140,12 @@ def __str__(self):
return self.url


class ScrollDirection(enum.Enum):
TOP = enum.auto()
BOTTOM = enum.auto()
BOTH = enum.auto()


class TwitterAPIScraper(snscrape.base.Scraper):
def __init__(self, baseUrl, **kwargs):
super().__init__(**kwargs)
Expand Down Expand Up @@ -193,15 +201,25 @@ def _get_api_data(self, endpoint, params):
raise snscrape.base.ScraperException('Received invalid JSON from Twitter') from e
return obj

def _iter_api_data(self, endpoint, params, paginationParams = None, cursor = None):
def _iter_api_data(self, endpoint, params, paginationParams = None, cursor = None, direction = ScrollDirection.BOTTOM):
# Iterate over endpoint with params/paginationParams, optionally starting from a cursor
# Handles guest token extraction using the baseUrl passed to __init__ etc.
# Order from params and paginationParams is preserved. To insert the cursor at a particular location, insert a 'cursor' key into paginationParams there (value is overwritten).
# direction controls in which direction it should scroll from the initial response. BOTH equals TOP followed by BOTTOM.

# Logic for dual scrolling: direction is set to top, but if the bottom cursor is found, bottomCursorAndStop is set accordingly.
# Once the top pagination is exhausted, the bottomCursorAndStop is used and reset to None; it isn't set anymore after because the first entry condition will always be true for the bottom cursor.

if cursor is None:
reqParams = params
else:
reqParams = paginationParams.copy()
reqParams['cursor'] = cursor
bottomCursorAndStop = None
if direction is ScrollDirection.TOP or direction is ScrollDirection.BOTH:
dir = 'top'
else:
dir = 'bottom'
stopOnEmptyResponse = False
while True:
logger.info(f'Retrieving scroll page {cursor}')
Expand All @@ -210,6 +228,8 @@ def _iter_api_data(self, endpoint, params, paginationParams = None, cursor = Non

# No data format test, just a hard and loud crash if anything's wrong :-)
newCursor = None
promptCursor = None
newBottomCursorAndStop = None
for instruction in obj['timeline']['instructions']:
if 'addEntries' in instruction:
entries = instruction['addEntries']['entries']
Expand All @@ -218,13 +238,26 @@ def _iter_api_data(self, endpoint, params, paginationParams = None, cursor = Non
else:
continue
for entry in entries:
if entry['entryId'] == 'sq-cursor-bottom' or entry['entryId'].startswith('cursor-bottom-'):
if entry['entryId'] == f'sq-cursor-{dir}' or entry['entryId'].startswith(f'cursor-{dir}-'):
newCursor = entry['content']['operation']['cursor']['value']
if 'stopOnEmptyResponse' in entry['content']['operation']['cursor']:
stopOnEmptyResponse = entry['content']['operation']['cursor']['stopOnEmptyResponse']
elif entry['entryId'].startswith('cursor-showMoreThreadsPrompt-'): # E.g. 'offensive' replies button
promptCursor = entry['content']['operation']['cursor']['value']
elif direction is ScrollDirection.BOTH and bottomCursorAndStop is None and (entry['entryId'] == f'sq-cursor-bottom' or entry['entryId'].startswith('cursor-bottom-')):
newBottomCursorAndStop = (entry['content']['operation']['cursor']['value'], entry['content']['operation']['cursor'].get('stopOnEmptyResponse', False))
if bottomCursorAndStop is None and newBottomCursorAndStop is not None:
bottomCursorAndStop = newBottomCursorAndStop
if not newCursor or newCursor == cursor or (stopOnEmptyResponse and self._count_tweets(obj) == 0):
# End of pagination
break
if promptCursor is not None:
newCursor = promptCursor
elif direction is ScrollDirection.BOTH and bottomCursorAndStop is not None:
dir = 'bottom'
newCursor, stopOnEmptyResponse = bottomCursorAndStop
bottomCursorAndStop = None
else:
break
cursor = newCursor
reqParams = paginationParams.copy()
reqParams['cursor'] = cursor
Expand All @@ -243,7 +276,7 @@ def _count_tweets(self, obj):
count += 1
return count

def _instructions_to_tweets(self, obj):
def _instructions_to_tweets(self, obj, includeConversationThreads = False):
# No data format test, just a hard and loud crash if anything's wrong :-)
for instruction in obj['timeline']['instructions']:
if 'addEntries' in instruction:
Expand All @@ -254,21 +287,30 @@ def _instructions_to_tweets(self, obj):
continue
for entry in entries:
if entry['entryId'].startswith('sq-I-t-') or entry['entryId'].startswith('tweet-'):
if 'tweet' in entry['content']['item']['content']:
if 'promotedMetadata' in entry['content']['item']['content']['tweet']: # Promoted tweet aka ads
continue
if entry['content']['item']['content']['tweet']['id'] not in obj['globalObjects']['tweets']:
logger.warning(f'Skipping tweet {entry["content"]["item"]["content"]["tweet"]["id"]} which is not in globalObjects')
continue
tweet = obj['globalObjects']['tweets'][entry['content']['item']['content']['tweet']['id']]
elif 'tombstone' in entry['content']['item']['content'] and 'tweet' in entry['content']['item']['content']['tombstone']:
if entry['content']['item']['content']['tombstone']['tweet']['id'] not in obj['globalObjects']['tweets']:
logger.warning(f'Skipping tweet {entry["content"]["item"]["content"]["tombstone"]["tweet"]["id"]} which is not in globalObjects')
continue
tweet = obj['globalObjects']['tweets'][entry['content']['item']['content']['tombstone']['tweet']['id']]
else:
raise snscrape.base.ScraperException(f'Unable to handle entry {entry["entryId"]!r}')
yield self._tweet_to_tweet(tweet, obj)
yield from self._instruction_tweet_entry_to_tweet(entry['entryId'], entry['content'], obj)
elif includeConversationThreads and entry['entryId'].startswith('conversationThread-') and not entry['entryId'].endswith('-show_more_cursor'):
for item in entry['content']['timelineModule']['items']:
if item['entryId'].startswith('tweet-'):
yield from self._instruction_tweet_entry_to_tweet(item['entryId'], item, obj)

def _instruction_tweet_entry_to_tweet(self, entryId, entry, obj):
if 'tweet' in entry['item']['content']:
if 'promotedMetadata' in entry['item']['content']['tweet']: # Promoted tweet aka ads
return
if entry['item']['content']['tweet']['id'] not in obj['globalObjects']['tweets']:
logger.warning(f'Skipping tweet {entry["item"]["content"]["tweet"]["id"]} which is not in globalObjects')
return
tweet = obj['globalObjects']['tweets'][entry['item']['content']['tweet']['id']]
elif 'tombstone' in entry['item']['content']:
if 'tweet' not in entry['item']['content']['tombstone']: # E.g. deleted reply
return
if entry['item']['content']['tombstone']['tweet']['id'] not in obj['globalObjects']['tweets']:
logger.warning(f'Skipping tweet {entry["item"]["content"]["tombstone"]["tweet"]["id"]} which is not in globalObjects')
return
tweet = obj['globalObjects']['tweets'][entry['item']['content']['tombstone']['tweet']['id']]
else:
raise snscrape.base.ScraperException(f'Unable to handle entry {entryId!r}')
yield self._tweet_to_tweet(tweet, obj)

def _tweet_to_tweet(self, tweet, obj):
# Transforms a Twitter API tweet object into a Tweet
Expand Down Expand Up @@ -586,6 +628,90 @@ def from_args(cls, args):
return cls(args.hashtag, retries = args.retries)


class TwitterTweetScraperMode(enum.Enum):
SINGLE = 'single'
SCROLL = 'scroll'
RECURSE = 'recurse'

@classmethod
def from_args(cls, args):
if args.scroll:
return cls.SCROLL
if args.recurse:
return cls.RECURSE
return cls.SINGLE


class TwitterTweetScraper(TwitterAPIScraper):
name = 'twitter-tweet'

def __init__(self, tweetId, mode, **kwargs):
self._tweetId = tweetId
self._mode = mode
super().__init__(f'https://twitter.com/i/web/{self._tweetId}', **kwargs)

def get_items(self):
paginationParams = {
'include_profile_interstitial_type': '1',
'include_blocking': '1',
'include_blocked_by': '1',
'include_followed_by': '1',
'include_want_retweets': '1',
'include_mute_edge': '1',
'include_can_dm': '1',
'include_can_media_tag': '1',
'skip_status': '1',
'cards_platform': 'Web-12',
'include_cards': '1',
'include_ext_alt_text': 'true',
'include_quote_count': 'true',
'include_reply_count': '1',
'tweet_mode': 'extended',
'include_entities': 'true',
'include_user_entities': 'true',
'include_ext_media_color': 'true',
'include_ext_media_availability': 'true',
'send_error_codes': 'true',
'simple_quoted_tweet': 'true',
'count': '20',
'cursor': None,
'include_ext_has_birdwatch_notes': 'false',
'ext': 'mediaStats%2ChighlightedLabel',
}
params = paginationParams.copy()
del params['cursor']
if self._mode is TwitterTweetScraperMode.SINGLE:
obj = self._get_api_data(f'https://twitter.com/i/api/2/timeline/conversation/{self._tweetId}.json', params)
yield self._tweet_to_tweet(obj['globalObjects']['tweets'][str(self._tweetId)], obj)
elif self._mode is TwitterTweetScraperMode.SCROLL:
for obj in self._iter_api_data(f'https://twitter.com/i/api/2/timeline/conversation/{self._tweetId}.json', params, paginationParams, direction = ScrollDirection.BOTH):
yield from self._instructions_to_tweets(obj, includeConversationThreads = True)
elif self._mode is TwitterTweetScraperMode.RECURSE:
seenTweets = set()
queue = collections.deque()
queue.append(self._tweetId)
while queue:
tweetId = queue.popleft()
for obj in self._iter_api_data(f'https://twitter.com/i/api/2/timeline/conversation/{tweetId}.json', params, paginationParams, direction = ScrollDirection.BOTH):
for tweet in self._instructions_to_tweets(obj, includeConversationThreads = True):
if tweet.id not in seenTweets:
yield tweet
seenTweets.add(tweet.id)
if tweet.replyCount:
queue.append(tweet.id)

@classmethod
def setup_parser(cls, subparser):
group = subparser.add_mutually_exclusive_group(required = False)
group.add_argument('--scroll', action = 'store_true', default = False, help = 'Enable scrolling in both directions')
group.add_argument('--recurse', '--recursive', action = 'store_true', default = False, help = 'Enable recursion through all tweets encountered (warning: slow, potentially memory-intensive!)')
subparser.add_argument('tweetId', type = int, help = 'A tweet ID')

@classmethod
def from_args(cls, args):
return cls(args.tweetId, TwitterTweetScraperMode.from_args(args), retries = args.retries)


class TwitterListPostsScraper(TwitterSearchScraper):
name = 'twitter-list-posts'

Expand Down

0 comments on commit 45d1fa2

Please sign in to comment.