In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
import pandas as pd
import time
from datetime import datetime, timedelta
import matplotlib.pyplot as plt
import os
import multiprocessing as mp
import numpy as np
import itertools


In [41]:
directory = '/content/drive/My Drive/MarketData/ohlc'

In [8]:

def decide_start_end_ts():
    # 各銘柄の最初と最後のタイムスタンプを保存するための辞書
    timestamps = {}

    cleansed_files = [f for f in os.listdir(directory) if f.startswith('cleansed_') and f.endswith('.csv')]

    # Delete all cleansed files
    for f in cleansed_files:
        os.remove(os.path.join(directory, f))

    for filename in os.listdir(directory):
        if filename.endswith(".csv"):
            file_path = os.path.join(directory, filename)
            df = pd.read_csv(file_path)
            # タイムスタンプの最小値と最大値を取得
            min_ts, max_ts = df['timestamp'].min(), df['timestamp'].max()
            # 銘柄名（ファイル名から拡張子を除去）と対応するタイムスタンプを保存
            timestamps[filename.rsplit('.', 1)[0]] = (min_ts, max_ts)

    # 全銘柄が持つ共通の最小と最大のタイムスタンプを取得
    common_min_ts = max(t[0] for t in timestamps.values())
    common_max_ts = min(t[1] for t in timestamps.values())

    # タイムスタンプをISOフォーマットの日付に変換
    common_min_date = datetime.utcfromtimestamp(common_min_ts/1000).isoformat()
    common_max_date = datetime.utcfromtimestamp(common_max_ts/1000).isoformat()

    # 対象期間の長さを計算
    period_length = (common_max_ts - common_min_ts) / (1000 * 60 * 60 * 24)  # ミリ秒を日数に変換

    print(f'Common start timestamp: {common_min_ts} ({common_min_date})')
    print(f'Common end timestamp: {common_max_ts} ({common_max_date})')
    print(f'The length of the period: {period_length} days')
    return common_min_ts, common_max_ts


common_min_ts, common_max_ts = decide_start_end_ts()

Common start timestamp: 1686220560000 (2023-06-08T10:36:00)
Common end timestamp: 1686811260000 (2023-06-15T06:41:00)
The length of the period: 6.836805555555555 days


In [9]:

def determine_timestamps(directory: str, x: float):
    # 各銘柄の最初と最後のタイムスタンプを保存するための辞書
    timestamps = {}

    for filename in os.listdir(directory):
        if filename.endswith(".csv"):
            file_path = os.path.join(directory, filename)
            df = pd.read_csv(file_path)
            # タイムスタンプの最小値と最大値を取得
            min_ts, max_ts = df['timestamp'].min(), df['timestamp'].max()
            # データ範囲を計算
            range_ts = max_ts - min_ts
            # 銘柄名（ファイル名から拡張子を除去）と対応するタイムスタンプを保存
            timestamps[filename.rsplit('.', 1)[0]] = (min_ts, max_ts, range_ts)

    # データ範囲で銘柄をソート
    sorted_symbols = sorted(timestamps.items(), key=lambda item: item[1][2], reverse=True)

    # 上位x%の銘柄の最初と最後のタイムスタンプを取得
    num_symbols = int(len(sorted_symbols) * x)
    selected_symbols = sorted_symbols[:num_symbols]
    common_min_ts = max(t[1][0] for t in selected_symbols)
    common_max_ts = min(t[1][1] for t in selected_symbols)

    # その期間のデータが存在する全ての銘柄を選択
    final_symbols = [symbol for symbol, ts in timestamps.items() if ts[0] <= common_min_ts and ts[1] >= common_max_ts]

    # タイムスタンプをISOフォーマットの日付に変換
    common_min_date = datetime.utcfromtimestamp(common_min_ts/1000).isoformat()
    common_max_date = datetime.utcfromtimestamp(common_max_ts/1000).isoformat()

    # 対象期間の長さを計算
    period_length = (common_max_ts - common_min_ts) / (1000 * 60 * 60 * 24)  # ミリ秒を日数に変換
    print(f'Common start timestamp: {common_min_ts} ({common_min_date})')
    print(f'Common end timestamp: {common_max_ts} ({common_max_date})')
    print(f'The length of the period: {period_length} days')
    print(f'Num Selected symbols: {len(final_symbols)}')
    return common_min_ts, common_max_ts, final_symbols

# 関数を実行

common_min_ts, common_max_ts, final_symbols = determine_timestamps(directory, 0.5)  # 例として全体の10%の銘柄を最初に選択します

Common start timestamp: 1684219380000 (2023-05-16T06:43:00)
Common end timestamp: 1686811320000 (2023-06-15T06:42:00)
The length of the period: 29.999305555555555 days
Num Selected symbols: 367


In [23]:

def cleanse_data(directory: str, final_symbols: list, common_min_ts: int, common_max_ts: int, interval: int):
    cleansed_data = {}
    for filename in final_symbols:
        file_path = os.path.join(directory, filename+'.csv')
        df = pd.read_csv(file_path)

        # クレンジング前のデータの長さ
        original_length = len(df)

        # 開始・終了のタイムスタンプを基準値に合わせてそれ以前・以降のデータを削除
        df = df[(df['timestamp'] >= common_min_ts) & (df['timestamp'] <= common_max_ts)]
        df = df.drop_duplicates(subset='timestamp')  # 重複を削除

        # タイムスタンプをインデックスに設定
        df.set_index('timestamp', inplace=True)
        # タイムスタンプの間隔が一定でない部分を補間
        df = df.reindex(range(common_min_ts, common_max_ts + 1, interval))

        # 線形補間
        df.interpolate(method='linear', inplace=True)

        # クレンジング後のデータを新しいCSVファイルとして保存
        df.reset_index().to_csv(directory+'/cleansed_'+filename+'.csv', index=False)

        # クレンジング後のデータの長さとの差を取得
        cleansed_length = len(df)
        #print(f'File: {filename}\nOriginal Length: {original_length}\nCleansed Length: {cleansed_length}\nDifference: {cleansed_length - original_length}\n')

        # クレンジング後のデータフレームを辞書に保存
        cleansed_data[filename] = df
    return cleansed_data


# 関数を実行
interval = 60000  # 1分間隔（ミリ秒単位）
cleansed_data = cleanse_data(directory, final_symbols, common_min_ts, common_max_ts, interval)

for k in list(cleansed_data.keys()):
    print(len(cleansed_data[k]), cleansed_data[k].index)
print('Num all target cleansed file=', len(cleansed_data))

File: apexpro-BTC-USDC
Original Length: 43200
Cleansed Length: 43200
Difference: 0

File: apexpro-ETH-USDC
Original Length: 43200
Cleansed Length: 43200
Difference: 0

File: apexpro-AVAX-USDC
Original Length: 43200
Cleansed Length: 43200
Difference: 0

File: apexpro-ARB-USDC
Original Length: 43200
Cleansed Length: 43200
Difference: 0

File: apexpro-XRP-USDC
Original Length: 43200
Cleansed Length: 43200
Difference: 0

File: apexpro-ATOM-USDC
Original Length: 43200
Cleansed Length: 43200
Difference: 0

File: apexpro-DOGE-USDC
Original Length: 43200
Cleansed Length: 43200
Difference: 0

File: apexpro-MATIC-USDC
Original Length: 43200
Cleansed Length: 43200
Difference: 0

File: apexpro-OP-USDC
Original Length: 43200
Cleansed Length: 43200
Difference: 0

File: apexpro-SOL-USDC
Original Length: 43200
Cleansed Length: 43200
Difference: 0

File: apexpro-BNB-USDC
Original Length: 43200
Cleansed Length: 43200
Difference: 0

File: apexpro-LTC-USDC
Original Length: 43200
Cleansed Length: 43200
Dif

In [49]:

# 並列化のための関数定義
def calculate_corr(args):
    pair, period = args
    df1 = pd.read_csv(pair[0], index_col='timestamp')
    df2 = pd.read_csv(pair[1], index_col='timestamp')

    # 指定したタイムスタンプ以前の指定した期間（period分）のデータを取得
    timestamp = df1.index.max()  # 最新のタイムスタンプを取得
    start_timestamp = timestamp - period * 60 * 1000  # 開始タイムスタンプを計算（ミリ秒単位）

    df1 = df1[df1.index >= start_timestamp]
    df2 = df2[df2.index >= start_timestamp]

    return (pair, df1['close'].corr(df2['close']))


def estimate_processing_time(num_files, period):
    # cleansedという文字から始まるファイルを全て読み込む
    cleansed_files = [os.path.join(directory, f) for f in os.listdir(directory) if f.startswith('cleansed_') and f.endswith('.csv')]

    # ファイルを指定した数に限定
    cleansed_files = cleansed_files[:num_files]

    # 銘柄ペアのリストを作成
    symbol_pairs = list(itertools.combinations(cleansed_files, 2))

    # periodを含む引数リストを作成
    args = [(pair, period) for pair in symbol_pairs]

    # 処理開始時間を記録
    start_time = time.time()

    # multiprocessing Poolを使って並列化
    with mp.Pool(mp.cpu_count()) as pool:
        results = pool.map(calculate_corr, args)

    # 処理終了時間を記録
    end_time = time.time()

    # 結果を表示
    for result in results:
        print(f"Correlation between {result[0][0]} and {result[0][1]}: {result[1]}")


    # 処理時間を表示
    print(f"Processing time for {num_files} files: {end_time - start_time} seconds")

    # 全体の処理時間を推測
    total_files = len([f for f in os.listdir(directory) if f.startswith('cleansed_') and f.endswith('.csv')])
    estimated_time = (end_time - start_time) * ((total_files * (total_files - 1)) / (num_files * (num_files - 1)))
    print(f"Estimated processing time for all files: {estimated_time} seconds")

    return results

# 関数を呼び出す
results = estimate_processing_time(num_files=10, period=1440)

Correlation between /content/drive/My Drive/MarketData/ohlc/cleansed_apexpro-BTC-USDC.csv and /content/drive/My Drive/MarketData/ohlc/cleansed_apexpro-ETH-USDC.csv: 0.9942618477352133
Correlation between /content/drive/My Drive/MarketData/ohlc/cleansed_apexpro-BTC-USDC.csv and /content/drive/My Drive/MarketData/ohlc/cleansed_apexpro-AVAX-USDC.csv: 0.9721537798938183
Correlation between /content/drive/My Drive/MarketData/ohlc/cleansed_apexpro-BTC-USDC.csv and /content/drive/My Drive/MarketData/ohlc/cleansed_apexpro-ARB-USDC.csv: 0.9938126142395962
Correlation between /content/drive/My Drive/MarketData/ohlc/cleansed_apexpro-BTC-USDC.csv and /content/drive/My Drive/MarketData/ohlc/cleansed_apexpro-XRP-USDC.csv: 0.9741347141126504
Correlation between /content/drive/My Drive/MarketData/ohlc/cleansed_apexpro-BTC-USDC.csv and /content/drive/My Drive/MarketData/ohlc/cleansed_apexpro-ATOM-USDC.csv: 0.7966198902161407
Correlation between /content/drive/My Drive/MarketData/ohlc/cleansed_apexpro-B

In [38]:
estimated_time = (end_time - start_time) * ((total_files * (total_files - 1)) / (100 * (100 - 1)))

In [39]:
estimated_time

4065.7320813663077

In [34]:
mp.cpu_count()

2

In [35]:
!cat /proc/cpuinfo | grep 'model name' | uniq
!cat /proc/cpuinfo | grep 'processor' | uniq

model name	: Intel(R) Xeon(R) CPU @ 2.20GHz
processor	: 0
processor	: 1
