In [61]:
# Pump‑stats API workflow  ────────────────────────────────────────────────
# 1. очистка таблицы `pump_event_stats`
# 2. получение последних свечей от API
# 3. фиксация новых памп‑ивентов в БД
# 4. просмотр записей и сводной статистики
#
# Все функции определены прямо здесь: ничего импортировать не нужно
import time, requests, psycopg2, logging
from datetime import datetime, timedelta, timezone
from typing import Optional, Tuple, Dict, List, Callable


In [105]:

# ————————————————————————————————————————————————
# 1) Логирование
logging.basicConfig(
    level=logging.INFO,
    format='[%(asctime)s] %(levelname)s: %(message)s',
    datefmt='%H:%M:%S',
)
logger = logging.getLogger(__name__)

# ————————————————————————————————————————————————
# 2) Параметры подключения к Postgres
DB: Dict[str, str | int] = dict(
    dbname='mexc_statistics_case',
    user='postgres',
    password='1374fjsney831',
    host='localhost',
    port=5432,
)
SYMBOL: str   = 'ALPACA_USDT'
PUMP_THR: float = 1.001        # +7 % к цене 3 ч назад
CHECK_INTSEC   = 10           # опрос каждую минуту


In [106]:

# ———————————————————————————————————————————————— DB helpers
def _pg():
    conn = psycopg2.connect(**DB)
    conn.autocommit = True
    return conn

def init_table() -> None:
    ddl = """
    CREATE TABLE IF NOT EXISTS pump_event_stats (
        id            SERIAL PRIMARY KEY,
        coin_symbol   TEXT NOT NULL REFERENCES coins(coin) ON DELETE CASCADE,
        pump_start_ts BIGINT NOT NULL,
        pump_end_ts   BIGINT,
        duration_sec  INTEGER,
        diff_percent  NUMERIC(8,3),
        bucket        VARCHAR(20),
        is_profit     BOOLEAN,
        UNIQUE (coin_symbol, pump_start_ts)
    );"""
    with _pg() as conn, conn.cursor() as cur:
        cur.execute(ddl)

def clear_table(symbol: str) -> None:
    with _pg() as conn, conn.cursor() as cur:
        cur.execute('DELETE FROM pump_event_stats WHERE coin_symbol=%s', (symbol,))

def upsert_stat(symbol: str, ts_start: int, ts_end: Optional[int],
                diff_pct: Optional[float], bucket: str, is_profit: Optional[bool]) -> None:
    duration = ts_end - ts_start if ts_end else None
    sql = """
    INSERT INTO pump_event_stats
          (coin_symbol, pump_start_ts, pump_end_ts,
           duration_sec, diff_percent, bucket, is_profit)
    VALUES (%s,%s,%s,%s,%s,%s,%s)
    ON CONFLICT (coin_symbol, pump_start_ts)
        DO UPDATE SET
            pump_end_ts   = EXCLUDED.pump_end_ts,
            duration_sec  = EXCLUDED.duration_sec,
            diff_percent  = EXCLUDED.diff_percent,
            bucket        = EXCLUDED.bucket,
            is_profit     = EXCLUDED.is_profit;"""
    with _pg() as conn, conn.cursor() as cur:
        cur.execute(sql, (symbol, ts_start, ts_end, duration,
                          diff_pct, bucket, is_profit))

def stats_last72h(symbol: str) -> Dict[str, int | float]:
    t0 = int((datetime.now(timezone.utc) - timedelta(days=3)).timestamp())
    sql = """
    WITH s AS (
        SELECT * FROM pump_event_stats
        WHERE coin_symbol=%s AND pump_start_ts >= %s
    )
    SELECT
        SUM((bucket='fifteen_minutes')::int) AS fifteen,
        SUM((bucket='three_hours')::int)     AS three_h,
        SUM((bucket='one_day')::int)         AS one_d,
        SUM((bucket='more_one_day')::int)    AS more_d,
        SUM((bucket='in_deal')::int)         AS in_deal,
        COUNT(*)                             AS total,
        SUM((is_profit IS TRUE)::int)        AS wins,
        SUM((is_profit IS FALSE)::int)       AS losses,
        ROUND(COALESCE(SUM(diff_percent)
              FILTER (WHERE is_profit),0),2) AS sum_profit_pct,
        ROUND(COALESCE(SUM(diff_percent)
              FILTER (WHERE is_profit IS FALSE),0),2) AS sum_loss_pct
    FROM s;"""
    with _pg() as conn, conn.cursor() as cur:
        cur.execute(sql, (symbol, t0))
        return dict(zip([d.name for d in cur.description], cur.fetchone()))


In [107]:

# ———————————————————————————————————————————————— API helpers
def last_candle(symbol: str,
                session: Callable[..., requests.Response] = requests.get,
                interval: str = 'Min1') -> Optional[Tuple[int, float]]:
    url = f'https://contract.mexc.com/api/v1/contract/kline/{symbol}'
    params = {'interval': interval}
    try:
        r = session(url, params=params, timeout=10)
        r.raise_for_status()
        j = r.json()
        if not j.get('success', True):
            logger.warning('MEXC error: %s', j.get('msg', j.get('code')))
            return None
        data = j.get('data', [])
        if not data:
            logger.warning('empty candles list')
            return None

        if isinstance(data, list):
            ts_sec  = int(data[-1][0]) // 1000
            price_o = float(data[-1][1])
        else:                                         # dict
            ts_sec  = int(data['time'][-1]) // 1000
            price_o = float(data['open'][-1])

        return ts_sec, price_o

    except Exception as exc:
        logger.warning('network/json error: %s', exc)
        return None



In [108]:
def history_last_hours(symbol: str, hours: int = 3,
                       session=requests.get) -> List[Tuple[int, float]]:
    """Вернёт [(ts_sec, open_price)] за N часов (Min1)."""
    url = f'https://contract.mexc.com/api/v1/contract/kline/{symbol}'
    params = {
        'interval': 'Min1',
        'start'   : int(time.time()*1000) - hours*3600*1000,
    }
    r = session(url, params=params, timeout=10)
    r.raise_for_status()
    data = r.json()['data']

    result: list[tuple[int, float]] = []

    if isinstance(data, list):                          # формат: [[ts_ms, open, ...], ...]
        for row in data:
            ts_sec = int(row[0]) // 1000
            price  = float(row[1])
            result.append((ts_sec, price))

    elif isinstance(data, dict):                        # формат: {'time': [...], 'open': [...] }
        for ts_ms, op in zip(data['time'], data['open']):
            ts_sec = int(ts_ms) // 1000
            price  = float(op)
            result.append((ts_sec, price))

    else:
        logger.warning('unexpected kline format: %s', type(data))

    return result


In [109]:

# ———————————————————————————————————————————————— PumpWatcher
class PumpWatcher:
    """Detect pump starts/ends and log to DB (stateless external)."""

    def __init__(self, symbol: str,
                 pump_thr: float = PUMP_THR,
                 api_fn: Callable[[str], Optional[Tuple[int, float]]] = last_candle):
        self.symbol = symbol
        self.pump_thr = pump_thr
        self.api_fn = api_fn
        # FIX №2: сразу загружаем 3‑часовую историю
        self.history: List[Tuple[int, float]] = history_last_hours(symbol)
        self.open_pumps: Dict[int, float] = {}

    # ───────── helpers ─────────
    @staticmethod
    def _bucket(age: int) -> str:
        if age < 900:
            return 'fifteen_minutes'
        if age < 3 * 3600:
            return 'three_hours'
        if age < 24 * 3600:
            return 'one_day'
        return 'more_one_day'

    # ───────── main step ─────────
    def step(self) -> None:
        candle = self.api_fn(self.symbol)
        if candle is None:
            return
        ts, price = candle
        self.history.append((ts, price))

        # 1) open pump?
        ts_3h = ts - 3 * 3600
        base_price = next((p for (t, p) in reversed(self.history) if t <= ts_3h), None)
        if base_price and price >= base_price * self.pump_thr:
            if ts not in self.open_pumps:
                self.open_pumps[ts] = price
                logger.info('🚀 START %s price=%.4f', datetime.utcfromtimestamp(ts), price)
                upsert_stat(self.symbol, ts, None, None, 'in_deal', None)

        # 2) close pumps?
        to_close = []
        for ts_start, p_start in self.open_pumps.items():
            age = ts - ts_start
            if price <= p_start * 0.93:  # profit
                bucket = self._bucket(age)
                diff = round((price - p_start) / p_start * 100, 3)
                upsert_stat(self.symbol, ts_start, ts, diff, bucket, True)
                logger.info('✅ PROFIT close %dm diff=%s%%', age // 60, diff)
                to_close.append(ts_start)
            elif age > 24 * 3600:        # loss (time)
                diff = round((price - p_start) / p_start * 100, 3)
                upsert_stat(self.symbol, ts_start, ts, diff, 'more_one_day', False)
                logger.info('❌ LOSS close >24h diff=%s%%', diff)
                to_close.append(ts_start)
        for t in to_close:
            del self.open_pumps[t]

print('Функции и класс PumpWatcher готовы!')


Функции и класс PumpWatcher готовы!


In [110]:

# ───────────────────────────────────────────────────────────────────────────
# Шаг 1 — подготовить базу
init_table()
clear_table(SYMBOL)
print(f"Таблица 'pump_event_stats' очищена для {SYMBOL} ✨")


Таблица 'pump_event_stats' очищена для ALPACA_USDT ✨


In [111]:

# ───────────────────────────────────────────────────────────────────────────
# Шаг 2 — один ручной step
watcher = PumpWatcher(SYMBOL)
watcher.step()


In [112]:

# ───────────────────────────────────────────────────────────────────────────
# Шаг 3 — последние записи в БД
import pandas as pd
df = pd.read_sql_query(
    '''
    SELECT id, to_timestamp(pump_start_ts) AS start_utc,
           bucket, diff_percent, is_profit
    FROM pump_event_stats
    WHERE coin_symbol=%s
    ORDER BY id DESC
    LIMIT 10
    ''', _pg(), params=(SYMBOL,)
)
display(df)


  df = pd.read_sql_query(


Unnamed: 0,id,start_utc,bucket,diff_percent,is_profit


In [113]:

# ───────────────────────────────────────────────────────────────────────────
# Шаг 4 — статистика за 72 ч
for k, v in stats_last72h(SYMBOL).items():
    print(f'{k:15} → {v}')


fifteen         → None
three_h         → None
one_d           → None
more_d          → None
in_deal         → None
total           → 0
wins            → None
losses          → None
sum_profit_pct  → 0.00
sum_loss_pct    → 0.00


In [114]:

# ───────────────────────────────────────────────────────────────────────────
# Шаг 5 — непрерывный мониторинг (остановить ⏹️)
import itertools
try:
    for _ in itertools.count():
        watcher.step()
        time.sleep(CHECK_INTSEC)
except KeyboardInterrupt:
    print('⏹️  Остановлено пользователем')


⏹️  Остановлено пользователем


In [115]:
import pandas as pd
pd.read_sql_query(
    "SELECT id, to_timestamp(pump_start_ts) AS start_utc, bucket "
    "FROM pump_event_stats WHERE coin_symbol=%s ORDER BY id DESC LIMIT 5",
    _pg(), params=(SYMBOL,)
)

  pd.read_sql_query(


Unnamed: 0,id,start_utc,bucket
