Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Solved outstanding issues mentioned in snscrape#413 #8

Open
wants to merge 8 commits into
base: more-tg-info
Choose a base branch
from
191 changes: 138 additions & 53 deletions snscrape/modules/telegram.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import snscrape.base
import typing
import urllib.parse
import unittest
import threading

_logger = logging.getLogger(__name__)
_SINGLE_MEDIA_LINK_PATTERN = re.compile(r'^https://t\.me/[^/]+/\d+\?single$')
Expand Down Expand Up @@ -152,7 +154,7 @@ def _soup_to_items(self, soup, pageUrl, onlyUsername = False):
imageUrls = _STYLE_MEDIA_URL_PATTERN.findall(style)
if len(imageUrls) == 1:
media.append(Photo(url = imageUrls[0]))
continue

if _SINGLE_MEDIA_LINK_PATTERN.match(link['href']):
style = link.attrs.get('style', '')
imageUrls = _STYLE_MEDIA_URL_PATTERN.findall(style)
Expand All @@ -161,48 +163,23 @@ def _soup_to_items(self, soup, pageUrl, onlyUsername = False):
# resp = self._get(image[0])
# encoded_string = base64.b64encode(resp.content)
# Individual photo or video link
continue
john-osullivan marked this conversation as resolved.
Show resolved Hide resolved

if link.text.startswith('@'):
mentions.append(link.text.strip('@'))
continue

if link.text.startswith('#'):
hashtags.append(link.text.strip('#'))
continue

if 'tgme_widget_message_voice_player' in link.get('class', []):
media.append(_parse_voice_message(link))
john-osullivan marked this conversation as resolved.
Show resolved Hide resolved

if 'tgme_widget_message_video_player' in link.get('class', []):
media.append(_parse_video_message(link))

href = urllib.parse.urljoin(pageUrl, link['href'])
if (href not in outlinks) and (href != rawUrl) and (href != forwardedUrl):
outlinks.append(href)

for voicePlayer in post.find_all('a', {'class': 'tgme_widget_message_voice_player'}):
audioUrl = voicePlayer.find('audio')['src']
durationStr = voicePlayer.find('time').text
duration = _durationStrToSeconds(durationStr)
barHeights = [float(s['style'].split(':')[-1].strip(';%')) for s in voicePlayer.find('div', {'class': 'bar'}).find_all('s')]

media.append(VoiceMessage(url = audioUrl, duration = duration, bars = barHeights))

for videoPlayer in post.find_all('a', {'class': 'tgme_widget_message_video_player'}):
iTag = videoPlayer.find('i')
if iTag is None:
videoUrl = None
videoThumbnailUrl = None
else:
style = iTag['style']
videoThumbnailUrl = _STYLE_MEDIA_URL_PATTERN.findall(style)[0]
videoTag = videoPlayer.find('video')
videoUrl = None if videoTag is None else videoTag['src']
mKwargs = {
'thumbnailUrl': videoThumbnailUrl,
'url': videoUrl,
}
timeTag = videoPlayer.find('time')
if timeTag is None:
cls = Gif
else:
cls = Video
durationStr = videoPlayer.find('time').text
mKwargs['duration'] = _durationStrToSeconds(durationStr)
media.append(cls(**mKwargs))

linkPreview = None
if (linkPreviewA := post.find('a', class_ = 'tgme_widget_message_link_preview')):
kwargs = {}
Expand All @@ -219,8 +196,6 @@ def _soup_to_items(self, soup, pageUrl, onlyUsername = False):
else:
_logger.warning(f'Could not process link preview image on {url}')
linkPreview = LinkPreview(**kwargs)
if kwargs['href'] in outlinks:
outlinks.remove(kwargs['href'])
john-osullivan marked this conversation as resolved.
Show resolved Hide resolved

viewsSpan = post.find('span', class_ = 'tgme_widget_message_views')
views = None if viewsSpan is None else _parse_num(viewsSpan.text)
Expand All @@ -239,24 +214,20 @@ def get_items(self):
return
nextPageUrl = ''
while True:
if soup.find("div", class_ = "tme_no_messages_found"):
break
yield from self._soup_to_items(soup, r.url)
try:
if soup.find('a', attrs = {'class': 'tgme_widget_message_date'}, href = True)['href'].split('/')[-1] == '1':
dateElt = soup.find('a', attrs = {'class': 'tgme_widget_message_date'}, href = True)
if dateElt and 'href' in dateElt.attrs:
john-osullivan marked this conversation as resolved.
Show resolved Hide resolved
urlPieces = dateElt['href'].split('/')
if urlPieces and urlPieces[-1] == '1':
# if message 1 is the first message in the page, terminate scraping
break
except:
pass
pageLink = soup.find('a', attrs = {'class': 'tme_messages_more', 'data-before': True})
if not pageLink:
# some pages are missing a "tme_messages_more" tag, causing early termination
if '=' not in nextPageUrl:
nextPageUrl = soup.find('link', attrs = {'rel': 'canonical'}, href = True)['href']
nextPostIndex = int(nextPageUrl.split('=')[-1]) - 20
if nextPostIndex > 20:
pageLink = {'href': nextPageUrl.split('=')[0] + f'={nextPostIndex}'}
else:
break
nextPageUrl = urllib.parse.urljoin(r.url, pageLink['href'])
if pageLink := soup.find('link', attrs = {'rel': 'prev'}, href = True):
nextPageUrl = urllib.parse.urljoin(r.url, pageLink['href'])
else:
nextPostIndex = int(soup.find('div', attrs = {'class': 'tgme_widget_message', 'data-post': True})["data-post"].split("/")[-1])
nextPageUrl = urllib.parse.urljoin(r.url, r.url.split('?')[0] + f'?before={nextPostIndex}')
r = self._get(nextPageUrl, headers = self._headers, responseOkCallback = _telegramResponseOkCallback)
if r.status_code != 200:
raise snscrape.base.ScraperException(f'Got status code {r.status_code}')
Expand Down Expand Up @@ -333,4 +304,118 @@ def _telegramResponseOkCallback(r):
if r.status_code == 200:
return (True, None)
return (False, f'{r.status_code=}')


def _parse_voice_message(voicePlayer):
audioUrl = voicePlayer.find('audio')['src']
durationStr = voicePlayer.find('time').text
duration = _durationStrToSeconds(durationStr)
barHeights = [float(s['style'].split(':')[-1].strip(';%')) for s in voicePlayer.find('div', {'class': 'bar'}).find_all('s')]
return VoiceMessage(url = audioUrl, duration = duration, bars = barHeights)

def _parse_video_message(videoPlayer):
iTag = videoPlayer.find('i')
if iTag is None:
videoUrl = None
videoThumbnailUrl = None
else:
style = iTag['style']
videoThumbnailUrl = _STYLE_MEDIA_URL_PATTERN.findall(style)[0]
videoTag = videoPlayer.find('video')
videoUrl = None if videoTag is None else videoTag['src']
mKwargs = {
'thumbnailUrl': videoThumbnailUrl,
'url': videoUrl,
}
timeTag = videoPlayer.find('time')
if timeTag is None:
# Example of duration-less GIF: https://t.me/thisisatestchannel19451923/3
cls = Gif
else:
cls = Video
durationStr = videoPlayer.find('time').text
mKwargs['duration'] = _durationStrToSeconds(durationStr)
return cls(**mKwargs)


class TestTelegramChannelScraper(unittest.TestCase):
"""Run suite by directly running this file."""

@staticmethod
def execute_with_timeout(func, timeout=10):
"""
Executes a function in a separate thread and enforces a timeout.
If provided function throws an error, it's re-raised in main thread.
Used to detect infinite loops in finite time, works cross-platform.

:param func: The function to execute. This function should accept no arguments.
:param timeout: The timeout in seconds.
"""
exceptions=[]
def func_passing_exceptions():
try:
func()
except Exception as e:
exceptions.append((e.__class__, e, e.__traceback__))

thread = threading.Thread(target=func_passing_exceptions)
thread.start()
thread.join(timeout=timeout)

if exceptions:
exc_class, exc_instance, traceback = exceptions[0]
raise exc_class(exc_instance).with_traceback(traceback)

if thread.is_alive():
raise TimeoutError(f"Function didn't complete within {timeout} seconds")

def test_scraping_termination_missing_prev(self):
"""Test scraping always terminates, even if the page's prev link is missing."""

def scrape_two_pages():
scraper = TelegramChannelScraper('WLM_USA_TEXAS?before=3766')
items = list()
num_items_on_page = 20
for item in scraper.get_items():
items.append(item)
if len(items) > 2 * num_items_on_page:
break

self.execute_with_timeout(scrape_two_pages)

def test_scraping_termination_small_post_count(self):
"""Test scraping always terminates, even with small number of posts. This channel's highest ID is 28."""

def scrape_small_channel():
scraper = TelegramChannelScraper('AKCPB')
items = list(scraper.get_items())
return items

self.execute_with_timeout(scrape_small_channel)

def test_scraping_termination_pages_without_posts(self):
"""Test scraping gracefully handles pages without any posts."""

def scrape_empty_page():
scraper = TelegramChannelScraper('BREAKDCODE?before=3')
for _ in scraper.get_items():
pass

self.execute_with_timeout(scrape_empty_page)

def test_media_order_preservation(self):
"""Test scraped media appears in the same order as in the post."""
scraper = TelegramChannelScraper('nexta_live?before=43103')
item = next(scraper.get_items(), None)
self.assertIsNotNone(item, "Failed to scrape any posts.")

# This particular post is known to include media [Video, Photo, Video]
self.assertEqual(item.url, "https://t.me/s/nexta_live/43102")

expected_types = [Video, Photo, Video]
actual_types = [type(media) for media in item.media] if item.media else []

self.assertEqual(actual_types, expected_types, "Media did not appear in the expected order.")


if __name__ == '__main__':
unittest.main()