Skip to content

Commit

Permalink
Refactor how links on Twitter are handled
Browse files Browse the repository at this point in the history
All links in text (tweets, profile descriptions, and profile links) are now represented by TextLink objects, which contain all relevant information: the displayed text (if available), the URL, the short t.co URL, and the indices in the text at which it appears.

Closes #478
  • Loading branch information
JustAnotherArchivist committed May 29, 2022
1 parent 01cf6a0 commit dc6bc9b
Showing 1 changed file with 75 additions and 39 deletions.
114 changes: 75 additions & 39 deletions snscrape/modules/twitter.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
__all__ = [
'Tweet', 'Medium', 'Photo', 'VideoVariant', 'Video', 'Gif', 'DescriptionURL', 'Coordinates', 'Place',
'Tweet', 'Medium', 'Photo', 'VideoVariant', 'Video', 'Gif', 'TextLink', 'Coordinates', 'Place',
'User', 'UserLabel',
'Trend',
'GuestTokenManager',
Expand Down Expand Up @@ -31,6 +31,18 @@
import time
import typing
import urllib.parse
import warnings


# DescriptionURL deprecation
_DEPRECATED_NAMES = {'DescriptionURL': 'TextLink'}
def __getattr__(name):
if name in _DEPRECATED_NAMES:
warnings.warn(f'{name} is deprecated, use {_DEPRECATED_NAMES[name]} instead', FutureWarning, stacklevel = 2)
return globals()[_DEPRECATED_NAMES[name]]
raise AttributeError(f'module {__name__!r} has no attribute {name!r}')
def __dir__():
return sorted(__all__ + list(_DEPRECATED_NAMES.keys()))


_logger = logging.getLogger(__name__)
Expand All @@ -56,8 +68,7 @@ class Tweet(snscrape.base.Item):
source: str
sourceUrl: typing.Optional[str] = None
sourceLabel: typing.Optional[str] = None
outlinks: typing.Optional[typing.List[str]] = None
tcooutlinks: typing.Optional[typing.List[str]] = None
links: typing.Optional[typing.List['TextLink']] = None
media: typing.Optional[typing.List['Medium']] = None
retweetedTweet: typing.Optional['Tweet'] = None
quotedTweet: typing.Optional['Tweet'] = None
Expand All @@ -71,13 +82,23 @@ class Tweet(snscrape.base.Item):
card: typing.Optional['Card'] = None

username = snscrape.base._DeprecatedProperty('username', lambda self: self.user.username, 'user.username')
outlinksss = snscrape.base._DeprecatedProperty('outlinksss', lambda self: ' '.join(self.outlinks) if self.outlinks else '', 'outlinks')
tcooutlinksss = snscrape.base._DeprecatedProperty('tcooutlinksss', lambda self: ' '.join(self.tcooutlinks) if self.tcooutlinks else '', 'tcooutlinks')
outlinks = snscrape.base._DeprecatedProperty('outlinks', lambda self: [x.url for x in self.links] if self.links else [], 'links (url attribute)')
outlinksss = snscrape.base._DeprecatedProperty('outlinksss', lambda self: ' '.join(x.url for x in self.links) if self.links else '', 'links (url attribute)')
tcooutlinks = snscrape.base._DeprecatedProperty('tcooutlinks', lambda self: [x.tcourl for x in self.links] if self.links else [], 'links (tcourl attribute)')
tcooutlinksss = snscrape.base._DeprecatedProperty('tcooutlinksss', lambda self: ' '.join(x.tcourl for x in self.links) if self.links else '', 'links (tcourl attribute)')

def __str__(self):
return self.url


@dataclasses.dataclass
class TextLink:
text: typing.Optional[str]
url: str
tcourl: str
indices: typing.Tuple[int, int]


class Medium:
pass

Expand Down Expand Up @@ -109,14 +130,6 @@ class Gif(Medium):
variants: typing.List[VideoVariant]


@dataclasses.dataclass
class DescriptionURL:
text: typing.Optional[str]
url: str
tcourl: str
indices: typing.Tuple[int, int]


@dataclasses.dataclass
class Coordinates:
longitude: float
Expand Down Expand Up @@ -447,7 +460,7 @@ class User(snscrape.base.Entity):
displayname: typing.Optional[str] = None
description: typing.Optional[str] = None # Description as it's displayed on the web interface with URLs replaced
rawDescription: typing.Optional[str] = None # Raw description with the URL(s) intact
descriptionUrls: typing.Optional[typing.List[DescriptionURL]] = None
descriptionLinks: typing.Optional[typing.List[TextLink]] = None
verified: typing.Optional[bool] = None
created: typing.Optional[datetime.datetime] = None
followersCount: typing.Optional[int] = None
Expand All @@ -458,12 +471,15 @@ class User(snscrape.base.Entity):
mediaCount: typing.Optional[int] = None
location: typing.Optional[str] = None
protected: typing.Optional[bool] = None
linkUrl: typing.Optional[str] = None
linkTcourl: typing.Optional[str] = None
link: typing.Optional[TextLink] = None
profileImageUrl: typing.Optional[str] = None
profileBannerUrl: typing.Optional[str] = None
label: typing.Optional['UserLabel'] = None

descriptionUrls = snscrape.base._DeprecatedProperty('descriptionUrls', lambda self: self.descriptionLinks, 'descriptionLinks')
linkUrl = snscrape.base._DeprecatedProperty('linkUrl', lambda self: self.link.url if self.link else None, 'link.url')
linkTcourl = snscrape.base._DeprecatedProperty('linkTcourl', lambda self: self.link.tcourl if self.link else None, 'link.tcourl')

@property
def url(self):
return f'https://twitter.com/{self.username}'
Expand Down Expand Up @@ -815,8 +831,12 @@ def _make_tweet(self, tweet, user, retweetedTweet = None, quotedTweet = None, ca
kwargs['user'] = user
kwargs['date'] = email.utils.parsedate_to_datetime(tweet['created_at'])
if tweet['entities'].get('urls'):
kwargs['outlinks'] = [u['expanded_url'] for u in tweet['entities']['urls']]
kwargs['tcooutlinks'] = [u['url'] for u in tweet['entities']['urls']]
kwargs['links'] = [TextLink(
text = u.get('display_url'),
url = u['expanded_url'],
tcourl = u['url'],
indices = tuple(u['indices']),
) for u in tweet['entities']['urls']]
kwargs['url'] = f'https://twitter.com/{user.username}/status/{tweetId}'
kwargs['replyCount'] = tweet['reply_count']
kwargs['retweetCount'] = tweet['retweet_count']
Expand Down Expand Up @@ -877,10 +897,15 @@ def _make_tweet(self, tweet, user, retweetedTweet = None, quotedTweet = None, ca
if hasattr(card, 'url') and '//t.co/' in card.url:
# Try to convert the URL to the non-shortened/t.co one
# Retweets inherit the card but not the outlinks; try to get them from the retweeted tweet instead in that case.
if 'tcooutlinks' in kwargs and card.url in kwargs['tcooutlinks']:
card.url = kwargs['outlinks'][kwargs['tcooutlinks'].index(card.url)]
elif retweetedTweet and retweetedTweet.tcooutlinks and card.url in retweetedTweet.tcooutlinks:
card.url = retweetedTweet.outlinks[retweetedTweet.tcooutlinks.index(card.url)]
candidates = []
if 'links' in kwargs:
candidates.extend(kwargs['links'])
if retweetedTweet:
candidates.extend(retweetedTweet.links)
for u in candidates:
if u.tcourl == card.url:
card.url = u.url
break
else:
_logger.warning(f'Could not translate t.co card URL on tweet {tweetId}')
return Tweet(**kwargs)
Expand Down Expand Up @@ -1303,12 +1328,12 @@ def _user_to_user(self, user, id_ = None):
kwargs['description'] = self._render_text_with_urls(user['description'], user['entities']['description'].get('urls'))
kwargs['rawDescription'] = user['description']
if user['entities']['description'].get('urls'):
kwargs['descriptionUrls'] = [DescriptionURL(
text = x.get('display_url'),
url = x['expanded_url'],
tcourl = x['url'],
indices = tuple(x['indices']),
) for x in user['entities']['description']['urls']]
kwargs['descriptionLinks'] = [TextLink(
text = x.get('display_url'),
url = x['expanded_url'],
tcourl = x['url'],
indices = tuple(x['indices']),
) for x in user['entities']['description']['urls']]
kwargs['verified'] = user.get('verified')
kwargs['created'] = email.utils.parsedate_to_datetime(user['created_at'])
kwargs['followersCount'] = user['followers_count']
Expand All @@ -1319,9 +1344,13 @@ def _user_to_user(self, user, id_ = None):
kwargs['mediaCount'] = user['media_count']
kwargs['location'] = user['location']
kwargs['protected'] = user.get('protected')
if 'url' in user['entities']:
kwargs['linkUrl'] = (user['entities']['url']['urls'][0].get('expanded_url') or user.get('url'))
kwargs['linkTcourl'] = user.get('url')
if user.get('url'):
entity = user['entities'].get('url', {}).get('urls', [None])[0]
if not entity or entity['url'] != user['url']:
self.logger.warning(f'Link inconsistency on user {kwargs["id"]}')
if not entity:
entity = {'display_url': None, 'expanded_url': user['url'], 'indices': (0, len(user['url']))}
kwargs['link'] = TextLink(text = entity['display_url'], url = entity['expanded_url'], tcourl = user['url'], indices = tuple(entity['indices']))
kwargs['profileImageUrl'] = user['profile_image_url_https']
kwargs['profileBannerUrl'] = user.get('profile_banner_url')
if 'ext' in user and (label := user['ext']['highlightedLabel']['r']['ok'].get('label')):
Expand Down Expand Up @@ -1447,6 +1476,14 @@ def _get_entity(self):
user = obj['data']['user']['result']
rawDescription = user['legacy']['description']
description = self._render_text_with_urls(rawDescription, user['legacy']['entities']['description']['urls'])
link = None
if user['legacy'].get('url'):
entity = user['legacy']['entities'].get('url', {}).get('urls', [None])[0]
if not entity or entity['url'] != user['legacy']['url']:
self.logger.warning(f'Link inconsistency on user')
if not entity:
entity = {'display_url': None, 'expanded_url': user['legacy']['url'], 'indices': (0, len(user['legacy']['url']))}
link = TextLink(text = entity['display_url'], url = entity['expanded_url'], tcourl = user['legacy']['url'], indices = tuple(entity['indices']))
label = None
if (labelO := user['affiliates_highlighted_label'].get('label')):
label = self._user_label_to_user_label(labelO)
Expand All @@ -1456,12 +1493,12 @@ def _get_entity(self):
displayname = user['legacy']['name'],
description = description,
rawDescription = rawDescription,
descriptionUrls = [DescriptionURL(
text = x.get('display_url'),
url = x['expanded_url'],
tcourl = x['url'],
indices = tuple(x['indices']),
) for x in user['legacy']['entities']['description']['urls']],
descriptionLinks = [TextLink(
text = x.get('display_url'),
url = x['expanded_url'],
tcourl = x['url'],
indices = tuple(x['indices']),
) for x in user['legacy']['entities']['description']['urls']],
verified = user['legacy']['verified'],
created = email.utils.parsedate_to_datetime(user['legacy']['created_at']),
followersCount = user['legacy']['followers_count'],
Expand All @@ -1472,8 +1509,7 @@ def _get_entity(self):
mediaCount = user['legacy']['media_count'],
location = user['legacy']['location'],
protected = user['legacy']['protected'],
linkUrl = user['legacy']['entities']['url']['urls'][0]['expanded_url'] if 'url' in user['legacy']['entities'] else None,
linkTcourl = user['legacy'].get('url'),
link = link,
profileImageUrl = user['legacy']['profile_image_url_https'],
profileBannerUrl = user['legacy'].get('profile_banner_url'),
label = label,
Expand Down

0 comments on commit dc6bc9b

Please sign in to comment.