In [1]:
import pandas as pd
# Cell 1: Import necessary libraries
import pandas as pd
import numpy as np

from StateMachines.TradingStateMachine import TradingStateMachine
from StrategyBuilders import getStrategyBuilder
from Strategies.AbstractTradingStrategy import AbstractTradingStrategy

In [2]:
folder = "./Data/gemini"
folder_lab1 = "./Data/lab1"
file = "gemini_1m.csv"
# strategy_builder = getStrategyBuilder('MODEL')
# trader: TradingStateMachine = TradingStateMachine(strategy_builder("gemini_btcusd_1m"))

df = pd.read_csv(f"{folder}/{file}")

df['timestamp'] = pd.to_datetime(df['timestamp'])
df['year_'] = df['timestamp'].dt.year
df = df.sort_values('timestamp',ascending=True)
print(df.head())  # Show the first few rows to verify data


            timestamp   symbol    open    high     low   close    volume  \
0 2017-01-01 00:00:00  BTC/USD  974.55  974.55  974.55  974.55  0.000000   
1 2017-01-01 00:01:00  BTC/USD  974.55  974.55  974.55  974.55  0.000000   
2 2017-01-01 00:02:00  BTC/USD  974.55  974.55  970.00  970.00  0.417679   
3 2017-01-01 00:03:00  BTC/USD  970.00  970.00  970.00  970.00  0.000514   
4 2017-01-01 00:04:00  BTC/USD  970.00  970.00  970.00  970.00  0.000000   

   year_  
0   2017  
1   2017  
2   2017  
3   2017  
4   2017  


In [8]:


# Cell 3: Define candle colors using single letters
def define_candle_color(row):
    if row['close'] > row['open']:
        return 'G'  # Green
    else:
        return 'R'  # Red

df['state'] = df.apply(define_candle_color, axis=1)

# # Cell 4: Convert state to string if not already and create combined states for the last 5 candles
# df['state'] = df['state'].astype(str)
# df['combined_state'] = df['state'].rolling(window=5).apply(lambda x: ''.join(x), raw=False)
# import pandas as pd

# Assuming data loading and preliminary processing (defining 'state' from 'open' and 'close') are done
# Load data
# df = pd.read_csv('path_to_your_candle_data.csv')

# Define candle colors using single letters
df['state'] = df.apply(lambda row: 'G' if row['close'] > row['open'] else 'R', axis=1)

# Manually create combined states for the last 5 candles
def create_combined_states(states, window_size=5):
    combined_states = []
    states = states.fillna('').astype(str)
    for i in range(len(states) - window_size + 1):
        combined_state = ''.join(states[i:i+window_size])
        combined_states.append(combined_state)
    return pd.Series(combined_states, index=states.index[window_size-1:])

df['combined_state'] = create_combined_states(df['state'])

# Continue with your transition matrix calculation and other analysis as before

# Cell 5: Calculate the transition matrix
def calculate_efficient_transition_matrix(df):
    df = df.dropna(subset=['combined_state'])
    unique_states = df['combined_state'].unique()
    matrix = pd.DataFrame(0, index=unique_states, columns=unique_states)

    previous_state = df['combined_state'].shift(1)
    transitions = pd.DataFrame({'prev': previous_state, 'curr': df['combined_state']}).dropna()

    # Aggregate counts using groupby, which is more efficient on large datasets
    counts = transitions.groupby(['prev', 'curr']).size().unstack(fill_value=0)

    # Adding missing states to the matrix
    counts = counts.reindex(index=matrix.index, columns=matrix.index, fill_value=0)

    # Normalize the counts to probabilities
    matrix = counts.div(counts.sum(axis=1), axis=0)
    return matrix

transition_matrix = calculate_efficient_transition_matrix(df)
print(transition_matrix.head())  # Print the top of the matrix to inspect it

# Cell 6: Function to get transition probabilities
def get_transition_probability(current_pattern, next_pattern, matrix):
    if current_pattern in matrix.index and next_pattern in matrix.columns:
        return matrix.at[current_pattern, next_pattern]
    else:
        return 0  # Return 0 probability if the pattern hasn't been observed

# Cell 7: Test the model with an example
current_pattern = 'RGRGG'  # Last 5 observed patterns
next_pattern = 'GRGGR'    # Hypothetical next pattern
probability = get_transition_probability(current_pattern, next_pattern, transition_matrix)
print(f"Probability of transitioning from {current_pattern} to {next_pattern}: {probability}")


          RRRRR     RRRRG     RRRGR     RRGRR     RGRRR     GRRRR     RRRGG  \
RRRRR  0.757738  0.242262  0.000000  0.000000  0.000000  0.000000  0.000000   
RRRRG  0.000000  0.000000  0.651082  0.000000  0.000000  0.000000  0.348918   
RRRGR  0.000000  0.000000  0.000000  0.651661  0.000000  0.000000  0.000000   
RRGRR  0.000000  0.000000  0.000000  0.000000  0.645628  0.000000  0.000000   
RGRRR  0.000000  0.000000  0.000000  0.000000  0.000000  0.644617  0.000000   

       RRGGR  RGGRR  GGRRR  ...  GGGRG  GGRGR  RGGGR  GGRRG  GRGRG  RGRGG  \
RRRRR    0.0    0.0    0.0  ...    0.0    0.0    0.0    0.0    0.0    0.0   
RRRRG    0.0    0.0    0.0  ...    0.0    0.0    0.0    0.0    0.0    0.0   
RRRGR    0.0    0.0    0.0  ...    0.0    0.0    0.0    0.0    0.0    0.0   
RRGRR    0.0    0.0    0.0  ...    0.0    0.0    0.0    0.0    0.0    0.0   
RGRRR    0.0    0.0    0.0  ...    0.0    0.0    0.0    0.0    0.0    0.0   

       GRGGR  GGRGG  RGGRG  GRGGG  
RRRRR    0.0    0.0    0.0

In [12]:
current_pattern = 'RGRGG'  # Last 5 observed patterns
next_pattern = 'GRGGG'    # Hypothetical next pattern
probability = get_transition_probability(current_pattern, next_pattern, transition_matrix)
print(f"Probability of transitioning from {current_pattern} to {next_pattern}: {probability}")

def bet(current_pattern, transition_matrix):
    # Calculate the probability of transitioning to each possible state
    probabilities = transition_matrix.loc[current_pattern]
    # Bet on the state with the highest probability
    return probabilities.idxmax()

print(bet('RGGGG', transition_matrix))


Probability of transitioning from RGRGG to GRGGG: 0.4426594540193735
GGGGR


In [13]:
transition_matrix.to_csv(f"{folder_lab1}/transition_matrix.csv")

In [14]:
def predict_two_steps_ahead(current_pattern, transition_matrix, steps=2):
    # Check if the current pattern exists in the transition matrix
    if current_pattern not in transition_matrix.index:
        return "Current pattern not found in the matrix"

    # Step through the matrix to find the most probable states over the specified number of steps
    next_pattern = current_pattern
    for _ in range(steps):
        next_pattern = transition_matrix.loc[next_pattern].idxmax()  # Get the next pattern with the highest probability

        # Ensure the next pattern is a valid entry in the matrix for subsequent steps
        if pd.isna(next_pattern) or next_pattern not in transition_matrix.index:
            return "Transition leads to an unknown pattern"
    
    return next_pattern

# Example use of the function
transition_matrix = calculate_efficient_transition_matrix(df)  # Assuming you have this matrix already calculated
current_pattern = 'RRRGG'
resulting_pattern = predict_two_steps_ahead(current_pattern, transition_matrix)
print(f"The most probable pattern two candles after '{current_pattern}' is '{resulting_pattern}'.")


The most probable pattern two candles after 'RRRGG' is 'RGGRR'.


In [16]:
transition_matrix = calculate_efficient_transition_matrix(df)  # Assuming you have this matrix already calculated
current_pattern = 'RRRGG'
resulting_pattern = predict_two_steps_ahead(current_pattern, transition_matrix, steps=2)
print(f"The most probable pattern two candles after '{current_pattern}' is '{resulting_pattern}'.")

The most probable pattern two candles after 'RRRGG' is 'RGGRR'.
