In [None]:
class UpbitDataFetcher:
    def __init__(self, batch_size=30, delay=2):
        self.base_url = "https://api.upbit.com/v1/candles/minutes/1"
        self.headers = {"accept": "application/json"}
        self.shit_list = [
            'KRW-GRS', 'KRW-SBD', 'KRW-BOUNTY', 'KRW-MOC', 'KRW-TT', 'KRW-GAME2',
            'KRW-MED', 'KRW-MLK', 'KRW-DKA', 'KRW-BORA', 'KRW-AHT', 'KRW-MVL',
            'KRW-HUNT', 'KRW-CRO', 'KRW-FCT2', 'KRW-HPO', 'KRW-CBK', 'KRW-AQT',
            'KRW-STRIKE', 'KRW-META', 'KRW-CTC', 'KRW-MNT', 'KRW-TAIKO', 'KRW-SOL',
            'KRW-BLAST', 'KRW-ATH', 'KRW-CARV', 'KRW-BTC', 'KRW-ETC', 'KRW-ETH', 
        ]
        self.filtered_pairs = []
        self.all_candles_data = []
        self.batch_size = batch_size
        self.delay = delay

    def fetch_market_pairs(self):
        """Получает список всех пар с Upbit и фильтрует их."""
        url = "https://api.upbit.com/v1/market/all?isDetails=true"
        try:
            response = requests.get(url, headers=self.headers)
            response.raise_for_status()
            markets_data = response.json()

            df = pd.DataFrame(markets_data)
            df = df.rename(columns={
                'market': 'Market Pair',
                'korean_name': 'Korean Name',
                'english_name': 'English Name'
            })

            # Фильтруем пары
            self.filtered_pairs = df[
                df['Market Pair'].str.startswith('KRW-') & 
                ~df['Market Pair'].isin(self.shit_list)
            ]['Market Pair'].tolist()

            print(f"Filtered market pairs: {self.filtered_pairs[:5]} ...")
        except requests.exceptions.RequestException as e:
            print(f"An error occurred: {e}")

    async def fetch_candle_data(self, pair, session):
        """Асинхронный метод для получения свечей по одной паре."""
        to_date = (datetime.utcnow() - timedelta(minutes=1)).strftime('%Y-%m-%dT%H:%M:%S')
        params = {
            'market': pair,
            'count': 1,
            'to': to_date
        }
        try:
            async with session.get(self.base_url, params=params, headers=self.headers) as response:
                if response.status == 429:
                    print(f"Rate limit exceeded for {pair}. Retrying after 1 second.")
                    await asyncio.sleep(1)
                    return await self.fetch_candle_data(pair, session)  # Повторить запрос
                response.raise_for_status()
                candles = await response.json()

                if candles:
                    candles[0]['market'] = pair
                    self.all_candles_data.append(candles[0])
                    print(f"Successfully fetched data for {pair}")
                else:
                    print(f"No data returned for {pair}")
        except aiohttp.ClientError as e:
            print(f"Failed to fetch data for {pair}: {e}")

    async def fetch_candles_batch(self, pairs_batch, session):
        """Обрабатывает пакет пар."""
        tasks = [self.fetch_candle_data(pair, session) for pair in pairs_batch]
        await asyncio.gather(*tasks)

    async def fetch_all_candles(self):
        """Главный метод для получения данных по пакетам."""
        async with aiohttp.ClientSession() as session:
            for i in range(0, len(self.filtered_pairs), self.batch_size):
                batch = self.filtered_pairs[i:i + self.batch_size]
                print(f"Fetching batch {i // self.batch_size + 1} of {len(self.filtered_pairs) // self.batch_size + 1}")
                await self.fetch_candles_batch(batch, session)
                await asyncio.sleep(self.delay)  # Задержка между пакетами

    def save_to_csv(self):
        """Сохраняет собранные данные в CSV."""
        if self.all_candles_data:
            df = pd.DataFrame(self.all_candles_data)
            df['candle_date_time_utc'] = pd.to_datetime(df['candle_date_time_utc'])
            df.to_csv("upbit_minute_candles.csv", index=False)
            print("Сохранены свечи:", df.head())

    def run(self):
        """Запуск сбора данных."""
        print("Fetching Upbit market pairs...")
        self.fetch_market_pairs()
        print("Fetching candles...")
        asyncio.run(self.fetch_all_candles())
        self.save_to_csv()


# Пример использования:
fetcher = UpbitDataFetcher(batch_size=30, delay=2)
fetcher.run()

Fetching Upbit market pairs...
Filtered market pairs: ['KRW-NEO', 'KRW-MTL', 'KRW-XRP', 'KRW-SNT', 'KRW-WAVES'] ...
Fetching candles...
Fetching batch 1 of 4
Successfully fetched data for KRW-POLYX
Successfully fetched data for KRW-XEM
Successfully fetched data for KRW-LOOM
Successfully fetched data for KRW-ARK
Successfully fetched data for KRW-STORJ
Successfully fetched data for KRW-IOST
Successfully fetched data for KRW-ADA
Successfully fetched data for KRW-EOS
Successfully fetched data for KRW-STEEM
Rate limit exceeded for KRW-NEO. Retrying after 1 second.
Successfully fetched data for KRW-ZRX
Rate limit exceeded for KRW-SC. Retrying after 1 second.
Rate limit exceeded for KRW-IQ. Retrying after 1 second.
Rate limit exceeded for KRW-MTL. Retrying after 1 second.
Rate limit exceeded for KRW-WAVES. Retrying after 1 second.
Rate limit exceeded for KRW-SNT. Retrying after 1 second.
Rate limit exceeded for KRW-POWR. Retrying after 1 second.
Rate limit exceeded for KRW-ZIL. Retrying after

get candles for binance

In [None]:
import nest_asyncio

nest_asyncio.apply()  # Позволяет запускать asyncio.run() в существующем loop

class BinanceDataFetcher:
    def __init__(self):
        self.base_url_spot = "https://api.binance.com/api/v3/klines"
        self.base_url_perp = "https://fapi.binance.com/fapi/v1/klines"
        self.all_candles_data = []

    async def fetch_candle(self, session, base_url, pair, interval="1m"):
        """Получает одну минутную свечу для указанной пары с Binance."""
        params = {
            "symbol": pair.replace("/", ""),
            "interval": interval,
            "limit": 1,
            "endTime": int((datetime.utcnow() - timedelta(minutes=1)).timestamp() * 1000)
        }
        async with session.get(base_url, params=params) as response:
            if response.status == 200:
                data = await response.json()
                if data:
                    return {
                        "market": pair,
                        "candle_date_time_utc": datetime.utcfromtimestamp(data[0][0] / 1000).strftime("%Y-%m-%d %H:%M:%S"),
                        "opening_price": float(data[0][1]),
                        "high_price": float(data[0][2]),
                        "low_price": float(data[0][3]),
                        "trade_price": float(data[0][4]),
                        "candle_acc_trade_price": float(data[0][7]),
                        "candle_acc_trade_volume": float(data[0][5]),
                    }
            return None

    async def fetch_all_binance_candles(self, pairs):
        """Получает свечи для всех пар из списка, включая спот и perp."""
        async with aiohttp.ClientSession() as session:
            tasks = []
            for pair in pairs:
                tasks.append(self.fetch_candle(session, self.base_url_spot, pair))
                tasks.append(self.fetch_candle(session, self.base_url_perp, pair))

            results = await asyncio.gather(*tasks, return_exceptions=True)

            for i in range(0, len(results), 2):
                spot_data = results[i]
                perp_data = results[i + 1]
                pair = pairs[i // 2]

                if spot_data:
                    self.all_candles_data.append(spot_data)
                elif perp_data:
                    perp_data["market"] += " (perp)"
                    self.all_candles_data.append(perp_data)

    def save_to_csv(self):
        """Сохраняет полученные данные в CSV."""
        if self.all_candles_data:
            df = pd.DataFrame(self.all_candles_data)
            df.to_csv("binance_minute_candles.csv", index=False)
            print("Сохранены свечи:", df.head())

    def run(self, pairs):
        """Запускает сбор данных по свечам."""
        print("Fetching Binance candles...")
        asyncio.run(self.fetch_all_binance_candles(pairs))
        self.save_to_csv()


# Пример использования:
pairs_to_check = [
    "BTC/USDT", "ETH/USDT", "XRP/USDT", "BNB/USDT", "SOL/USDT", "DOGE/USDT",  # Добавьте сюда нужные пары
    "GRS/USDT", "SBD/USDT"  # Пары, которые могут быть не найдены
]

fetcher = BinanceDataFetcher()
fetcher.run(pairs_to_check)

Fetching Binance candles...
Сохранены свечи:      market candle_date_time_utc  opening_price  high_price   low_price  \
0  BTC/USDT  2024-11-28 11:13:00     94952.2900  95000.0000  94947.7500   
1  ETH/USDT  2024-11-28 11:13:00      3598.2500   3602.3000   3597.7200   
2  XRP/USDT  2024-11-28 11:13:00         1.4497      1.4526      1.4493   
3  BNB/USDT  2024-11-28 11:13:00       656.5100    657.4000    656.3400   
4  SOL/USDT  2024-11-28 11:13:00       235.6000    235.8500    235.5600   

   trade_price  candle_acc_trade_price  candle_acc_trade_volume  
0   94999.9900           772837.759326                  8.13646  
1    3602.2900           502370.398107                139.53740  
2       1.4526           307995.935800             212225.00000  
3     657.1400           409393.670890                623.07800  
4     235.8400           240847.936020               1021.73900  


In [102]:
import nest_asyncio
import asyncio
import aiohttp
import requests
import pandas as pd
from datetime import datetime, timedelta

nest_asyncio.apply()  # Позволяет запускать asyncio.run() в существующем loop

# Класс для получения данных с Upbit
class UpbitDataFetcher:
    def __init__(self, batch_size=30, delay=2):
        self.base_url = "https://api.upbit.com/v1/candles/minutes/1"
        self.headers = {"accept": "application/json"}
        # Убираем пару KRW-USDT из shit_list
        self.shit_list = [
            'KRW-GRS', 'KRW-SBD', 'KRW-BOUNTY', 'KRW-MOC', 'KRW-TT', 'KRW-GAME2',
            'KRW-MED', 'KRW-MLK', 'KRW-DKA', 'KRW-BORA', 'KRW-AHT', 'KRW-MVL',
            'KRW-HUNT', 'KRW-CRO', 'KRW-FCT2', 'KRW-HPO', 'KRW-CBK', 'KRW-AQT',
            'KRW-STRIKE', 'KRW-META', 'KRW-CTC', 'KRW-MNT', 'KRW-TAIKO', 'KRW-SOL',
            'KRW-BLAST', 'KRW-ATH', 'KRW-CARV', 'KRW-BTC', 'KRW-ETC', 'KRW-ETH', 
        ]
        self.filtered_pairs = []
        self.all_candles_data = []
        self.batch_size = batch_size
        self.delay = delay
        self.krw_usdt_rate = None  # Это будет хранить курс KRW-USDT

    def fetch_market_pairs(self):
        """Получает список всех пар с Upbit и фильтрует их."""
        url = "https://api.upbit.com/v1/market/all?isDetails=true"
        try:
            response = requests.get(url, headers=self.headers)
            response.raise_for_status()
            markets_data = response.json()

            df = pd.DataFrame(markets_data)
            df = df.rename(columns={
                'market': 'Market Pair',
                'korean_name': 'Korean Name',
                'english_name': 'English Name'
            })

            # Фильтруем пары
            self.filtered_pairs = df[
                df['Market Pair'].str.startswith('KRW-') & 
                ~df['Market Pair'].isin(self.shit_list)
            ]['Market Pair'].tolist()

            print(f"Filtered market pairs: {self.filtered_pairs[:5]} ...")
        except requests.exceptions.RequestException as e:
            print(f"An error occurred: {e}")

    async def fetch_krw_usdt_rate(self, session):
        """Получает курс KRW-USDT для конвертации."""
        url = "https://api.upbit.com/v1/ticker"
        params = {"markets": "KRW-USDT"}
        try:
            async with session.get(url, params=params) as response:
                if response.status == 200:
                    data = await response.json()
                    if data:
                        self.krw_usdt_rate = data[0]["trade_price"]
                        print(f"KRW/USDT rate: {self.krw_usdt_rate}")
        except aiohttp.ClientError as e:
            print(f"Failed to fetch KRW-USDT rate: {e}")

    async def fetch_candle_data(self, pair, session):
        """Асинхронный метод для получения свечей по одной паре."""
        to_date = (datetime.utcnow() - timedelta(minutes=1)).strftime('%Y-%m-%dT%H:%M:%S')
        params = {
            'market': pair,
            'count': 1,
            'to': to_date
        }
        try:
            async with session.get(self.base_url, params=params, headers=self.headers) as response:
                if response.status == 429:
                    print(f"Rate limit exceeded for {pair}. Retrying after 1 second.")
                    await asyncio.sleep(1)
                    return await self.fetch_candle_data(pair, session)  # Повторить запрос
                response.raise_for_status()
                candles = await response.json()

                if candles:
                    # Конвертируем цены в доллары, если пара начинается с 'KRW-'
                    if pair.startswith("KRW-") and self.krw_usdt_rate:
                        for candle in candles:
                            candle['opening_price'] /= self.krw_usdt_rate
                            candle['high_price'] /= self.krw_usdt_rate
                            candle['low_price'] /= self.krw_usdt_rate
                            candle['trade_price'] /= self.krw_usdt_rate
                            candle['candle_acc_trade_price'] /= self.krw_usdt_rate
                            candle['candle_acc_trade_volume'] /= self.krw_usdt_rate

                    candles[0]['market'] = pair
                    self.all_candles_data.append(candles[0])
                    print(f"Successfully fetched data for {pair}")
                else:
                    print(f"No data returned for {pair}")
        except aiohttp.ClientError as e:
            print(f"Failed to fetch data for {pair}: {e}")

    async def fetch_candles_batch(self, pairs_batch, session):
        """Обрабатывает пакет пар."""
        tasks = [self.fetch_candle_data(pair, session) for pair in pairs_batch]
        await asyncio.gather(*tasks)

    async def fetch_all_candles(self):
        """Главный метод для получения данных по пакетам."""
        async with aiohttp.ClientSession() as session:
            await self.fetch_krw_usdt_rate(session)  # Получаем курс KRW-USDT
            for i in range(0, len(self.filtered_pairs), self.batch_size):
                batch = self.filtered_pairs[i:i + self.batch_size]
                print(f"Fetching batch {i // self.batch_size + 1} of {len(self.filtered_pairs) // self.batch_size + 1}")
                await self.fetch_candles_batch(batch, session)
                await asyncio.sleep(self.delay)  # Задержка между пакетами

    def save_to_csv(self):
        """Сохраняет собранные данные в CSV."""
        if self.all_candles_data:
            df = pd.DataFrame(self.all_candles_data)
            df['candle_date_time_utc'] = pd.to_datetime(df['candle_date_time_utc'])
            df.to_csv("upbit_minute_candles.csv", index=False)
            print("Сохранены свечи:", df.head())

    def run(self):
        """Запуск сбора данных."""
        print("Fetching Upbit market pairs...")
        self.fetch_market_pairs()
        print("Fetching candles...")
        asyncio.run(self.fetch_all_candles())
        self.save_to_csv()


# Класс для получения данных с Binance
class BinanceDataFetcher:
    def __init__(self):
        self.base_url_spot = "https://api.binance.com/api/v3/klines"
        self.base_url_perp = "https://fapi.binance.com/fapi/v1/klines"
        self.all_candles_data = []

    async def fetch_candle(self, session, base_url, pair, interval="1m"):
        """Получает одну минутную свечу для указанной пары с Binance."""
        params = {
            "symbol": pair.replace("/", ""),
            "interval": interval,
            "limit": 1,
            "endTime": int((datetime.utcnow() - timedelta(minutes=1)).timestamp() * 1000)
        }
        async with session.get(base_url, params=params) as response:
            if response.status == 200:
                data = await response.json()
                if data:
                    return {
                        "market": pair,
                        "candle_date_time_utc": datetime.utcfromtimestamp(data[0][0] / 1000).strftime("%Y-%m-%d %H:%M:%S"),
                        "opening_price": float(data[0][1]),
                        "high_price": float(data[0][2]),
                        "low_price": float(data[0][3]),
                        "trade_price": float(data[0][4]),
                        "candle_acc_trade_price": float(data[0][7]),
                        "candle_acc_trade_volume": float(data[0][5]),
                    }
            return None

    async def fetch_all_binance_candles(self, pairs):
        """Получает свечи для всех пар из списка, включая спот и perp."""
        async with aiohttp.ClientSession() as session:
            tasks = []
            for pair in pairs:
                tasks.append(self.fetch_candle(session, self.base_url_spot, pair))
                tasks.append(self.fetch_candle(session, self.base_url_perp, pair))

            results = await asyncio.gather(*tasks, return_exceptions=True)

            for i in range(0, len(results), 2):
                spot_data = results[i]
                perp_data = results[i + 1]
                pair = pairs[i // 2]

                if spot_data:
                    self.all_candles_data.append(spot_data)
                if perp_data:
                    self.all_candles_data.append(perp_data)

    def get_binance_candles_df(self):
        """Возвращает данные с Binance в виде DataFrame."""
        return pd.DataFrame(self.all_candles_data)


# Основной процесс
def main():
    upbit_fetcher = UpbitDataFetcher()
    binance_fetcher = BinanceDataFetcher()

    # Получаем данные с Upbit и Binance
    upbit_fetcher.run()

    # Получаем данные с Binance
    binance_fetcher.fetch_all_binance_candles(upbit_fetcher.filtered_pairs)

    # Создаём единую таблицу
    upbit_data_df = pd.read_csv("upbit_minute_candles.csv")
    binance_data_df = binance_fetcher.get_binance_candles_df()

    # Объединяем данные в одну таблицу по ключу 'market'
    combined_df = pd.concat([upbit_data_df, binance_data_df]).reset_index(drop=True)

    print("Объединённые данные:")
    print(combined_df.head())

if __name__ == "__main__":
    main()

Fetching Upbit market pairs...
Filtered market pairs: ['KRW-NEO', 'KRW-MTL', 'KRW-XRP', 'KRW-SNT', 'KRW-WAVES'] ...
Fetching candles...
KRW/USDT rate: 1402.0
Fetching batch 1 of 4
Successfully fetched data for KRW-NEO
Successfully fetched data for KRW-LSK
Successfully fetched data for KRW-BCH
Successfully fetched data for KRW-STORJ
Successfully fetched data for KRW-IOST
Successfully fetched data for KRW-ZIL
Successfully fetched data for KRW-SNT
Successfully fetched data for KRW-EOS
Successfully fetched data for KRW-ZRX
Rate limit exceeded for KRW-SC. Retrying after 1 second.
Rate limit exceeded for KRW-XRP. Retrying after 1 second.
Rate limit exceeded for KRW-MTL. Retrying after 1 second.
Rate limit exceeded for KRW-ONT. Retrying after 1 second.
Successfully fetched data for KRW-ADA
Rate limit exceeded for KRW-ICX. Retrying after 1 second.
Successfully fetched data for KRW-LOOM
Rate limit exceeded for KRW-XLM. Retrying after 1 second.
Rate limit exceeded for KRW-BTG. Retrying after 1 s

  binance_fetcher.fetch_all_binance_candles(upbit_fetcher.filtered_pairs)


In [104]:
import nest_asyncio
import asyncio
import aiohttp
import requests
import pandas as pd
from datetime import datetime, timedelta

nest_asyncio.apply()  # Позволяет запускать asyncio.run() в существующем loop

# Класс для получения данных с Upbit
class UpbitDataFetcher:
    def __init__(self, batch_size=30, delay=2):
        self.base_url = "https://api.upbit.com/v1/candles/minutes/1"
        self.headers = {"accept": "application/json"}
        # Убираем пару KRW-USDT из shit_list
        self.shit_list = [
            'KRW-GRS', 'KRW-SBD', 'KRW-BOUNTY', 'KRW-MOC', 'KRW-TT', 'KRW-GAME2',
            'KRW-MED', 'KRW-MLK', 'KRW-DKA', 'KRW-BORA', 'KRW-AHT', 'KRW-MVL',
            'KRW-HUNT', 'KRW-CRO', 'KRW-FCT2', 'KRW-HPO', 'KRW-CBK', 'KRW-AQT',
            'KRW-STRIKE', 'KRW-META', 'KRW-CTC', 'KRW-MNT', 'KRW-TAIKO', 'KRW-SOL',
            'KRW-BLAST', 'KRW-ATH', 'KRW-CARV', 'KRW-BTC', 'KRW-ETC', 'KRW-ETH', 
        ]
        self.filtered_pairs = []
        self.all_candles_data = []
        self.batch_size = batch_size
        self.delay = delay
        self.krw_usdt_rate = None  # Это будет хранить курс KRW-USDT

    def fetch_market_pairs(self):
        """Получает список всех пар с Upbit и фильтрует их."""
        url = "https://api.upbit.com/v1/market/all?isDetails=true"
        try:
            response = requests.get(url, headers=self.headers)
            response.raise_for_status()
            markets_data = response.json()

            df = pd.DataFrame(markets_data)
            df = df.rename(columns={
                'market': 'Market Pair',
                'korean_name': 'Korean Name',
                'english_name': 'English Name'
            })

            # Фильтруем пары
            self.filtered_pairs = df[
                df['Market Pair'].str.startswith('KRW-') & 
                ~df['Market Pair'].isin(self.shit_list)
            ]['Market Pair'].tolist()

            print(f"Filtered market pairs: {self.filtered_pairs[:5]} ...")
        except requests.exceptions.RequestException as e:
            print(f"An error occurred: {e}")

    async def fetch_krw_usdt_rate(self, session):
        """Получает курс KRW-USDT для конвертации."""
        url = "https://api.upbit.com/v1/ticker"
        params = {"markets": "KRW-USDT"}
        try:
            async with session.get(url, params=params) as response:
                if response.status == 200:
                    data = await response.json()
                    if data:
                        self.krw_usdt_rate = data[0]["trade_price"]
                        print(f"KRW/USDT rate: {self.krw_usdt_rate}")
        except aiohttp.ClientError as e:
            print(f"Failed to fetch KRW-USDT rate: {e}")

    async def fetch_candle_data(self, pair, session):
        """Асинхронный метод для получения свечей по одной паре."""
        to_date = (datetime.utcnow() - timedelta(minutes=1)).strftime('%Y-%m-%dT%H:%M:%S')
        params = {
            'market': pair,
            'count': 1,
            'to': to_date
        }
        try:
            async with session.get(self.base_url, params=params, headers=self.headers) as response:
                if response.status == 429:
                    print(f"Rate limit exceeded for {pair}. Retrying after 1 second.")
                    await asyncio.sleep(1)
                    return await self.fetch_candle_data(pair, session)  # Повторить запрос
                response.raise_for_status()
                candles = await response.json()

                if candles:
                    # Конвертируем цены в доллары, если пара начинается с 'KRW-'
                    if pair.startswith("KRW-") and self.krw_usdt_rate:
                        for candle in candles:
                            candle['opening_price'] /= self.krw_usdt_rate
                            candle['high_price'] /= self.krw_usdt_rate
                            candle['low_price'] /= self.krw_usdt_rate
                            candle['trade_price'] /= self.krw_usdt_rate
                            candle['candle_acc_trade_price'] /= self.krw_usdt_rate
                            candle['candle_acc_trade_volume'] /= self.krw_usdt_rate

                    # Преобразуем KRW- в TOKEN-USDT
                    token = pair.replace("KRW-", "")
                    pair_in_usdt = f"{token}-USDT"
                    candle_data = candles[0]
                    candle_data['market'] = pair_in_usdt  # Меняем на стиль Binance
                    self.all_candles_data.append(candle_data)
                    print(f"Successfully fetched data for {pair}")
                else:
                    print(f"No data returned for {pair}")
        except aiohttp.ClientError as e:
            print(f"Failed to fetch data for {pair}: {e}")

    async def fetch_candles_batch(self, pairs_batch, session):
        """Обрабатывает пакет пар."""
        tasks = [self.fetch_candle_data(pair, session) for pair in pairs_batch]
        await asyncio.gather(*tasks)

    async def fetch_all_candles(self):
        """Главный метод для получения данных по пакетам."""
        async with aiohttp.ClientSession() as session:
            await self.fetch_krw_usdt_rate(session)  # Получаем курс KRW-USDT
            for i in range(0, len(self.filtered_pairs), self.batch_size):
                batch = self.filtered_pairs[i:i + self.batch_size]
                print(f"Fetching batch {i // self.batch_size + 1} of {len(self.filtered_pairs) // self.batch_size + 1}")
                await self.fetch_candles_batch(batch, session)
                await asyncio.sleep(self.delay)  # Задержка между пакетами

    def save_to_csv(self):
        """Сохраняет собранные данные в CSV."""
        if self.all_candles_data:
            df = pd.DataFrame(self.all_candles_data)
            df['candle_date_time_utc'] = pd.to_datetime(df['candle_date_time_utc'])
            df.to_csv("upbit_minute_candles_converted.csv", index=False)
            print("Сохранены свечи:", df.head())

    def run(self):
        """Запуск сбора данных."""
        print("Fetching Upbit market pairs...")
        self.fetch_market_pairs()
        print("Fetching candles...")
        asyncio.run(self.fetch_all_candles())
        self.save_to_csv()


# Класс для получения данных с Binance
class BinanceDataFetcher:
    def __init__(self):
        self.base_url_spot = "https://api.binance.com/api/v3/klines"
        self.base_url_perp = "https://fapi.binance.com/fapi/v1/klines"
        self.all_candles_data = []

    async def fetch_candle(self, session, base_url, pair, interval="1m"):
        """Получает одну минутную свечу для указанной пары с Binance."""
        params = {
            "symbol": pair.replace("/", ""),
            "interval": interval,
            "limit": 1,
            "endTime": int((datetime.utcnow() - timedelta(minutes=1)).timestamp() * 1000)
        }
        async with session.get(base_url, params=params) as response:
            if response.status == 200:
                data = await response.json()
                if data:
                    return {
                        "market": pair,
                        "candle_date_time_utc": datetime.utcfromtimestamp(data[0][0] / 1000).strftime("%Y-%m-%d %H:%M:%S"),
                        "opening_price": float(data[0][1]),
                        "high_price": float(data[0][2]),
                        "low_price": float(data[0][3]),
                        "trade_price": float(data[0][4]),
                        "candle_acc_trade_price": float(data[0][7]),
                        "candle_acc_trade_volume": float(data[0][5])
                    }
            return None

    async def fetch_candles_batch(self, pairs_batch, session):
        """Получает свечи по batch."""
        tasks = [
            self.fetch_candle(session, self.base_url_spot, pair) for pair in pairs_batch
        ]
        return await asyncio.gather(*tasks)

    def get_binance_candles_df(self, table):
        """Возвращает DF c Binance."""
        return pd.DataFrame(table)

    async def fetch_binance_data(self, pairs):
        results = []
        async with aiohttp.ClientSession() as session:
            tasks = [self.fetch_candle(session, self.base_url_spot, pair) for pair in pairs]
            results = await asyncio.gather(*tasks)
        return [x for x in results if x]

    def run(self, upbit_symbols):
        loop = asyncio.get_event_loop()
        results = loop.run_until_complete(self.fetch_binance_data(upbit_symbols))
        self.all_candles_data = results
        print(f'Fetched {len(results)} pairs')

    def save_to_csv(self):
        """Сохраняет собранные данные в CSV."""
        if self.all_candles_data:
            df = pd.DataFrame(self.all_candles_data)
            df['candle_date_time_utc'] = pd.to_datetime(df['candle_date_time_utc'])
            df.to_csv("binance_candles.csv", index=False)
            print("Сохранены свечи Binance:", df.head())

    def run(self, upbit_symbols):
        loop = asyncio.get_event_loop()
        results = loop.run_until_complete(self.fetch_binance_data(upbit_symbols))
        self.all_candles_data = results
        print(f'Fetched {len(results)} pairs')

def main():
    upbit_fetcher = UpbitDataFetcher()
    binance_fetcher = BinanceDataFetcher()
    upbit_fetcher.run()
    converted_upbit = pd.read_csv("upbit_minute_candles_converted.csv")
    unique_binance_markets = converted_upbit['market'].unique().tolist()
    binance_fetcher.run(unique_binance_markets)
    binance_df = binance_fetcher.get_binance_candles_df(binance_fetcher.all_candles_data)
    result = pd.concat([converted_upbit, binance_df])
    result.to_csv("all_candles_merged.csv")
    print("Готово. См. файл: all_candles_merged.csv")

if __name__ == "__main__":
    main()

Fetching Upbit market pairs...
Filtered market pairs: ['KRW-NEO', 'KRW-MTL', 'KRW-XRP', 'KRW-SNT', 'KRW-WAVES'] ...
Fetching candles...
KRW/USDT rate: 1390.0
Fetching batch 1 of 4
Successfully fetched data for KRW-NEO
Successfully fetched data for KRW-ONT
Successfully fetched data for KRW-ARDR
Successfully fetched data for KRW-XLM
Successfully fetched data for KRW-SNT
Successfully fetched data for KRW-IOST
Successfully fetched data for KRW-STORJ
Successfully fetched data for KRW-IQ
Successfully fetched data for KRW-CVC
Successfully fetched data for KRW-ARK
Rate limit exceeded for KRW-QTUM. Retrying after 1 second.
Rate limit exceeded for KRW-BAT. Retrying after 1 second.
Rate limit exceeded for KRW-ZRX. Retrying after 1 second.
Rate limit exceeded for KRW-LSK. Retrying after 1 second.
Rate limit exceeded for KRW-EOS. Retrying after 1 second.
Successfully fetched data for KRW-SC
Rate limit exceeded for KRW-BTG. Retrying after 1 second.
Rate limit exceeded for KRW-ADA. Retrying after 1 s

In [156]:
import requests
import aiohttp
import asyncio
import pandas as pd
from datetime import datetime, timedelta

class UpbitDataFetcher:
    def __init__(self, batch_size=30, delay=2):
        self.base_url = "https://api.upbit.com/v1/candles/minutes/1"
        self.headers = {"accept": "application/json"}
        self.shit_list = ['KRW-USDT']
        self.filtered_pairs = []
        self.all_candles_data = []
        self.batch_size = batch_size
        self.delay = delay
        self.krw_usdt_rate = None  # Курс KRW/USDT

    def fetch_market_pairs(self):
        """Получает список всех пар с Upbit и фильтрует их."""
        url = "https://api.upbit.com/v1/market/all?isDetails=true"
        try:
            response = requests.get(url, headers=self.headers)
            response.raise_for_status()
            markets_data = response.json()

            df = pd.DataFrame(markets_data)
            self.filtered_pairs = df[
                df['market'].str.startswith('KRW-') & 
                ~df['market'].isin(self.shit_list)
            ]['market'].tolist()

            print(f"Filtered market pairs: {self.filtered_pairs[:5]} ...")
        except requests.exceptions.RequestException as e:
            print(f"An error occurred: {e}")

    async def fetch_krw_usdt_rate(self, session):
        """Получает курс KRW/USDT."""
        url = "https://api.upbit.com/v1/ticker"
        params = {"markets": "KRW-USDT"}
        try:
            async with session.get(url, params=params) as response:
                if response.status == 200:
                    data = await response.json()
                    if data:
                        self.krw_usdt_rate = data[0]["trade_price"]
                        print(f"KRW/USDT rate: {self.krw_usdt_rate}")
        except aiohttp.ClientError as e:
            print(f"Failed to fetch KRW-USDT rate: {e}")

    async def fetch_candle_data(self, pair, session):
        """Асинхронный метод для получения свечей по одной паре."""
        to_date = (datetime.utcnow() - timedelta(minutes=5)).strftime('%Y-%m-%dT%H:%M:%S')
        params = {'market': pair, 'count': 1, 'to': to_date}
        try:
            async with session.get(self.base_url, params=params, headers=self.headers) as response:
                response.raise_for_status()
                candles = await response.json()

                if candles:
                    # Конвертация цен в доллары
                    for candle in candles:
                        for key in ['opening_price', 'high_price', 'low_price', 'trade_price']:
                            candle[key] /= self.krw_usdt_rate
                        # Преобразуем пару в формат "SYMBOL/USDT" (если это Upbit)
                        if 'KRW-' in pair:
                            candle['market'] = f"{pair.replace('KRW-', '')}/USDT"
                        else:
                            candle['market'] = pair  # Для других бирж пары уже в нужном формате
                        candle['source'] = 'Upbit'
                    self.all_candles_data.extend(candles)
                    print(f"Fetched candles for {pair}")
                else:
                    print(f"No data for {pair}")
        except aiohttp.ClientError as e:
            print(f"Failed to fetch candles for {pair}: {e}")

    async def fetch_candles_batch(self, pairs_batch, session):
        tasks = [self.fetch_candle_data(pair, session) for pair in pairs_batch]
        await asyncio.gather(*tasks)

    async def fetch_all_candles(self):
        async with aiohttp.ClientSession() as session:
            await self.fetch_krw_usdt_rate(session)
            for i in range(0, len(self.filtered_pairs), self.batch_size):
                batch = self.filtered_pairs[i:i + self.batch_size]
                print(f"Fetching batch {i // self.batch_size + 1}")
                await self.fetch_candles_batch(batch, session)
                await asyncio.sleep(self.delay)

    def save_to_csv(self):
        """Сохраняет свечи в CSV с подписью источника."""
        if self.all_candles_data:
            df = pd.DataFrame(self.all_candles_data)
            df.to_csv("upbit_candles_usdt.csv", index=False)
            print("Saved to upbit_candles_usdt.csv")

    def run(self):
        """Запуск сбора данных."""
        self.fetch_market_pairs()
        asyncio.run(self.fetch_all_candles())
        self.save_to_csv()

In [None]:
class BinanceDataFetcher:
    def __init__(self):
        self.base_url_spot = "https://api.binance.com/api/v3/klines"
        self.base_url_perp = "https://fapi.binance.com/fapi/v1/klines"
        self.all_candles_data = {}

    async def fetch_candle(self, session, base_url, pair, interval="5m"):
        """Получает пятиминутную свечу для указанной пары с Binance."""
        # Преобразуем пары, как это нужно для Binance (например, 'NEO/USDT' -> 'NEOUSDT')
        pair_for_binance = pair.replace("/", "").replace("-", "")
        
        params = {
            "symbol": pair_for_binance,  # Формат пары, без "/"
            "interval": interval,
            "limit": 1,
            "endTime": int((datetime.utcnow() - timedelta(minutes=5)).timestamp() * 1000)
        }
        try:
            async with session.get(base_url, params=params) as response:
                if response.status == 200:
                    data = await response.json()
                    if data:
                        return {
                            "market": pair,
                            "source": "Binance",
                            "candle_date_time_utc": datetime.utcfromtimestamp(data[0][0] / 1000).strftime("%Y-%m-%d %H:%M:%S"),
                            "opening_price": float(data[0][1]),
                            "high_price": float(data[0][2]),
                            "low_price": float(data[0][3]),
                            "trade_price": float(data[0][4]),
                            "candle_acc_trade_price": float(data[0][7]),
                            "candle_acc_trade_volume": float(data[0][5])
                        }
        except aiohttp.ClientError as e:
            print(f"Error fetching candle for {pair} from {base_url}: {e}")
        return None

    async def fetch_candles_batch(self, pairs_batch, session):
        """Получает свечи для группы пар с Binance."""
        tasks = []
        for pair in pairs_batch:
            tasks.append(self.fetch_candle(session, self.base_url_spot, pair))
            tasks.append(self.fetch_candle(session, self.base_url_perp, pair))
        results = await asyncio.gather(*tasks)
        return results

    async def fetch_binance_data(self, upbit_pairs):
        """Основной метод для получения данных с Binance."""
        async with aiohttp.ClientSession() as session:
            batch_size = 10  # Число пар в одном пакете
            for i in range(0, len(upbit_pairs), batch_size):
                pairs_batch = upbit_pairs[i:i + batch_size]
                print(f"Fetching batch {i // batch_size + 1}")
                candles = await self.fetch_candles_batch(pairs_batch, session)
                for pair in pairs_batch:
                    pair_data = [c for c in candles if c and c['market'] == pair]
                    if not pair_data:
                        # Если нет данных с Binance, добавляем пустые строки для этой пары
                        self.all_candles_data[pair] = {
                            "market": pair,
                            "source": "Binance",
                            "candle_date_time_utc": datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S"),
                            "opening_price": 0.0,
                            "high_price": 0.0,
                            "low_price": 0.0,
                            "trade_price": 0.0,
                            "candle_acc_trade_price": 0.0,
                            "candle_acc_trade_volume": 0.0
                        }
                    else:
                        # Если данные есть, сохраняем
                        for data in pair_data:
                            self.all_candles_data[pair] = data

    def combine_data(self, upbit_df):
        """Объединяет свечи Upbit и Binance в одну таблицу."""
        binance_df = pd.DataFrame(self.all_candles_data.values())  # Конвертируем в DataFrame

        # Переименовываем столбцы для данных Upbit
        upbit_df = upbit_df.rename(columns={
            "opening_price": "opening_price_upbit",
            "high_price": "high_price_upbit",
            "low_price": "low_price_upbit",
            "trade_price": "trade_price_upbit",
            "candle_acc_trade_price": "candle_acc_trade_price_upbit",
            "candle_acc_trade_volume": "candle_acc_trade_volume_upbit",
            "unit": "unit_upbit",
            "source": "source_upbit"
        })

        # Переименовываем столбцы для данных Binance
        binance_df = binance_df.rename(columns={
            "opening_price": "opening_price_binance",
            "high_price": "high_price_binance",
            "low_price": "low_price_binance",
            "trade_price": "trade_price_binance",
            "candle_acc_trade_price": "candle_acc_trade_price_binance",
            "candle_acc_trade_volume": "candle_acc_trade_volume_binance",
            "source": "source_binance"
        })

        # Объединяем данные по паре
        combined_df = pd.merge(
            upbit_df, binance_df, on="market", how="left"
        )

        self.combined_data = combined_df

    def save_combined_data(self):
        """Сохраняет объединенные данные в CSV."""
        if not self.combined_data.empty:
            self.combined_data.to_csv("combined_candles.csv", index=False)
            print("Сохранены объединенные данные:", self.combined_data.head())
        else:
            print("Нет данных для сохранения")

    def run(self, upbit_df):
        """Запуск сбора и обработки данных."""
        upbit_pairs = upbit_df["market"].unique().tolist()  # Извлекаем уникальные пары из DataFrame
        asyncio.run(self.fetch_binance_data(upbit_pairs))
        self.combine_data(upbit_df)
        self.save_combined_data()

In [158]:
# Создаем экземпляр класса для Upbit
upbit_fetcher = UpbitDataFetcher(batch_size=30, delay=2)

# Запуск сбора данных
upbit_fetcher.run()

# Считываем полученные свечи с Upbit
upbit_df = pd.read_csv("upbit_minute_candles_converted.csv")
print("Upbit Data:")
print(upbit_df.head())

Filtered market pairs: ['KRW-BTC', 'KRW-ETH', 'KRW-NEO', 'KRW-MTL', 'KRW-XRP'] ...
KRW/USDT rate: 1390.0
Fetching batch 1
Fetched candles for KRW-BTC
Fetched candles for KRW-XRP
Fetched candles for KRW-ETH
Fetched candles for KRW-XLM
Fetched candles for KRW-LSK
Fetched candles for KRW-MTL
Fetched candles for KRW-STEEM
Fetched candles for KRW-ADA
Fetched candles for KRW-ONT
Fetched candles for KRW-LOOM
Failed to fetch candles for KRW-POWR: 429, message='Too Many Requests', url='https://api.upbit.com/v1/candles/minutes/1?market=KRW-POWR&count=1&to=2024-11-29T11:46:42'
Fetched candles for KRW-SBD
Failed to fetch candles for KRW-ETC: 429, message='Too Many Requests', url='https://api.upbit.com/v1/candles/minutes/1?market=KRW-ETC&count=1&to=2024-11-29T11:46:42'
Failed to fetch candles for KRW-QTUM: 429, message='Too Many Requests', url='https://api.upbit.com/v1/candles/minutes/1?market=KRW-QTUM&count=1&to=2024-11-29T11:46:42'
Failed to fetch candles for KRW-SC: 429, message='Too Many Reques

In [159]:
# Предположим, что у вас уже есть DataFrame `upbit_df`
binance_fetcher = BinanceDataFetcher()
binance_fetcher.run(upbit_df)

Fetching batch 1
Fetching batch 2
Fetching batch 3
Fetching batch 4
Fetching batch 5
Fetching batch 6
Fetching batch 7
Fetching batch 8
Fetching batch 9
Fetching batch 10
Fetching batch 11
Fetching batch 12
Сохранены объединенные данные:      market candle_date_time_utc_x candle_date_time_kst  opening_price_upbit  \
0  NEO-USDT    2024-11-29 10:43:00  2024-11-29T19:43:00            14.783862   
1  XRP-USDT    2024-11-29 10:43:00  2024-11-29T19:43:00             1.672190   
2  BTG-USDT    2024-11-29 10:43:00  2024-11-29T19:43:00            34.661383   
3  ICX-USDT    2024-11-29 10:42:00  2024-11-29T19:42:00             0.213256   
4  ZRX-USDT    2024-11-29 10:43:00  2024-11-29T19:43:00             0.555403   

   high_price_upbit  low_price_upbit  trade_price_upbit      timestamp  \
0         14.783862        14.783862          14.783862  1732877030672   
1          1.676513         1.670749           1.676513  1732877039886   
2         34.661383        34.632565          34.639769  17

In [None]:
import pandas as pd

# Загрузка данных
input_path = "combined_candles.csv"  # Замените на путь к вашему файлу
combined_candles = pd.read_csv(input_path)

# Переименование колонок для удобства
combined_candles.rename(columns={
    'candle_date_time_utc_x': 'candle_date_time_utc',
    'trade_price_x': 'trade_price_upbit',
    'candle_acc_trade_price_x': 'candle_acc_trade_price_upbit',
    'candle_acc_trade_volume_x': 'candle_acc_trade_volume_upbit',
    'candle_date_time_utc_y': 'candle_date_time_utc_binance',
    'trade_price_y': 'trade_price_binance',
    'candle_acc_trade_price_y': 'candle_acc_trade_price_binance',
    'candle_acc_trade_volume_y': 'candle_acc_trade_volume_binance',
}, inplace=True)

# Выбор и упорядочивание необходимых столбцов
columns_order = [
    'market', 
    'candle_date_time_utc_upbit', 
    'candle_date_time_kst', 
    'trade_price_upbit', 
    'timestamp', 
    'candle_acc_trade_price_upbit', 
    'candle_acc_trade_volume_upbit', 
    'unit_upbit', 
    'source_binance', 
    'candle_date_time_utc_binance', 
    'opening_price_binance', 
    'high_price_binance', 
    'low_price_binance', 
    'trade_price_binance', 
    'candle_acc_trade_price_binance', 
    'candle_acc_trade_volume_binance'
]

formatted_table = combined_candles[columns_order]

# Сохранение результата в CSV
output_path = "formatted_combined_candles.csv"
formatted_table.to_csv(output_path, index=False)

print(f"Форматированная таблица успешно сохранена в {output_path}")

Форматированная таблица успешно сохранена в formatted_combined_candles.csv


In [5]:
import sqlite3

# Открытие соединения с базой данных
conn = sqlite3.connect('/Users/user/Desktop/upbit vs binance/crypto_prices.db')
cursor = conn.cursor()

# Выполнение запроса к таблице crypto_prices
cursor.execute("SELECT * FROM crypto_prices LIMIT 10;")
for row in cursor.fetchall():
    print(row)

# Закрытие соединения
conn.close()