In [2]:
!pip install yfinance mplfinance opencv-python scikit-learn torch torchvision




In [3]:
import os
import cv2
import numpy as np
import pandas as pd
import yfinance as yf
import mplfinance as mpf
import matplotlib.pyplot as plt

from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, classification_report

import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms
from torch.utils.data import DataLoader


In [4]:
stocks = ['AAPL', 'MSFT', 'GOOG', 'AMZN', 'TSLA']
start_date = '2023-01-01'
end_date = '2025-01-01'

data = {}

for stock in stocks:
    df = yf.download(stock, start=start_date, end=end_date)
    df = df[['Open', 'High', 'Low', 'Close', 'Volume']]
    data[stock] = df


  df = yf.download(stock, start=start_date, end=end_date)
[*********************100%***********************]  1 of 1 completed
  df = yf.download(stock, start=start_date, end=end_date)
[*********************100%***********************]  1 of 1 completed
  df = yf.download(stock, start=start_date, end=end_date)
[*********************100%***********************]  1 of 1 completed
  df = yf.download(stock, start=start_date, end=end_date)
[*********************100%***********************]  1 of 1 completed
  df = yf.download(stock, start=start_date, end=end_date)
[*********************100%***********************]  1 of 1 completed


In [5]:
os.makedirs("images", exist_ok=True)


In [6]:
df = yf.download(stock, start=start_date, end=end_date)

df.columns = df.columns.get_level_values(0)

df = df[['Open', 'High', 'Low', 'Close', 'Volume']]

df = df.apply(pd.to_numeric, errors='coerce')
df.dropna(inplace=True)


  df = yf.download(stock, start=start_date, end=end_date)
[*********************100%***********************]  1 of 1 completed


In [7]:
window = 20
img_count = 0

os.makedirs("images", exist_ok=True)

for stock, df in data.items():


    df.columns = df.columns.get_level_values(0)

    df = df[['Open', 'High', 'Low', 'Close', 'Volume']]

    df = df.apply(pd.to_numeric, errors='coerce')
    df.dropna(inplace=True)

    for i in range(len(df) - window):
        window_df = df.iloc[i:i+window]

        save_path = f"images/{stock}_{img_count}.png"

        mpf.plot(
            window_df,
            type='candle',
            style='charles',
            savefig=save_path
        )
        plt.close()
        img_count += 1

print("Total images created:", img_count)




Total images created: 2410


In [8]:
print(df.dtypes)



Price
Open      float64
High      float64
Low       float64
Close     float64
Volume      int64
dtype: object


In [9]:
def detect_doji(row):
    return abs(row['Open'] - row['Close']) <= (row['High'] - row['Low']) * 0.1

def detect_hammer(row):
    body = abs(row['Open'] - row['Close'])
    lower_wick = min(row['Open'], row['Close']) - row['Low']
    return lower_wick > body * 2


In [10]:
labels = []
image_names = []

window = 20
img_count = 0

os.makedirs("images", exist_ok=True)

for stock, df in data.items():


    df.columns = df.columns.get_level_values(0)
    df = df[['Open', 'High', 'Low', 'Close', 'Volume']]
    df = df.apply(pd.to_numeric, errors='coerce')
    df.dropna(inplace=True)

    for i in range(len(df) - window):
        window_df = df.iloc[i:i+window]
        last_candle = window_df.iloc[-1]

        label = None
        if detect_doji(last_candle):
            label = 0
        elif detect_hammer(last_candle):
            label = 1
        else:
            continue

        save_path = f"images/{stock}_{img_count}.png"

        mpf.plot(
            window_df,
            type='candle',
            style='charles',
            savefig=save_path
        )
        plt.close()

        labels.append(label)
        image_names.append(f"{stock}_{img_count}.png")
        img_count += 1

print("Total labeled images:", len(labels))



Total labeled images: 523


In [None]:
from sklearn.model_selection import train_test_split

X_train, X_temp, y_train, y_temp = train_test_split(
    image_names, labels, test_size=0.3, shuffle=True, stratify=labels
)

X_val, X_test, y_val, y_test = train_test_split(
    X_temp, y_temp, test_size=0.5, shuffle=True, stratify=y_temp
)

print(len(X_train), len(X_val), len(X_test))



In [14]:
from PIL import Image
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader


In [13]:
transform = transforms.Compose([
    transforms.Resize((128, 128)),
    transforms.RandomHorizontalFlip(),
    transforms.ColorJitter(brightness=0.2, contrast=0.2),
    transforms.ToTensor()
])



In [15]:
class CandleDataset(Dataset):
    def __init__(self, image_files, labels, transform=None):
        self.image_files = image_files
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        img_path = os.path.join("images", self.image_files[idx])
        image = Image.open(img_path).convert("RGB")

        if self.transform:
            image = self.transform(image)

        return image, self.labels[idx]


In [16]:
train_loader = DataLoader(
    CandleDataset(X_train, y_train, transform),
    batch_size=32, shuffle=True
)

val_loader = DataLoader(
    CandleDataset(X_val, y_val, transform),
    batch_size=32, shuffle=False
)

test_loader = DataLoader(
    CandleDataset(X_test, y_test, transform),
    batch_size=32, shuffle=False
)


### CNN MODEL

In [17]:
import torch
import torch.nn as nn


In [18]:
class CNN(nn.Module):
    def __init__(self, num_classes=3):
        super().__init__()

        self.features = nn.Sequential(
            nn.Conv2d(3, 32, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),

            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),

            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.AdaptiveAvgPool2d((1, 1))
        )

        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(128, num_classes)
        )

    def forward(self, x):
        x = self.features(x)
        x = self.classifier(x)
        return x


In [19]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = CNN(num_classes=2).to(device)

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.003)


In [20]:
epochs = 25

for epoch in range(epochs):
    model.train()
    correct = 0
    total = 0
    running_loss = 0.0

    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

        _, preds = torch.max(outputs, 1)
        correct += (preds == labels).sum().item()
        total += labels.size(0)

    train_acc = 100 * correct / total
    avg_loss = running_loss / len(train_loader)

    print(f"Epoch [{epoch+1}/{epochs}] | Loss: {avg_loss:.4f} | Train Acc: {train_acc:.2f}%")


Epoch [1/25] | Loss: 0.7093 | Train Acc: 51.91%
Epoch [2/25] | Loss: 0.6945 | Train Acc: 47.81%
Epoch [3/25] | Loss: 0.6940 | Train Acc: 46.72%
Epoch [4/25] | Loss: 0.6928 | Train Acc: 52.19%
Epoch [5/25] | Loss: 0.6921 | Train Acc: 52.19%
Epoch [6/25] | Loss: 0.6919 | Train Acc: 52.19%
Epoch [7/25] | Loss: 0.6947 | Train Acc: 52.19%
Epoch [8/25] | Loss: 0.6917 | Train Acc: 52.19%
Epoch [9/25] | Loss: 0.6930 | Train Acc: 52.19%
Epoch [10/25] | Loss: 0.6928 | Train Acc: 52.19%
Epoch [11/25] | Loss: 0.6925 | Train Acc: 52.19%
Epoch [12/25] | Loss: 0.6913 | Train Acc: 52.19%
Epoch [13/25] | Loss: 0.6931 | Train Acc: 52.19%
Epoch [14/25] | Loss: 0.6920 | Train Acc: 52.19%
Epoch [15/25] | Loss: 0.6914 | Train Acc: 52.19%
Epoch [16/25] | Loss: 0.6928 | Train Acc: 52.19%
Epoch [17/25] | Loss: 0.6920 | Train Acc: 52.19%
Epoch [18/25] | Loss: 0.6930 | Train Acc: 52.19%
Epoch [19/25] | Loss: 0.6924 | Train Acc: 52.19%
Epoch [20/25] | Loss: 0.6919 | Train Acc: 52.19%
Epoch [21/25] | Loss: 0.6930 

# MODEL EVALUATION

In [21]:
from sklearn.metrics import classification_report, confusion_matrix
import numpy as np


In [22]:
model.eval()
y_true, y_pred = [], []

with torch.no_grad():
    for images, labels in test_loader:
        images = images.to(device)
        outputs = model(images)
        _, preds = torch.max(outputs, 1)

        y_true.extend(labels.numpy())
        y_pred.extend(preds.cpu().numpy())

print(confusion_matrix(y_true, y_pred))
print(classification_report(y_true, y_pred))


[[ 0 38]
 [ 0 41]]
              precision    recall  f1-score   support

           0       0.00      0.00      0.00        38
           1       0.52      1.00      0.68        41

    accuracy                           0.52        79
   macro avg       0.26      0.50      0.34        79
weighted avg       0.27      0.52      0.35        79



  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))


In [23]:
capital = 10000
profit = 0

for pred in y_pred:
    if pred == 1:        # Hammer → Buy
        profit += 10
    elif pred == 0:      # Doji → Small profit
        profit += 5

print("Initial Capital:", capital)
print("Final Capital:", capital + profit)


Initial Capital: 10000
Final Capital: 10790
