In [11]:
import pandas as pd
import numpy as np
from cassandra.cluster import Cluster
from datetime import datetime, timedelta
from scipy.signal import find_peaks
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense
from sklearn.model_selection import train_test_split

In [12]:
# 1. Connect to Cassandra
cluster = Cluster(['127.0.0.1'], port=9042)  # adjust if needed
session = cluster.connect('stock_data')

In [13]:
# 2. Load candlestick data for a stock (last 150 days)
stock = "AOT"
start_date = datetime.now() - timedelta(days=150)
rows = session.execute(f"""
    SELECT * FROM candlestick_data 
    WHERE symbol = '{stock}'
""")
data = pd.DataFrame(rows)

# 3. Preprocess
if data.empty:
    raise ValueError("No data found")
data['time'] = pd.to_datetime(data['time'])
data = data[data['time'] >= start_date]
data = data.sort_values('time')

In [14]:
# 4. Head and Shoulders detection function (rule-based for labeling)
def detect_head_and_shoulders(prices, distance=3, tolerance=0.05):
    peaks, _ = find_peaks(prices, distance=distance)
    if len(peaks) < 3:
        return False
    for i in range(len(peaks) - 2):
        ls, head, rs = peaks[i], peaks[i+1], peaks[i+2]
        if head > rs or ls > head: continue
        ls_val, head_val, rs_val = prices[ls], prices[head], prices[rs]
        if head_val > ls_val and head_val > rs_val:
            if abs(ls_val - rs_val) / max(ls_val, rs_val) < tolerance:
                return True
    return False

In [15]:
# 5. Generate labeled dataset 
window_size = 40
X, y = [], []
close_prices = data['close_price'].values

for i in range(len(close_prices) - window_size):
    window = close_prices[i:i+window_size]
    label = 1 if detect_head_and_shoulders(window) else 0
    X.append(window)
    y.append(label)

X = np.array(X)
y = np.array(y)
X = X.reshape(-1, window_size, 1, 1)  # For CNN

In [16]:
# 6. Train/test split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

In [17]:
# 7. Build CNN model
model = Sequential([
    Conv2D(32, (3, 1), activation='relu', input_shape=(window_size, 1, 1)),
    MaxPooling2D((2, 1)),
    Flatten(),
    Dense(64, activation='relu'),
    Dense(1, activation='sigmoid')
])

model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
model.summary()


Do not pass an `input_shape`/`input_dim` argument to a layer. When using Sequential models, prefer using an `Input(shape)` object as the first layer in the model instead.



In [18]:
# 8. Train model
model.fit(X_train, y_train, epochs=20, batch_size=32, validation_split=0.2)

Epoch 1/20
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 491ms/step - accuracy: 0.2692 - loss: 4.1469 - val_accuracy: 0.8571 - val_loss: 0.3728
Epoch 2/20
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 43ms/step - accuracy: 0.7308 - loss: 0.5730 - val_accuracy: 0.8571 - val_loss: 0.7041
Epoch 3/20
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 42ms/step - accuracy: 0.7308 - loss: 1.3869 - val_accuracy: 0.8571 - val_loss: 0.8247
Epoch 4/20
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 43ms/step - accuracy: 0.7308 - loss: 1.6227 - val_accuracy: 0.8571 - val_loss: 0.7395
Epoch 5/20
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 41ms/step - accuracy: 0.7308 - loss: 1.4592 - val_accuracy: 0.8571 - val_loss: 0.5570
Epoch 6/20
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 44ms/step - accuracy: 0.7308 - loss: 1.0800 - val_accuracy: 0.8571 - val_loss: 0.3879
Epoch 7/20
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━

<keras.src.callbacks.history.History at 0x17eaa0e10>

In [19]:
# 9. Evaluate model
loss, acc = model.evaluate(X_test, y_test)
print(f"\n✅ Test Accuracy: {acc:.2f}")

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 20ms/step - accuracy: 0.4444 - loss: 0.6587

✅ Test Accuracy: 0.44


In [None]:
import plotly.graph_objects as go
import numpy as np
import pandas as pd

def mark_head_and_shoulders(close, tolerance=0.05):
    from scipy.signal import find_peaks
    peaks, _ = find_peaks(close, distance=3)
    if len(peaks) < 3:
        return None
    for i in range(len(peaks) - 2):
        ls, head, rs = peaks[i], peaks[i + 1], peaks[i + 2]
        ls_val, head_val, rs_val = close[ls], close[head], close[rs]
        if head_val > ls_val and head_val > rs_val:
            if abs(ls_val - rs_val) / max(ls_val, rs_val) < tolerance:
                return ls, head, rs
    return None

num_samples_to_plot = 3
indices = np.random.choice(len(X_test), num_samples_to_plot, replace=False)

for idx in indices:
    sample = X_test[idx].squeeze()  # Shape: (window,)
    true_label = y_test[idx]
    pred = model.predict(np.expand_dims(X_test[idx], axis=0))[0][0]
    pred_label = 1 if pred >= 0.5 else 0

    # Prepare time index
    time = pd.date_range(end=pd.Timestamp.today(), periods=len(sample))

    # Detect Head and Shoulders
    hs_points = mark_head_and_shoulders(sample)

    # Plot line chart for close price
    fig = go.Figure()
    fig.add_trace(go.Scatter(
        x=time,
        y=sample,
        mode='lines+markers',
        name='Close'
    ))

    if hs_points:
        ls, head, rs = hs_points
        fig.add_trace(go.Scatter(
            x=[time[ls]], y=[sample[ls]],
            mode='markers+text',
            marker=dict(color='yellow', size=10),
            text=["LS"], name="Left Shoulder", textposition="top center"
        ))
        fig.add_trace(go.Scatter(
            x=[time[head]], y=[sample[head]],
            mode='markers+text',
            marker=dict(color='red', size=10),
            text=["Head"], name="Head", textposition="top center"
        ))
        fig.add_trace(go.Scatter(
            x=[time[rs]], y=[sample[rs]],
            mode='markers+text',
            marker=dict(color='green', size=10),
            text=["RS"], name="Right Shoulder", textposition="top center"
        ))
        # Neckline
        fig.add_trace(go.Scatter(
            x=[time[ls], time[rs]],
            y=[sample[ls], sample[rs]],
            mode='lines',
            line=dict(color='blue', dash='dot'),
            name='Neckline'
        ))

    fig.update_layout(
        title=f"📊 Head & Shoulders Detection\nTrue: {true_label} | Predicted: {pred_label} ({pred:.2f})",
        title_x=0.5,
        xaxis_title="⏰ Time",
        yaxis_title="💵 Price",
        template="plotly_white",
        legend=dict(x=0.01, y=0.99),
        margin=dict(l=40, r=40, t=80, b=40),
)
    fig.show()

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 33ms/step


[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 17ms/step


[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 17ms/step
