In [2]:
from msedge.selenium_tools import Edge, EdgeOptions
from selenium.webdriver.common.keys import Keys
from time import sleep
import sys
import re
import pandas as pd
import datetime as dt
from datetime import timedelta
from hashlib import sha256
import os.path
from src import DATA_DIR

import warnings
warnings.filterwarnings("ignore")

In [9]:
class Twitter_scraper:
    def __init__(self):
        options = EdgeOptions()
        options.getchromium = True
        self.driver = Edge('/mnt/f/msedgedriver.exe', options=options)
        
        sleep(2)
        self.driver.get('https://twitter.com/explore')
        sleep(3)
    
    
    def get_tweet_data(self, search_query:dict, max_tweet=None):
        
        ''' 
        *** Extract tweets ***
        
                :param search_query: (dict) Gets search query. The dictionary can have following keys:
                    - include: (list) Words that must be in tweet text
                    - include_operator: ('AND', 'OR' - default='And')
                    - not_include: (list) Words that shouldn't be in tweet text
                    
                    - since: (str, samp = “2015-12-21”) Containing tweets that sent since a specific date
                    - until: (str, samp = “2022-12-21”) Containing tweets that sent until a specific date
                    - from: (list) Filters tweets that sent from specific Twitter accounts
                    - to: (list) Filters tweets that sent in reply to specific Twitter accounts
                    - mentioning: (list) Containing tweets that mentioning specific Twitter accounts
                    - question: ('True', 'False') If True, gets all tweet, that were recognized by twitter AI as a question
                
                :param max_tweet= (int) Ahen reachs to the number, running will stop
                
        '''
        exprot_file_name = '_'.join(search_query['include'])
        serach_text = ''
        
        if 'from' in search_query.keys():
            serach_text += 'from:' + ' OR from:'.join(search_query['from'])
            
        if 'to' in search_query.keys():
            serach_text += ' to:' + ' OR to:'.join(search_query['to'])
        
        if 'since' in search_query.keys():
            serach_text += f" since:{search_query['since']}"
            
        if 'until' in search_query.keys():
            serach_text += f" until:{search_query['until']}"
        
        if 'include_operator' in search_query.keys():
            if search_query['include_operator'] == 'OR':
                serach_text += ' ' + ' OR '.join(search_query['include'])
                
            else:
                serach_text += ' ' + ' '.join(search_query['include'])
        
        else: 
            serach_text = ' '.join(search_query['include'])
                                   
        if 'not_include' in search_query.keys():
            serach_text = ' -'.join([serach_text, *search_query['not_include']])
        
        if 'mentioning' in search_query.keys():
            serach_text = ' @'.join([serach_text, *search_query['mentioning']])
        
        
        sleep(6)
        search = self.driver.find_element('xpath', '//label[@data-testid="SearchBox_Search_Input_label"]')
        search.send_keys(Keys.CONTROL + "a")
        search.send_keys(Keys.DELETE)
        search.send_keys(serach_text)
        search.send_keys(Keys.RETURN)

        sleep(2)
        self.driver.find_element('link text', 'Latest').click()

        unique_tweet_ids = set()
        last_positon = self.driver.execute_script('return window.pageYOffset;')
        scrolling = True
        list_ = []
        progress = 1
        
        while scrolling:
            
            sleep(2)
            tweets = self.driver.find_elements('xpath', '//article[@data-testid="tweet"]')  

            for tweet in tweets[-min(24, len(tweets)-1):]:
                try:
                    reply_to, hashtag = str(), str()
                    text = tweet.find_element('xpath', './/div[@data-testid="tweetText"]').text

                    if 'Replying to \n@' in tweet.text:
                        reply_to = tweet.find_element('xpath', './/div[@class="css-901oao r-1bwzh9t r-37j5jr r-a023e6 r-16dba41 r-rjixqe r-bcqeeo r-qvutc0"]').text.split()
                        reply_to = [id_ for id_ in reply_to if id_[0] == '@']

                    if '#' in text:
                        words = text.split()
                        hashtag = [word for word in words if word[0] == '#']

                    reply = tweet.find_element('xpath', './/div[@data-testid="reply"]').text
                    retweet = tweet.find_element('xpath', './/div[@data-testid="retweet"]').text
                    like = tweet.find_element('xpath', './/div[@data-testid="like"]').text

                    header = tweet.find_element('xpath', './/div[@data-testid="User-Names"]')
                    _ = header.find_element('xpath', './/time').get_attribute('datetime')
                    datetime = dt.datetime.strptime(_, '%Y-%m-%dT%H:%M:%S.000Z') + timedelta(hours=4.5)

                    info = header.text.split('@')
                    username = info[0].strip('\n')
                    user_id = '@' + info[1].split()[0]

                    tweet_id = sha256("".join([user_id, str(datetime), text]).encode()).hexdigest()
                    
                    if tweet_id not in unique_tweet_ids:
                        sys.stdout.write('\r')
                        sys.stdout.write("[%-0s] %d" % ('', progress))
                        sys.stdout.flush()
                        progress += 1
                        
                        list_.append([tweet_id, username, user_id, datetime, text, reply_to, hashtag, reply, retweet, like])
                        unique_tweet_ids.add(tweet_id)
                        
                        if len(list_) >= 100 or progress > max_tweet:
                            df = pd.DataFrame(list_, columns=['Tweet_ID', 'Username',
                                                              'User_ID', 'DateTime',
                                                              'Text', 'Reply_to',
                                                              'Hashtag', 'Reply',
                                                              'Retweet', 'Like',
                                                             ])
                            
                            if not os.path.isfile(DATA_DIR / f'data/{exprot_file_name}.csv'):
                                df.to_csv(DATA_DIR / f'data/{exprot_file_name}.csv', encoding="utf-8-sig")
                                
                            else:
                                df.to_csv(DATA_DIR / f'data/{exprot_file_name}.csv', encoding="utf-8-sig", mode='a', header=False)
                                
                            list_ = []
                            
                        if progress > max_tweet:
                            scrolling = False
                            break
                            
                except:
                    pass
            
            # Scrolling
            scroll_attemp = 0
            
            while True:
                self.driver.execute_script('window.scrollTo(0, document.body.scrollHeight);')
                sleep(4)
                current_positon = self.driver.execute_script('return window.pageYOffset;')

                if current_positon == last_positon:
                    scroll_attemp += 1
                    
                    if scroll_attemp >= 4:
                        if len(list_) > 0:
                            df = pd.DataFrame(list_, columns=['Tweet_ID', 'Username',
                                                              'User_ID', 'DateTime',
                                                              'Text', 'Reply_to',
                                                              'Hashtag', 'Reply',
                                                              'Retweet', 'Like',
                                                             ])
                            if not os.path.isfile(DATA_DIR / f'data/{exprot_file_name}.csv'):
                                df.to_csv(DATA_DIR / f'data/{exprot_file_name}.csv', encoding="utf-8-sig")
                                
                            else:
                                df.to_csv(DATA_DIR / f'data/{exprot_file_name}.csv', encoding="utf-8-sig", mode='a', header=False)
                            
                        scrolling = False
                        break
                    
                    else:
                        sleep(3)
                
                else:
                    last_positon = current_positon
                    break

## Get Data

In [10]:
tweet = Twitter_scraper()

##### 01_simple

In [11]:
tweet.get_tweet_data(
                    search_query={'include': ['خلاقیت', 'بادکوبه'],
                                  'include_operator': 'AND',
                                  'until': '2022-07-06',                                  
                                 },
                    max_tweet=None
                    )

[] 12

##### 02_complete version

In [86]:
tweet.get_tweet_data(
                    search_query={'include': ['space', 'earth'],
                                  'include_operator': 'AND',
                                  'not_include': ['satellite'],
                                  'since': '2018-01-01',
                                  'until': '2022-01-01',
                                  'from': ['tonyandthesun'],
                                  'to': ['NASA', 'SpaceX'],
                                  'mentioning': ['SpaceX']
                                  
                    })

[] 1