In [10]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from statsmodels.tsa.stattools import adfuller, coint
from statsmodels.tsa.arima.model import ARIMA
from statsmodels.graphics.tsaplots import plot_acf
import warnings
import os
import base64
from io import BytesIO
from google.colab import files
warnings.filterwarnings("ignore")

# Initialize a list to store output for HTML report
html_output = []
plot_files = []

# Function to upload files in Colab
def upload_files():
    uploaded = files.upload()
    return list(uploaded.keys())

# Function to check if files exist in Colab environment
def check_files_exist(expected_files):
    missing_files = []
    for file in expected_files:
        if not os.path.exists(file):
            missing_files.append(file)
    return missing_files

# Function to capture print output
def capture_output(text):
    print(text)
    html_output.append(text.replace('\n', '<br>'))

# Function to save plot and get base64 string for HTML embedding
def save_and_embed_plot(fig, filename):
    fig.savefig(filename)
    with open(filename, "rb") as image_file:
        encoded_string = base64.b64encode(image_file.read()).decode()
    plot_files.append(filename)
    plt.close(fig)
    return f'<img src="data:image/png;base64,{encoded_string}" alt="{filename}" style="max-width:100%;">'

# Function to perform ADF test with interpretation
def adf_test(series, title):
    result = adfuller(series, autolag='AIC')
    output = f'\n{title} ADF Test:<br>ADF Statistic: {result[0]:.4f}<br>p-value: {result[1]:.4f}<br>'
    interpretation = 'The series is stationary' if result[1] < 0.05 else 'The series is not stationary'
    output += f'Interpretation: {interpretation}'
    capture_output(output)
    return result[1]

# Function to perform cointegration test with interpretation and buy recommendation
def coint_test(series1, series2, name1, name2, interval):
    score, p_value, _ = coint(series1, series2)
    output = f'\nCointegration Test: {name1} vs {name2} ({interval})<br>T-statistic: {score:.4f}<br>p-value: {p_value:.4f}<br>'
    interpretation = 'Series are cointegrated' if p_value < 0.05 else 'Series are not cointegrated'
    output += f'Interpretation: {interpretation}<br>'
    if p_value < 0.05:
        output += f'Recommendation: The stocks are cointegrated, suggesting a potential pairs trading opportunity. It may make sense to consider buying {name1} or {name2} as part of a pairs trading strategy.'
    else:
        output += f'Recommendation: The stocks are not cointegrated. There is no clear long-term relationship, so it may not be advantageous to buy {name1} or {name2} based on this analysis.'
    capture_output(output)

# Function to check if residuals resemble white noise
def check_residuals_acf(residuals, ticker, interval):
    from statsmodels.tsa.stattools import acf
    acf_values, confint = acf(residuals, nlags=20, alpha=0.05)
    significant_lags = sum(1 for i, val in enumerate(acf_values[1:], 1) if abs(val) > confint[i, 1] / np.sqrt(len(residuals)))
    interpretation = 'Residuals appear to be white noise (no significant ACF spikes)' if significant_lags == 0 else f'Residuals show {significant_lags} significant ACF spikes, suggesting non-white noise'
    capture_output(f'Interpretation: {interpretation}')
    return interpretation

# Function to analyze single stock data
def analyze_stock_data(df, ticker, interval):
    # Extract Close column (second column)
    series = df.iloc[:, 1]
    series.name = f'{ticker}_{interval}'

    # Plot original series
    fig, ax = plt.subplots(figsize=(10, 4))
    ax.plot(series, label='Close')
    ax.set_title(f'{ticker} Stock Price ({interval})')
    ax.legend()
    html_output.append(save_and_embed_plot(fig, f'{ticker}_{interval}_series.png'))

    # ADF tests
    adf_test(series, f'{ticker} Original ({interval})')
    diff_series = series.diff().dropna()
    adf_test(diff_series, f'{ticker} Differenced ({interval})')

    # Fit ARMA(1,1) model on differenced series
    model = ARIMA(diff_series, order=(1, 0, 1))
    results = model.fit()
    capture_output(f'\n{ticker} ({interval}) ARMA(1,1) Model Summary:')
    capture_output(str(results.summary().tables[1]))

    # Plot residuals
    residuals = results.resid
    fig, ax = plt.subplots(figsize=(10, 4))
    ax.plot(residuals, label='Residuals')
    ax.set_title(f'{ticker} ARMA(1,1) Residuals ({interval})')
    ax.legend()
    html_output.append(save_and_embed_plot(fig, f'{ticker}_{interval}_residuals.png'))

    # Residual histogram
    fig, ax = plt.subplots(figsize=(10, 4))
    pd.Series(residuals).hist(bins=30, ax=ax)
    ax.set_title(f'{ticker} Residuals Histogram ({interval})')
    html_output.append(save_and_embed_plot(fig, f'{ticker}_{interval}_residuals_hist.png'))

    # Residual ACF
    fig, ax = plt.subplots(figsize=(10, 4))
    plot_acf(residuals, lags=20, ax=ax)
    ax.set_title(f'{ticker} Residuals ACF ({interval})')
    html_output.append(save_and_embed_plot(fig, f'{ticker}_{interval}_residuals_acf.png'))

    # Check residuals for white noise
    check_residuals_acf(residuals, ticker, interval)

    return series

# Function to get user choice for cointegration test
def get_user_coint_choice():
    capture_output("\nAvailable stock pairs for cointegration test:<br>1. TM vs F<br>2. GM vs F<br>3. GM vs TM")
    while True:
        choice = input("Enter the number of the pair to test (1-3): ").strip()
        if choice in ['1', '2', '3']:
            break
        capture_output("Invalid choice. Please enter 1, 2, or 3.")

    capture_output("\nAvailable intervals:<br>1. 1-minute<br>2. 3-minute")
    while True:
        interval_choice = input("Enter the number of the interval (1-2): ").strip()
        if interval_choice in ['1', '2']:
            break
        capture_output("Invalid choice. Please enter 1 or 2.")

    # Map choices to data_dict keys and names
    pairs_map = {
        '1': ('TM', 'F'),
        '2': ('GM', 'F'),
        '3': ('GM', 'TM')
    }
    interval_map = {
        '1': '1min',
        '2': '3min'
    }

    ticker1, ticker2 = pairs_map[choice]
    interval = interval_map[interval_choice]
    return ticker1, ticker2, interval

# Function to ask if user wants another cointegration test
def another_coint_test():
    while True:
        choice = input("\nDo you want to perform another cointegration test? (yes/no): ").strip().lower()
        if choice in ['yes', 'no']:
            capture_output(f"\nDo you want to perform another cointegration test? (yes/no): {choice}")
            return choice == 'yes'
        capture_output("Invalid choice. Please enter 'yes' or 'no'.")

# Function to generate HTML report
def generate_html_report():
    html_content = """
    <html>
    <head>
        <title>Stock Time Series Analysis Report</title>
        <style>
            body { font-family: Arial, sans-serif; margin: 20px; }
            h1, h2, h3 { color: #333; }
            img { max-width: 100%; margin: 10px 0; }
            pre { background: #f4f4f4; padding: 10px; border-radius: 5px; }
        </style>
    </head>
    <body>
        <h1>Stock Time Series Analysis Report</h1>
    """
    for output in html_output:
        if output.startswith('<img'):
            html_content += output
        else:
            html_content += f'<pre>{output}</pre>'
    html_content += """
    </body>
    </html>
    """
    with open('analysis_report.html', 'w') as f:
        f.write(html_content)
    capture_output("\nHTML report generated as 'analysis_report.html'.")

# Main execution
def main():
    # Expected file names
    expected_files = [
        'GM_1min.xlsx', 'GM_3min.xlsx',
        'F_1min.xlsx', 'F_3min.xlsx',
        'TM_1min.xlsx', 'TM_3min.xlsx'
    ]

    # Check if files exist
    missing_files = check_files_exist(expected_files)

    if missing_files:
        capture_output("The following files are missing. Please upload them:")
        for f in missing_files:
            capture_output(f"- {f}")
        uploaded_files = upload_files()
        # Verify all expected files are now present
        still_missing = [f for f in expected_files if f not in uploaded_files and not os.path.exists(f)]
        if still_missing:
            capture_output(f"Error: The following files are still missing: {still_missing}")
            return
    else:
        capture_output("All expected files are already present.")
        uploaded_files = expected_files

    data_dict = {}

    # Process each file
    for file in uploaded_files:
        if file in expected_files:
            try:
                df = pd.read_excel(file)
                ticker = file.split('_')[0]
                interval = file.split('_')[1].replace('.xlsx', '')
                capture_output(f'\nProcessing {file}')
                series = analyze_stock_data(df, ticker, interval)
                data_dict[f'{ticker}_{interval}'] = series
            except Exception as e:
                capture_output(f'Error processing {file}: {str(e)}')
        else:
            capture_output(f'Unexpected file: {file}')

    # Cointegration test loop
    while True:
        # Get user choice for cointegration test
        ticker1, ticker2, interval = get_user_coint_choice()
        s1 = f'{ticker1}_{interval}'
        s2 = f'{ticker2}_{interval}'

        # Perform cointegration test for selected pair and interval
        capture_output(f"\nPerforming Cointegration Test for {ticker1} vs {ticker2} ({interval})")
        if s1 in data_dict and s2 in data_dict:
            # Ensure same length by aligning series
            min_len = min(len(data_dict[s1]), len(data_dict[s2]))
            coint_test(data_dict[s1][:min_len], data_dict[s2][:min_len], ticker1, ticker2, interval)
        else:
            capture_output(f'Missing data for {s1} or {s2}')

        # Ask if user wants another test
        if not another_coint_test():
            capture_output("\nAnalysis complete. Program ending.")
            break

    # Generate HTML report
    generate_html_report()

if __name__ == "__main__":
    main()

All expected files are already present.

Processing GM_1min.xlsx

GM Original (1min) ADF Test:<br>ADF Statistic: -1.6617<br>p-value: 0.4509<br>Interpretation: The series is not stationary

GM Differenced (1min) ADF Test:<br>ADF Statistic: -11.9695<br>p-value: 0.0000<br>Interpretation: The series is stationary

GM (1min) ARMA(1,1) Model Summary:
                 coef    std err          z      P>|z|      [0.025      0.975]
------------------------------------------------------------------------------
const         -0.0021      0.002     -1.025      0.305      -0.006       0.002
ar.L1          0.5132    200.433      0.003      0.998    -392.328     393.354
ma.L1         -0.5134    200.421     -0.003      0.998    -393.331     392.304
sigma2         0.0016   5.37e-05     30.349      0.000       0.002       0.002
Interpretation: Residuals show 20 significant ACF spikes, suggesting non-white noise

Processing GM_3min.xlsx

GM Original (3min) ADF Test:<br>ADF Statistic: -1.8216<br>p-value: 0