In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
import numpy as np
import pandas as pd

import os
import gc
import pyarrow.parquet as pq
import time

# **1. 데이터 확인**

In [None]:
base_path = './parquet'

# 월별로 파일 읽어오기
for month in range(1, 13):
  file_name = f'BTCUSDT-trades-2023-{month:02}.parquet' # 파일명
  path = os.path.join(base_path, file_name) # 파일 경로

  df = pd.read_parquet(path)
  print("=== {} ===".format(month))
  print(df.isna().sum())
  print()

  # 데이터프레임을 메모리에서 삭제
  del df

  # 가비지 컬렉션 수동 실행
  gc.collect()

=== 1 ===
id                0
price             0
qty               0
quote_qty         0
time              0
is_buyer_maker    0
dtype: int64

=== 2 ===
id                0
price             0
qty               0
quote_qty         0
time              0
is_buyer_maker    0
dtype: int64

=== 3 ===
id                0
price             0
qty               0
quote_qty         0
time              0
is_buyer_maker    0
dtype: int64

=== 4 ===
id                0
price             0
qty               0
quote_qty         0
time              0
is_buyer_maker    0
dtype: int64

=== 5 ===
id                0
price             0
qty               0
quote_qty         0
time              0
is_buyer_maker    0
dtype: int64

=== 6 ===
id                0
price             0
qty               0
quote_qty         0
time              0
is_buyer_maker    0
dtype: int64

=== 7 ===
id                0
price             0
qty               0
quote_qty         0
time              0
is_buyer_maker    0
dtype:

- 원본 데이터 자체에는 결측치는 없음

# **2. OHLCV 데이터로 가공하기**

In [None]:
### OHLCV 데이터 가공 함수

def convert_tick_to_ohlcv(data):
    """
    주어진 Binance 틱 데이터를 1시간 간격의 OHLCV (Open, High, Low, Close, Volume) 데이터로 변환
    :param data: DataFrame with Tick data
    :return: DataFrame with the Open(시가), High(고가), Low(저가), Close(종가), Volume(거래량) values
    """

    ## datetime 형으로 변환
    data['time'] = pd.to_datetime(data['time'], unit='ms')

    ## 시간대별로 재정렬
    data = data.sort_values(by='time').reset_index(drop=True)

    ## ohlcv 데이터로 가공
    ohlcv = data.resample('1H', on='time').agg({
        'price': ['first', 'max', 'min', 'last'],
        'qty': 'sum',
    })
    ohlcv.columns = ['Open', 'High', 'Low', 'Close', 'Volume']
    # time열 추가
    ohlcv['Time'] = ohlcv.index
    # 열 순서 조정
    ohlcv = ohlcv[['Time', 'Open', 'High', 'Low', 'Close', 'Volume']]

    return ohlcv

In [None]:
### 일별 데이터로 전처리

def process_data(file_path):
    dfs = []
    parquet_file = pq.ParquetFile(file_path) # 월별로 파일 읽어오기
    num_row_groups = parquet_file.num_row_groups

    for i in range(num_row_groups):
        # 각 row group에 대한 데이터 읽어오기
        fragment = parquet_file.read_row_group(i)
        df = fragment.to_pandas()

        # 각 row group에 대한 전처리 수행
        processed_chunk = convert_tick_to_ohlcv(df)

        # 처리된 chunk를 리스트에 추가
        dfs.append(processed_chunk)

    # 리스트에 저장된 모든 DataFrame을 합치기
    result_df = pd.concat(dfs, ignore_index=True)
    return result_df

In [None]:
# 주어진 경로
base_path = './parquet'

# 결과를 저장할 빈 리스트 생성
final_dfs = []

# 월별로 파일을 읽어오고 전처리 수행
for month in range(1, 13):
    file_name = f'BTCUSDT-trades-2023-{month:02}.parquet'
    file_path = os.path.join(base_path, file_name)

    # 각 월별 데이터를 row group 단위로 읽어오고 전처리 수행
    result_df = process_data(file_path)

    # 최종 결과 리스트에 추가
    final_dfs.append(result_df)

    # 데이터프레임을 메모리에서 삭제
    del result_df

    # 가비지 컬렉션 수동 실행
    gc.collect()

    print("{}월 전처리 완료!".format(month))

# 리스트에 저장된 모든 DataFrame을 합치기
final_result_df = pd.concat(final_dfs, ignore_index = True)

1월 전처리 완료!
2월 전처리 완료!
3월 전처리 완료!
4월 전처리 완료!
5월 전처리 완료!
6월 전처리 완료!
7월 전처리 완료!
8월 전처리 완료!
9월 전처리 완료!
10월 전처리 완료!
11월 전처리 완료!
12월 전처리 완료!


In [None]:
# 시간에 따라 재정렬
final_result_df = final_result_df.sort_values(by='Time').reset_index(drop=True)

# index를 Time으로 설정
final_result_df.set_index('Time', inplace=True)

In [None]:
# 최종 결과 확인
final_result_df

Unnamed: 0_level_0,Open,High,Low,Close,Volume
Time,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2023-01-01 00:00:00,16537.5,16540.9,16504.0,16527.0,5381.399
2023-01-01 01:00:00,16527.1,16554.3,16524.1,16550.4,3210.826
2023-01-01 02:00:00,16550.5,16557.1,16534.8,16542.4,2399.668
2023-01-01 03:00:00,16542.5,16542.5,16515.0,16529.3,3214.480
2023-01-01 04:00:00,16529.2,16530.4,16508.8,16517.8,3150.954
...,...,...,...,...,...
2023-12-31 19:00:00,42702.9,42741.9,42624.7,42659.9,3944.096
2023-12-31 20:00:00,42659.9,42724.5,42543.3,42599.1,4730.936
2023-12-31 21:00:00,42599.2,42717.0,42558.2,42558.9,3794.010
2023-12-31 22:00:00,42559.0,42629.5,42111.9,42294.8,11952.346


In [None]:
final_result_df.info()

<class 'pandas.core.frame.DataFrame'>
DatetimeIndex: 11102 entries, 2023-01-01 00:00:00 to 2023-12-31 23:00:00
Data columns (total 5 columns):
 #   Column  Non-Null Count  Dtype  
---  ------  --------------  -----  
 0   Open    8761 non-null   float64
 1   High    8761 non-null   float64
 2   Low     8761 non-null   float64
 3   Close   8761 non-null   float64
 4   Volume  11102 non-null  float64
dtypes: float64(5)
memory usage: 520.4 KB


- 중간에 결측치가 있는 것 같음 -> 적절한 처리 필요

In [None]:
# 결측인 시간대 확인

final_result_df[final_result_df.isnull().any(axis=1)].to_csv('./null_time.csv')

- 데이터 확인 결과, 연속적으로 4~5일 이상 비는 경우들도 많이 존재하였음
- 비트코인 거래의 추세를 반영하기 위해 보간법을 활용하여 결측치를 처리하기로 결정
  - `interpolate(method = 'time')`

# **3. 결측치 처리**

In [None]:
# Open, High, Low, Close -> 보간
final_result_df[['Open', 'High', 'Low', 'Close']] = final_result_df[['Open', 'High', 'Low', 'Close']].interpolate(method = 'time')

# Volume -> 일단 0으로 처리
final_result_df['Volume'] = final_result_df['Volume'].fillna(0)

In [None]:
final_result_df.info()

<class 'pandas.core.frame.DataFrame'>
DatetimeIndex: 11102 entries, 2023-01-01 00:00:00 to 2023-12-31 23:00:00
Data columns (total 5 columns):
 #   Column  Non-Null Count  Dtype  
---  ------  --------------  -----  
 0   Open    11102 non-null  float64
 1   High    11102 non-null  float64
 2   Low     11102 non-null  float64
 3   Close   11102 non-null  float64
 4   Volume  11102 non-null  float64
dtypes: float64(5)
memory usage: 520.4 KB


- 결측치가 제대로 처리되었다.

In [None]:
### 파일로 저장해두기

final_result_df.to_csv('./OHLCV.csv', encoding = 'utf-8')