## STEP 1: Setup & Configuration

In [None]:
# =========================
# STEP 1: SETUP & CONFIG
# =========================
import subprocess
import sys
import importlib
import logging
from pathlib import Path

# Create project folders
Path("advanced_ta").mkdir(exist_ok=True)
Path("news_sentiment").mkdir(exist_ok=True)
Path("cache").mkdir(exist_ok=True)

# Configure logging (only once)
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)

# Function to check & install dependencies
def install_package(package):
    module_name = package.split('==')[0]
    if importlib.util.find_spec(module_name) is None:
        logger.info(f"Installing {package} ...")
        subprocess.check_call([sys.executable, "-m", "pip", "install", package])
    else:
        logger.info(f"{package} already installed.")

# Required packages
packages = [
    'requests', 'pandas', 'numpy==1.26.4', 'ta', 'ccxt',
    'vectorbt', 'nltk', 'praw', 'gnews', 'newsapi-python',
    'optuna', 'transformers', 'torch', 'tqdm', 'python-dotenv'
]
for pkg in packages:
    install_package(pkg)

# NLTK data setup
import nltk
nltk.download('vader_lexicon')

logger.info("Step 1 complete: Dependencies installed and folders ready.")


# Step 2: Market Data



In [None]:
# =========================
# STEP 2: MARKET DATA FETCH (TOP 100)
# =========================
import ccxt
import pandas as pd
from pathlib import Path

CACHE_FILE = Path("cache/top_pairs.csv")

def get_pairs(exchange, quote_currency="USDT", min_volume=0):
    """Fetch tradable pairs from an exchange, filter by quote & volume."""
    try:
        markets = exchange.load_markets()
        tickers = exchange.fetch_tickers()
        data = []

        for symbol, ticker in tickers.items():
            if symbol in markets:
                if (quote_currency in symbol
                    and markets[symbol].get('active', True)
                    and markets[symbol].get('type', 'spot') == 'spot'):

                    vol = ticker.get('quoteVolume', 0) or ticker.get('baseVolume', 0)
                    try:
                        vol = float(vol)
                    except:
                        vol = 0

                    data.append({
                        'symbol': symbol,
                        'volume': vol,
                        'exchange': exchange.id
                    })

        return pd.DataFrame(data)

    except Exception as e:
        logger.error(f"Error fetching pairs from {exchange.id}: {e}")
        return pd.DataFrame(columns=["symbol", "volume", "exchange"])


def fetch_top_pairs(top_n=100):
    logger.info("Fetching pairs from Bitget & KuCoin...")

    bitget = ccxt.bitget()
    kucoin = ccxt.kucoin()

    df_bitget = get_pairs(bitget)
    df_kucoin = get_pairs(kucoin)

    # Combine & sort by volume
    df_all = pd.concat([df_bitget, df_kucoin], ignore_index=True)
    df_all = df_all.sort_values(by="volume", ascending=False).reset_index(drop=True)

    # Take top N
    df_top = df_all.head(top_n)

    # Cache to CSV
    df_top.to_csv(CACHE_FILE, index=False)
    logger.info(f"Saved top {len(df_top)} pairs to {CACHE_FILE}")
    return df_top

# Run fetch step
top_pairs_df = fetch_top_pairs(top_n=100)
top_pairs_df.head()


#Technical Indicator Analysis
This step will:

Fetch live historical OHLCV data (candlestick data) from Bitget

Calculate at least 4 key indicators:

✅ RSI (Relative Strength Index)

✅ MACD (Moving Average Convergence Divergence)

✅ EMA (Exponential Moving Averages — 20, 50, 200)

✅ Bollinger Bands


In [None]:
# === Step 3 (v2) ===
# Multi-timeframe TA with smart throttling + progressive caching (Colab-ready)
import os
import time
import math
import ccxt
import pandas as pd
import numpy as np
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
import logging
from datetime import datetime, timezone

# CONFIG
TIMEFRAMES = ['1h', '4h', '1d']
OHLCV_LIMIT_FULL = 1000       # when no cache exists, fetch up to this many candles
OHLCV_LIMIT_INCREMENT = 1000  # max candles to fetch when using 'since' (APIs often cap)
MAX_WORKERS = 6               # total worker threads for parallel symbol processing
RETRY_ATTEMPTS = 3
RETRY_BACKOFF = 2             # exponential backoff base
ADV_TA_DIR = Path("advanced_ta")
ADV_TA_DIR.mkdir(exist_ok=True, parents=True)
MASTER_CSV = ADV_TA_DIR / "master_multi_tf.csv"
TOP_PAIRS_CSV = Path("cache/top_pairs.csv")  # from Step 2
# Per-exchange delay (seconds) to avoid 429s — tune these if you still see limits
EXCHANGE_DELAY = {
    "bitget": 0.35,
    "kucoin": 0.25
}
# Use conservative single-exchange concurrency (1 per exchange) to reduce 429s risk.
PER_EXCHANGE_MAX_WORKERS = 2

# Logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger("step3_v2")

# Try TALIB
try:
    import talib
    TALIB_AVAILABLE = True
    logger.info("TA-Lib available.")
except Exception:
    TALIB_AVAILABLE = False
    logger.info("TA-Lib not available; using pandas fallbacks where needed.")

# Initialize CCXT clients with rate limit enabled
EXCHANGE_CLIENTS = {
    "bitget": ccxt.bitget({'enableRateLimit': True}),
    "kucoin": ccxt.kucoin({'enableRateLimit': True}),
}
def get_exchange_client(exchange_id):
    if exchange_id in EXCHANGE_CLIENTS:
        return EXCHANGE_CLIENTS[exchange_id]
    # default fallback: instantiate on-the-fly
    return getattr(ccxt, exchange_id)({'enableRateLimit': True})

# utility: convert ISO-like timeframe to ms using ccxt helper (safe fallback)
def timeframe_to_ms(tf):
    try:
        return ccxt.Exchange().parse_timeframe(tf) * 1000
    except Exception:
        # fallback approximate mapping
        if tf.endswith('h'):
            return int(tf.replace('h', '')) * 60 * 60 * 1000
        if tf.endswith('d'):
            return int(tf.replace('d', '')) * 24 * 60 * 60 * 1000
        return 60 * 1000

# progressive OHLCV fetch: if cached, fetch from last_ts+1 else fetch full
def fetch_ohlcv_incremental(exchange_id, symbol, timeframe, limit_full=OHLCV_LIMIT_FULL, limit_since=OHLCV_LIMIT_INCREMENT):
    exchange = get_exchange_client(exchange_id)
    out_path = ADV_TA_DIR / f"{symbol.replace('/', '_')}_{timeframe}.csv"
    existing_df = None
    since = None

    # If cache exists, determine 'since' timestamp (ms)
    if out_path.exists():
        try:
            existing_df = pd.read_csv(out_path, parse_dates=['timestamp']).set_index('timestamp')
            if len(existing_df) > 0:
                last_ts = existing_df.index[-1]
                # convert to ms (UTC)
                since = int(last_ts.tz_localize(None).timestamp() * 1000) + 1
        except Exception as e:
            logger.warning(f"Couldn't read cache {out_path}: {e}")
            existing_df = None
            since = None

    # Define fetch function with retries
    for attempt in range(1, RETRY_ATTEMPTS + 1):
        try:
            if since:
                # fetch candles since last timestamp
                ohlcv = exchange.fetch_ohlcv(symbol, timeframe=timeframe, since=since, limit=limit_since)
            else:
                ohlcv = exchange.fetch_ohlcv(symbol, timeframe=timeframe, limit=limit_full)
            # convert to DataFrame
            if not ohlcv:
                # Some exchanges may return empty list if since too recent; that's OK
                new_df = pd.DataFrame(columns=['open','high','low','close','volume'])
            else:
                df_new = pd.DataFrame(ohlcv, columns=['timestamp','open','high','low','close','volume'])
                df_new['timestamp'] = pd.to_datetime(df_new['timestamp'], unit='ms')
                df_new = df_new.set_index('timestamp')
                # ensure numeric types
                df_new[['open','high','low','close','volume']] = df_new[['open','high','low','close','volume']].apply(pd.to_numeric, errors='coerce')
                new_df = df_new

            # SAFE MERGE (avoid FutureWarning and handle empty frames)
            if existing_df is None or existing_df.empty:
                merged = new_df
            elif new_df is None or new_df.empty:
                merged = existing_df
            else:
                # Only append rows from new_df whose index is not already in existing_df
                to_append = new_df[~new_df.index.isin(existing_df.index)]
                if to_append.empty:
                    merged = existing_df
                else:
                    merged = pd.concat([existing_df, to_append], axis=0)
                    merged = merged[~merged.index.duplicated(keep='last')].sort_index()

            return merged
        except ccxt.BaseError as e:
            wait = (RETRY_BACKOFF ** (attempt - 1))
            logger.warning(f"{exchange_id} fetch error for {symbol} {timeframe} attempt {attempt}/{RETRY_ATTEMPTS}: {e}. Backing off {wait}s")
            time.sleep(wait)
        except Exception as e:
            logger.error(f"Unexpected fetch error {exchange_id} {symbol} {timeframe}: {e}")
            break
    # If all retries fail, return existing_df (could be None)
    return existing_df

# indicator computations (same as earlier; kept reasonably compact)
def compute_indicators(df):
    if df is None or df.empty:
        return None
    df = df.copy()
    close = df['close']
    high = df['high']
    low = df['low']
    vol = df['volume']

    # EMA
    df['ema50'] = close.ewm(span=50, adjust=False).mean()
    df['ema200'] = close.ewm(span=200, adjust=False).mean()

    # ATR
    if TALIB_AVAILABLE:
        df['atr'] = talib.ATR(high.values, low.values, close.values, timeperiod=14)
    else:
        tr1 = high - low
        tr2 = (high - close.shift()).abs()
        tr3 = (low - close.shift()).abs()
        tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)
        df['atr'] = tr.rolling(14).mean()

    # RSI
    if TALIB_AVAILABLE:
        df['rsi'] = talib.RSI(close.values, timeperiod=14)
    else:
        delta = close.diff()
        up = delta.clip(lower=0)
        down = -1 * delta.clip(upper=0)
        ma_up = up.ewm(com=13, adjust=False).mean()
        ma_down = down.ewm(com=13, adjust=False).mean()
        rs = ma_up / ma_down
        df['rsi'] = 100 - (100 / (1 + rs))

    # MACD
    if TALIB_AVAILABLE:
        macd, macdsig, macdhist = talib.MACD(close.values, fastperiod=12, slowperiod=26, signalperiod=9)
        df['macd'] = macd
        df['macd_signal'] = macdsig
        df['macd_hist'] = macdhist
    else:
        ema12 = close.ewm(span=12, adjust=False).mean()
        ema26 = close.ewm(span=26, adjust=False).mean()
        macd = ema12 - ema26
        signal = macd.ewm(span=9, adjust=False).mean()
        df['macd'] = macd
        df['macd_signal'] = signal
        df['macd_hist'] = macd - signal

    # Stochastic
    low_min = low.rolling(14).min()
    high_max = high.rolling(14).max()
    df['stoch_k'] = ( (close - low_min) / (high_max - low_min) * 100 ).rolling(3).mean()
    df['stoch_d'] = df['stoch_k'].rolling(3).mean()

    # Bollinger Bands
    ma20 = close.rolling(20).mean()
    std20 = close.rolling(20).std()
    df['bb_middle'] = ma20
    df['bb_upper'] = ma20 + 2 * std20
    df['bb_lower'] = ma20 - 2 * std20

    # OBV
    obv = [0]
    c = close.values
    v = vol.values
    for i in range(1, len(c)):
        if c[i] > c[i-1]:
            obv.append(obv[-1] + (v[i] if not math.isnan(v[i]) else 0))
        elif c[i] < c[i-1]:
            obv.append(obv[-1] - (v[i] if not math.isnan(v[i]) else 0))
        else:
            obv.append(obv[-1])
    df['obv'] = obv

    # ADX
    if TALIB_AVAILABLE:
        df['adx'] = talib.ADX(high.values, low.values, close.values, timeperiod=14)
    else:
        up_move = high.diff()
        down_move = -low.diff()
        plus_dm = np.where((up_move > down_move) & (up_move > 0), up_move, 0.0)
        minus_dm = np.where((down_move > up_move) & (down_move > 0), down_move, 0.0)
        tr1 = high - low
        tr2 = (high - close.shift()).abs()
        tr3 = (low - close.shift()).abs()
        tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)
        atr14 = tr.rolling(14).mean()
        plus_di = 100 * (pd.Series(plus_dm).rolling(14).mean() / atr14)
        minus_di = 100 * (pd.Series(minus_dm).rolling(14).mean() / atr14)
        dx = (abs(plus_di - minus_di) / (plus_di + minus_di)) * 100
        df['adx'] = dx.rolling(14).mean()

    # Ichimoku components
    df['ichimoku_conv'] = ((high.rolling(9).max() + low.rolling(9).min()) / 2)
    df['ichimoku_base'] = ((high.rolling(26).max() + low.rolling(26).min()) / 2)
    df['ichimoku_span_a'] = ((df['ichimoku_conv'] + df['ichimoku_base']) / 2).shift(26)
    df['ichimoku_span_b'] = ((high.rolling(52).max() + low.rolling(52).min()) / 2).shift(26)

    return df

# Fibonacci - compute from last `lookback` candles
def fib_levels_from_df(df, lookback=100):
    if df is None or df.empty:
        return {}
    recent = df[-lookback:]
    swing_high = recent['high'].max()
    swing_low = recent['low'].min()
    diff = swing_high - swing_low
    if diff == 0 or math.isnan(diff):
        return {}
    return {
        'fib_0': swing_high,
        'fib_0236': swing_high - 0.236 * diff,
        'fib_0382': swing_high - 0.382 * diff,
        'fib_05': swing_high - 0.5 * diff,
        'fib_0618': swing_high - 0.618 * diff,
        'fib_100': swing_low
    }

# worker for single symbol/timeframe: fetch incremental, compute indicators, save
def process_symbol_tf(row, timeframe, per_exchange_delay=0.3):
    symbol = row['symbol']
    exchange_id = row.get('exchange', 'bitget')
    out_path = ADV_TA_DIR / f"{symbol.replace('/', '_')}_{timeframe}.csv"

    # fetch incremental ohlcv (merges with existing cache)
    merged = fetch_ohlcv_incremental(exchange_id, symbol, timeframe)
    if merged is None or merged.empty:
        return {'symbol': symbol, 'timeframe': timeframe, 'status': 'no_data'}

    # If the file existed, read old DF to compare length — skip compute if unchanged
    try:
        if out_path.exists():
            old_df = pd.read_csv(out_path, parse_dates=['timestamp']).set_index('timestamp')
        else:
            old_df = None
    except Exception:
        old_df = None

    # If merged equals old_df (no new candles), skip recomputation
    if old_df is not None and len(merged) == len(old_df):
        # still ensure we return a positive status (cached)
        return {'symbol': symbol, 'timeframe': timeframe, 'status': 'cached', 'path': str(out_path)}

    # compute indicators (we compute over full merged df to ensure indicator continuity)
    df_with_ind = compute_indicators(merged)

    # append fibonacci as scalar columns (last-known) for easier master merge
    fib = fib_levels_from_df(merged, lookback=100)
    for k, v in fib.items():
        # fill column with same value for easier storage (useful when reading per-file)
        df_with_ind[k] = v

    # save the merged indicators back to CSV (overwrite)
    try:
        df_with_ind.reset_index().rename(columns={'index':'timestamp'}).to_csv(out_path, index=False)
    except Exception as e:
        logger.error(f"Error saving {out_path}: {e}")
        return {'symbol': symbol, 'timeframe': timeframe, 'status': 'save_error'}

    # throttle after each fetch to avoid 429s
    delay = EXCHANGE_DELAY.get(exchange_id, per_exchange_delay)
    time.sleep(delay)
    return {'symbol': symbol, 'timeframe': timeframe, 'status': 'saved', 'path': str(out_path)}

# orchestrator: group by exchange, process per-exchange in batches to respect delays
def step3_process_all(top_pairs_df, timeframes=TIMEFRAMES, max_workers=MAX_WORKERS):
    results = []
    # group rows by exchange to process exchange-by-exchange
    exchanges = top_pairs_df['exchange'].fillna('bitget').unique().tolist()
    for exchange_id in exchanges:
        logger.info(f"Processing exchange {exchange_id} ...")
        rows = top_pairs_df[top_pairs_df['exchange'] == exchange_id].to_dict('records')
        if not rows:
            continue

        # use a limited ThreadPool per exchange to avoid overwhelming it
        per_exchange_workers = min(PER_EXCHANGE_MAX_WORKERS, max(1, max_workers // len(exchanges)))
        with ThreadPoolExecutor(max_workers=per_exchange_workers) as ex:
            futures = []
            for row in rows:
                for tf in timeframes:
                    futures.append(ex.submit(process_symbol_tf, row, tf, EXCHANGE_DELAY.get(exchange_id, 0.3)))

            for fut in tqdm(as_completed(futures), total=len(futures), desc=f"{exchange_id} tasks"):
                try:
                    r = fut.result()
                    results.append(r)
                except Exception as e:
                    logger.error(f"Worker exception: {e}")
    return results

# Build master CSV by taking latest row per timeframe and prefixing
def build_master_csv(top_pairs_df, timeframes=TIMEFRAMES):
    rows = []
    for _, row in top_pairs_df.iterrows():
        symbol = row['symbol']
        combined = {'symbol': symbol, 'exchange': row.get('exchange', '')}
        for tf in timeframes:
            path = ADV_TA_DIR / f"{symbol.replace('/', '_')}_{tf}.csv"
            if not path.exists():
                continue
            df = pd.read_csv(path, parse_dates=['timestamp']).set_index('timestamp')
            if df.empty:
                continue
            latest = df.iloc[-1].to_dict()
            for k, v in latest.items():
                combined[f"{tf}_{k}"] = v
        rows.append(combined)
    master_df = pd.DataFrame(rows)
    master_df.to_csv(MASTER_CSV, index=False)
    logger.info(f"Master CSV written to {MASTER_CSV} ({len(master_df)} symbols).")
    return master_df

# === RUN ===
if not TOP_PAIRS_CSV.exists():
    raise FileNotFoundError(f"Expected {TOP_PAIRS_CSV} produced by Step 2. Run that first.")

top_pairs_df = pd.read_csv(TOP_PAIRS_CSV)
top_pairs_df = top_pairs_df.head(100).reset_index(drop=True)

logger.info(f"Starting smart Step 3: {len(top_pairs_df)} symbols x {len(TIMEFRAMES)} timeframes.")
results = step3_process_all(top_pairs_df, timeframes=TIMEFRAMES, max_workers=MAX_WORKERS)
logger.info(f"Step 3 tasks completed: {len(results)} results.")

# create master file for LLM consumption
master_df = build_master_csv(top_pairs_df, timeframes=TIMEFRAMES)
logger.info("Step 3 (smart) complete.")


#News

In [None]:
!pip install snscrape


#News Analys

In [None]:
import os
os.environ['REDDIT_CLIENT_ID'] = 'Yae4rZFWI_VkZQbdGoRN-A'
os.environ['REDDIT_CLIENT_SECRET'] = '8rU3QZMlYSOWTSrxUpUsq86pQkvOtA'
os.environ['REDDIT_USER_AGENT'] = 'crypto_bot/1.0'
os.environ['NEWSAPI_KEY'] = 'de2624f3e8eb46d38450387905a6db56'
os.environ['CRYPTOPANIC_TOKEN'] = '500701eba889d6dc3a1140b41e2041d2f987a622'
print("Env variables set in runtime.")


In [None]:
import os
import requests
import pandas as pd
from transformers import pipeline
import praw

# ====== ENV / KEYS ======
REDDIT_CLIENT_ID = 'Yae4rZFWI_VkZQbdGoRN-A'
REDDIT_CLIENT_SECRET = '8rU3QZMlYSOWTSrxUpUsq86pQkvOtA'
REDDIT_USER_AGENT = 'crypto_bot/1.0'
NEWSAPI_KEY = 'de2624f3e8eb46d38450387905a6db56'
CRYPTOPANIC_KEY = '500701eba889d6dc3a1140b41e2041d2f987a622'

CACHE_DIR = "news_sentiment"
os.makedirs(CACHE_DIR, exist_ok=True)

# ====== Load FinBERT ======
print("Loading FinBERT sentiment model...")
finbert = pipeline("sentiment-analysis", model="ProsusAI/finbert")

# ====== CryptoPanic ======
def fetch_cryptopanic():
    url = (
        f"https://cryptopanic.com/api/v1/posts/"
        f"?auth_token={CRYPTOPANIC_KEY}&kind=news&public=true"
    )
    r = requests.get(url)
    if r.status_code != 200:
        print("CryptoPanic request failed:", r.text)
        return pd.DataFrame()

    data = r.json()
    results = data.get("results", [])
    if not results:
        print("CryptoPanic returned no results.")
        return pd.DataFrame()

    rows = []
    for post in results:
        rows.append({
            "source": "CryptoPanic",
            "title": post["title"],
            "url": post["url"],
            "publishedAt": post.get("published_at"),
            "sentiment": finbert(post["title"])[0]["label"]
        })

    df = pd.DataFrame(rows)
    df.to_csv(f"{CACHE_DIR}/cryptopanic_all.csv", index=False)
    print(f"CryptoPanic: {len(df)} articles saved.")
    return df

# ====== NewsAPI ======
def fetch_newsapi():
    url = (
        f"https://newsapi.org/v2/everything?q=crypto&language=en"
        f"&sortBy=publishedAt&apiKey={NEWSAPI_KEY}"
    )
    r = requests.get(url)
    if r.status_code != 200:
        print("NewsAPI request failed:", r.text)
        return pd.DataFrame()

    articles = r.json().get("articles", [])
    rows = []
    for art in articles:
        rows.append({
            "source": "NewsAPI",
            "title": art["title"],
            "url": art["url"],
            "publishedAt": art["publishedAt"],
            "sentiment": finbert(art["title"])[0]["label"]
        })

    df = pd.DataFrame(rows)
    df.to_csv(f"{CACHE_DIR}/newsapi_all.csv", index=False)
    print(f"NewsAPI: {len(df)} articles saved.")
    return df

# ====== Reddit ======
def fetch_reddit():
    reddit = praw.Reddit(
        client_id=REDDIT_CLIENT_ID,
        client_secret=REDDIT_CLIENT_SECRET,
        user_agent=REDDIT_USER_AGENT
    )
    subreddit = reddit.subreddit("CryptoCurrency+Bitcoin+Ethereum+CryptoMarkets")
    posts = subreddit.new(limit=50)
    rows = []
    for post in posts:
        rows.append({
            "source": "Reddit",
            "title": post.title,
            "url": f"https://reddit.com{post.permalink}",
            "publishedAt": post.created_utc,
            "sentiment": finbert(post.title)[0]["label"]
        })

    df = pd.DataFrame(rows)
    df.to_csv(f"{CACHE_DIR}/reddit_all.csv", index=False)
    print(f"Reddit: {len(df)} posts saved.")
    return df

# ====== Run All ======
if __name__ == "__main__":
    df_cp = fetch_cryptopanic()
    df_na = fetch_newsapi()
    df_rd = fetch_reddit()

    # Merge all
    all_df = pd.concat([df_cp, df_na, df_rd], ignore_index=True)
    all_df.to_csv(f"{CACHE_DIR}/all_sources.csv", index=False)
    print(f"\nTOTAL ARTICLES SAVED: {len(all_df)}")


#Addtional News

In [None]:
import feedparser
import pandas as pd
from datetime import datetime

def fetch_rss_news(feed_url, score_func=None, source_name="Unknown"):
    feed = feedparser.parse(feed_url)
    news_list = []

    for entry in feed.entries:
        title = entry.get("title", "").strip()
        link = entry.get("link", "").strip()
        published = entry.get("published", "").strip()

        if not title:  # skip empty titles
            continue

        score = score_func(title) if score_func else None
        news_list.append({
            "source": source_name,
            "title": title,
            "link": link,
            "published": published,
            "score": score
        })

    return pd.DataFrame(news_list)


def fetch_coindesk_rss(score_func=None):
    return fetch_rss_news(
        "https://www.coindesk.com/arc/outboundfeeds/rss/",
        score_func,
        source_name="Coindesk"
    )

def fetch_cointelegraph_rss(score_func=None):
    return fetch_rss_news(
        "https://cointelegraph.com/rss",
        score_func,
        source_name="Cointelegraph"
    )


# Example: Fetch news from both
cd_df = fetch_coindesk_rss(score_func=None)
ct_df = fetch_cointelegraph_rss(score_func=None)

# Combine
all_news_df = pd.concat([cd_df, ct_df], ignore_index=True)

# Save to CSV
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
csv_path = f"/content/news_sentiment/crypto_news_{timestamp}.csv"
all_news_df.to_csv(csv_path, index=False, encoding="utf-8")

print(f"✅ Saved {len(all_news_df)} news articles to {csv_path}")


#DeepSeek LLM Implementation

In [None]:
!pip install groq
from groq import Groq


In [None]:
# In a Colab code cell (one-liners):
%env GROQ_API_KEY=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
%env TELEGRAM_BOT_TOKEN=8361038742:AAFE3wPyw0FZ-7QPLW8fetPYBAE19ZSLrjQ
%env TELEGRAM_CHAT_ID=1083299833


In [None]:
# Cell 1: Data load + selection
import os, json, math, time, hashlib, logging
from pathlib import Path
from datetime import datetime, timedelta
import pandas as pd
import numpy as np

logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger("signal_cell1")

# CONFIG (paths)
MASTER_TA_PATH = Path("/content/advanced_ta/master_multi_tf.csv")
NEWS_DIR = Path("/content/news_sentiment")
DEBUG_OUT = Path("/content/selection_debug.csv")

# Tunables (can adjust)
MIN_INDICATORS = 3
REQUIRE_NEWS_AGREEMENT = False   # if True require news sign to match TA (looser by default)
NEWS_ABS_MIN = 0.12              # minimum absolute news score to consider agreement
TOP_K = 20                       # how many top candidates to present to LLM
WEIGHT_SENTIMENT = 0.45
WEIGHT_TA = 0.55

# helpers
def load_master(master_path=MASTER_TA_PATH):
    if not master_path.exists():
        raise FileNotFoundError(f"Master TA file not found: {master_path}")
    df = pd.read_csv(master_path, dtype=str)
    df.columns = [c.strip() for c in df.columns]
    df['symbol'] = df['symbol'].astype(str).str.strip().str.upper()
    logger.info("Loaded master TA: %d rows, %d symbols", len(df), df['symbol'].nunique())
    return df

def load_sentiment_map(news_dir=NEWS_DIR, symbols_list=None):
    """Aggregate any CSVs under news_dir. tries common score columns.
       Returns dict symbol->avg_score. If symbol inference fails, returns 'UNKNOWN' bucket.
    """
    files = list(news_dir.rglob("*.csv"))
    rows = []
    score_cols = ['score','sentiment_score','compound','sentiment','avg_score']
    for f in files:
        try:
            df = pd.read_csv(f)
        except Exception as e:
            logger.warning("Skipping unreadable news file %s: %s", f, e)
            continue
        # infer symbol from filename if possible: tokens like BTC, ETH, ADA etc
        fname = f.name.upper()
        inferred = None
        if symbols_list:
            for s in symbols_list:
                token = s.split("/")[0].replace("USDT","")
                if token in fname or token in fname.replace("_",""):
                    inferred = s
                    break
        # find a numeric score column
        sc = None
        for c in score_cols:
            if c in df.columns:
                sc = c; break
        # if sentiment text (positive/negative), map it
        for _, r in df.iterrows():
            try:
                if sc:
                    raw = r.get(sc, 0.0)
                    # convert strings like 'positive' or 'neg' to numeric
                    if isinstance(raw, str):
                        v = raw.lower()
                        if 'pos' in v: v = 1.0
                        elif 'neg' in v: v = -1.0
                        elif '%' in v:
                            try: v = float(v.replace('%',''))/100.0
                            except: v = 0.0
                        else:
                            try: v = float(v)
                            except: v = 0.0
                    else:
                        v = float(raw) if not (pd.isna(raw)) else 0.0
                else:
                    # try 'title' + 'description' heuristic (fallback neutral)
                    v = 0.0
                rows.append({"symbol": inferred or "UNKNOWN", "score": float(v)})
            except Exception:
                continue
    if not rows:
        logger.warning("No news rows found in %s", news_dir)
        return {s:0.0 for s in (symbols_list or [])}
    nd = pd.DataFrame(rows)
    grouped = nd.groupby('symbol')['score'].mean().to_dict()
    # build full map for all symbols_list
    sentiment_map = {}
    if symbols_list:
        for s in symbols_list:
            sentiment_map[s] = float(grouped.get(s, 0.0))
    # include unknown bucket
    if 'UNKNOWN' in grouped:
        sentiment_map['UNKNOWN'] = float(grouped['UNKNOWN'])
    # include any other symbols found
    for k,v in grouped.items():
        if k not in sentiment_map:
            sentiment_map[k] = float(v)
    logger.info("Built sentiment_map for %d symbols (includes UNKNOWN=%s)", len(sentiment_map), sentiment_map.get('UNKNOWN',None))
    return sentiment_map

# indicator helpers (adapted from earlier system)
def _get_first_numeric(row, candidates):
    for c in candidates:
        if c in row and row[c] not in (None,"","nan","NaN"):
            try: return float(row[c])
            except: pass
    return None

def indicator_signs_from_row(row):
    def first(pref): return _get_first_numeric(row, pref)
    signs={}
    ema50 = first(['1d_ema50','4h_ema50','1h_ema50']); ema200 = first(['1d_ema200','4h_ema200','1h_ema200'])
    signs['ema'] = 1 if (ema50 is not None and ema200 is not None and ema50>ema200) else (-1 if (ema50 is not None and ema200 is not None and ema50<ema200) else 0)
    macd = first(['1d_macd','4h_macd','1h_macd']); macd_sig = first(['1d_macd_signal','4h_macd_signal','1h_macd_signal'])
    signs['macd'] = 1 if (macd is not None and macd_sig is not None and macd>macd_sig) else (-1 if (macd is not None and macd_sig is not None and macd<macd_sig) else 0)
    macdh = first(['1d_macd_hist','4h_macd_hist','1h_macd_hist']); signs['macd_hist'] = 1 if (macdh is not None and macdh>0) else (-1 if (macdh is not None and macdh<0) else 0)
    rsi = first(['1d_rsi','4h_rsi','1h_rsi']); signs['rsi'] = 1 if (rsi is not None and rsi<35) else (-1 if (rsi is not None and rsi>65) else 0)
    stoch_k = first(['1d_stoch_k','4h_stoch_k','1h_stoch_k']); stoch_d = first(['1d_stoch_d','4h_stoch_d','1h_stoch_d'])
    signs['stoch'] = 1 if (stoch_k is not None and stoch_d is not None and stoch_k>stoch_d) else (-1 if (stoch_k is not None and stoch_d is not None and stoch_k<stoch_d) else 0)
    close = first(['1d_close','4h_close','1h_close']); bb_upper = first(['1d_bb_upper','4h_bb_upper','1h_bb_upper']); bb_lower = first(['1d_bb_lower','4h_bb_lower','1h_bb_lower'])
    signs['bb'] = 1 if (close is not None and bb_lower is not None and close < bb_lower) else (-1 if (close is not None and bb_upper is not None and close > bb_upper) else 0)
    adx = first(['1d_adx','4h_adx','1h_adx']); signs['adx'] = (signs.get('ema') or signs.get('macd')) if (adx is not None and adx>=30) else 0
    span_a = first(['1d_ichimoku_span_a','4h_ichimoku_span_a','1h_ichimoku_span_a']); span_b = first(['1d_ichimoku_span_b','4h_ichimoku_span_b','1h_ichimoku_span_b'])
    signs['ichimoku'] = 1 if (span_a is not None and span_b is not None and span_a>span_b) else (-1 if (span_a is not None and span_b is not None and span_a<span_b) else 0)
    fib0618 = first(['1d_fib_0618','4h_fib_0618','1h_fib_0618']); fib0236 = first(['1d_fib_0236','4h_fib_0236','1h_fib_0236'])
    signs['fib'] = 1 if (close is not None and fib0618 is not None and close>fib0618) else (-1 if (close is not None and fib0236 is not None and close<fib0236) else 0)
    return signs

def ta_score_from_row(row):
    signs = indicator_signs_from_row(row)
    weights = {'ema':1.0,'macd':1.0,'macd_hist':0.6,'rsi':0.9,'stoch':0.6,'bb':0.6,'adx':0.5,'ichimoku':0.7,'fib':0.4}
    total_w = sum(weights.values())
    ssum = sum(signs.get(k,0)*w for k,w in weights.items())
    return float(ssum/total_w) if total_w else 0.0

def select_candidates_by_rule(master_df, sentiment_map, min_indicators=MIN_INDICATORS, news_abs_min=NEWS_ABS_MIN, require_news_agreement=REQUIRE_NEWS_AGREEMENT, top_k=TOP_K):
    rows=[]
    for _, r in master_df.iterrows():
        sym = r['symbol']
        signs = indicator_signs_from_row(r)
        indicator_count = sum(1 for v in signs.values() if v!=0)
        if indicator_count < min_indicators:
            continue
        direction = 1 if sum(signs.values())>0 else (-1 if sum(signs.values())<0 else 0)
        sent = float(np.clip(sentiment_map.get(sym, sentiment_map.get("UNKNOWN",0.0)), -1, 1))
        news_sign = 1 if sent>0 else (-1 if sent<0 else 0)
        if require_news_agreement and news_sign != 0:
            if news_sign != direction and abs(sent) < news_abs_min:
                continue
        ta_s = ta_score_from_row(r)
        composite = WEIGHT_SENTIMENT*sent + WEIGHT_TA*ta_s
        # small boost for indicator_count (diminishing)
        composite_adj = composite * (1 + min(indicator_count,10)/40.0)
        rows.append({"symbol":sym,"indicator_count":indicator_count,"direction":direction,"sentiment":sent,"ta":ta_s,"composite":composite_adj,"signs":signs})
    if not rows:
        return []
    dfc = pd.DataFrame(rows)
    dfc['abs_composite'] = dfc['composite'].abs()
    dfc = dfc.sort_values(['abs_composite','indicator_count'], ascending=[False,False])
    out = dfc.head(top_k).to_dict(orient='records')
    # debug save
    try:
        pd.DataFrame(rows).sort_values(['abs_composite','indicator_count'], ascending=[False,False]).to_csv(DEBUG_OUT, index=False)
        logger.info("Wrote %s for inspection", DEBUG_OUT)
    except Exception:
        pass
    return out

# Run selection now and expose variables for next cell
master = load_master()
symbols = sorted(master['symbol'].unique().tolist())
sentiment_map = load_sentiment_map(NEWS_DIR, symbols_list=symbols)
candidates = select_candidates_by_rule(master, sentiment_map)
logger.info("Candidates selected: %d", len(candidates))
# Show top summary
for c in candidates[:20]:
    print(f"{c['symbol']:12} indicators={c['indicator_count']:2d} composite={c['composite']:.3f} sent={c['sentiment']:.3f} ta={c['ta']:.3f}")
# Keep 'candidates' in notebook namespace for cell 2



#LLM and prompt

In [None]:
# ===== Ready-to-paste: Groq-hybrid signaler (sanitizes cache + strict JSON prompt) =====
# BEFORE running: either set environment variables or paste your keys in the placeholders below.
# Recommended (Colab): run
#   %env GROQ_API_KEY=your_key_here
#   %env TELEGRAM_BOT_TOKEN=your_bot_token_here
#   %env TELEGRAM_CHAT_ID=@your_channel_or_id
#
# If you prefer to hardcode (not recommended), set the values in the CONFIG section.

import os, time, json, re, random, logging, hashlib
from pathlib import Path
from datetime import datetime
import requests
import pandas as pd
import numpy as np

logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger("groq_hybrid_ready")

# ---------------- CONFIG ----------------
ROOT = Path("/content")
MASTER_TA_PATH = ROOT / "advanced_ta" / "master_multi_tf.csv"
NEWS_DIR = ROOT / "news_sentiment"
LLM_CACHE_PATH = NEWS_DIR / "llm_cache_groq.json"
NEWS_DIR.mkdir(parents=True, exist_ok=True)
if not LLM_CACHE_PATH.exists():
    LLM_CACHE_PATH.write_text(json.dumps({}))

# Put keys here if you want to embed them (NOT recommended). Otherwise set them via env vars:
GROQ_API_KEY = os.getenv("GROQ_API_KEY", "")          # set via %env GROQ_API_KEY=...
GROQ_MODEL = os.getenv("GROQ_MODEL", "deepseek-r1-distill-llama-70b")
TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN", "")  # set via %env TELEGRAM_BOT_TOKEN=...
TELEGRAM_CHAT_ID   = os.getenv("TELEGRAM_CHAT_ID", "")    # set via %env TELEGRAM_CHAT_ID=@channel_or_id

# Tunables
MIN_INDICATORS = 3
NEWS_ABS_MIN = 0.12
REQUIRE_NEWS_AGREEMENT = False
TOP_K = 8
WEIGHT_SENTIMENT = 0.45
WEIGHT_TA = 0.55
MIN_CONFIDENCE_TO_SEND = 45
DRY_RUN = True

# LLM retry/backoff
LLM_MAX_ATTEMPTS = 4
LLM_BASE_BACKOFF = 1.6
LLM_JITTER = 0.6

# ---------------- Utilities ----------------
def sanitize_llm_cache(path: Path):
    """Keep only JSON arrays in cache values (store parsed lists). Backup old cache."""
    if not path.exists():
        path.write_text(json.dumps({}))
        return
    try:
        raw = path.read_text()
        cache = json.loads(raw)
    except Exception as e:
        logger.warning("Cache not valid JSON, backing up and resetting: %s", e)
        bak = path.with_suffix(".bak.json")
        path.rename(bak)
        path.write_text(json.dumps({}))
        return

    cleaned = {}
    dropped = []
    for k, v in cache.items():
        if isinstance(v, list):
            cleaned[k] = v
            continue
        if not isinstance(v, str):
            dropped.append(k); continue
        s = v.strip()
        # Try parse directly
        try:
            parsed = json.loads(s)
            if isinstance(parsed, list):
                cleaned[k] = parsed
                continue
        except Exception:
            pass
        # attempt to extract first JSON array
        m = re.search(r'(\[\s*\{.*\}\s*\])', s, flags=re.DOTALL)
        if m:
            try:
                parsed = json.loads(m.group(1))
                if isinstance(parsed, list):
                    cleaned[k] = parsed; continue
            except Exception:
                pass
        # fallback: substring from first [ to last ]
        try:
            i = s.index('[')
            j = s.rindex(']')
            candidate = s[i:j+1]
            parsed = json.loads(candidate)
            if isinstance(parsed, list):
                cleaned[k] = parsed; continue
        except Exception:
            dropped.append(k)
            continue
    # backup and write cleaned
    bak = path.with_suffix(".bak.json")
    path.rename(bak)
    path.write_text(json.dumps(cleaned, indent=2))
    logger.info("Sanitized LLM cache. Kept %d entries, dropped %d. Backup at %s", len(cleaned), len(dropped), bak)

def _load_llm_cache():
    try:
        return json.loads(LLM_CACHE_PATH.read_text())
    except Exception:
        return {}
def _save_llm_cache(c):
    try:
        LLM_CACHE_PATH.write_text(json.dumps(c))
    except Exception as e:
        logger.warning("Failed to write llm cache: %s", e)

# ---------------- Data loaders / TA scoring (adapted from your logic) ----------------
def load_master(master_path=MASTER_TA_PATH):
    if not master_path.exists():
        raise FileNotFoundError(f"Master TA file not found: {master_path}")
    df = pd.read_csv(master_path, dtype=str)
    df['symbol'] = df['symbol'].astype(str).str.strip().str.upper()
    return df

def load_sentiment_map(news_dir=NEWS_DIR, symbols_list=None):
    files = list(news_dir.rglob("*.csv"))
    rows=[]
    for f in files:
        try:
            df = pd.read_csv(f)
        except Exception:
            continue
        score_col=None
        for c in ['score','sentiment','compound','sentiment_score']:
            if c in df.columns:
                score_col=c; break
        for _, r in df.iterrows():
            s = 0.0
            if score_col and score_col in df.columns:
                try: s = float(r.get(score_col,0.0))
                except: s = 0.0
            assigned=None
            if symbols_list:
                fname = f.name.upper()
                for sym in symbols_list:
                    if sym.replace('/','_') in fname or sym.replace('/','') in fname:
                        assigned=sym; break
            rows.append({"symbol":assigned or "UNKNOWN", "score": s})
    if not rows:
        return {s:0.0 for s in (symbols_list or [])}
    nd = pd.DataFrame(rows)
    grouped = nd.groupby('symbol')['score'].mean().to_dict()
    sentiment_map = {sym: float(grouped.get(sym,0.0)) for sym in (symbols_list or grouped.keys())}
    if 'UNKNOWN' in grouped: sentiment_map['UNKNOWN']=float(grouped['UNKNOWN'])
    return sentiment_map

def _get_first_numeric(row, candidates):
    for c in candidates:
        if c in row and row[c] not in (None,"","nan","NaN"):
            try: return float(row[c])
            except: pass
    return None

def indicator_signs_from_row(row):
    def first(pref): return _get_first_numeric(row, pref)
    signs={}
    ema50 = first(['1d_ema50','4h_ema50','1h_ema50']); ema200 = first(['1d_ema200','4h_ema200','1h_ema200'])
    signs['ema'] = 1 if (ema50 is not None and ema200 is not None and ema50>ema200) else (-1 if (ema50 is not None and ema200 is not None and ema50<ema200) else 0)
    macd = first(['1d_macd','4h_macd','1h_macd']); macd_sig = first(['1d_macd_signal','4h_macd_signal','1h_macd_signal'])
    signs['macd'] = 1 if (macd is not None and macd_sig is not None and macd>macd_sig) else (-1 if (macd is not None and macd_sig is not None and macd<macd_sig) else 0)
    macdh = first(['1d_macd_hist','4h_macd_hist','1h_macd_hist']); signs['macd_hist'] = 1 if (macdh is not None and macdh>0) else (-1 if (macdh is not None and macdh<0) else 0)
    rsi = first(['1d_rsi','4h_rsi','1h_rsi']); signs['rsi'] = 1 if (rsi is not None and rsi<35) else (-1 if (rsi is not None and rsi>65) else 0)
    stoch_k = first(['1d_stoch_k','4h_stoch_k','1h_stoch_k']); stoch_d = first(['1d_stoch_d','4h_stoch_d','1h_stoch_d'])
    signs['stoch'] = 1 if (stoch_k is not None and stoch_d is not None and stoch_k>stoch_d) else (-1 if (stoch_k is not None and stoch_d is not None and stoch_k<stoch_d) else 0)
    close = first(['1d_close','4h_close','1h_close']); bb_upper = first(['1d_bb_upper','4h_bb_upper','1h_bb_upper']); bb_lower = first(['1d_bb_lower','4h_bb_lower','1h_bb_lower'])
    signs['bb'] = 1 if (close is not None and bb_lower is not None and close < bb_lower) else (-1 if (close is not None and bb_upper is not None and close > bb_upper) else 0)
    adx = first(['1d_adx','4h_adx','1h_adx']); signs['adx'] = (signs.get('ema') or signs.get('macd')) if (adx is not None and adx>=30) else 0
    span_a = first(['1d_ichimoku_span_a','4h_ichimoku_span_a','1h_ichimoku_span_a']); span_b = first(['1d_ichimoku_span_b','4h_ichimoku_span_b','1h_ichimoku_span_b'])
    signs['ichimoku'] = 1 if (span_a is not None and span_b is not None and span_a>span_b) else (-1 if (span_a is not None and span_b is not None and span_a<span_b) else 0)
    fib0618 = first(['1d_fib_0618','4h_fib_0618','1h_fib_0618']); fib0236 = first(['1d_fib_0236','4h_fib_0236','1h_fib_0236'])
    signs['fib'] = 1 if (close is not None and fib0618 is not None and close>fib0618) else (-1 if (close is not None and fib0236 is not None and close<fib0236) else 0)
    return signs

def ta_score_from_row(row):
    signs = indicator_signs_from_row(row)
    weights = {'ema':1.0,'macd':1.0,'macd_hist':0.6,'rsi':0.9,'stoch':0.6,'bb':0.6,'adx':0.5,'ichimoku':0.7,'fib':0.4}
    total_w = sum(weights.values()); ssum = sum(signs.get(k,0)*w for k,w in weights.items())
    return float(ssum/total_w) if total_w else 0.0

def select_candidates_by_rule(master_df, sentiment_map, min_indicators=MIN_INDICATORS, news_abs_min=NEWS_ABS_MIN, require_news_agreement=REQUIRE_NEWS_AGREEMENT, top_k=TOP_K):
    rows=[]
    for _, r in master_df.iterrows():
        sym = r['symbol']
        signs = indicator_signs_from_row(r)
        indicator_count = sum(1 for v in signs.values() if v!=0)
        if indicator_count < min_indicators: continue
        direction = 1 if sum(signs.values())>0 else (-1 if sum(signs.values())<0 else 0)
        sent = float(np.clip(sentiment_map.get(sym,0.0), -1, 1))
        news_sign = 1 if sent>0 else (-1 if sent<0 else 0)
        if require_news_agreement and news_sign != 0:
            if news_sign != direction and abs(sent) < news_abs_min: continue
        ta_s = ta_score_from_row(r)
        composite = WEIGHT_SENTIMENT*sent + WEIGHT_TA*ta_s
        # Optionally include sample headlines count if available (we don't require here)
        rows.append({"symbol":sym,"indicator_count":indicator_count,"direction":direction,"sentiment":sent,"ta":ta_s,"composite":composite,"signs":signs})
    if not rows: return []
    dfc = pd.DataFrame(rows); dfc['abs_composite'] = dfc['composite'].abs()
    dfc = dfc.sort_values(['abs_composite','indicator_count'], ascending=[False,False])
    return dfc.head(top_k).to_dict(orient='records')

# ---------------- LLM helpers ----------------
def extract_json_array_from_model(content):
    if not content: return None
    s = content.strip()
    # If starts with [ and valid JSON, parse directly
    try:
        if s.startswith('['):
            parsed = json.loads(s)
            if isinstance(parsed, list): return parsed
    except Exception:
        pass
    # find first JSON array
    m = re.search(r'(\[\s*\{.*?\}\s*\])', content, flags=re.DOTALL)
    if m:
        try:
            parsed = json.loads(m.group(1))
            if isinstance(parsed, list): return parsed
        except Exception:
            pass
    # fallback from first [ to last ]
    try:
        i = content.index('[')
        j = content.rindex(']')
        candidate = content[i:j+1]
        parsed = json.loads(candidate)
        if isinstance(parsed, list): return parsed
    except Exception:
            return None
    return None

def call_groq(prompt, model=GROQ_MODEL, max_attempts=LLM_MAX_ATTEMPTS):
    key = GROQ_API_KEY
    # try python client
    try:
        from groq import Groq
        client = Groq(api_key=key) if key else Groq()
        attempt = 0
        while attempt < max_attempts:
            attempt += 1
            try:
                completion = client.chat.completions.create(model=model, messages=[{"role":"user","content":prompt}], timeout=30)
                return completion.choices[0].message.content
            except Exception as e:
                backoff = (LLM_BASE_BACKOFF ** attempt) + random.random()*LLM_JITTER
                logger.warning("Groq client attempt %d failed: %s. Backoff %.1fs", attempt, e, backoff)
                time.sleep(backoff)
        logger.error("Groq client failed after %d attempts", max_attempts)
    except Exception as e:
        logger.info("groq python client not available or failed to init (%s). Falling back to HTTP", e)

    # HTTP fallback
    if not key:
        logger.warning("No GROQ_API_KEY set; skipping LLM call.")
        return None
    url = "https://api.groq.com/openai/v1/chat/completions"
    payload = {"model": model, "messages":[{"role":"user","content":prompt}], "temperature":0.0, "max_tokens":512}
    attempt=0
    while attempt < max_attempts:
        attempt += 1
        try:
            r = requests.post(url, headers={"Authorization": f"Bearer {key}"}, json=payload, timeout=30)
            r.raise_for_status()
            data = r.json()
            return data.get("choices",[{}])[0].get("message",{}).get("content")
        except Exception as e:
            backoff = (LLM_BASE_BACKOFF ** attempt) + random.random()*LLM_JITTER
            logger.warning("Groq HTTP attempt %d failed: %s. Backoff %.1fs", attempt, e, backoff)
            time.sleep(backoff)
    logger.error("Groq HTTP failed after %d attempts.", max_attempts)
    return None

def deterministic_fallback(selected_candidates, top_n=1):
    if not selected_candidates: return []
    sel = sorted(selected_candidates, key=lambda x:(abs(x['composite']), x['indicator_count']), reverse=True)
    out=[]
    for cand in sel[:top_n]:
        action = "BUY" if cand['composite']>0 else ("SELL" if cand['composite']<0 else "HOLD")
        conf = max(5, min(99, int(round(abs(cand['composite'])*100))))
        reason = f"Rule fallback: {cand['indicator_count']} indicators; composite {cand['composite']:.2f}; sentiment {cand['sentiment']:.2f}."
        out.append({"symbol": cand['symbol'], "action": action, "confidence": conf, "reason": reason})
    return out

def send_telegram_message(text):
    if DRY_RUN:
        logger.info("(dry) Would send Telegram message:\n%s", text)
        return True
    if not TELEGRAM_BOT_TOKEN or not TELEGRAM_CHAT_ID:
        logger.warning("Telegram not configured. Not sending.")
        return False
    url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage"
    try:
        r = requests.post(url, json={"chat_id": TELEGRAM_CHAT_ID, "text": text}, timeout=15)
        r.raise_for_status()
        return True
    except Exception as e:
        logger.warning("Telegram send failed: %s", e)
        return False

# ---------------- Orchestration ----------------
def run_once(dry_run=DRY_RUN, top_n=3):
    # sanitize cache first
    sanitize_llm_cache(LLM_CACHE_PATH)

    master = load_master()
    symbols = sorted(master['symbol'].unique().tolist())
    sentiment_map = load_sentiment_map(NEWS_DIR, symbols_list=symbols)
    selected = select_candidates_by_rule(master, sentiment_map, top_k=TOP_K)
    logger.info("Selected %d candidates by rule", len(selected))
    if not selected:
        logger.info("No candidates found.")
        return []

    # caching key (deterministic)
    key_json = json.dumps(selected, sort_keys=True, default=str)
    key_hash = hashlib.sha256(key_json.encode()).hexdigest()
    cache = _load_llm_cache()
    if key_hash in cache:
        logger.info("LLM cache hit.")
        refined = cache[key_hash]
    else:
        # build strict prompt (system + user style)
        PROMPT = (
            "SYSTEM:\n"
            "You are a conservative, factual, professional crypto trading analyst assistant with 20 years experience and an expert in technnical and fundamental analysis.\n"
            "You MUST OUTPUT EXACTLY a single JSON ARRAY and NOTHING ELSE. No commentary, no analysis, no internal chains of thought.\n\n"
            "USER:\n"
             "use your deethinking mode and reasoning to execute the given task."
            "You are given candidate trades (JSON list). For each candidate choose up to {TOP_N} best trades to ACT on now (24-72h horizon).\n"
            "Output format EXACTLY (example):\n"
            '[{"symbol":"BTC/USDT","action":"BUY","confidence":0-100,"reason":"one-sentence rationale"}]\n\n'
            "Scoring rules (apply heuristics):\n"
            "- base_conf = round(abs(composite) * 100)\n"
            "- +10 if indicator_count >= 6\n"
            "- +10 if there are >=2 supporting headlines (if available)\n"
            "- -30 if sentiment and TA direction conflict and abs(sentiment) > 0.2\n"
            "- -20 if indicator_count < 3\n"
            "- Entry Price – the ideal buy/sell level."
              "Analyze the provided market data and generate a trading signal."
              "Respond with:"

              "Entry Price – the ideal buy/sell level"

              "Take Profit (TP) – realistic profit target."

              "Stop Loss (SL) – risk protection level."

              "Short Explanation – 1-2 sentences summarizing why this signal is valid, based on market trends, indicators, or news."
              "Use concise, clear formatting. Example:"
              "Entry: 27500 USDT"
              "TP: 28200 USDT"
              "SL: 27200 USDT"
            "- -20 if indicator_count < 3\n"
            "Cap confidence 1..99. If confidence < {MIN_CONF}, prefer 'HOLD'.\n"
            "Be concise. Output EXACTLY a JSON array.\n\n"
            "Candidates:\n"
        ).replace("{TOP_N}", str(top_n)).replace("{MIN_CONF}", str(MIN_CONFIDENCE_TO_SEND))
        PROMPT = PROMPT + json.dumps(selected, indent=2)
        # call LLM
        content = call_groq(PROMPT, model=GROQ_MODEL)
        parsed = extract_json_array_from_model(content) if content else None
        if parsed is None:
            logger.info("LLM produced no parseable JSON; using deterministic fallback.")
            parsed = deterministic_fallback(selected, top_n=1)
        # cache parsed list
        cache[key_hash] = parsed
        _save_llm_cache(cache)
        refined = parsed

    # validate & send top results
    sent = []
    # refined should be a list
    if not isinstance(refined, list):
        logger.warning("Refined result not a list; applying fallback.")
        refined = deterministic_fallback(selected, top_n=1)

    for sig in refined[:top_n]:
        if not isinstance(sig, dict): continue
        sym = sig.get('symbol'); action = sig.get('action'); conf = int(sig.get('confidence',0)); reason = sig.get('reason','')
        if sym not in symbols:
            logger.warning("Unknown symbol suggested: %s", sym); continue
        if conf < MIN_CONFIDENCE_TO_SEND:
            logger.info("Skipping %s due to low confidence %d", sym, conf); continue
        if action not in ("BUY","SELL","HOLD"):
            logger.warning("Invalid action %s for %s", action, sym); continue
        msg = f"🚀 Top Signal:\nPair: {sym}\nSignal: {action}\nConfidence: {conf}%\nReason: {reason}"
        ok = send_telegram_message(msg)
        sent.append({"symbol":sym,"action":action,"confidence":conf,"reason":reason,"sent":ok})
    # log decisions locally
    try:
        DECISIONS_DIR = ROOT / "decisions"; DECISIONS_DIR.mkdir(exist_ok=True)
        p = DECISIONS_DIR / "decisions_log.csv"
        dfrow = {"timestamp": datetime.utcnow().isoformat(), "selected": json.dumps(selected), "refined": json.dumps(refined), "signals": json.dumps(sent)}
        df = pd.DataFrame([dfrow])
        if p.exists(): df.to_csv(p, index=False, mode='a', header=False)
        else: df.to_csv(p, index=False)
    except Exception as e:
        logger.warning("Failed to write decisions log: %s", e)

    logger.info("Run complete. Sent signals: %d", len(sent))
    return sent

# ---------------- Quick run (dry-run by default) ----------------
if __name__ == "__main__":
    print("DRY_RUN =", DRY_RUN, "MIN_CONFIDENCE_TO_SEND =", MIN_CONFIDENCE_TO_SEND)
    try:
        results = run_once(dry_run=DRY_RUN, top_n=3)
        print("Done. Signals:", results)
    except Exception as e:
        logger.exception("Error during run: %s", e)
        raise

#TELEGRAM SETUP

In [None]:
# === Send top cached LLM signals to Telegram (or dry-run) ===
import os, json, requests
from pathlib import Path
from urllib.parse import quote_plus

# CONFIG - set via %env in Colab OR place values here (not recommended)
NEWS_DIR = Path("/content/news_sentiment")
LLM_CACHE_PATH = NEWS_DIR / "llm_cache_groq.json"

# Put your keys in Colab env with:
#   %env TELEGRAM_BOT_TOKEN=8361038742:AAFE3wPy...
#   %env TELEGRAM_CHAT_ID=@YourChannelOrId
TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN", "8361038742:AAFE3wPyw0FZ-7QPLW8fetPYBAE19ZSLrjQ")
TELEGRAM_CHAT_ID = os.getenv("TELEGRAM_CHAT_ID", "@Musty021")

# Tunables
DRY_RUN = False        # True = only print what WOULD be sent. Set False to actually send.
TOP_N = 3             # how many top signals to send (by confidence)
MIN_CONF = 25         # minimum confidence to actually send (or show)

# Safety check - cache exists?
if not LLM_CACHE_PATH.exists():
    raise FileNotFoundError(f"LLM cache not found at: {LLM_CACHE_PATH}")

# Load cache
raw = LLM_CACHE_PATH.read_text()
try:
    cache = json.loads(raw)
except Exception as e:
    raise RuntimeError(f"Failed to parse JSON cache: {e}")

# Flatten all cached lists into a single list
signals = []
for k, v in cache.items():
    if not isinstance(v, list):
        # try to parse if string
        if isinstance(v, str):
            try:
                parsed = json.loads(v)
                if isinstance(parsed, list):
                    v = parsed
                else:
                    continue
            except Exception:
                continue
        else:
            continue
    for entry in v:
        if not isinstance(entry, dict):
            continue
        # normalize keys
        sym = entry.get("symbol") or entry.get("pair") or entry.get("ticker")
        action = (entry.get("action") or entry.get("signal") or "").upper()
        conf = entry.get("confidence")
        reason = entry.get("reason","")
        try:
            conf = int(round(float(conf)))
        except Exception:
            conf = 0
        if not sym or action not in ("BUY","SELL","HOLD"):
            continue
        signals.append({"symbol": sym, "action": action, "confidence": conf, "reason": reason, "source_key": k})

if not signals:
    print("No valid signals found in cache.")
else:
    # dedupe by symbol (keep highest confidence per symbol)
    by_symbol = {}
    for s in signals:
        sym = s['symbol']
        if sym not in by_symbol or s['confidence'] > by_symbol[sym]['confidence']:
            by_symbol[sym] = s
    unique_signals = list(by_symbol.values())
    # sort by confidence desc
    unique_signals.sort(key=lambda x: x['confidence'], reverse=True)
    # filter by MIN_CONF
    filtered = [s for s in unique_signals if s['confidence'] >= MIN_CONF]
    top = filtered[:TOP_N]

    if not top:
        print(f"No signals meet MIN_CONF = {MIN_CONF}. Available candidates (top 10 shown):")
        for s in unique_signals[:10]:
            print(s)
    else:
        print(f"Top {len(top)} signals to send (DRY_RUN={DRY_RUN}):")
        for s in top:
            print(s)

        # Send (or dry-run)
        def send_via_telegram(text):
            if DRY_RUN:
                print("(dry) would send:", text)
                return True
            if not TELEGRAM_BOT_TOKEN or not TELEGRAM_CHAT_ID:
                print("Telegram credentials missing. Set TELEGRAM_BOT_TOKEN and TELEGRAM_CHAT_ID as environment variables.")
                return False
            url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage"
            payload = {"chat_id": TELEGRAM_CHAT_ID, "text": text}
            try:
                r = requests.post(url, json=payload, timeout=15)
                r.raise_for_status()
                return True
            except Exception as e:
                print("Telegram send failed:", e)
                return False

        results = []
        for s in top:
            msg = f"🚀 Top Signal:\nPair: {s['symbol']}\nSignal: {s['action']}\nConfidence: {s['confidence']}%\nReason: {s['reason']}"
            ok = send_via_telegram(msg)
            results.append({**s, "sent": ok})

        print("Send results:", results)
