In [None]:
import os
import re
import time
import logging
import requests
import numpy as np
import pandas as pd
import spacy
import openai
from textblob import TextBlob
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
from pmaw import PushshiftAPI
from pandas.tseries.offsets import BDay

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

class HistoricalCryptoAnalyzer:
    def __init__(self, start_date, end_date, use_gpt_cleaning=True):
        # Date validation and business day calculation
        self.start_date, self.end_date = self._validate_dates(start_date, end_date)
        self.business_days = pd.date_range(start=self.start_date, end=self.end_date, freq=BDay())
        
        # API configuration
        self.api_config = {
            "santiment": {
                "endpoint": "https://api.santiment.net/graphql",
                "headers": {"Authorization": f"Bearer {os.getenv('SANTIMENT_KEY')}"},
                "query": """
                    query ($slug: String!, $from: DateTime!, $to: DateTime!) {
                        getMetric(metric: "sentiment_volume_consumed") {
                            timeseriesData(
                                slug: $slug
                                from: $from
                                to: $to
                                interval: "1d"
                            ) { datetime, value }
                        }
                    }
                """
            },
            "twitter": {
                "endpoint": "https://api.twitter.com/2/tweets/search/all",
                "headers": {"Authorization": f"Bearer {os.getenv('TWITTER_BEARER')}"},
                "params_template": {
                    "max_results": 500,
                    "tweet.fields": "created_at,public_metrics,lang",
                    "expansions": "author_id"
                }
            }
        }

        self.cryptos = ["BTC", "ETH", "BNB", "XRP", "ADA", "DOGE"]
        self.vader = SentimentIntensityAnalyzer()
        self.nlp = spacy.load("en_core_web_sm")
        self.rate_limiter = RateLimiter()
        self.use_gpt_cleaning = use_gpt_cleaning
        openai.api_key = os.getenv("OPENAI_API_KEY", "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")

    def _validate_dates(self, start, end):
        """Enforce Santiment free tier limitations"""
        start_date = pd.to_datetime(start)
        end_date = pd.to_datetime(end)
        
        max_window = pd.Timedelta(days=730)  # 2 years
        if end_date - start_date > max_window:
            new_start = end_date - max_window
            logging.warning(f"Adjusting start date to {new_start} for Santiment free tier")
            start_date = new_start
            
        min_allowed_date = pd.Timestamp.now() - pd.Timedelta(days=730)
        if start_date < min_allowed_date:
            raise ValueError("Free tier only allows data up to 2 years old")
            
        return start_date, end_date

    def _fetch_santiment(self, crypto, date):
        """Santiment API with free tier constraints"""
        self.rate_limiter.wait('santiment')
        try:
            response = requests.post(
                self.api_config["santiment"]["endpoint"],
                headers=self.api_config["santiment"]["headers"],
                json={
                    "query": self.api_config["santiment"]["query"],
                    "variables": {
                        "slug": crypto.lower(),
                        "from": (date - BDay(1)).isoformat(),
                        "to": date.isoformat()
                    }
                }
            )
            if response.status_code == 200:
                data = response.json()
                if 'errors' in data:
                    logging.warning(f"Santiment restrictions: {data['errors'][0]['message']}")
                    return []
                return data['data']['getMetric']['timeseriesData']
        except Exception as e:
            logging.error(f"Santiment error: {e}")
            return []

    def _fetch_reddit(self, crypto, date):
        """Pushshift API with PMAW optimization"""
        self.rate_limiter.wait('pushshift')
        try:
            api = PushshiftAPI(
                num_workers=10,
                rate_limit=60,
                limit_type='backoff',
                jitter='decorr'
            )
            
            start_ts = int((date - BDay(1)).timestamp())
            end_ts = int(date.timestamp())
            
            posts = api.search_submissions(
                q=crypto,
                after=start_ts,
                before=end_ts,
                subreddit="CryptoCurrency,Bitcoin,ethereum",
                filter=['title', 'selftext', 'created_utc'],
                mem_safe=True,
                safe_exit=True
            )
            return [f"{p['title']} {p['selftext']}" for p in posts]
        except Exception as e:
            logging.error(f"Pushshift error: {e}")
            return []

    def _fetch_twitter(self, crypto, date):
        """Twitter Academic API with historical support"""
        self.rate_limiter.wait('twitter')
        try:
            params = {
                **self.api_config["twitter"]["params_template"],
                "query": f"#{crypto} lang:en -is:retweet",
                "start_time": (date - BDay(1)).strftime('%Y-%m-%dT%H:%M:%SZ'),
                "end_time": date.strftime('%Y-%m-%dT%H:%M:%SZ')
            }
            
            response = requests.get(
                self.api_config["twitter"]["endpoint"],
                headers=self.api_config["twitter"]["headers"],
                params=params
            )
            
            if handle_api_errors(response, 'twitter'):
                return self._fetch_twitter(crypto, date)  # Retry
            
            return [tweet['text'] for tweet in response.json().get('data', [])]
        except Exception as e:
            logging.error(f"Twitter error: {e}")
            return []

    def _process_day(self, date):
        """Process data for a single business day"""
        daily_data = []
        
        for crypto in self.cryptos:
            try:
                # Data collection
                reddit = self._fetch_reddit(crypto, date)
                twitter = self._fetch_twitter(crypto, date)
                santiment = self._fetch_santiment(crypto, date)
                
                # Text processing
                raw_text = " ".join(reddit + twitter)
                cleaned_text = self._clean_text(raw_text)
                
                # Feature generation
                embedding = self._generate_embedding(cleaned_text)
                sentiment = self._analyze_sentiment(cleaned_text)
                
                daily_data.append({
                    "date": date.date(),
                    "crypto": crypto,
                    "santiment_score": santiment[0]['value'] if santiment else np.nan,
                    "embedding": embedding,
                    **sentiment
                })
                
            except Exception as e:
                logging.error(f"Failed processing {crypto} on {date}: {e}")
        
        return daily_data

    def _clean_text(self, text):
        if self.use_gpt_cleaning:
            return self._gpt_clean(text)
        return self._basic_clean(text)

    def _gpt_clean(self, text):
        try:
            response = openai.chat.completions.create(
                model="gpt-3.5-turbo",
                messages=[{
                    "role": "user",
                    "content": f"Clean this crypto text, remove noise, preserve sentiment:\n{text}"
                }],
                temperature=0.1
            )
            return response.choices[0].message.content.lower().strip()
        except Exception as e:
            logging.warning(f"GPT cleaning failed: {e}")
            return self._basic_clean(text)

    def _basic_clean(self, text):
        text = re.sub(r'http\S+', '', text)
        text = re.sub(r'[@#]\w+', '', text)
        text = re.sub(r'[^\w\s]', '', text)
        doc = self.nlp(text.lower())
        tokens = [
            token.lemma_ for token in doc
            if not token.is_stop 
            and not token.is_punct 
            and not token.like_num
            and len(token.text) > 2
        ]
        return " ".join(tokens)

    def _generate_embedding(self, text):
        if not text:
            return np.nan
        try:
            response = openai.embeddings.create(
                model="text-embedding-3-large",
                input=text,
                dimensions=1536
            )
            return np.array(response.data[0].embedding, dtype=np.float32)
        except Exception as e:
            logging.error(f"Embedding failed: {e}")
            return np.nan

    def _analyze_sentiment(self, text):
        return {
            "vader": self.vader.polarity_scores(text)['compound'],
            "textblob": TextBlob(text).sentiment.polarity,
            "word_count": len(text.split())
        }

    def execute_backtest(self):
        master_df = pd.DataFrame()
        
        for idx, date in enumerate(self.business_days):
            if date.weekday() < 5:  # Ensure business day
                daily_data = self._process_day(date)
                master_df = pd.concat([master_df, pd.DataFrame(daily_data)], ignore_index=True)
                
                # Save checkpoint every week
                if idx % 5 == 0:
                    master_df.to_parquet(f"backtest_checkpoint_{date.date()}.parquet")
        
        # Final processing and return
        master_df['date'] = pd.to_datetime(master_df['date'])
        master_df.set_index(['date', 'crypto'], inplace=True)
        return master_df

class RateLimiter:
    """Enhanced rate limiting system"""
    def __init__(self):
        self.buckets = {
            'santiment': TokenBucket(60, 60),  # 60 requests/minute
            'pushshift': TokenBucket(60, 60),
            'twitter': TokenBucket(300, 900)   # 300 requests/15 minutes
        }

    def wait(self, service):
        while not self.buckets[service].consume(1):
            time.sleep(0.1)

class TokenBucket:
    """Token bucket rate limiting implementation"""
    def __init__(self, capacity, refill_period):
        self.capacity = capacity
        self.tokens = capacity
        self.refill_period = refill_period
        self.last_refill = time.time()

    def consume(self, tokens=1):
        now = time.time()
        elapsed = now - self.last_refill
        
        if elapsed > self.refill_period:
            self.tokens = self.capacity
            self.last_refill = now
            
        if self.tokens >= tokens:
            self.tokens -= tokens
            return True
        return False

def handle_api_errors(response, service):
    """Handle API-specific error codes"""
    if response.status_code == 429:
        retry_after = int(response.headers.get('Retry-After', 60))
        logging.warning(f"Rate limited on {service}. Retrying in {retry_after}s")
        time.sleep(retry_after)
        return True
    elif response.status_code >= 400:
        logging.error(f"{service} API error {response.status_code}: {response.text}")
        return False
    return True

# Usage Example
if __name__ == "__main__":
    analyzer = HistoricalCryptoAnalyzer(
        start_date="2024-02-03",
        end_date="2025-02-03",
        use_gpt_cleaning=True
    )
    
    features_df = analyzer.execute_backtest()
    features_df.to_parquet("crypto_sentiment_backtest.parquet")

2025-02-04 23:57:43,601 - INFO - Response cache key: a97ef3c62acdd2f30ea811f4a759655b
2025-02-04 23:57:43,603 - INFO - No previous requests to load
2025-02-04 23:57:45,708 - INFO - 0 result(s) available in Pushshift
2025-02-04 23:57:46,312 - ERROR - twitter API error 401: {
  "title": "Unauthorized",
  "type": "about:blank",
  "status": 401,
  "detail": "Unauthorized"
}
2025-02-04 23:57:47,997 - INFO - HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
2025-02-04 23:57:48,963 - INFO - HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
2025-02-04 23:57:49,050 - INFO - Response cache key: 0218a8e39c6310784b9947fc023a9ca1
2025-02-04 23:57:49,052 - INFO - No previous requests to load
2025-02-04 23:57:50,837 - INFO - 0 result(s) available in Pushshift
2025-02-04 23:57:51,450 - ERROR - twitter API error 401: {
  "title": "Unauthorized",
  "type": "about:blank",
  "status": 401,
  "detail": "Unauthorized"
}
2025-02-04 23:57:52,669 - INFO - H