In [13]:
import numpy as np
import pandas as pd
from statsmodels.tsa.stattools import adfuller, coint
from scipy import stats
from sklearn.linear_model import LinearRegression
import matplotlib.pyplot as plt
from itertools import combinations
import os
import multiprocessing

In [14]:

# Function to read and preprocess CSV file
def read_stock_data(file_path):
    # Read the CSV file
    df = pd.read_csv(file_path)
    
    # Check for timestamp column
    timestamp_col = next((col for col in df.columns if col.lower() in ['ts', 'time', 'date', 'datetime']), None)
    if timestamp_col is None:
        raise ValueError(f"CSV file {file_path} must have a timestamp column (e.g., 'ts', 'time', 'date', 'datetime')")
    
    # Check for price column
    price_col = next((col for col in df.columns if col.lower() in ['close', 'price', 'adj close', 'adjusted close']), None)
    if price_col is None:
        raise ValueError(f"CSV file {file_path} must have a price column (e.g., 'close', 'price', 'adj close')")
    
    # Convert timestamp to datetime and set it as the index
    df[timestamp_col] = pd.to_datetime(df[timestamp_col])
    df.set_index(timestamp_col, inplace=True)
    
    # Sort the index to ensure chronological order
    df.sort_index(inplace=True)
    
    # Resample to 1-minute intervals if necessary
    if df.index.inferred_freq != '1T':
        df = df.resample('1T').last().ffill()
    
    # Return only the price column
    return df[price_col]


# Function to check for cointegration
def check_cointegration(y, x):
    _, p_value, _ = coint(y, x)
    return p_value

# Function to calculate the spread
def calculate_spread(y, x):
    reg = LinearRegression().fit(x.values.reshape(-1, 1), y)
    spread = y - reg.predict(x.values.reshape(-1, 1)).flatten()
    return spread, reg.coef_[0]

# Function to generate trading signals
def generate_signals(z_score, open_threshold=2, close_threshold=0.5):
    signals = np.zeros(len(z_score))
    position = 0
    for i in range(len(z_score)):
        if position == 0:
            if z_score[i] < -open_threshold:
                signals[i] = 1  # Go long
                position = 1
            elif z_score[i] > open_threshold:
                signals[i] = -1  # Go short
                position = -1
        elif position == 1:
            if z_score[i] > -close_threshold:
                signals[i] = -1  # Close long position
                position = 0
        elif position == -1:
            if z_score[i] < close_threshold:
                signals[i] = 1  # Close short position
                position = 0
    return signals

# Function to calculate returns
def calculate_returns(data, signals, transaction_cost=0.001):
    returns = data.pct_change()
    strategy_returns = signals.shift(1) * returns - np.abs(signals.diff()) * transaction_cost
    return strategy_returns

# Function to run pair trading strategy for a single pair
def run_pair_trading_strategy(data, stock1, stock2):
    # Check for cointegration
    p_value = check_cointegration(data[stock1], data[stock2])
    if p_value > 0.05:
        return None
    
    # Calculate spread
    spread, hedge_ratio = calculate_spread(data[stock1], data[stock2])
    
    # Calculate z-score of spread
    z_score = (spread - spread.mean()) / spread.std()
    
    # Generate trading signals
    signals = generate_signals(z_score)
    
    # Calculate returns
    returns = calculate_returns(data[[stock1, stock2]], pd.Series(signals, index=data.index))
    
    # Calculate performance metrics
    total_return = (1 + returns).prod() - 1
    annualized_return = (1 + total_return) ** (252 * 390 / len(returns)) - 1  # Assuming 252 trading days and 390 minutes per day
    sharpe_ratio = np.sqrt(252 * 390) * returns.mean() / returns.std()
    
    return {
        'pair': (stock1, stock2),
        'p_value': p_value,
        'total_return': total_return,
        'annualized_return': annualized_return,
        'sharpe_ratio': sharpe_ratio
    }

# Function to process all pairs for a given set of stocks
def process_all_pairs(data):
    stocks = data.columns
    results = []
    for stock1, stock2 in combinations(stocks, 2):
        result = run_pair_trading_strategy(data, stock1, stock2)
        if result:
            results.append(result)
    return results

# Main function to run the analysis
def run_multi_stock_analysis(data_folder):
    # Read all CSV files in the folder
    stock_data = {}
    for file in os.listdir(data_folder):
        if file.endswith('.csv'):
            stock_name = os.path.splitext(file)[0]
            stock_data[stock_name] = read_stock_data(os.path.join(data_folder, file))
    
    # Combine all stock data into a single DataFrame
    data = pd.DataFrame(stock_data)
    
    # Use multiprocessing to speed up the analysis
    with multiprocessing.Pool() as pool:
        chunk_size = len(data.columns) // (multiprocessing.cpu_count() * 2)
        data_chunks = [data[data.columns[i:i+chunk_size]] for i in range(0, len(data.columns), chunk_size)]
        results = pool.map(process_all_pairs, data_chunks)
    
    # Flatten the results list
    all_results = [item for sublist in results for item in sublist]
    
    # Convert results to DataFrame and sort by Sharpe ratio
    results_df = pd.DataFrame(all_results)
    results_df = results_df.sort_values('sharpe_ratio', ascending=False)
    
    # Display top 10 pairs
    print(results_df.head(10))
    
    # Save full results to CSV
    results_df.to_csv('pair_trading_results.csv', index=False)
    
    return results_df

# Example usage
data_folder = '/Users/mouyasushi/k_data/永豐'
results = run_multi_stock_analysis(data_folder)

Process SpawnPoolWorker-1:
Traceback (most recent call last):
  File "/opt/homebrew/Cellar/python@3.12/3.12.6/Frameworks/Python.framework/Versions/3.12/lib/python3.12/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/opt/homebrew/Cellar/python@3.12/3.12.6/Frameworks/Python.framework/Versions/3.12/lib/python3.12/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/homebrew/Cellar/python@3.12/3.12.6/Frameworks/Python.framework/Versions/3.12/lib/python3.12/multiprocessing/pool.py", line 114, in worker
    task = get()
           ^^^^^
  File "/opt/homebrew/Cellar/python@3.12/3.12.6/Frameworks/Python.framework/Versions/3.12/lib/python3.12/multiprocessing/queues.py", line 389, in get
    return _ForkingPickler.loads(res)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: Can't get attribute 'process_all_pairs' on <module '__main__' (<class '_frozen_importlib.BuiltinImporter'>)>
Process SpawnPoolWorker-2:
Trace

KeyboardInterrupt: 