In [15]:
"""
Santiment API Client
Retrieve social volume, weighted sentiment, exchange flows, addresses, MVRV, and NPL data
"""

import requests
import pandas as pd
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Union
import logging
import pytz
from pydantic import BaseModel
from google.colab import userdata

API_KEY = userdata.get('SANTIMENT_API_KEY')

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class SantimentData(BaseModel):
    """Data model for Santiment metrics"""
    slug: str
    metric: str
    datetime: datetime
    value: Union[float, int, None]
    metadata: Optional[Dict] = {}

class SantimentClient:
    """Client for Santiment API"""

    def __init__(self, api_key: str):
        """
        Initialize Santiment client

        Args:
            api_key: Santiment API key
        """
        self.api_key = api_key
        self.base_url = "https://api.santiment.net/graphql"
        self.session = requests.Session()
        self.session.headers.update({
            'Authorization': f'Apikey {api_key}',
            'Content-Type': 'application/json'
        })

        # Available metrics - ONLY the ones you specified (corrected based on API errors)
        self.metrics = {
            'daily_active_addresses': 'daily_active_addresses',
            'circulation': 'circulation',
            'mvrv_usd': 'mvrv_usd_30d',
            'whale_transaction_count': 'whale_transaction_count_1m_usd_to_inf',
            'transaction_volume_in_profit_or_loss': 'transaction_volume_in_profit',
            'mean_dollar_invested_age': 'mean_dollar_invested_age',
            'npl': 'network_profit_loss',
            'supply_on_exchanges': 'supply_on_exchanges',
            'exchange_flow_balance': 'exchange_balance',
            'social_volume': 'social_volume_total',
            'social_dominance': 'social_dominance_total',
            'weighted_sentiment': 'sentiment_balance_total',
            'market_cap_usd': 'marketcap_usd'
        }

    def query_graphql(self, query: str) -> Dict:
        """Execute GraphQL query"""
        try:
            response = self.session.post(
                self.base_url,
                json={'query': query},
                timeout=30
            )
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            logger.error(f"GraphQL query failed: {e}")
            raise

    def get_metric_data(
        self,
        slug: str,
        metric: str,
        from_date: datetime,
        to_date: datetime,
        interval: str = '1d'
    ) -> List[SantimentData]:
        """
        Get metric data for a specific asset

        Args:
            slug: Asset slug (e.g., 'bitcoin', 'ethereum')
            metric: Metric name from self.metrics
            from_date: Start date
            to_date: End date
            interval: Time interval ('1h', '1d', '7d')
        """

        # Convert metric name to Santiment metric
        santiment_metric = self.metrics.get(metric, metric)

        # Format dates
        from_str = from_date.strftime('%Y-%m-%dT%H:%M:%SZ')
        to_str = to_date.strftime('%Y-%m-%dT%H:%M:%SZ')

        query = f"""
        {{
            getMetric(metric: "{santiment_metric}") {{
                timeseriesData(
                    slug: "{slug}"
                    from: "{from_str}"
                    to: "{to_str}"
                    interval: "{interval}"
                ) {{
                    datetime
                    value
                }}
            }}
        }}
        """

        try:
            result = self.query_graphql(query)

            if 'errors' in result:
                logger.error(f"GraphQL errors: {result['errors']}")
                return []

            data_points = result.get('data', {}).get('getMetric', {}).get('timeseriesData', [])

            santiment_data = []
            for point in data_points:
                if point['value'] is not None:
                    santiment_data.append(SantimentData(
                        slug=slug,
                        metric=metric,
                        datetime=datetime.fromisoformat(point['datetime'].replace('Z', '+00:00')),
                        value=point['value'],
                        metadata={'santiment_metric': santiment_metric, 'interval': interval}
                    ))

            logger.info(f"Retrieved {len(santiment_data)} data points for {slug} {metric}")
            return santiment_data

        except Exception as e:
            logger.error(f"Error retrieving {metric} for {slug}: {e}")
            return []

    def get_social_volume(
        self,
        slug: str,
        from_date: datetime,
        to_date: datetime,
        interval: str = '1d'
    ) -> List[SantimentData]:
        """Get social volume data"""
        return self.get_metric_data(slug, 'social_volume', from_date, to_date, interval)

    def get_weighted_sentiment(
        self,
        slug: str,
        from_date: datetime,
        to_date: datetime,
        interval: str = '1d'
    ) -> List[SantimentData]:
        """Get weighted sentiment data"""
        return self.get_metric_data(slug, 'weighted_sentiment', from_date, to_date, interval)

    def get_exchange_flows(
        self,
        slug: str,
        from_date: datetime,
        to_date: datetime,
        interval: str = '1d'
    ) -> Dict[str, List[SantimentData]]:
        """Get exchange inflow and outflow data"""
        inflow = self.get_metric_data(slug, 'exchange_inflow', from_date, to_date, interval)
        outflow = self.get_metric_data(slug, 'exchange_outflow', from_date, to_date, interval)

        return {
            'inflow': inflow,
            'outflow': outflow
        }

    def get_address_metrics(
        self,
        slug: str,
        from_date: datetime,
        to_date: datetime,
        interval: str = '1d'
    ) -> Dict[str, List[SantimentData]]:
        """Get address creation and usage metrics"""
        active = self.get_metric_data(slug, 'active_addresses', from_date, to_date, interval)
        new = self.get_metric_data(slug, 'new_addresses', from_date, to_date, interval)

        return {
            'active_addresses': active,
            'new_addresses': new
        }

    def get_mvrv(
        self,
        slug: str,
        from_date: datetime,
        to_date: datetime,
        interval: str = '1d'
    ) -> List[SantimentData]:
        """Get MVRV (Market Value to Realized Value) data"""
        return self.get_metric_data(slug, 'mvrv_usd', from_date, to_date, interval)

    def get_npl(
        self,
        slug: str,
        from_date: datetime,
        to_date: datetime,
        interval: str = '1d'
    ) -> List[SantimentData]:
        """Get Network Profit/Loss data"""
        return self.get_metric_data(slug, 'npl', from_date, to_date, interval)

    def get_daily_active_addresses(
        self,
        slug: str,
        from_date: datetime,
        to_date: datetime,
        interval: str = '1d'
    ) -> List[SantimentData]:
        """Get Daily Active Addresses data"""
        return self.get_metric_data(slug, 'daily_active_addresses', from_date, to_date, interval)

    def get_circulation(
        self,
        slug: str,
        from_date: datetime,
        to_date: datetime,
        interval: str = '1d'
    ) -> List[SantimentData]:
        """Get Circulation data"""
        return self.get_metric_data(slug, 'circulation', from_date, to_date, interval)

    def get_funding_rate(
        self,
        slug: str,
        from_date: datetime,
        to_date: datetime,
        interval: str = '1d'
    ) -> List[SantimentData]:
        """Get Funding Rate data"""
        return self.get_metric_data(slug, 'funding_rate', from_date, to_date, interval)

    def get_whale_transaction_count(
        self,
        slug: str,
        from_date: datetime,
        to_date: datetime,
        interval: str = '1d'
    ) -> List[SantimentData]:
        """Get Whale Transaction Count data"""
        return self.get_metric_data(slug, 'whale_transaction_count', from_date, to_date, interval)

    def get_supply_distribution(
        self,
        slug: str,
        from_date: datetime,
        to_date: datetime,
        interval: str = '1d'
    ) -> List[SantimentData]:
        """Get Supply Distribution data"""
        return self.get_metric_data(slug, 'supply_distribution', from_date, to_date, interval)

    def get_transaction_volume_in_profit_or_loss(
        self,
        slug: str,
        from_date: datetime,
        to_date: datetime,
        interval: str = '1d'
    ) -> List[SantimentData]:
        """Get Transaction Volume in Profit or Loss data"""
        return self.get_metric_data(slug, 'transaction_volume_in_profit_or_loss', from_date, to_date, interval)

    def get_mean_dollar_invested_age(
        self,
        slug: str,
        from_date: datetime,
        to_date: datetime,
        interval: str = '1d'
    ) -> List[SantimentData]:
        """Get Mean Dollar Invested Age data"""
        return self.get_metric_data(slug, 'mean_dollar_invested_age', from_date, to_date, interval)

    def get_supply_on_exchanges(
        self,
        slug: str,
        from_date: datetime,
        to_date: datetime,
        interval: str = '1d'
    ) -> List[SantimentData]:
        """Get Supply on Exchanges data"""
        return self.get_metric_data(slug, 'supply_on_exchanges', from_date, to_date, interval)

    def get_exchange_flow_balance(
        self,
        slug: str,
        from_date: datetime,
        to_date: datetime,
        interval: str = '1d'
    ) -> List[SantimentData]:
        """Get Exchange Flow Balance data"""
        return self.get_metric_data(slug, 'exchange_flow_balance', from_date, to_date, interval)

    def get_social_dominance(
        self,
        slug: str,
        from_date: datetime,
        to_date: datetime,
        interval: str = '1d'
    ) -> List[SantimentData]:
        """Get Social Dominance data"""
        return self.get_metric_data(slug, 'social_dominance', from_date, to_date, interval)

    def get_market_cap_usd(
        self,
        slug: str,
        from_date: datetime,
        to_date: datetime,
        interval: str = '1d'
    ) -> List[SantimentData]:
        """Get Market Cap USD data"""
        return self.get_metric_data(slug, 'market_cap_usd', from_date, to_date, interval)

    def get_all_metrics(
        self,
        slugs: List[str],
        from_date: datetime,
        to_date: datetime,
        interval: str = '1d'
    ) -> Dict[str, Dict[str, List[SantimentData]]]:
        """
        Get all available metrics for multiple assets using a single GraphQL query

        Args:
            slugs: List of asset slugs (e.g., ['bitcoin', 'ethereum'])
            from_date: Start date
            to_date: End date
            interval: Time interval ('1h', '1d', '7d')

        Returns:
            Dictionary with structure: {slug: {metric: [SantimentData]}}
        """
        logger.info(f"Fetching all metrics for {len(slugs)} assets from {from_date} to {to_date}")

        # Format dates
        from_str = from_date.strftime('%Y-%m-%dT%H:%M:%SZ')
        to_str = to_date.strftime('%Y-%m-%dT%H:%M:%SZ')

        # Build single GraphQL query for all slugs and all metrics
        query_parts = []

        for slug in slugs:
            for metric_key, santiment_metric in self.metrics.items():
                query_parts.append(f"""
                {slug}_{metric_key}: getMetric(metric: "{santiment_metric}") {{
                    timeseriesData(
                        slug: "{slug}"
                        from: "{from_str}"
                        to: "{to_str}"
                        interval: "{interval}"
                    ) {{
                        datetime
                        value
                    }}
                }}""")

        query = f"{{{''.join(query_parts)}}}"

        try:
            result = self.query_graphql(query)

            if 'errors' in result:
                logger.error(f"GraphQL errors: {result['errors']}")
                return {}

            # Parse results into structured format
            all_data = {}

            for slug in slugs:
                all_data[slug] = {}

                for metric_key in self.metrics.keys():
                    query_key = f"{slug}_{metric_key}"
                    data_points = result.get('data', {}).get(query_key, {}).get('timeseriesData', [])

                    santiment_data = []
                    for point in data_points:
                        if point['value'] is not None:
                            santiment_data.append(SantimentData(
                                slug=slug,
                                metric=metric_key,
                                datetime=datetime.fromisoformat(point['datetime'].replace('Z', '+00:00')),
                                value=point['value'],
                                metadata={'santiment_metric': self.metrics[metric_key], 'interval': interval}
                            ))

                    all_data[slug][metric_key] = santiment_data

            # Summary
            total_points = sum(
                len(data)
                for slug_data in all_data.values()
                for data in slug_data.values()
            )
            logger.info(f"Retrieved {total_points} total data points across {len(slugs)} assets and {len(self.metrics)} metrics")

            return all_data

        except Exception as e:
            logger.error(f"Error retrieving all metrics: {e}")
            return {}

    def to_dataframe(self, data: List[SantimentData]) -> pd.DataFrame:
        """Convert SantimentData list to pandas DataFrame"""
        if not data:
            return pd.DataFrame()

        df_data = []
        for item in data:
            df_data.append({
                'slug': item.slug,
                'metric': item.metric,
                'datetime': item.datetime,
                'value': item.value,
                'santiment_metric': item.metadata.get('santiment_metric'),
                'interval': item.metadata.get('interval')
            })

        df = pd.DataFrame(df_data)
        df['datetime'] = pd.to_datetime(df['datetime'])
        df = df.set_index('datetime')

        return df

    def get_available_assets(self) -> List[str]:
        """Get list of available asset slugs"""
        query = """
        {
            allProjects {
                slug
                name
                ticker
            }
        }
        """

        try:
            result = self.query_graphql(query)
            projects = result.get('data', {}).get('allProjects', [])

            asset_list = []
            for project in projects:
                asset_list.append({
                    'slug': project['slug'],
                    'name': project['name'],
                    'ticker': project['ticker']
                })

            logger.info(f"Found {len(asset_list)} available assets")
            return asset_list

        except Exception as e:
            logger.error(f"Error retrieving available assets: {e}")
            return []

    def get_metric_intervals(self, slug: str) -> pd.DataFrame:
        """
        Brute-force all metrics and common intervals to see which are available
        for a given asset (trial key compatible). Returns a DataFrame with
        metrics as rows, intervals as columns, and ‚úÖ/‚ùå.
        """
        intervals_to_test = ['1m', '5m', '15m', '30m', '1h', '6h', '12h', '1d', '7d']
        metrics = list(self.metrics.keys())

        results = []

        # Use a short date range to stay within trial limits
        to_date = datetime.now() - timedelta(days=30)
        from_date = to_date - timedelta(days=7)

        for metric in metrics:
            row = {'metric': metric}
            for interval in intervals_to_test:
                try:
                    data = self.get_metric_data(slug, metric, from_date, to_date, interval)
                    row[interval] = '‚úÖ' if data else '‚ùå'
                except Exception:
                    row[interval] = '‚ùå'
            results.append(row)

        df = pd.DataFrame(results)
        df.set_index('metric', inplace=True)
        return df


In [16]:
import os
import pandas as pd
from datetime import datetime, timedelta


# Initialize client
client = SantimentClient(API_KEY)

# Time range
to_date = datetime.utcnow() - timedelta(days=60)     # 2 months ago
from_date = datetime.utcnow() - timedelta(days=300)  # 8 months ago

slug = "bitcoin"

print("üì° Fetching Bitcoin metrics...")

# --- Fetch price (via market_cap_usd) ---
price_data = client.get_market_cap_usd(slug, from_date, to_date, interval="1d")
price_df = client.to_dataframe(price_data)[["value"]].rename(columns={"value": "price"})

metric_dfs = {"price": price_df}

# --- Fetch all other metrics defined in the client ---
for metric in client.metrics.keys():
    if metric == "market_cap_usd":
        continue  # Already used as price

    print(f" ‚Üí Fetching {metric} ...")
    data = client.get_metric_data(slug, metric, from_date, to_date, interval="1d")
    df = client.to_dataframe(data)

    if df.empty:
        print(f"   ‚ö†Ô∏è No data returned for {metric}")
        continue

    df = df[["value"]].rename(columns={"value": metric})
    metric_dfs[metric] = df

# Merge metrics
print("\nüîÑ Merging all time series...")
merged_df = pd.concat(metric_dfs.values(), axis=1, join="inner")

print(f"üìä Final dataset shape: {merged_df.shape}")

# Compute correlation table
corr_table = merged_df.corr()[["price"]].sort_values("price", ascending=False)

# Display correlation table
print("\n===== üìà Correlation Table vs Bitcoin Price =====\n")
corr_table


  to_date = datetime.utcnow() - timedelta(days=60)     # 2 months ago
  from_date = datetime.utcnow() - timedelta(days=240)  # 8 months ago


üì° Fetching Bitcoin metrics...
 ‚Üí Fetching daily_active_addresses ...
 ‚Üí Fetching circulation ...
 ‚Üí Fetching mvrv_usd ...
 ‚Üí Fetching whale_transaction_count ...
 ‚Üí Fetching transaction_volume_in_profit_or_loss ...
 ‚Üí Fetching mean_dollar_invested_age ...
 ‚Üí Fetching npl ...
 ‚Üí Fetching supply_on_exchanges ...
 ‚Üí Fetching exchange_flow_balance ...
 ‚Üí Fetching social_volume ...
 ‚Üí Fetching social_dominance ...
 ‚Üí Fetching weighted_sentiment ...

üîÑ Merging all time series...
üìä Final dataset shape: (181, 13)

===== üìà Correlation Table vs Bitcoin Price =====



Unnamed: 0,price
price,1.0
circulation,0.806687
whale_transaction_count,0.383953
social_volume,0.145193
social_dominance,0.138264
transaction_volume_in_profit_or_loss,0.126933
npl,0.110158
exchange_flow_balance,0.108536
daily_active_addresses,0.071565
weighted_sentiment,-0.058867
