In [None]:
import pandas as pd

pendle_df = pd.read_json('./pendle.json')

In [None]:
pendle_df.head()

In [None]:
prediction_only_tweets = pendle_df.copy()

In [None]:
pendle_llm_analysis = pd.read_csv("./pendle_llm_analysis.csv")

In [None]:
pendle_llm_analysis.head()

In [None]:
prediction_only_tweets.head()

Fetching userName

In [None]:
import ast

# Function to extract username from dictionary
def extract_username(user_info):
    try:
        return user_info.get("userName", None)  # Extract the username
    except (ValueError, SyntaxError):
        return None

# Apply extraction function to the username column
prediction_only_tweets["username"] = prediction_only_tweets["author"].apply(extract_username)

# Display the cleaned data
prediction_only_tweets[["id","fullText","username"]].head()

In [None]:
tenx_df = pd.read_csv("10x_coins.csv")

tenx_df.head()

In [None]:
non_tenx_df = pd.read_csv("not_10x_coins.csv")

non_tenx_df.head()

In [None]:
import ahocorasick
import pandas as pd
import re
import jieba  # For Chinese text segmentation
import unicodedata

# Preprocess tenx_df and non_tenx_df: drop NaN values
tenx_df = tenx_df.dropna(subset=["name", "id", "symbol", "screen_name"])
non_tenx_df = non_tenx_df.dropna(subset=["name", "id", "symbol", "screen_name"])

# Function to build Aho-Corasick trie
def build_ac_trie(word_dict):
    """
    Builds an Aho-Corasick Trie (automaton) for fast multi-pattern string matching.

    The Aho-Corasick algorithm is an efficient method for searching multiple keywords 
    in a given text simultaneously. It constructs a Trie structure from a given dictionary 
    of keywords and then transforms it into an automaton that supports fast lookups.

    Steps:
    1. Insert each keyword from `word_dict` into the Trie.
    2. Convert the Trie into an Aho-Corasick automaton with failure links, allowing 
       efficient backtracking when mismatches occur.
    3. The resulting automaton enables linear-time matching of multiple words in a text.

    Parameters:
    word_dict (dict): A dictionary where keys are words (patterns to match) and values 
                      are associated identifiers.

    Returns:
    ahocorasick.Automaton: A compiled Aho-Corasick Trie ready for pattern matching.
    """
    trie = ahocorasick.Automaton()
    for key, value in word_dict.items():
        trie.add_word(key, value)
    trie.make_automaton()
    return trie

# Create mappings for tenx and non_tenx using 'name', 'symbol', and 'screen_name' columns
name_to_id_10x = {k.lower(): v.lower() for k, v in zip(tenx_df["name"], tenx_df["id"])}
symbol_to_id_10x = {k.lower(): v.lower() for k, v in zip(tenx_df["symbol"], tenx_df["id"])}
screen_name_to_id_10x = {"@" + k.lower(): v.lower() for k, v in zip(tenx_df["screen_name"], tenx_df["id"])}

name_to_id_non10x = {k.lower(): v.lower() for k, v in zip(non_tenx_df["name"], non_tenx_df["id"])}
symbol_to_id_non10x = {k.lower(): v.lower() for k, v in zip(non_tenx_df["symbol"], non_tenx_df["id"])}
screen_name_to_id_non10x = {"@" + k.lower(): v.lower() for k, v in zip(non_tenx_df["screen_name"], non_tenx_df["id"])}

# Merge both datasets' mappings
all_coin_mappings = {**name_to_id_10x, **symbol_to_id_10x, **screen_name_to_id_10x,
                      **name_to_id_non10x, **symbol_to_id_non10x, **screen_name_to_id_non10x}

# Common English words to filter out
common_words = {"about", "again", "all", "an", "and", "any", "are", "as", "at", "bad", "be", "big", "but", "by", "can", 
                "different", "do", "early", "every", "for", "from", "good", "has", "high", "how", "if", "in", "is", "it", 
                "just", "late", "like", "long", "low", "me", "more", "most", "much", "my", "new", "not", "now", "old", "on", 
                "one", "only", "or", "other", "out", "over", "own", "short", "so", "that", "the", "this", "to", "under", "up", 
                "way", "we", "well", "what", "when", "where", "why", "will", "with", "would", "you", "young", "your"}

# Filter out short and ambiguous names
filtered_mappings = {k: v for k, v in all_coin_mappings.items() if k not in common_words and len(k) > 2}

# Build Aho-Corasick trie
ac_trie = build_ac_trie(filtered_mappings)

# Define regex patterns
hashtag_pattern = re.compile(r'#([A-Za-z0-9]+)')
mention_pattern = re.compile(r'@([A-Za-z0-9_]+)')
dollar_pattern = re.compile(r'\$([A-Za-z0-9]+)')

# Function to detect non-English text
def contains_non_english(text):
    return any(unicodedata.category(char)[0] not in ('L', 'N') for char in text)

# Function to tokenize text
def tokenize_text(text):
    if contains_non_english(text):
        return jieba.lcut(text)  # Use jieba for Chinese/Japanese text
    else:
        return re.findall(r'\b[a-zA-Z0-9-]+\b', text)  # Normal word extraction

# Function to extract coin IDs from text
def extract_coin_ids(text):
    coin_ids = set()
    text_lower = text.lower()

    # Extract hashtags
    for match in hashtag_pattern.findall(text_lower):
        if match in filtered_mappings:
            coin_ids.add(filtered_mappings[match])

    # Extract mentions
    for match in mention_pattern.findall(text_lower):
        match_lower = "@" + match
        if match_lower in filtered_mappings:
            coin_ids.add(filtered_mappings[match_lower])

    # Extract tickers
    for match in dollar_pattern.findall(text_lower):
        if match in filtered_mappings:
            coin_ids.add(filtered_mappings[match])

    # Tokenize text based on language type
    words = tokenize_text(text_lower)

    # Use Aho-Corasick Trie to match words
    for word in words:
        if word in filtered_mappings:
            coin_ids.add(filtered_mappings[word])

    return list(coin_ids)

# Apply function to extract mentions
prediction_only_tweets["coin_mentions"] = prediction_only_tweets["fullText"].astype(str).apply(extract_coin_ids)

# Ensure proper filtering of truly empty coin_mentions
tweets_without_mentions = prediction_only_tweets[
    prediction_only_tweets["coin_mentions"].apply(lambda x: isinstance(x, list) and not x)
]

# Display sample results
tweets_without_mentions[["id", "fullText", "coin_mentions"]].reset_index()

In [None]:
prediction_only_tweets.iloc[5345]["fullText"]

In [None]:
prediction_only_tweets.iloc[5345]["coin_mentions"]

Calculating Prediction Ratio

In [None]:
# Convert timestamps to datetime objects
prediction_only_tweets['tweet_date'] = pd.to_datetime(prediction_only_tweets['createdAt'], errors='coerce')
tenx_df['all_time_high_date'] = pd.to_datetime(tenx_df['ath_date'], errors='coerce')

# Create a dictionary of coin name to ATH date for faster lookup
coin_ath_dict = dict(zip(tenx_df['name'].str.lower(), tenx_df['all_time_high_date']))

# Function to analyze predictions
def analyze_predictions(row):
    try:
        if isinstance(row['coin_mentions'], list):
            coins = row['coin_mentions']
        else:
            coins = ast.literal_eval(row['coin_mentions'])
        
        # Count total predictions
        total_predictions = len(coins)
        
        # Count successful predictions (coins mentioned before ATH)
        successful_predictions = sum(
            1 for coin in coins 
            if row['tweet_date'] < coin_ath_dict.get(coin.lower(), pd.NaT)
        )
        
        return pd.Series([total_predictions, successful_predictions])
    except Exception:
        return pd.Series([0, 0])

# Apply the function to create new columns
prediction_only_tweets[['total_predictions', 'successful_predictions']] = prediction_only_tweets.apply(analyze_predictions, axis=1)

prediction_only_tweets[['id', 'coin_mentions', 'total_predictions', 'successful_predictions']].head()

In [None]:
prediction_only_tweets[prediction_only_tweets["lang"] != "en"][["fullText","coin_mentions"]].head()

Successful 10x prediction ratio per userName

In [None]:
# Group by userName and calculate statistics
user_stats = prediction_only_tweets.groupby('username').agg({
    'total_predictions': 'sum',
    'successful_predictions': 'sum',
    'id': 'count'  # Count of tweets per user
}).reset_index()

# Calculate success ratio
user_stats['success_ratio'] = user_stats['successful_predictions'] * 2 / user_stats['total_predictions'] * 5
user_stats['successful_10x_predictions_ratio'] = user_stats['success_ratio'].fillna(0)  # Handle division by zero

# Rename columns for clarity
user_stats = user_stats.rename(columns={'id': 'tweet_count'})

# Sort by success ratio in descending order
user_stats = user_stats.sort_values('successful_10x_predictions_ratio', ascending=False)

user_stats[["username","successful_10x_predictions_ratio"]].head()

In [None]:
user_stats["successful_10x_predictions_ratio"].describe()

Finding the longest tweets streak by each handle and average time span in days

In [None]:
prediction_only_tweets["successful_predictions"].unique()

In [None]:
pred_tweets = prediction_only_tweets.copy()

df = pred_tweets

# Convert createdAt to datetime
df['createdAt'] = pd.to_datetime(df['createdAt'])

# Step 1: Group by username and sort by createdAt
df = df.sort_values(['username', 'createdAt'])

# Step 2: Identify streaks using cumsum
df['streak_id'] = df.groupby('username')['successful_predictions'].apply(lambda x: (x == 0).cumsum())

# Filter out zeros (breaking points)
streaks = df[df['successful_predictions'] != 0].groupby(['username', 'streak_id'])

# Calculate longest streak and average time span
streak_lengths = streaks['successful_predictions'].count().reset_index(name='streak_length')
max_streaks = streak_lengths.groupby('username')['streak_length'].max().reset_index(name='longest_tweets_streak')

time_spans = (streaks['createdAt'].max() - streaks['createdAt'].min()).dt.total_seconds() / (3600 * 24)
avg_time_spans = time_spans.groupby('username').mean().reset_index(name='avg_time_span_days')

# Merge results
result_df = pd.merge(max_streaks, avg_time_spans, on='username', how='left')

result_df.sort_values("longest_tweets_streak",ascending=False,inplace=True)

result_df.head()

Simplified Version of Streak IDs

In [None]:
import pandas as pd

# Copy the dataset
df = prediction_only_tweets.copy()

# Step 1: Convert 'createdAt' to datetime
df['createdAt'] = pd.to_datetime(df['createdAt'])

# Step 2: Sort by 'username' and 'createdAt'
df = df.sort_values(['username', 'createdAt']).reset_index(drop=True)

# Step 3: Calculate longest streak per username
longest_streaks = {}
avg_time_spans = {}

for username in df['username'].unique():
    user_df = df[df['username'] == username]
    
    longest_streak = 0
    current_streak = 0
    streak_start_time = None
    streak_durations = []

    for i in range(len(user_df)):
        row = user_df.iloc[i]
        
        if row['successful_predictions'] != 0:
            if current_streak == 0:
                streak_start_time = row['createdAt']
            current_streak += 1
        else:
            if current_streak > 0:
                longest_streak = max(longest_streak, current_streak)
                if streak_start_time is not None:
                    streak_durations.append((row['createdAt'] - streak_start_time).total_seconds() / (3600 * 24))
                current_streak = 0
                streak_start_time = None
    
    # Final check at end of loop
    if current_streak > 0:
        longest_streak = max(longest_streak, current_streak)
        if streak_start_time is not None:
            streak_durations.append((user_df.iloc[-1]['createdAt'] - streak_start_time).total_seconds() / (3600 * 24))
    
    longest_streaks[username] = longest_streak
    avg_time_spans[username] = sum(streak_durations) / len(streak_durations) if streak_durations else 0

# Convert results to DataFrame
streak_df = pd.DataFrame(list(longest_streaks.items()), columns=['username', 'longest_tweets_streak'])
time_span_df = pd.DataFrame(list(avg_time_spans.items()), columns=['username', 'avg_time_span_days'])

# Merge and sort final results
one_result_df = pd.merge(streak_df, time_span_df, on='username', how='left')
one_result_df.sort_values('longest_tweets_streak', ascending=False, inplace=True)

one_result_df.head()

Condition Part-1 (call_to_action is written as buy but no before_ath_coin is mentioned)

In [None]:
buy_tweets_analysis = pendle_llm_analysis.copy()

buy_tweets_analysis[ (buy_tweets_analysis["call_to_action"] == "buy") ].head()

In [None]:
buy_tweets_ids = buy_tweets_analysis["id"].to_list()

buy_tweets_df = prediction_only_tweets[prediction_only_tweets["id"].isin(buy_tweets_ids)]

In [None]:
buy_tweets_df["successful_predictions"].unique()

In [None]:
buy_tweets_df = buy_tweets_df[buy_tweets_df["successful_predictions"] == 0]

buy_tweets_df.head()

Condition Part-2 (tweet has a bullish signal, but no coin is mentioned)

In [None]:
bullish_tweets_analysis = pendle_llm_analysis.copy()

bullish_tweets_analysis[ (bullish_tweets_analysis["signal_classification"] == "bullish") ].head()

In [None]:
bullish_tweets_ids = bullish_tweets_analysis["id"].to_list()

bullish_tweets_df = prediction_only_tweets[prediction_only_tweets["id"].isin(bullish_tweets_ids)]

In [None]:
bullish_tweets_df = bullish_tweets_df[bullish_tweets_df["coin_mentions"].apply(lambda x: len(x) == 0)]
bullish_tweets_df.head()

Concatenating DataFrames from both condition parts (Part1+Part2)

In [None]:
len(bullish_tweets_df),len(buy_tweets_df)

In [None]:
incorrect_buy_df = pd.concat([buy_tweets_df, bullish_tweets_df])

incorrect_buy_df.head()

Calculating Prediction Success Rate

In [None]:
user_stats.sort_values("username",ascending=True,inplace=True)
result_df.sort_values("username",ascending=True,inplace=True)

In [None]:
missing_usernames = set(user_stats["username"]) - set(result_df["username"])
print(missing_usernames)

# Create a DataFrame with missing users and default values
missing_users_df = user_stats[user_stats["username"].isin(missing_usernames)].copy()
missing_users_df["longest_tweets_streak"] = 0
missing_users_df["avg_time_span_days"] = 0

# Append missing users to result_df
result_df = pd.concat([result_df, missing_users_df], ignore_index=True)

len(result_df["username"]),len(user_stats["username"])

Normalizing

In [None]:
# Avoid division by zero by checking if all values are the same
min_val = user_stats["successful_10x_predictions_ratio"].min()
max_val = user_stats["successful_10x_predictions_ratio"].max()

if min_val == max_val:
    user_stats["successful_10x_predictions_ratio_normalized"] = 1.0  # or 0.5, depending on preference
else:
    user_stats["successful_10x_predictions_ratio_normalized"] = (
        (user_stats["successful_10x_predictions_ratio"] - min_val) /
        (max_val - min_val)
    )

# Avoid division by zero by checking if all values are the same
min_val = result_df["longest_tweets_streak"].min()
max_val = result_df["longest_tweets_streak"].max()

if min_val == max_val:
    result_df["longest_tweets_streak_normalized"] = 1.0  # or 0.5, depending on preference
else:
    result_df["longest_tweets_streak_normalized"] = (
        (result_df["longest_tweets_streak"] - min_val) /
        (max_val - min_val)
    )

In [None]:
predicition_success = pd.DataFrame()

predicition_success["username"] = result_df["username"]
predicition_success["prediction_success_score"] = user_stats["successful_10x_predictions_ratio_normalized"] * 0.7 + result_df["longest_tweets_streak_normalized"] * 0.3

In [None]:
predicition_success["prediction_success_score"].describe()

False Prediction Rate

In [None]:
incorrect_buy_df["incorrect_buy_signal"] = incorrect_buy_df["successful_predictions"].apply(lambda x: 1 if x == 0 else 0)

incorrect_buy_df.head()

In [None]:
# Group by userName and calculate statistics
false_prediction_df = incorrect_buy_df.groupby('username').agg({
    'incorrect_buy_signal': 'sum',
    'id': 'count'  # Count of tweets per user
}).reset_index()

# Calculate success ratio
false_prediction_df['incorrect_buy_signal_inverse'] = 1 - (false_prediction_df['incorrect_buy_signal'] / false_prediction_df['id'])
false_prediction_df['incorrect_buy_signal_inverse'] = false_prediction_df['incorrect_buy_signal_inverse'].fillna(0)  # Handle division by zero

# Rename columns for clarity
false_prediction_df = false_prediction_df.rename(columns={'id': 'tweet_count'})

# Sort by success ratio in descending order
false_prediction_df = false_prediction_df.sort_values('incorrect_buy_signal_inverse', ascending=False)

false_prediction_df[["username","incorrect_buy_signal_inverse"]].head()

Adding missed users

In [None]:
len(false_prediction_df["username"]) , len(user_stats["username"])

In [None]:
# Find missing usernames
missing_usernames = set(user_stats["username"]) - set(false_prediction_df["username"])

# Create a DataFrame for missing users with default values
missing_users_df = pd.DataFrame({
    "username": list(missing_usernames),
    "incorrect_buy_signal": 0,
    "tweet_count": 0,
    "incorrect_buy_signal_inverse": 1  # Set inverse column to 1 for all missing rows
})

# Append missing users to false_prediction_df
false_prediction_df = pd.concat([false_prediction_df, missing_users_df], ignore_index=True)

# Ensure correct column order
false_prediction_df = false_prediction_df[["username", "incorrect_buy_signal", "tweet_count", "incorrect_buy_signal_inverse"]]

# Verify results
false_prediction_df.head()

In [None]:
len(false_prediction_df["username"]) , len(user_stats["username"])

Historical Prediction Accuracy

In [None]:
hist_score = pd.DataFrame()

hist_score["username"] = predicition_success["username"]
hist_score["prediction_success_rate"] = predicition_success["prediction_success_score"]
hist_score["false_prediction_rate"] = false_prediction_df["incorrect_buy_signal_inverse"]

hist_score["score"] = hist_score["prediction_success_rate"] * 0.7 + hist_score["false_prediction_rate"] * 0.3

In [None]:
hist_score["score"].describe()