In [None]:

from telethon import TelegramClient
from telethon.tl.types import MessageMediaPhoto, MessageMediaDocument, MessageMediaWebPage
import pandas as pd
import re
import os

# ------------------------------
# SETUP TELETHON CLIENT
# ------------------------------
api_id =     # your API ID
api_hash = "API_HASH"

client = TelegramClient("sebi_scraper_session", api_id, api_hash)

# ------------------------------
# SCRAPER FUNCTION
# ------------------------------
async def scrape_group_messages(group_username, limit=200):
    messages_data = []
    entity = await client.get_entity(group_username)

    async for msg in client.iter_messages(entity, limit=limit):
        sender = await msg.get_sender()
        
        # Detect media type
        if msg.media:
            if isinstance(msg.media, MessageMediaPhoto):
                media_type = "photo"
            elif isinstance(msg.media, MessageMediaDocument):
                media_type = "document"
            elif isinstance(msg.media, MessageMediaWebPage):
                media_type = "webpage"
            else:
                media_type = "other"
        else:
            media_type = None

        has_hyperlink = bool(re.search(r"http[s]?://", msg.text or ""))

        messages_data.append({
            "message_id": msg.id,
            "date": msg.date,
            "chat_id": entity.id,
            "sender_id": sender.id if sender else None,
            "sender_username": getattr(sender, "username", None),
            "sender_first_name": getattr(sender, "first_name", None),
            "sender_last_name": getattr(sender, "last_name", None),
            "sender_phone": getattr(sender, "phone", None),
            "text": msg.text,
            "views": getattr(msg, "views", None),
            "forwards": getattr(msg, "forwards", None),
            "reply_to_msg_id": getattr(msg, "reply_to_msg_id", None),
            "media_type": media_type,
            "has_hyperlink": has_hyperlink,
        })

    return pd.DataFrame(messages_data)

# ------------------------------
# MAIN SCRAPER LOOP
# ------------------------------
async def main():
    group_usernames = [
        "abhayvarn", "eqwires", "VGSTOCKRESEARCH", "EverydayProfits", "StockPro_Online",
        "ChaseAlpha", "Equity99", "SharesNservices", "DeltaTrading1", "PatelWealth",
        "STOCKGAINERSS", "PATELWEALTH", "Intradat", "GREEN_TRADERS_SEBI", "AngelOneAdvisory",
        "GapUp", "Centrum", "MotilalOswal", "LIVELONGWEALTH", "StockPhoenix",
        "Vision_Optiontrading", "OptionsGurukul", "DarkHorseOfStockMarket",
        "IntradayMatchSebiRegistered", "Sharekhan_Official", "CAJagdeesh",
        "TheOriginalBull", "TheFinberg", "PowerOfStocks", "LiveTradingTricks"
    ]

    all_data = []
    for group in group_usernames:
        try:
            df_group = await scrape_group_messages(group, limit=200)
            df_group["group"] = group
            all_data.append(df_group)
            print(f"✅ Scraped {group}, messages: {len(df_group)}")
        except Exception as e:
            print(f"❌ Failed to fetch {group}: {e}")

    if all_data:
        df_all = pd.concat(all_data, ignore_index=True)

        # Save path
        save_path = r"D:\Darryl\Coding\s_p\data\raw"
        os.makedirs(save_path, exist_ok=True)
        csv_file = os.path.join(save_path, "sebi_groups_messages.csv")

        df_all.to_csv(csv_file, index=False)
        print(f"💾 Saved to {csv_file}")

        return df_all
    else:
        return pd.DataFrame()

# ------------------------------
# RUN
# ------------------------------
await client.start()
df_all = await main()
df_all.head()


✅ Scraped abhayvarn, messages: 200
✅ Scraped eqwires, messages: 200
✅ Scraped VGSTOCKRESEARCH, messages: 200
✅ Scraped EverydayProfits, messages: 200
✅ Scraped StockPro_Online, messages: 200
✅ Scraped ChaseAlpha, messages: 200
✅ Scraped Equity99, messages: 200
✅ Scraped SharesNservices, messages: 200
✅ Scraped DeltaTrading1, messages: 200
✅ Scraped PatelWealth, messages: 200
✅ Scraped STOCKGAINERSS, messages: 200
✅ Scraped PATELWEALTH, messages: 200
✅ Scraped Intradat, messages: 200
✅ Scraped GREEN_TRADERS_SEBI, messages: 200
✅ Scraped AngelOneAdvisory, messages: 200
✅ Scraped GapUp, messages: 200
✅ Scraped Centrum, messages: 0
✅ Scraped MotilalOswal, messages: 38
✅ Scraped LIVELONGWEALTH, messages: 200
✅ Scraped StockPhoenix, messages: 200
✅ Scraped Vision_Optiontrading, messages: 200
✅ Scraped OptionsGurukul, messages: 4
✅ Scraped DarkHorseOfStockMarket, messages: 200
✅ Scraped IntradayMatchSebiRegistered, messages: 200
✅ Scraped Sharekhan_Official, messages: 38
✅ Scraped CAJagdeesh,

  df_all = pd.concat(all_data, ignore_index=True)


Unnamed: 0,message_id,date,chat_id,sender_id,sender_username,sender_first_name,sender_last_name,sender_phone,text,views,forwards,reply_to_msg_id,media_type,has_hyperlink,group
0,19552.0,2025-09-02 04:54:46+00:00,1246175000.0,1246175000.0,abhayvarn,,,,47+ 🥳😍💹🤩😍\nFirst Big Target done 🥳😍💹\nMore tha...,3959.0,0.0,19550.0,,False,abhayvarn
1,19551.0,2025-09-02 04:31:22+00:00,1246175000.0,1246175000.0,abhayvarn,,,,33+ 🥳😍💹🤩😍\nMore than 40% Returns coming now ✌️...,4406.0,0.0,19550.0,,False,abhayvarn
2,19550.0,2025-09-02 04:24:35+00:00,1246175000.0,1246175000.0,abhayvarn,,,,Hero Zero \n\nBuy Nifty 24750 CE @ 23 - 28\n\n...,4656.0,3.0,,,False,abhayvarn
3,19548.0,2025-09-02 04:07:17+00:00,1246175000.0,1246175000.0,abhayvarn,,,,https://www.youtube.com/live/SUodqXwmxMs?si=Ho...,4830.0,0.0,,webpage,True,abhayvarn
4,19547.0,2025-09-02 02:27:26+00:00,1246175000.0,1246175000.0,abhayvarn,,,,Good Newzz before Nifty Weekly Expiry 🤩💥🔥\n\nO...,4974.0,0.0,,webpage,True,abhayvarn


Server closed the connection: [WinError 10054] An existing connection was forcibly closed by the remote host
Error executing high-level request after reconnect: <class 'sqlite3.OperationalError'>: database is locked


In [15]:
# Paths
INPUT_PATH = r"D:\Darryl\Coding\s_p\data\raw\sebi_groups_messages.csv"
OUTPUT_CSV = r"D:\Darryl\Coding\s_p\data\processed\sebi_groups_messages_preprocessed.csv"


In [16]:
# Imports
import re, html, unicodedata as ud
from datetime import datetime
from zoneinfo import ZoneInfo

import pandas as pd
import numpy as np

# Display options
pd.set_option("display.max_colwidth", 200)
pd.set_option("display.width", 160)


In [17]:
# ---- CSV loader with encoding fallbacks ----
def load_csv_safely(path):
    encodings = ["utf-8", "utf-8-sig", "latin1"]
    errors = []
    for enc in encodings:
        try:
            return pd.read_csv(path, encoding=enc, low_memory=False)
        except Exception as e:
            errors.append((enc, str(e)))
    try:
        return pd.read_csv(path, engine="python", low_memory=False)
    except Exception as e:
        raise RuntimeError(f"Failed to read CSV with tried encodings: {errors} and python engine: {e}")

df = load_csv_safely(INPUT_PATH)
print(df.shape)
df.head(3)

(5080, 15)


Unnamed: 0,message_id,date,chat_id,sender_id,sender_username,sender_first_name,sender_last_name,sender_phone,text,views,forwards,reply_to_msg_id,media_type,has_hyperlink,group
0,19552.0,2025-09-02 04:54:46+00:00,1246175000.0,1246175000.0,abhayvarn,,,,47+ 🥳😍💹🤩😍\nFirst Big Target done 🥳😍💹\nMore than 90% Returns coming now ✌️🥳💥🔥\nStart Booking profits now and Trail SL STRICTLY ✌️🤩💥,3959.0,0.0,19550.0,,False,abhayvarn
1,19551.0,2025-09-02 04:31:22+00:00,1246175000.0,1246175000.0,abhayvarn,,,,33+ 🥳😍💹🤩😍\nMore than 40% Returns coming now ✌️🥳💥🔥\nStart Booking profits now and Trail SL STRICTLY ✌️🤩💥,4406.0,0.0,19550.0,,False,abhayvarn
2,19550.0,2025-09-02 04:24:35+00:00,1246175000.0,1246175000.0,abhayvarn,,,,Hero Zero \n\nBuy Nifty 24750 CE @ 23 - 28\n\nTARGET - 48 - 68 - 84 - 98\n\nSTOP LOSS - 0,4656.0,3.0,,,False,abhayvarn


In [18]:
# ---- Guess likely column names ----
def choose_col(df, candidates):
    for c in candidates:
        if c in df.columns:
            return c
    return None

text_col = choose_col(df, ["text", "message", "content", "body"])
date_col = choose_col(df, ["date", "datetime", "timestamp", "time"])
sender_col = choose_col(df, ["from", "from_name", "sender", "author", "user", "username", "name"])
message_id_col = choose_col(df, ["id", "message_id", "msg_id"])
reply_to_col = choose_col(df, ["reply_to_message_id", "reply_to", "in_reply_to"])
forward_from_col = choose_col(df, ["forwarded_from", "forward_from", "fwd_from"])
views_col = choose_col(df, ["views", "view_count"])
reactions_col = choose_col(df, ["reactions", "reaction_count"])
channel_col = choose_col(df, ["chat", "chat_name", "channel", "group", "group_name", "channel_title"])

guessed = {
    "text_col": text_col,
    "date_col": date_col,
    "sender_col": sender_col,
    "message_id_col": message_id_col,
    "reply_to_col": reply_to_col,
    "forward_from_col": forward_from_col,
    "views_col": views_col,
    "reactions_col": reactions_col,
    "channel_col": channel_col,
}
guessed

{'text_col': 'text',
 'date_col': 'date',
 'sender_col': None,
 'message_id_col': 'message_id',
 'reply_to_col': None,
 'forward_from_col': None,
 'views_col': 'views',
 'reactions_col': None,
 'channel_col': 'group'}

In [19]:
# ---- Regexes & utils ----
URL_RE = re.compile(r"(https?://\S+|www\.\S+)", flags=re.IGNORECASE)
MENTION_RE = re.compile(r"@[\w_]{3,50}")
HASHTAG_RE = re.compile(r"(?:^|\s)#(\w{2,})")
CASHTAG_RE = re.compile(r"(?:^|\s)\$([A-Za-z]{1,10})")
EMAIL_RE = re.compile(r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}\b")
PHONE_RE = re.compile(r"(?:\+?\d{1,3}[-.\s]?)?(?:\(?\d{2,4}\)?[-.\s]?){2,4}\d{3,4}")
EMOJI_RE = re.compile("["
    "\U0001F1E0-\U0001F1FF"
    "\U0001F300-\U0001F5FF"
    "\U0001F600-\U0001F64F"
    "\U0001F680-\U0001F6FF"
    "\U0001F700-\U0001F77F"
    "\U0001F780-\U0001F7FF"
    "\U0001F800-\U0001F8FF"
    "\U0001F900-\U0001F9FF"
    "\U0001FA00-\U0001FA6F"
    "\U0001FA70-\U0001FAFF"
    "\u2700-\u27BF"
    "\u2600-\u26FF"
"]+", flags=re.UNICODE)

ZW_RE = re.compile(r"[\u200B-\u200D\uFEFF]")
CTRL_RE = re.compile(r"[\x00-\x08\x0B\x0C\x0E-\x1F]")

def to_nfc(s: str) -> str:
    return ud.normalize("NFC", s)

def normalize_text(s: str) -> str:
    if not isinstance(s, str):
        s = "" if pd.isna(s) else str(s)
    s = html.unescape(s)
    s = to_nfc(s)
    s = s.replace("\r\n", "\n").replace("\r", "\n")
    s = ZW_RE.sub("", s)
    s = CTRL_RE.sub("", s)
    s = re.sub(r"\s+", " ", s)
    return s.strip()

def extract_list(pattern, s: str):
    if not isinstance(s, str):
        return []
    return [m.group(0) for m in pattern.finditer(s)]

def extract_group(pattern, s: str, group_idx=1):
    if not isinstance(s, str):
        return []
    return [m.group(group_idx) for m in pattern.finditer(s)]

SYSTEM_LIKE = re.compile(
    r"^(joined the group|left the group|pinned a message|changed the group|added .* to the group)",
    flags=re.IGNORECASE,
)

def looks_like_system_msg(s: str) -> bool:
    if not isinstance(s, str):
        return False
    return bool(SYSTEM_LIKE.search(s.strip()))

def tokenise_basic(s: str) -> list:
    if not isinstance(s, str):
        return []
    return re.findall(r"[^\W_][\w’'-]*", s, flags=re.UNICODE)

def clean_for_nlp(s: str, remove_urls=True, remove_emojis=True):
    s = normalize_text(s)
    if remove_urls:
        s = URL_RE.sub(" ", s)
    if remove_emojis:
        s = EMOJI_RE.sub(" ", s)
    s = re.sub(r"[^\w\s@#\$’'-]", " ", s, flags=re.UNICODE)
    s = re.sub(r"\s+", " ", s).strip()
    return s

def safe_to_datetime(series, assumed_tz="UTC", target_tz="Asia/Kolkata"):
    dt = pd.to_datetime(series, errors="coerce", utc=True)
    try:
        dt = dt.dt.tz_convert(ZoneInfo(target_tz))
    except Exception:
        pass
    return dt


In [20]:
# ---- Apply preprocessing ----
df_proc = df.copy()

if text_col is not None:
    df_proc["text_raw"] = df_proc[text_col].astype(str)
    df_proc["text_norm"] = df_proc["text_raw"].map(normalize_text)
    df_proc["text_clean"] = df_proc["text_norm"].map(lambda s: clean_for_nlp(s, remove_urls=False, remove_emojis=False))
    df_proc["urls"] = df_proc["text_norm"].map(lambda s: extract_list(URL_RE, s))
    df_proc["mentions"] = df_proc["text_norm"].map(lambda s: extract_list(MENTION_RE, s))
    df_proc["hashtags"] = df_proc["text_norm"].map(lambda s: extract_group(HASHTAG_RE, s))
    df_proc["cashtags"] = df_proc["text_norm"].map(lambda s: extract_group(CASHTAG_RE, s))
    df_proc["emails"] = df_proc["text_norm"].map(lambda s: extract_list(EMAIL_RE, s))
    df_proc["phones_maybe"] = df_proc["text_norm"].map(lambda s: extract_list(PHONE_RE, s))
    df_proc["emojis"] = df_proc["text_norm"].map(lambda s: extract_list(EMOJI_RE, s))

    df_proc["is_system_like"] = df_proc["text_norm"].map(looks_like_system_msg)
    df_proc["char_count"] = df_proc["text_norm"].map(lambda s: len(s) if isinstance(s, str) else 0)
    df_proc["word_count"] = df_proc["text_norm"].map(lambda s: len(tokenise_basic(s)))
    df_proc["url_count"] = df_proc["urls"].map(len)
    df_proc["mention_count"] = df_proc["mentions"].map(len)
    df_proc["hashtag_count"] = df_proc["hashtags"].map(len)
    df_proc["cashtag_count"] = df_proc["cashtags"].map(len)
    df_proc["email_count"] = df_proc["emails"].map(len)
    df_proc["emoji_count"] = df_proc["emojis"].map(len)

if sender_col is not None:
    df_proc["sender_raw"] = df_proc[sender_col].astype(str)
    df_proc["sender_norm"] = df_proc["sender_raw"].map(normalize_text).replace({"nan": np.nan})

if message_id_col is not None:
    df_proc["message_id_norm"] = df_proc[message_id_col]

if reply_to_col is not None:
    df_proc["reply_to_norm"] = df_proc[reply_to_col]

if forward_from_col is not None:
    df_proc["forward_from_norm"] = df_proc[forward_from_col].astype(str).replace({"nan": np.nan})

if channel_col is not None:
    df_proc["channel_norm"] = df_proc[channel_col].astype(str).map(normalize_text).replace({"nan": np.nan})

if views_col is not None and views_col in df_proc.columns:
    df_proc["views_norm"] = pd.to_numeric(df_proc[views_col], errors="coerce")

if reactions_col is not None and reactions_col in df_proc.columns:
    df_proc["reactions_norm"] = pd.to_numeric(df_proc[reactions_col], errors="coerce")

if date_col is not None:
    df_proc["date_parsed_ist"] = safe_to_datetime(df_proc[date_col], target_tz="Asia/Kolkata")
    dt = df_proc["date_parsed_ist"]
    df_proc["date"] = dt.dt.date
    df_proc["time"] = dt.dt.time
    df_proc["hour"] = dt.dt.hour
    df_proc["dow"] = dt.dt.dayofweek
    df_proc["month"] = dt.dt.month
    df_proc["year"] = dt.dt.year

# De-duplication
dedupe_keys = [c for c in ["channel_norm", "sender_norm", "text_norm", "date_parsed_ist"] if c in df_proc.columns]
if dedupe_keys:
    before = len(df_proc)
    df_proc = df_proc.drop_duplicates(subset=dedupe_keys, keep="first").reset_index(drop=True)
    print(f"Dropped duplicates: {before - len(df_proc)} based on keys: {dedupe_keys}")

df_proc.to_csv(OUTPUT_CSV, index=False)
df_proc.head(5)

Dropped duplicates: 296 based on keys: ['channel_norm', 'text_norm', 'date_parsed_ist']


Unnamed: 0,message_id,date,chat_id,sender_id,sender_username,sender_first_name,sender_last_name,sender_phone,text,views,...,emoji_count,message_id_norm,channel_norm,views_norm,date_parsed_ist,time,hour,dow,month,year
0,19552.0,2025-09-02,1246175000.0,1246175000.0,abhayvarn,,,,47+ 🥳😍💹🤩😍\nFirst Big Target done 🥳😍💹\nMore than 90% Returns coming now ✌️🥳💥🔥\nStart Booking profits now and Trail SL STRICTLY ✌️🤩💥,3959.0,...,6,19552.0,abhayvarn,3959.0,2025-09-02 10:24:46+05:30,10:24:46,10,1,9,2025
1,19551.0,2025-09-02,1246175000.0,1246175000.0,abhayvarn,,,,33+ 🥳😍💹🤩😍\nMore than 40% Returns coming now ✌️🥳💥🔥\nStart Booking profits now and Trail SL STRICTLY ✌️🤩💥,4406.0,...,5,19551.0,abhayvarn,4406.0,2025-09-02 10:01:22+05:30,10:01:22,10,1,9,2025
2,19550.0,2025-09-02,1246175000.0,1246175000.0,abhayvarn,,,,Hero Zero \n\nBuy Nifty 24750 CE @ 23 - 28\n\nTARGET - 48 - 68 - 84 - 98\n\nSTOP LOSS - 0,4656.0,...,0,19550.0,abhayvarn,4656.0,2025-09-02 09:54:35+05:30,09:54:35,9,1,9,2025
3,19548.0,2025-09-02,1246175000.0,1246175000.0,abhayvarn,,,,https://www.youtube.com/live/SUodqXwmxMs?si=Ho4TiQ9RXBsdxxgN,4830.0,...,0,19548.0,abhayvarn,4830.0,2025-09-02 09:37:17+05:30,09:37:17,9,1,9,2025
4,19547.0,2025-09-02,1246175000.0,1246175000.0,abhayvarn,,,,Good Newzz before Nifty Weekly Expiry 🤩💥🔥\n\nOption Trading Premium Channel Joining Link is active for 10 Members to get the feedback ✅️😉\n\nhttps://tinyurl.com/22usq7jn,4974.0,...,3,19547.0,abhayvarn,4974.0,2025-09-02 07:57:26+05:30,07:57:26,7,1,9,2025


In [21]:
# === Paths ===
INPUT_PATH = r"D:\Darryl\Coding\s_p\data\raw\sebi_groups_messages.csv"
OUTPUT_CSV = r"D:\Darryl\Coding\s_p\data\processed\sebi_groups_messages_preprocessed_final.csv"
PROMO_SAMPLE_CSV = r"D:\Darryl\Coding\s_p\data\processed\promo_sample_for_labeling.csv"

# === Imports ===
import re, html, unicodedata as ud
from datetime import datetime
from zoneinfo import ZoneInfo
import pandas as pd
import numpy as np

# sklearn only needed if fuzzy dedupe is enabled
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import linear_kernel

# Display options
pd.set_option("display.max_colwidth", 200)
pd.set_option("display.width", 160)

# ---- CSV loader with encoding fallbacks ----
def load_csv_safely(path):
    encodings = ["utf-8", "utf-8-sig", "latin1"]
    errors = []
    for enc in encodings:
        try:
            return pd.read_csv(path, encoding=enc, low_memory=False)
        except Exception as e:
            errors.append((enc, str(e)))
    try:
        return pd.read_csv(path, engine="python", low_memory=False)
    except Exception as e:
        raise RuntimeError(f"Failed to read CSV with tried encodings: {errors} and python engine: {e}")

df = load_csv_safely(INPUT_PATH)
print("Loaded shape:", df.shape)

# ---- Guess likely column names ----
def choose_col(df, candidates):
    for c in candidates:
        if c in df.columns:
            return c
    return None

text_col = choose_col(df, ["text", "message", "content", "body"])
date_col = choose_col(df, ["date", "datetime", "timestamp", "time"])
sender_col = choose_col(df, ["from", "from_name", "sender", "author", "user", "username", "name"])
message_id_col = choose_col(df, ["id", "message_id", "msg_id"])
reply_to_col = choose_col(df, ["reply_to_message_id", "reply_to", "in_reply_to"])
forward_from_col = choose_col(df, ["forwarded_from", "forward_from", "fwd_from"])
views_col = choose_col(df, ["views", "view_count"])
reactions_col = choose_col(df, ["reactions", "reaction_count"])
channel_col = choose_col(df, ["chat", "chat_name", "channel", "group", "group_name", "channel_title"])

# ---- Regexes & utils ----
URL_RE = re.compile(r"(https?://\S+|www\.\S+)", flags=re.IGNORECASE)
MENTION_RE = re.compile(r"@[\w_]{3,50}")
HASHTAG_RE = re.compile(r"(?:^|\s)#(\w{2,})")
CASHTAG_RE = re.compile(r"(?:^|\s)\$([A-Za-z]{1,10})")
EMAIL_RE = re.compile(r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}\b")
PHONE_RE = re.compile(r"(?:\+?\d{1,3}[-.\s]?)?(?:\(?\d{2,4}\)?[-.\s]?){2,4}\d{3,4}")
EMOJI_RE = re.compile("["
    "\U0001F1E0-\U0001F1FF"
    "\U0001F300-\U0001F5FF"
    "\U0001F600-\U0001F64F"
    "\U0001F680-\U0001F6FF"
    "\U0001F700-\U0001F77F"
    "\U0001F780-\U0001F7FF"
    "\U0001F800-\U0001F8FF"
    "\U0001F900-\U0001F9FF"
    "\U0001FA00-\U0001FA6F"
    "\U0001FA70-\U0001FAFF"
    "\u2700-\u27BF"
    "\u2600-\u26FF"
"]+", flags=re.UNICODE)

ZW_RE = re.compile(r"[\u200B-\u200D\uFEFF]")
CTRL_RE = re.compile(r"[\x00-\x08\x0B\x0C\x0E-\x1F]")

SYSTEM_LIKE = re.compile(
    r"^(joined the group|left the group|pinned a message|changed the group|added .* to the group)",
    flags=re.IGNORECASE,
)

def to_nfc(s: str) -> str:
    return ud.normalize("NFC", s)

def normalize_text(s: str) -> str:
    if not isinstance(s, str):
        s = "" if pd.isna(s) else str(s)
    s = html.unescape(s)
    s = to_nfc(s)
    s = s.replace("\r\n", "\n").replace("\r", "\n")
    s = ZW_RE.sub("", s)
    s = CTRL_RE.sub("", s)
    s = re.sub(r"\s+", " ", s)
    return s.strip()

def extract_list(pattern, s: str):
    if not isinstance(s, str):
        return []
    return [m.group(0) for m in pattern.finditer(s)]

def extract_group(pattern, s: str, group_idx=1):
    if not isinstance(s, str):
        return []
    return [m.group(group_idx) for m in pattern.finditer(s)]

def looks_like_system_msg(s: str) -> bool:
    return bool(SYSTEM_LIKE.search(s.strip())) if isinstance(s, str) else False

def tokenise_basic(s: str) -> list:
    return re.findall(r"[^\W_][\w’'-]*", s, flags=re.UNICODE) if isinstance(s, str) else []

def clean_for_nlp(s: str, remove_urls=True, remove_emojis=True):
    s = normalize_text(s)
    if remove_urls:
        s = URL_RE.sub(" ", s)
    if remove_emojis:
        s = EMOJI_RE.sub(" ", s)
    s = re.sub(r"[^\w\s@#\$’'-]", " ", s, flags=re.UNICODE)
    s = re.sub(r"\s+", " ", s).strip()
    return s

def safe_to_datetime(series, target_tz="Asia/Kolkata"):
    dt = pd.to_datetime(series, errors="coerce", utc=True)
    try:
        dt = dt.dt.tz_convert(ZoneInfo(target_tz))
    except Exception:
        pass
    return dt

# PII redaction
def redact_text(s):
    if not isinstance(s, str): return s
    s = EMAIL_RE.sub("<EMAIL>", s)
    s = PHONE_RE.sub("<PHONE>", s)
    return s

def mask_username(u):
    if not isinstance(u, str) or u.strip()=="":
        return u
    u = u.strip()
    if len(u) <= 2: return u[0] + "*"
    return u[0] + "*"*(min(6,len(u)-2)) + u[-1]

# ---- Apply preprocessing ----
df_proc = df.copy()

if text_col:
    df_proc["text_raw"] = df_proc[text_col].astype(str)
    df_proc["text_norm"] = df_proc["text_raw"].map(normalize_text)
    df_proc["text_clean"] = df_proc["text_norm"].map(lambda s: clean_for_nlp(s, remove_urls=False, remove_emojis=False))
    df_proc["text_redacted"] = df_proc["text_norm"].map(redact_text)
    df_proc["text_for_model"] = df_proc["text_redacted"].map(lambda s: clean_for_nlp(s, remove_urls=True, remove_emojis=True).lower())
    df_proc["urls"] = df_proc["text_norm"].map(lambda s: extract_list(URL_RE, s))
    df_proc["mentions"] = df_proc["text_norm"].map(lambda s: extract_list(MENTION_RE, s))
    df_proc["hashtags"] = df_proc["text_norm"].map(lambda s: extract_group(HASHTAG_RE, s))
    df_proc["cashtags"] = df_proc["text_norm"].map(lambda s: extract_group(CASHTAG_RE, s))
    df_proc["emails"] = df_proc["text_norm"].map(lambda s: extract_list(EMAIL_RE, s))
    df_proc["phones_maybe"] = df_proc["text_norm"].map(lambda s: extract_list(PHONE_RE, s))
    df_proc["emojis"] = df_proc["text_norm"].map(lambda s: extract_list(EMOJI_RE, s))
    df_proc["is_system_like"] = df_proc["text_norm"].map(looks_like_system_msg)
    df_proc["char_count"] = df_proc["text_norm"].map(lambda s: len(s) if isinstance(s, str) else 0)
    df_proc["word_count"] = df_proc["text_norm"].map(lambda s: len(tokenise_basic(s)))
    df_proc["emoji_count"] = df_proc["emojis"].map(len)
    # promo-like flag
    PROMO_RE = re.compile(r"(join|subscribe|offer|http|www\.|t\.me|channel|dm|discount)", flags=re.I)
    df_proc["is_promo"] = df_proc["text_norm"].map(lambda s: bool(PROMO_RE.search(s)) if isinstance(s,str) else False)

if sender_col:
    df_proc["sender_raw"] = df_proc[sender_col].astype(str)
    df_proc["sender_norm"] = df_proc["sender_raw"].map(normalize_text).replace({"nan": np.nan})
    df_proc["sender_username_masked"] = df_proc["sender_norm"].map(mask_username)

if message_id_col:
    df_proc["message_id_norm"] = df_proc[message_id_col]

if reply_to_col:
    df_proc["reply_to_norm"] = df_proc[reply_to_col]

if forward_from_col:
    df_proc["forward_from_norm"] = df_proc[forward_from_col].astype(str).replace({"nan": np.nan})

if channel_col:
    df_proc["channel_norm"] = df_proc[channel_col].astype(str).map(normalize_text).replace({"nan": np.nan})

if views_col and views_col in df_proc.columns:
    df_proc["views_norm"] = pd.to_numeric(df_proc[views_col], errors="coerce")
    df_proc["views_bucket"] = pd.cut(df_proc["views_norm"], bins=[-1,0,10,100,1000,10000,1e9],
                                     labels=["0","1-10","11-100","101-1k","1k-10k",">10k"])

if reactions_col and reactions_col in df_proc.columns:
    df_proc["reactions_norm"] = pd.to_numeric(df_proc[reactions_col], errors="coerce")

if date_col:
    df_proc["date_parsed_ist"] = safe_to_datetime(df_proc[date_col])
    dt = df_proc["date_parsed_ist"]
    df_proc["date"] = dt.dt.date
    df_proc["hour"] = dt.dt.hour
    df_proc["dow"] = dt.dt.dayofweek
    df_proc["month"] = dt.dt.month
    df_proc["year"] = dt.dt.year

# --- Trading signal extraction ---
ACTION_RE = re.compile(r"\b(buy|sell|long|short)\b", flags=re.I)
TARGETS_RE = re.compile(r"target[s]?\s*[-:]\s*([\d\s\-\–,]+)", flags=re.I)
SL_RE = re.compile(r"stop\s*loss\s*[-:]\s*([\d\s\-\–,]+)", flags=re.I)
STRIKE_RE = re.compile(r"\b(\d{3,6})\s*(?:CE|PE)?\b", flags=re.I)

def extract_trade_info(text):
    data = {"action": None, "strikes": [], "targets": [], "stoploss": []}
    if not isinstance(text, str): return data
    m = ACTION_RE.search(text)
    if m: data["action"] = m.group(1).lower()
    data["strikes"] = STRIKE_RE.findall(text)
    t = TARGETS_RE.search(text)
    if t: data["targets"] = re.findall(r"\d{2,6}", t.group(1))
    s = SL_RE.search(text)
    if s: data["stoploss"] = re.findall(r"\d{1,6}", s.group(1))
    return data

trade_info = df_proc["text_norm"].map(extract_trade_info)
df_proc["trade_action"] = trade_info.map(lambda d: d["action"])
df_proc["trade_strikes"] = trade_info.map(lambda d: ",".join(d["strikes"]))
df_proc["trade_targets"] = trade_info.map(lambda d: ",".join(d["targets"]))
df_proc["trade_stoploss"] = trade_info.map(lambda d: ",".join(d["stoploss"]))

# --- De-duplication ---
dedupe_keys = [c for c in ["channel_norm","sender_norm","text_norm","date_parsed_ist"] if c in df_proc.columns]
if dedupe_keys:
    before = len(df_proc)
    df_proc = df_proc.drop_duplicates(subset=dedupe_keys, keep="first").reset_index(drop=True)
    print(f"Dropped duplicates: {before - len(df_proc)} based on {dedupe_keys}")

# --- (Optional) fuzzy dedupe by similarity within channel+date ---
def dedupe_group_by_similarity(df_group, text_col="text_for_model", threshold=0.88):
    docs = df_group[text_col].fillna("").astype(str).tolist()
    idxs = df_group.index.to_list()
    n = len(docs)
    if n <= 1: return df_group
    vec = TfidfVectorizer(ngram_range=(1,2), max_features=5000, stop_words="english")
    X = vec.fit_transform(docs)
    sim = linear_kernel(X, X)
    to_drop = set()
    for i in range(n):
        if idxs[i] in to_drop: continue
        dup_j = np.where(sim[i] > threshold)[0]
        for j in dup_j:
            if j > i: to_drop.add(idxs[j])
    return df_group.drop(index=list(to_drop)) if to_drop else df_group

result_frames = []
max_group = 500
if "channel_norm" in df_proc.columns and "date" in df_proc.columns:
    for (chan,date), grp in df_proc.groupby(["channel_norm","date"]):
        if len(grp) <= 1:
            result_frames.append(grp); continue
        if len(grp) > max_group:
            result_frames.append(grp); continue
        cleaned = dedupe_group_by_similarity(grp)
        result_frames.append(cleaned)
    df_proc = pd.concat(result_frames, ignore_index=False)
    print("After fuzzy dedupe rows:", len(df_proc))

# --- Save final ---
df_proc.to_csv(OUTPUT_CSV, index=False, encoding="utf-8")
print("Saved final preprocessed:", OUTPUT_CSV)

# --- Save promo sample for manual labeling ---
promo_sample = df_proc[df_proc["is_promo"]].sample(n=min(1000, df_proc[df_proc["is_promo"]].shape[0]), random_state=42)
promo_sample.to_csv(PROMO_SAMPLE_CSV, index=False, encoding="utf-8")
print("Saved promo sample:", PROMO_SAMPLE_CSV)

df_proc.head(5)


Loaded shape: (5080, 15)
Dropped duplicates: 296 based on ['channel_norm', 'text_norm', 'date_parsed_ist']
After fuzzy dedupe rows: 4098
Saved final preprocessed: D:\Darryl\Coding\s_p\data\processed\sebi_groups_messages_preprocessed_final.csv
Saved promo sample: D:\Darryl\Coding\s_p\data\processed\promo_sample_for_labeling.csv


Unnamed: 0,message_id,date,chat_id,sender_id,sender_username,sender_first_name,sender_last_name,sender_phone,text,views,...,views_bucket,date_parsed_ist,hour,dow,month,year,trade_action,trade_strikes,trade_targets,trade_stoploss
2850,29017.0,2025-06-23,1354171000.0,1354171000.0,AngelOneAdvisory,,,,🟢 **BUY** POONAWALLA **1** shares at **435.70**.\n\n__Message : SL 422 TGT 457 Modify Qty/ Lot as per your discretion__\n\n__Created Date & Time\n03:06 PM__\n__23/06/25__\n\n__Disclaimer : __[__ w...,32312.0,...,>10k,2025-06-23 15:06:38+05:30,15,0,6,2025,buy,435422457,,
2851,29016.0,2025-06-23,1354171000.0,1354171000.0,AngelOneAdvisory,,,,🟢 **BUY** DMART **1** shares at **4337.00**.\n\n__Message : SL 4260 TGT 4460 Modify Qty/ Lot as per your discretion__\n\n__Created Date & Time\n02:48 PM__\n__23/06/25__\n\n__Disclaimer : __[__ www...,29776.0,...,>10k,2025-06-23 14:48:37+05:30,14,0,6,2025,buy,433742604460,,
2852,29015.0,2025-06-23,1354171000.0,1354171000.0,AngelOneAdvisory,,,,BOOK PROFIT IN NIFTY 24900 PE@100.2\n\n__Created Date & Time__\n__02:31 PM__\n__23/06/25__,29378.0,...,>10k,2025-06-23 14:31:42+05:30,14,0,6,2025,,24900100,,
2853,29014.0,2025-06-23,1354171000.0,1354171000.0,AngelOneAdvisory,,,,EXIT M&M 3200CE AT 12.20\n\n__Created Date & Time__\n__02:30 PM__\n__23/06/25__,27925.0,...,>10k,2025-06-23 14:30:33+05:30,14,0,6,2025,,3200,,
2854,29013.0,2025-06-23,1354171000.0,1354171000.0,AngelOneAdvisory,,,,BOOK PROFIT IN EICHERMOT @ 5552\n\n__Created Date & Time__\n__02:22 PM__\n__23/06/25__,26807.0,...,>10k,2025-06-23 14:22:09+05:30,14,0,6,2025,,5552,,


In [24]:

import pandas as pd
import re, unicodedata, math, os
from datetime import datetime
# use display helper if available in your environment (optional)
try:
    from caas_jupyter_tools import display_dataframe_to_user
    _HAS_DISPLAY = True
except Exception:
    _HAS_DISPLAY = False

# **INPUT**: replace these if your files are in another path
ia_in = r"D:\Darryl\Coding\s_p\data\processed\sebi_investment_advisors_cleaned.csv"
ra_in = r"D:\Darryl\Coding\s_p\data\processed\sebi_research_analysts_cleaned.csv"

# **OUTPUT** (cleaned files written here)
ia_out = r"D:\Darryl\Coding\s_p\data\processed\sebi_investment_advisors_cleaned_v2.csv"
ra_out = r"D:\Darryl\Coding\s_p\data\processed\sebi_research_analysts_cleaned_v2.csv"

# Helper functions for normalization
def normalize_text_for_matching(s: str) -> str:
    """Lowercase, remove most punctuation (keep Devanagari letters), collapse spaces."""
    if s is None or (isinstance(s, float) and math.isnan(s)):
        return ""
    if not isinstance(s, str):
        s = str(s)
    s = s.strip()
    s = unicodedata.normalize("NFKC", s)
    s = re.sub(r"[^\w\s\u0900-\u097F]", " ", s, flags=re.UNICODE)
    s = re.sub(r"\s+", " ", s)
    return s.lower()

def simplify_org_suffixes(s: str) -> str:
    """Remove common company suffixes to make name matching simpler."""
    if not s:
        return s
    s2 = re.sub(r"\b(pvt|pvt\.|ltd|ltd\.|private|limited|llp|inc|corp|co|company|pvtltd|pvtltd)\b", " ", s, flags=re.I)
    s2 = re.sub(r"\s+", " ", s2).strip()
    return s2

def clean_registry(path: str, name_col: str = "name") -> pd.DataFrame:
    """Load CSV as strings, normalize fields, parse dates, return cleaned DataFrame."""
    df = pd.read_csv(path, dtype=str, keep_default_na=False, na_values=["", "nan", "NaN"])
    if name_col not in df.columns:
        name_col = df.columns[0]
    # canonical columns
    df["name_original"] = df[name_col].astype(str).fillna("")
    # find registration column heuristically
    reg_col = None
    for c in df.columns:
        if any(k in c.lower() for k in ("registration_no","reg_no","registration","registration_no","regno")):
            reg_col = c
            break
    if reg_col is not None:
        df["registration_no"] = df[reg_col].astype(str).str.strip().str.upper().replace({"nan": ""})
    else:
        df["registration_no"] = ""
    # ensure phone columns are strings (avoid scientific notation)
    phone_cols = [c for c in df.columns if any(k in c.lower() for k in ("phone","telephone","mobile"))]
    for c in phone_cols:
        df[c] = df[c].astype(str).replace({"nan": ""}).str.strip()
    # name normalizations (two flavors)
    df["name_norm"] = df["name_original"].apply(normalize_text_for_matching)
    df["name_norm_simple"] = df["name_norm"].apply(simplify_org_suffixes)
    df["sebi_key"] = df["registration_no"].fillna("") + "||" + df["name_norm_simple"].fillna("")

    # robust date parsing (parse common date columns, make tz-safe)
    for date_col in ("reg_date", "registration_date", "expiry_date", "expirty_date", "to"):
        if date_col in df.columns:
            try:
                # parse via UTC then drop tz to get consistent naive datetimes
                df[date_col + "_parsed"] = pd.to_datetime(df[date_col].astype(str), errors="coerce", utc=True).dt.tz_convert(None)
            except Exception:
                try:
                    df[date_col + "_parsed"] = pd.to_datetime(df[date_col].astype(str), errors="coerce")
                except Exception:
                    df[date_col + "_parsed"] = pd.NaT

    # compute days to expiry (if expiry parsed)
    if "expiry_date_parsed" in df.columns:
        try:
            today = pd.Timestamp.utcnow().normalize()
            df["days_to_expiry_v2"] = (df["expiry_date_parsed"] - today).dt.days
        except Exception:
            df["days_to_expiry_v2"] = pd.NA
    else:
        df["days_to_expiry_v2"] = pd.NA

    return df

# Run cleaning
print("Cleaning IA registry:", ia_in)
df_ia_clean = clean_registry(ia_in, name_col="name")
print("IA rows ->", len(df_ia_clean))

print("Cleaning RA registry:", ra_in)
df_ra_clean = clean_registry(ra_in, name_col="name")
print("RA rows ->", len(df_ra_clean))

# Save outputs
df_ia_clean.to_csv(ia_out, index=False)
df_ra_clean.to_csv(ra_out, index=False)
print("Saved cleaned IA ->", ia_out)
print("Saved cleaned RA ->", ra_out)

# Show quick sample + stats (display helper used if available)
def show_sample(df, title):
    cols = ["name_original","registration_no","name_norm","name_norm_simple","sebi_key"]
    cols = [c for c in cols if c in df.columns]
    sample = df.head(20)[cols]
    if _HAS_DISPLAY:
        display_dataframe_to_user(title, sample)
    else:
        print(title)
        print(sample.to_string(index=False))

show_sample(df_ia_clean, "IA sample cleaned (first 20 rows)")
show_sample(df_ra_clean, "RA sample cleaned (first 20 rows)")

print("\nQuick stats:")
print("IA rows:", len(df_ia_clean), "unique name_norm_simple:", df_ia_clean["name_norm_simple"].nunique())
print("RA rows:", len(df_ra_clean), "unique name_norm_simple:", df_ra_clean["name_norm_simple"].nunique())

print("\nFiles written to /mnt/data:")
for p in [ia_out, ra_out]:
    print(" -", p, "(exists:", os.path.exists(p), ", size:", os.path.getsize(p) if os.path.exists(p) else 'N/A', ")")


Cleaning IA registry: D:\Darryl\Coding\s_p\data\processed\sebi_investment_advisors_cleaned.csv
IA rows -> 969
Cleaning RA registry: D:\Darryl\Coding\s_p\data\processed\sebi_research_analysts_cleaned.csv
RA rows -> 1707
Saved cleaned IA -> D:\Darryl\Coding\s_p\data\processed\sebi_investment_advisors_cleaned_v2.csv
Saved cleaned RA -> D:\Darryl\Coding\s_p\data\processed\sebi_research_analysts_cleaned_v2.csv
IA sample cleaned (first 20 rows)
                                                          name_original registration_no                                                           name_norm                                    name_norm_simple                                                          sebi_key
                                                          KAVITHA MENON    INA000000037                                                       kavitha menon                                       kavitha menon                                       INA000000037||kavitha menon
         

  df[date_col + "_parsed"] = pd.to_datetime(df[date_col].astype(str), errors="coerce", utc=True).dt.tz_convert(None)
