In [1]:
# Importing necessary libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

# To ignore warnings (optional)
import warnings
warnings.filterwarnings('ignore')


In [2]:
# Generate sample dummy futures data
import pandas as pd
import numpy as np

np.random.seed(42)

dates = pd.date_range(start="2023-01-01", periods=200)
price = np.cumsum(np.random.randn(200)) + 100  # Random walk starting around 100

df = pd.DataFrame({
    'Date': dates,
    'Open': price + np.random.randn(200),
    'High': price + np.random.rand(200) * 2,
    'Low': price - np.random.rand(200) * 2,
    'Close': price + np.random.randn(200),
    'Volume': np.random.randint(100, 1000, size=200)
})

df.set_index('Date', inplace=True)

# Check
df.head()


Unnamed: 0_level_0,Open,High,Low,Close,Volume
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2023-01-01,100.854502,101.326353,100.217169,100.804516,830
2023-01-02,100.919234,100.905264,98.767915,98.648281,291
2023-01-03,102.08919,101.118889,100.602884,99.657953,116
2023-01-04,103.58297,104.258613,102.201856,103.272432,623
2023-01-05,100.917346,103.920817,101.966483,102.46588,500


In [3]:
import pandas as pd

# Create DataFrame manually
data = {
    'Date': ['2023-01-01', '2023-01-02', '2023-01-03', '2023-01-04', '2023-01-05'],
    'Open': [100.854502, 100.919234, 102.089190, 103.582970, 100.917346],
    'High': [101.326353, 100.905264, 101.118889, 104.258613, 103.920817],
    'Low': [100.217169, 98.767915, 100.602884, 102.201856, 101.966483],
    'Close': [100.804516, 98.648281, 99.657953, 103.272432, 102.465880],
    'Volume': [830, 291, 116, 623, 500]
}

df = pd.DataFrame(data)
df['Date'] = pd.to_datetime(df['Date'])
df.set_index('Date', inplace=True)

# Display the DataFrame
df.head()


Unnamed: 0_level_0,Open,High,Low,Close,Volume
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2023-01-01,100.854502,101.326353,100.217169,100.804516,830
2023-01-02,100.919234,100.905264,98.767915,98.648281,291
2023-01-03,102.08919,101.118889,100.602884,99.657953,116
2023-01-04,103.58297,104.258613,102.201856,103.272432,623
2023-01-05,100.917346,103.920817,101.966483,102.46588,500


In [4]:
# Function to find fractal highs and lows
def find_fractals(df):
    df['Fractal_High'] = (df['High'].shift(1) < df['High']) & (df['High'].shift(-1) < df['High'])
    df['Fractal_Low'] = (df['Low'].shift(1) > df['Low']) & (df['Low'].shift(-1) > df['Low'])
    return df

# Apply the function
df = find_fractals(df)

# Show rows where fractals are detected
df[df['Fractal_High'] | df['Fractal_Low']]


Unnamed: 0_level_0,Open,High,Low,Close,Volume,Fractal_High,Fractal_Low
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
2023-01-02,100.919234,100.905264,98.767915,98.648281,291,False,True
2023-01-04,103.58297,104.258613,102.201856,103.272432,623,True,False


In [5]:
def determine_bias(df):
    # Find last fractal low and high
    last_fractal_low = df[df['Fractal_Low']].iloc[-1]
    last_fractal_high = df[df['Fractal_High']].iloc[-1]
    
    if last_fractal_high['High'] > last_fractal_low['Low']:
        return "Uptrend Bias"
    else:
        return "Downtrend Bias"

# Check Bias
bias = determine_bias(df)
print(f"Directional Bias: {bias}")


Directional Bias: Uptrend Bias


In [6]:
def fibonacci_levels(low_price, high_price):
    """
    Calculate important fibonacci extension levels:
    -21%, 0%, 100%
    """
    range_ = high_price - low_price
    levels = {
        '-21%': high_price + (range_ * 0.21),  # Extension above High
        '0%': high_price,                     # High itself
        '100%': low_price                     # Low itself
    }
    return levels

# Pick last fractal low and high
last_low = df[df['Fractal_Low']].iloc[-1]
last_high = df[df['Fractal_High']].iloc[-1]

# Calculate Fibonacci levels
fib_levels = fibonacci_levels(low_price=last_low['Low'], high_price=last_high['High'])

# Display
print("Fibonacci Levels:")
for key, value in fib_levels.items():
    print(f"{key}: {value:.2f}")


Fibonacci Levels:
-21%: 105.41
0%: 104.26
100%: 98.77


In [7]:
# Step 1: Set the initial fib “–21%” trigger level
trigger_level = fib_levels['-21%']

# Step 2: Scan through the close prices to find where price crosses the trigger
entry_signals = []

for i in range(1, len(df)):
    prev_close = df['Close'].iat[i-1]
    curr_close = df['Close'].iat[i]
    
    # Check for a down‐move through the –21% level (bullish pullback entry)
    if prev_close > trigger_level and curr_close <= trigger_level:
        cross_idx   = df.index[i]
        cross_price = curr_close
        
        # Step 3: Find the next low AFTER the cross to anchor the new retracement
        subsequent_lows = df['Low'].iloc[i:]
        low_idx   = subsequent_lows.idxmin()
        low_price = subsequent_lows.min()
        
        # Step 4: Re‐compute fib levels from that low → cross price
        new_fibs = fibonacci_levels(low_price, cross_price)
        
        # Record all relevant points
        entry_signals.append({
            'cross_time':   cross_idx,
            'cross_price':  cross_price,
            'anchor_time':  low_idx,
            'anchor_price': low_price,
            'entry_price':  new_fibs['-21%'],
            'stop_loss':    new_fibs['0%'],
            'take_profit':  new_fibs['100%']
        })

# Show all detected entry setups
import pprint
pprint.pprint(entry_signals)


[]


In [8]:
# Step 1: Function to place OCO orders (Stoploss and Take Profit)
def place_oco_order(entry_signal):
    # Extract entry details
    entry_price = entry_signal['entry_price']
    stop_loss   = entry_signal['stop_loss']
    take_profit = entry_signal['take_profit']
    
    # Simulate the order placement (print orders for now)
    print(f"Placing OCO order for entry at {entry_price:.2f}")
    print(f"Stop Loss at {stop_loss:.2f}, Take Profit at {take_profit:.2f}")
    
    # Here you would interface with your broker's API to place actual orders.
    # For example, using QuantConnect's algorithm to place orders:
    # self.SetStopLoss(stop_loss)
    # self.SetTakeProfit(take_profit)
    # self.SetEntryPrice(entry_price)
    return {
        'entry_time': entry_signal['cross_time'],
        'entry_price': entry_price,
        'stop_loss': stop_loss,
        'take_profit': take_profit
    }

# Step 2: Loop through all entry signals and place OCO orders
oco_orders = []

for signal in entry_signals:
    order = place_oco_order(signal)
    oco_orders.append(order)

# Show the orders placed
import pprint
pprint.pprint(oco_orders)


[]


In [9]:
# Step 1: Debugging Cross detection and Fibonacci levels
def fibonacci_levels(low_price, high_price):
    """Recalculate Fibonacci Levels (–21%, 0%, 100%)"""
    range_ = high_price - low_price
    levels = {
        '-21%': high_price + (range_ * 0.21),
        '0%': high_price,
        '100%': low_price
    }
    return levels

# Step 2: Debug Entry Signal Generation
entry_signals = []

for i in range(1, len(df)):
    prev_close = df['Close'].iat[i-1]
    curr_close = df['Close'].iat[i]
    
    # Debugging Cross detection
    if prev_close > trigger_level and curr_close <= trigger_level:
        cross_idx = df.index[i]
        cross_price = curr_close
        
        # Debugging: Check if the cross is happening correctly
        print(f"Cross detected at {cross_idx} with price: {cross_price:.2f}")
        
        # Step 3: Find the next low after the cross
        subsequent_lows = df['Low'].iloc[i:]
        low_idx = subsequent_lows.idxmin()
        low_price = subsequent_lows.min()
        
        # Debugging: Print the lowest low after cross
        print(f"Lowest low after cross: {low_idx} with price: {low_price:.2f}")
        
        # Step 4: Recalculate Fibonacci levels based on this low and cross price
        new_fibs = fibonacci_levels(low_price, cross_price)
        
        # Store entry signals
        entry_signals.append({
            'cross_time': cross_idx,
            'cross_price': cross_price,
            'anchor_time': low_idx,
            'anchor_price': low_price,
            'entry_price': new_fibs['-21%'],
            'stop_loss': new_fibs['0%'],
            'take_profit': new_fibs['100%']
        })

# Check output
import pprint
pprint.pprint(entry_signals)


[]


In [10]:
# 1️⃣ Show exactly which bars were flagged as fractals
print("=== Fractal Points Detected ===")
fractals = df[df['Fractal_High'] | df['Fractal_Low']]
if fractals.empty:
    print("⚠️  No fractal highs or lows found. Try adding more data or widening the fractal window.")
else:
    display(fractals[['High','Low','Fractal_High','Fractal_Low']])

# 2️⃣ If we do have fractals, define our fib levels and trigger
if not fractals.empty:
    last_low  = df[df['Fractal_Low']].iloc[-1]
    last_high = df[df['Fractal_High']].iloc[-1]
    
    fib_levels = fibonacci_levels(low_price=last_low['Low'],
                                  high_price=last_high['High'])
    trigger_level = fib_levels['-21%']
    print(f"\nTrigger level (-21%): {trigger_level:.2f}")
    print(f"0% (stop loss):    {fib_levels['0%']:.2f}")
    print(f"100% (take profit):{fib_levels['100%']:.2f}\n")
    
    # 3️⃣ Now re‐run the entry scan with debug prints
    entry_signals = []
    for i in range(1, len(df)):
        prev_close = df['Close'].iat[i-1]
        curr_close = df['Close'].iat[i]
        
        if prev_close > trigger_level and curr_close <= trigger_level:
            cross_idx   = df.index[i]
            cross_price = curr_close
            print(f"→ Cross at {cross_idx.date()} | price={cross_price:.2f}")
            
            subsequent = df['Low'].iloc[i:]
            low_idx   = subsequent.idxmin()
            low_price = subsequent.min()
            print(f"   Next low at {low_idx.date()} | price={low_price:.2f}")
            
            new_fibs = fibonacci_levels(low_price, cross_price)
            entry_signals.append({
                'cross_time':   cross_idx,
                'cross_price':  cross_price,
                'anchor_time':  low_idx,
                'anchor_price': low_price,
                'entry_price':  new_fibs['-21%'],
                'stop_loss':    new_fibs['0%'],
                'take_profit':  new_fibs['100%']
            })
    
    print("\n=== Entry Signals ===")
    if entry_signals:
        from pprint import pprint
        pprint(entry_signals)
    else:
        print("⚠️  No entries detected. Check that your trigger level sits within the price range of your data.")


=== Fractal Points Detected ===


Unnamed: 0_level_0,High,Low,Fractal_High,Fractal_Low
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2023-01-02,100.905264,98.767915,False,True
2023-01-04,104.258613,102.201856,True,False



Trigger level (-21%): 105.41
0% (stop loss):    104.26
100% (take profit):98.77


=== Entry Signals ===
⚠️  No entries detected. Check that your trigger level sits within the price range of your data.


In [12]:
# Simulate a brief swing above then below the trigger
test_rows = pd.DataFrame([
    {'Open': 104.0, 'High': 106.0, 'Low': 104.0, 'Close': 106.0, 'Volume': 400},  # crosses above
    {'Open': 106.0, 'High': 106.0, 'Low': 105.0, 'Close': 105.0, 'Volume': 350}   # dips below
], index=pd.to_datetime(['2023-01-06', '2023-01-07']))

df_test = pd.concat([df, test_rows])

# Re-run the entry-scan cell on df_test instead of df


In [13]:
# Function to calculate Fibonacci Levels (using low and high price)
def fibonacci_levels(low_price, high_price):
    """Recalculate Fibonacci Levels (–21%, 0%, 100%)"""
    range_ = high_price - low_price
    levels = {
        '-21%': high_price + (range_ * 0.21),  # Negative Fibonacci retracement
        '0%': high_price,
        '100%': low_price
    }
    return levels

# Store entry signals (new list)
entry_signals = []

# Trigger for Fibonacci retracement (change –21% level to suit range)
trigger_level = 105.41  # Example trigger level that must be crossed

# Loop through data to detect entry points
for i in range(1, len(df)):
    prev_close = df['Close'].iat[i-1]
    curr_close = df['Close'].iat[i]
    
    # Check if price crosses down below the Fibonacci trigger level
    if prev_close > trigger_level and curr_close <= trigger_level:
        cross_idx = df.index[i]
        cross_price = curr_close
        
        # Identify the lowest low after crossing
        subsequent_lows = df['Low'].iloc[i:]
        low_idx = subsequent_lows.idxmin()
        low_price = subsequent_lows.min()
        
        # Recalculate Fibonacci levels based on the lowest price after crossing
        new_fibs = fibonacci_levels(low_price, cross_price)
        
        # Store the entry signal with stop loss and take profit
        entry_signals.append({
            'cross_time': cross_idx,
            'cross_price': cross_price,
            'anchor_time': low_idx,
            'anchor_price': low_price,
            'entry_price': new_fibs['-21%'],
            'stop_loss': new_fibs['0%'],
            'take_profit': new_fibs['100%']
        })

# Debug: Output entry signals
from pprint import pprint
pprint(entry_signals)


[]


In [15]:
def calculatePositionSize(self):
    risk_amount = self.Portfolio.Cash * 0.01
    distance    = abs(self.entryPrice - self.stopLoss)
    return risk_amount / distance
