## Работа с API

In [86]:
!pip install confluent-kafka
!pip install requests
!pip install numpy
!pip install pandas

Collecting pandas
  Downloading pandas-2.2.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (19 kB)
Collecting pytz>=2020.1 (from pandas)
  Downloading pytz-2024.1-py2.py3-none-any.whl.metadata (22 kB)
Collecting tzdata>=2022.7 (from pandas)
  Downloading tzdata-2024.1-py2.py3-none-any.whl.metadata (1.4 kB)
Downloading pandas-2.2.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (13.0 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m13.0/13.0 MB[0m [31m308.6 kB/s[0m eta [36m0:00:00[0m00:01[0m00:02[0m
[?25hDownloading pytz-2024.1-py2.py3-none-any.whl (505 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m505.5/505.5 kB[0m [31m180.0 kB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hDownloading tzdata-2024.1-py2.py3-none-any.whl (345 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m345.4/345.4 kB[0m [31m246.6 kB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hInstalling collected packages: p

In [91]:
import requests
import json
import time
import socket
import numpy as np
import pandas as pd
import os

from confluent_kafka import Producer, Consumer

In [5]:
def get_exchanges() -> dict:
    coins_url = (
        'https://api.coingecko.com/api/v3/exchanges'
    )
    content = requests.get(coins_url)
    return json.loads(content.text)

def get_exchange_volume_chart(id: str, days: int) -> dict:
    coins_url = (
        f'https://api.coingecko.com/api/v3/exchanges/{id}/volume_chart?days={days}'
    )
    content = requests.get(coins_url)
    return json.loads(content.text)

In [6]:
all_exchanges = get_exchanges()

In [7]:
all_exchanges[0]

{'id': 'binance',
 'name': 'Binance',
 'year_established': 2017,
 'country': 'Cayman Islands',
 'description': '',
 'url': 'https://www.binance.com/',
 'image': 'https://assets.coingecko.com/markets/images/52/small/binance.jpg?1706864274',
 'has_trading_incentive': False,
 'trust_score': 10,
 'trust_score_rank': 1,
 'trade_volume_24h_btc': 418057.62553189904,
 'trade_volume_24h_btc_normalized': 226153.870854533}

In [8]:
exchange_volume_chart_binance_last_day = get_exchange_volume_chart(
    id=all_exchanges[0]['id'],
    days=1
)

In [9]:
exchange_volume_chart_binance_last_day

[[1713347400000.0, '365791.1777479017548486'],
 [1713348000000.0, '365471.9121705432740441'],
 [1713348600000.0, '365628.16084551856028'],
 [1713349200000.0, '366669.4935562056702937'],
 [1713349800000.0, '365661.6695329617283305'],
 [1713350400000.0, '358543.7867133761490612'],
 [1713351000000.0, '355192.9208073091993576'],
 [1713351600000.0, '352122.1787715368356311'],
 [1713352200000.0, '350013.8866300359482958'],
 [1713352800000.0, '346827.057331568580111'],
 [1713353400000.0, '345378.4748691548617859'],
 [1713354000000.0, '345039.9956128462333246'],
 [1713354600000.0, '344066.4769697557836026'],
 [1713355200000.0, '344898.6909261810859386'],
 [1713355800000.0, '346113.7722950568755148'],
 [1713356400000.0, '347084.2139567983716556'],
 [1713357000000.0, '348809.2412024612614132'],
 [1713357600000.0, '350889.3622555192670582'],
 [1713358200000.0, '353166.3062269148456086'],
 [1713358800000.0, '354350.5863314428587543'],
 [1713359400000.0, '354430.9726963398504743'],
 [1713360000000.

In [219]:
def iter_all_exc_volume_charts(delay=10.1, auto_retry=True, retry_delay=31, limit=None):
  for exc in all_exchanges:

    while True:
      volume_one_day_data = get_exchange_volume_chart(
        id=exc['id'],
        days=1
      )

      if auto_retry:
        try:
          volume_one_day_data['status']['error_code']
          print(f'Error! Retrying after {retry_delay} s.')
          time.sleep(retry_delay)
          print('Retrying...')
        except TypeError:
          break
      else:
        break

    time.sleep(delay)
    
    message = {
        exc['name']: volume_one_day_data,
    }

    yield str.encode(json.dumps(message))

    if limit is None:
      pass
    elif limit == 1:
      return
    else:
      limit -= 1

## Работа с Kafka

### Отправка сообщений

In [36]:
topic = 'coingecko'

producer_config = {
    'bootstrap.servers': 'localhost:9092',
    'client.id': socket.gethostname()
}

producer = Producer(producer_config)

In [38]:
iter_limit = 10

for message_bytes in iter_all_exc_volume_charts(limit=iter_limit):
    producer.produce(
        topic=topic,
        value=message_bytes,
        partition=0,
    )
    print(f'MSG: {message_bytes}')
    
producer.flush()

MSG: b'{"Binance": [[1713349800000.0, "365661.6695329617283305"], [1713350400000.0, "358543.7867133761490612"], [1713351000000.0, "355192.9208073091993576"], [1713351600000.0, "352122.1787715368356311"], [1713352200000.0, "350013.8866300359482958"], [1713352800000.0, "346827.057331568580111"], [1713353400000.0, "345378.4748691548617859"], [1713354000000.0, "345039.9956128462333246"], [1713354600000.0, "344066.4769697557836026"], [1713355200000.0, "344898.6909261810859386"], [1713355800000.0, "346113.7722950568755148"], [1713356400000.0, "347084.2139567983716556"], [1713357000000.0, "348809.2412024612614132"], [1713357600000.0, "350889.3622555192670582"], [1713358200000.0, "353166.3062269148456086"], [1713358800000.0, "354350.5863314428587543"], [1713359400000.0, "354430.9726963398504743"], [1713360000000.0, "354572.7077218807996656"], [1713360600000.0, "354361.3786142015211013"], [1713361200000.0, "353028.7777693933851107"], [1713361800000.0, "349218.4332708278785144"], [1713362400000.

0

### Получение сообщений (проверка)

In [39]:
topic = 'coingecko'

consumer_config = {
    'bootstrap.servers': 'localhost:9092', 
    'group.id': 'coingecko_consumer', 
    'auto.offset.reset': 'earliest'
}

consumer = Consumer(consumer_config)

consumer.subscribe([topic])

In [40]:
collected_data = []

while True:
    msg = consumer.poll(1.0)
    
    if msg is None:
        continue
    if msg.error():
        print(f'Consumer error: {msg.error()}')
        continue
    
    data = json.loads(msg.value().decode('utf-8'))
    collected_data.append(data)
    print(f"Received data: {data}")

consumer.close()

Received data: {'Binance': [[1713349800000.0, '365661.6695329617283305'], [1713350400000.0, '358543.7867133761490612'], [1713351000000.0, '355192.9208073091993576'], [1713351600000.0, '352122.1787715368356311'], [1713352200000.0, '350013.8866300359482958'], [1713352800000.0, '346827.057331568580111'], [1713353400000.0, '345378.4748691548617859'], [1713354000000.0, '345039.9956128462333246'], [1713354600000.0, '344066.4769697557836026'], [1713355200000.0, '344898.6909261810859386'], [1713355800000.0, '346113.7722950568755148'], [1713356400000.0, '347084.2139567983716556'], [1713357000000.0, '348809.2412024612614132'], [1713357600000.0, '350889.3622555192670582'], [1713358200000.0, '353166.3062269148456086'], [1713358800000.0, '354350.5863314428587543'], [1713359400000.0, '354430.9726963398504743'], [1713360000000.0, '354572.7077218807996656'], [1713360600000.0, '354361.3786142015211013'], [1713361200000.0, '353028.7777693933851107'], [1713361800000.0, '349218.4332708278785144'], [171336

KeyboardInterrupt: 

In [41]:
collected_data

[{'Binance': [[1713349800000.0, '365661.6695329617283305'],
   [1713350400000.0, '358543.7867133761490612'],
   [1713351000000.0, '355192.9208073091993576'],
   [1713351600000.0, '352122.1787715368356311'],
   [1713352200000.0, '350013.8866300359482958'],
   [1713352800000.0, '346827.057331568580111'],
   [1713353400000.0, '345378.4748691548617859'],
   [1713354000000.0, '345039.9956128462333246'],
   [1713354600000.0, '344066.4769697557836026'],
   [1713355200000.0, '344898.6909261810859386'],
   [1713355800000.0, '346113.7722950568755148'],
   [1713356400000.0, '347084.2139567983716556'],
   [1713357000000.0, '348809.2412024612614132'],
   [1713357600000.0, '350889.3622555192670582'],
   [1713358200000.0, '353166.3062269148456086'],
   [1713358800000.0, '354350.5863314428587543'],
   [1713359400000.0, '354430.9726963398504743'],
   [1713360000000.0, '354572.7077218807996656'],
   [1713360600000.0, '354361.3786142015211013'],
   [1713361200000.0, '353028.7777693933851107'],
   [171336

%4|1713436506.939|MAXPOLL|rdkafka#consumer-7| [thrd:main]: Application maximum poll interval (300000ms) exceeded by 214ms (adjust max.poll.interval.ms for long-running message processing): leaving group


### Сохранение данных

In [213]:
def process_and_save(exc_name, exc_volume_chart, file_name):

    n_batches = 6
    batch_size = len(exc_volume_chart)//n_batches

    if os.path.exists(file_name):
        df = pd.read_csv(file_name, index_col='Unnamed: 0')
    else:
        df = pd.DataFrame(columns=['exchange'] + [f'volume_{v+1}' for v in range(6)])

    new_exc_volume_chart = []

    for ib in range(n_batches):
        batch = exc_volume_chart[ib*batch_size:(ib+1)*batch_size]
        new_exc_volume_chart.append(np.mean([float(i) for _, i in batch]))

    itm = {'exchange': exc_name}
    itm.update({f'volume_{i+1}': v for i, v in enumerate(new_exc_volume_chart)})

    if df[df.exchange == exc_name].empty:
        df = df._append(itm, ignore_index=True)
    else:
        df[df.exchange == exc_name] = itm.values()

    df.to_csv(file_name)

process_and_save(
    exc_name='test',
    exc_volume_chart=collected_data[0]['Binance'],
    file_name='out_test.csv'
)

  df = df._append(itm, ignore_index=True)


In [214]:
os.remove('out_test.csv')

In [215]:
file_name = 'out_test.csv'

for item in collected_data:
    exc_name, exc_volume_chart = list(item.items())[0]

    process_and_save(
        exc_name=exc_name,
        exc_volume_chart=exc_volume_chart,
        file_name=file_name
    )

  df = df._append(itm, ignore_index=True)


In [216]:
pd.read_csv('out_test.csv', index_col='Unnamed: 0')

Unnamed: 0,exchange,volume_1,volume_2,volume_3,volume_4,volume_5,volume_6
0,Binance,350771.649494,394268.082415,410599.956595,413703.33774,410450.781169,414245.880433
1,Bybit,73441.661024,82634.054282,85585.175603,86667.729861,88884.626454,92833.133659
2,OKX,51526.573701,59809.947618,63559.000896,63049.751216,63350.733081,63949.013579
3,Coinbase Exchange,44971.789918,51720.087533,55387.958585,54648.754048,54654.525617,53849.205097
4,Bitget,50165.748676,47713.837835,45935.749658,45698.741624,45077.333684,44901.003718
5,HTX,38655.01865,39490.053483,39126.732057,39383.082477,39692.3171,40813.204661
6,Gate.io,46874.373605,52470.477821,55761.083291,58040.31843,59606.695941,61668.554863
7,Kraken,18612.319066,21193.772542,22035.568282,21250.147074,21289.747074,21778.341772
8,KuCoin,12696.797462,14256.991334,15698.305747,15903.152408,18720.661909,20351.679074
9,Crypto.com Exchange,21020.601491,21755.344628,22010.923407,21778.370538,22153.531407,22399.17501
