In [1]:
# %pip install git+https://github.com/vladkens/twscrape.git
# %pip install langdetect twscrape parsel tqdm pandas
# !git clone https://github.com/AI4Finance-Foundation/FinNLP.git

Functions to clean the contents of the tweet

In [2]:
import re
import html
import pandas as pd
from difflib import SequenceMatcher

def clean_tweet_content(text):
    """Remove mentions, links, emojis, newlines, and specific search terms from tweet text."""
    text = html.unescape(text)
    text = re.sub(r'@\w+', '', text)  # Remove mentions
    text = re.sub(r'https?://\S+', '', text)  # Remove URLs
    text = re.sub(r'\n+', ' ', text)  # Replace newlines with space
    text = re.sub(r'#\w+', '', text)  # Remove hashtags
    text = re.sub(r'\$\w+', '', text)  # Remove cashtags

    # Remove emojis
    emoji_pattern = re.compile("["
                                u"\U0001F600-\U0001F64F"
                                u"\U0001F300-\U0001F5FF"
                                u"\U0001F680-\U0001F6FF"
                                u"\U0001F1E0-\U0001F1FF"
                                u"\U0001F700-\U0001F77F"
                                u"\U0001F780-\U0001F7FF"
                                u"\U0001F800-\U0001F8FF"
                                u"\U0001F900-\U0001F9FF"
                                u"\U0001FA00-\U0001FA6F"
                                u"\U0001FA70-\U0001FAFF"
                                u"\U00002702-\U000027B0"
                                u"\U000024C2-\U0001F251"
                                "]+", flags=re.UNICODE)
    text = emoji_pattern.sub(r'', text)  # Apply regex to remove emojis
    
    # Normalize spaces - replace two or more spaces with a single space
    text = re.sub(r'\s{2,}', ' ', text)

    return text.strip()

def is_spam(tweet, df):
    text = tweet['rawContent']
    # Define a list of promotional phrases to search for
    promotional_phrases = [
        "join us",
        "find out more",
        "Sign up now",
        "become a member",
        "Subscribe for updates",
        "Limited time offer",
        "Exclusive access",
        "Free trial",
        "Don't miss out",
        "Last chance",
        "Early bird discount",
        "Save big",
        "Register today",
        "Unlock benefits",
        "Special promotion",
        "Available for a limited time",
        "Claim your offer",
        "Act now",
        "Exclusive deals",
        "Get started",
        "Offer ends soon",
        "While supplies last",
        "Money-back guarantee",
        "Check out now",
        "Check out now...",
        "Check out now..",
        "Check out now.",
        "Check out",
        "Pay a visit",
        "Most profitable trading community.",
        "Join the",
        "Best trading community",
        "Free access",
        "trading community",
        "our ChatRoom",
        "join here",
        "Join our",
        "Check us out",
        "Daily ALERTS",
        "trading alerts",
        "Free chatroom",
        "Learn To Trade",
        "limited time",
        "CHATROOM",
        "We provide",
        "with us",
        "OUR FREE",
        "Learn how",
        'chat room',
        "Top analyst",
        "price target"
    ]
    
    promotional_words = [
        'trading',
        'community',
        'alerts'
    ]
    
    # Create a combined regular expression from the list of phrases
    regex_pattern = '|'.join(map(re.escape, promotional_phrases))
    # Search for any of the phrases in the text
    contains_promotion = re.search(regex_pattern, text, re.IGNORECASE) is not None
    
        # Create a combined regular expression from the list of phrases with optional spaces between words
    regex_pattern = '|'.join(['\\s*'.join(map(re.escape, phrase.split())) for phrase in promotional_words])
    # Find all occurrences of the phrases in the text and check if there are two or more
    contains_two_or_more_promotion_words = len(re.findall(regex_pattern, text, re.IGNORECASE)) >= 2
    
    # Regular expression to find cashtags (e.g., $AAPL)
    cashtag_pattern = r'\$[A-Za-z]+'
    # Find all occurrences of cashtags in the text
    cashtags = re.findall(cashtag_pattern, text)
    # Check if there are more than four cashtags
    contains_more_than_four_cashtags = len(cashtags) > 4

    # _________________________________________________________________________________
    
    # A simple heuristic: check if the text has at least three alphabetic characters
    is_too_short = len(re.findall("[a-zA-Z]", text)) < 5
    
    # ---------------------------------------------------------------------------------
    # Assuming `tweet` is a dict representing an incoming tweet for the spam check function
    user_id = tweet['user']['id']
    date_column = df.index.name
    timestamp = pd.to_datetime(tweet[date_column])

    # print(timestamp.tz)
    
    # Define the 24-hour lookback period
    start_time = timestamp - pd.Timedelta(days=1)
    # print(df.index)
    # print(start_time.tz)

    # # print(type(timestamp))
    # # print(type(start_time))
    # print(pd.to_datetime(df.index))

    # Filter for tweets by the same user in the past 24 hours
    recent_tweets = df[(df['user_id'] == user_id) & 
                   (pd.to_datetime(df.index, utc=True) > start_time) & 
                   (pd.to_datetime(df.index, utc=True) <= timestamp)]
    
    # If text is x percent similar then treat as duplicate.
    similar_texts = any(SequenceMatcher(None, text, rt).ratio() > 0.98 for rt in recent_tweets['rawContent'])
    
    # Check conditions
    if len(recent_tweets) > 15:
        tweet['reason'] = 'Spam detected due to excessive tweeting.'
        return True
    elif similar_texts:
        tweet['reason'] = 'Spam detected due to duplicate text.'
        return True
    
    return  contains_promotion or contains_two_or_more_promotion_words or contains_more_than_four_cashtags or is_too_short

Pyarrow will become a required dependency of pandas in the next major release of pandas (pandas 3.0),
(to allow more performant data types, such as the Arrow string type, and better interoperability with other libraries)
but was not found to be installed on your system.
If this would cause problems for you,
please provide us feedback at https://github.com/pandas-dev/pandas/issues/54466
        
  import pandas as pd


Get Tweets from Twitter

In [3]:
st_search_term = "Tesla OR TSLA"
ticker="TSLA"
tweets_limit=30

In [4]:
from twscrape import API, gather
from twscrape.logger import set_log_level
import json
import time
from langdetect import detect, LangDetectException
import re
import pytz
import datetime


# ####################################################################################
# Gather the tweets
# ####################################################################################
async def main():
    api = API()  # or API("path-to.db") - default is `accounts.db`

    # ADD ACCOUNTS (for CLI usage see BELOW)
    await api.pool.add_account("DarOho4050", "asdfghjkl1!", "we444465@gmail.com", "mail_pass1")
    await api.pool.login_all()

# search (latest tab)
    tweets = await gather(api.search(st_search_term, limit=tweets_limit))  # list[Tweet]
    # print(tweets)
    
    def user_to_dict(user):
        return {
            'id': user.id,
            'id_str': user.id_str,
            'url': user.url,
            'username': user.username,
            'displayname': user.displayname,
            'rawDescription': user.rawDescription,
            'created': user.created.isoformat() if user.created else None,            
        }
    
    def tweet_to_dict(tweet):
        return {
            'id': tweet.id,
            'date': tweet.date.isoformat(),
            'rawContent': tweet.rawContent,
            'cleanContent': clean_tweet_content(tweet.rawContent),
            'url': tweet.url,
            'user': user_to_dict(tweet.user),
            'lang': tweet.lang,
            'replyCount': tweet.replyCount,
            'retweetCount': tweet.retweetCount,
            'likeCount': tweet.likeCount,
            'quoteCount': tweet.quoteCount
        }

    # Convert each Tweet instance to a dictionary
    tweets_as_dicts = [tweet_to_dict(tweet) for tweet in tweets]
    # print(tweets_as_dicts)
    return tweets_as_dicts
    
def write_tweets_to_file(tweets, file_path):
    """Generic function to write tweets to a specified file with deduplication."""
    new_tweets_count = 0
    existing_ids = set()
    try:
        with open(file_path, 'r', encoding='utf-8') as file:
            existing_ids.update(json.loads(line)['id'] for line in file)
    except FileNotFoundError:
        pass  # It's okay if the file does not exist yet

    with open(file_path, 'a', encoding='utf-8') as file:
        for tweet in tweets:
            if tweet['id'] not in existing_ids:
                file.write(json.dumps(tweet, ensure_ascii=False) + '\n')
                new_tweets_count += 1
                existing_ids.add(tweet['id'])

    return new_tweets_count

def does_tweet_match_criteria(tweet_content, search_terms):
    """Check if tweet content matches the search criteria using regex for flexibility.
       Supports 'AND' and 'OR' logic in search terms."""
    # Split the search terms based on 'AND' and 'OR' logic
    or_groups = [group.strip() for group in search_terms.split('OR')]
    
    for group in or_groups:
        # For each 'OR' group, check if any of the 'AND' terms are all present
        and_terms = group.split('AND')
        if all(re.search(r'\b' + term.strip() + r'\b', tweet_content, re.IGNORECASE) for term in and_terms):
            return True  # If all 'AND' terms in any 'OR' group match, return True
    return False  # If none of the groups match, return False

def filter_tweets(tweets, search_term, language="en", df=[]):
    """Filter out Tweets that do not contain all of the provided search terms."""
    english_tweets = []
    filtered_tweets = []  # Tweets filtered out
    for tweet in tweets:
        if does_tweet_match_criteria(tweet['rawContent'], search_term):
            try:
                if detect(tweet['cleanContent']) == 'en' and not is_spam(tweet, df):
                    english_tweets.append(tweet)
                else:
                    filtered_tweets.append(tweet)
            except LangDetectException:
                # Assume English if detection fails
                filtered_tweets.append(tweet)
        else:
            filtered_tweets.append(tweet)
    return english_tweets, filtered_tweets

def append_df(array, df, unique_col='id'):
    array_df = pd.DataFrame(array)
    updated_df = pd.concat([df.reset_index(), array_df]).drop_duplicates(subset=[unique_col], keep='first')
    
    new_tweets_count = len(updated_df) - len(df)
    return new_tweets_count, updated_df

async def main_tweets(df):
    tweets = await main()  # Assume this fetches a list of tweet dictionaries
    english_tweets, filtered_tweets = filter_tweets(tweets, st_search_term, df=df)

    new_english_count, df = append_df(english_tweets, df, unique_col='id')
    df.to_json(f'{ticker}_tweets.jsonl', orient='records', lines=True, date_format='iso')
    
    # new_english_count = write_tweets_to_file(english_tweets, f'{ticker}_tweets.jsonl')
    filtered_count = write_tweets_to_file(filtered_tweets, f'trash_{ticker}_tweets.jsonl')
    
    total_processed = len(english_tweets) + len(filtered_tweets)
    
    if total_processed > 0:
        english_percentage = (new_english_count / total_processed) * 100
        filtered_percentage = (filtered_count / total_processed) * 100
        print(f"Processed {total_processed} tweets. New Tweets/New Percentage: {new_english_count + filtered_count}:{(english_percentage + filtered_percentage):.2f}% New English: {english_percentage:.2f}%. Filtered: {filtered_percentage:.2f}%.")
    else: 
        print("No tweets to process.")
    
    return {"new_tweets": new_english_count + filtered_count, "total_tweets": total_processed}

Get Tweets from StockTwits

In [5]:
# Stocktwits
import sys
import os

sys.path.append(f'{os.getcwd()}\FinNLP')
from finnlp.data_sources.social_media.stocktwits_streaming import Stocktwits_Streaming

st_search_term = f"{ticker}"
file_path = f'./{ticker}_stocktwits.jsonl'


pages = 1
config = {
    "max_retry": 5
}

async def main_stocktwits(df):
    downloader = Stocktwits_Streaming(config)
    downloader.download_streaming_stock(ticker, pages)

    stocktweets = downloader.dataframe
    stocktweets = stocktweets.rename(columns={'body': 'rawContent'})
    stocktweets = stocktweets.to_dict('records')
    # print(stocktweets)

    stocktweets = [{**tweet, 'cleanContent': clean_tweet_content(tweet['rawContent'])} for tweet in stocktweets]

    english_tweets, filtered_tweets = filter_tweets(stocktweets, st_search_term, df=df)
    

    new_english_count, df = append_df(english_tweets, df, unique_col='id')
    df.to_json(f'{ticker}_stocktweets.jsonl', orient='records', lines=True, date_format='iso')
    
    # new_english_count = write_tweets_to_file(english_tweets, f'{ticker}_stocktweets.jsonl')
    filtered_count = write_tweets_to_file(filtered_tweets, f'trash_{ticker}_stocktweets.jsonl')

    total_processed = len(english_tweets) + len(filtered_tweets)

    if total_processed > 0:
        english_percentage = (new_english_count / total_processed) * 100
        filtered_percentage = (filtered_count / total_processed) * 100
        print(f"Processed {total_processed} stocktwits. New Tweets/New Percentage: {new_english_count + filtered_count}:{(english_percentage + filtered_percentage):.2f}% New English: {english_percentage:.2f}%. Filtered: {filtered_percentage:.2f}%.")
    else: 
        print("No tweets to process.")
        
    return {"new_tweets": new_english_count + filtered_count, "total_tweets": total_processed}



Run both at the same time (indefinity every 60 seconds)

In [6]:
from datetime import datetime
import pandas  as pd

async def save_to_jsonl(data, file_path='tweets_stats.jsonl'):
    with open(file_path, 'a', encoding='utf-8') as file:
        file.write(json.dumps(data) + '\n')

# Load tweets and convert the date column to datetime
tweets_df = pd.read_json(f'{ticker}_tweets.jsonl', lines=True)
date_column = 'created_at' if 'created_at' in tweets_df.columns else 'date'
tweets_df.set_index(date_column, inplace=True)
tweets_df['user_id'] = tweets_df['user'].apply(lambda x: x['id'] if 'id' in x else None)

# Repeat for stocktweets, if processing separately
stocktweets_df = pd.read_json(f'{ticker}_stocktweets.jsonl', lines=True)
date_column = 'created_at' if 'created_at' in stocktweets_df.columns else 'date'
stocktweets_df.set_index(date_column, inplace=True)
stocktweets_df['user_id'] = stocktweets_df['user'].apply(lambda x: x['id'] if 'id' in x else None)

while True:
    tweet_data = await main_tweets(tweets_df)
    stocktwit_data = await main_stocktwits(stocktweets_df)
    
    combined_data = {
        "twitter": tweet_data,
        "stocktwits": stocktwit_data,
        "timestamp": datetime.now(pytz.utc).strftime("%Y-%m-%d %H:%M:%S %Z")
    }
    
    await save_to_jsonl(combined_data)
    
    time.sleep(60)
    time.sleep(10)
    time.sleep(10)  
    time.sleep(10)
    time.sleep(10)
    time.sleep(10)



Processed 20 tweets. New Tweets/New Percentage: 15:75.00% New English: 20.00%. Filtered: 55.00%.


100%|██████████| 1/1 [00:01<00:00,  1.33s/it]


Processed 30 stocktwits. New Tweets/New Percentage: 22:73.33% New English: 0.00%. Filtered: 73.33%.




Processed 17 tweets. New Tweets/New Percentage: 8:47.06% New English: 11.76%. Filtered: 35.29%.


100%|██████████| 1/1 [00:01<00:00,  1.45s/it]


Processed 30 stocktwits. New Tweets/New Percentage: 0:0.00% New English: 0.00%. Filtered: 0.00%.
