diff --git a/snscrape/modules/telegram.py b/snscrape/modules/telegram.py index 4e97765..dbf0f9b 100644 --- a/snscrape/modules/telegram.py +++ b/snscrape/modules/telegram.py @@ -9,6 +9,8 @@ import snscrape.base import typing import urllib.parse +import unittest +import threading _logger = logging.getLogger(__name__) _SINGLE_MEDIA_LINK_PATTERN = re.compile(r'^https://t\.me/[^/]+/\d+\?single$') @@ -152,7 +154,7 @@ def _soup_to_items(self, soup, pageUrl, onlyUsername = False): imageUrls = _STYLE_MEDIA_URL_PATTERN.findall(style) if len(imageUrls) == 1: media.append(Photo(url = imageUrls[0])) - continue + if _SINGLE_MEDIA_LINK_PATTERN.match(link['href']): style = link.attrs.get('style', '') imageUrls = _STYLE_MEDIA_URL_PATTERN.findall(style) @@ -161,48 +163,23 @@ def _soup_to_items(self, soup, pageUrl, onlyUsername = False): # resp = self._get(image[0]) # encoded_string = base64.b64encode(resp.content) # Individual photo or video link - continue + if link.text.startswith('@'): mentions.append(link.text.strip('@')) - continue + if link.text.startswith('#'): hashtags.append(link.text.strip('#')) - continue + + if 'tgme_widget_message_voice_player' in link.get('class', []): + media.append(_parse_voice_message(link)) + + if 'tgme_widget_message_video_player' in link.get('class', []): + media.append(_parse_video_message(link)) + href = urllib.parse.urljoin(pageUrl, link['href']) if (href not in outlinks) and (href != rawUrl) and (href != forwardedUrl): outlinks.append(href) - for voicePlayer in post.find_all('a', {'class': 'tgme_widget_message_voice_player'}): - audioUrl = voicePlayer.find('audio')['src'] - durationStr = voicePlayer.find('time').text - duration = _durationStrToSeconds(durationStr) - barHeights = [float(s['style'].split(':')[-1].strip(';%')) for s in voicePlayer.find('div', {'class': 'bar'}).find_all('s')] - - media.append(VoiceMessage(url = audioUrl, duration = duration, bars = barHeights)) - - for videoPlayer in post.find_all('a', {'class': 'tgme_widget_message_video_player'}): - iTag = videoPlayer.find('i') - if iTag is None: - videoUrl = None - videoThumbnailUrl = None - else: - style = iTag['style'] - videoThumbnailUrl = _STYLE_MEDIA_URL_PATTERN.findall(style)[0] - videoTag = videoPlayer.find('video') - videoUrl = None if videoTag is None else videoTag['src'] - mKwargs = { - 'thumbnailUrl': videoThumbnailUrl, - 'url': videoUrl, - } - timeTag = videoPlayer.find('time') - if timeTag is None: - cls = Gif - else: - cls = Video - durationStr = videoPlayer.find('time').text - mKwargs['duration'] = _durationStrToSeconds(durationStr) - media.append(cls(**mKwargs)) - linkPreview = None if (linkPreviewA := post.find('a', class_ = 'tgme_widget_message_link_preview')): kwargs = {} @@ -219,8 +196,6 @@ def _soup_to_items(self, soup, pageUrl, onlyUsername = False): else: _logger.warning(f'Could not process link preview image on {url}') linkPreview = LinkPreview(**kwargs) - if kwargs['href'] in outlinks: - outlinks.remove(kwargs['href']) viewsSpan = post.find('span', class_ = 'tgme_widget_message_views') views = None if viewsSpan is None else _parse_num(viewsSpan.text) @@ -239,24 +214,20 @@ def get_items(self): return nextPageUrl = '' while True: + if soup.find("div", class_ = "tme_no_messages_found"): + break yield from self._soup_to_items(soup, r.url) - try: - if soup.find('a', attrs = {'class': 'tgme_widget_message_date'}, href = True)['href'].split('/')[-1] == '1': + dateElt = soup.find('a', attrs = {'class': 'tgme_widget_message_date'}, href = True) + if dateElt and 'href' in dateElt.attrs: + urlPieces = dateElt['href'].split('/') + if urlPieces and urlPieces[-1] == '1': # if message 1 is the first message in the page, terminate scraping break - except: - pass - pageLink = soup.find('a', attrs = {'class': 'tme_messages_more', 'data-before': True}) - if not pageLink: - # some pages are missing a "tme_messages_more" tag, causing early termination - if '=' not in nextPageUrl: - nextPageUrl = soup.find('link', attrs = {'rel': 'canonical'}, href = True)['href'] - nextPostIndex = int(nextPageUrl.split('=')[-1]) - 20 - if nextPostIndex > 20: - pageLink = {'href': nextPageUrl.split('=')[0] + f'={nextPostIndex}'} - else: - break - nextPageUrl = urllib.parse.urljoin(r.url, pageLink['href']) + if pageLink := soup.find('link', attrs = {'rel': 'prev'}, href = True): + nextPageUrl = urllib.parse.urljoin(r.url, pageLink['href']) + else: + nextPostIndex = int(soup.find('div', attrs = {'class': 'tgme_widget_message', 'data-post': True})["data-post"].split("/")[-1]) + nextPageUrl = urllib.parse.urljoin(r.url, r.url.split('?')[0] + f'?before={nextPostIndex}') r = self._get(nextPageUrl, headers = self._headers, responseOkCallback = _telegramResponseOkCallback) if r.status_code != 200: raise snscrape.base.ScraperException(f'Got status code {r.status_code}') @@ -333,4 +304,118 @@ def _telegramResponseOkCallback(r): if r.status_code == 200: return (True, None) return (False, f'{r.status_code=}') - \ No newline at end of file + +def _parse_voice_message(voicePlayer): + audioUrl = voicePlayer.find('audio')['src'] + durationStr = voicePlayer.find('time').text + duration = _durationStrToSeconds(durationStr) + barHeights = [float(s['style'].split(':')[-1].strip(';%')) for s in voicePlayer.find('div', {'class': 'bar'}).find_all('s')] + return VoiceMessage(url = audioUrl, duration = duration, bars = barHeights) + +def _parse_video_message(videoPlayer): + iTag = videoPlayer.find('i') + if iTag is None: + videoUrl = None + videoThumbnailUrl = None + else: + style = iTag['style'] + videoThumbnailUrl = _STYLE_MEDIA_URL_PATTERN.findall(style)[0] + videoTag = videoPlayer.find('video') + videoUrl = None if videoTag is None else videoTag['src'] + mKwargs = { + 'thumbnailUrl': videoThumbnailUrl, + 'url': videoUrl, + } + timeTag = videoPlayer.find('time') + if timeTag is None: + # Example of duration-less GIF: https://t.me/thisisatestchannel19451923/3 + cls = Gif + else: + cls = Video + durationStr = videoPlayer.find('time').text + mKwargs['duration'] = _durationStrToSeconds(durationStr) + return cls(**mKwargs) + + +class TestTelegramChannelScraper(unittest.TestCase): + """Run suite by directly running this file.""" + + @staticmethod + def execute_with_timeout(func, timeout=10): + """ + Executes a function in a separate thread and enforces a timeout. + If provided function throws an error, it's re-raised in main thread. + Used to detect infinite loops in finite time, works cross-platform. + + :param func: The function to execute. This function should accept no arguments. + :param timeout: The timeout in seconds. + """ + exceptions=[] + def func_passing_exceptions(): + try: + func() + except Exception as e: + exceptions.append((e.__class__, e, e.__traceback__)) + + thread = threading.Thread(target=func_passing_exceptions) + thread.start() + thread.join(timeout=timeout) + + if exceptions: + exc_class, exc_instance, traceback = exceptions[0] + raise exc_class(exc_instance).with_traceback(traceback) + + if thread.is_alive(): + raise TimeoutError(f"Function didn't complete within {timeout} seconds") + + def test_scraping_termination_missing_prev(self): + """Test scraping always terminates, even if the page's prev link is missing.""" + + def scrape_two_pages(): + scraper = TelegramChannelScraper('WLM_USA_TEXAS?before=3766') + items = list() + num_items_on_page = 20 + for item in scraper.get_items(): + items.append(item) + if len(items) > 2 * num_items_on_page: + break + + self.execute_with_timeout(scrape_two_pages) + + def test_scraping_termination_small_post_count(self): + """Test scraping always terminates, even with small number of posts. This channel's highest ID is 28.""" + + def scrape_small_channel(): + scraper = TelegramChannelScraper('AKCPB') + items = list(scraper.get_items()) + return items + + self.execute_with_timeout(scrape_small_channel) + + def test_scraping_termination_pages_without_posts(self): + """Test scraping gracefully handles pages without any posts.""" + + def scrape_empty_page(): + scraper = TelegramChannelScraper('BREAKDCODE?before=3') + for _ in scraper.get_items(): + pass + + self.execute_with_timeout(scrape_empty_page) + + def test_media_order_preservation(self): + """Test scraped media appears in the same order as in the post.""" + scraper = TelegramChannelScraper('nexta_live?before=43103') + item = next(scraper.get_items(), None) + self.assertIsNotNone(item, "Failed to scrape any posts.") + + # This particular post is known to include media [Video, Photo, Video] + self.assertEqual(item.url, "https://t.me/s/nexta_live/43102") + + expected_types = [Video, Photo, Video] + actual_types = [type(media) for media in item.media] if item.media else [] + + self.assertEqual(actual_types, expected_types, "Media did not appear in the expected order.") + + +if __name__ == '__main__': + unittest.main()