<a href="https://colab.research.google.com/github/JadHay8/Computer-Vision-Market-Predictor/blob/main/model_dev.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Get Data

In [None]:
images = ["path/to/image1.png", "path/to/image2.png", ...]
labels = [0, 1, ...]  # 0 for decrease, 1 for increase


# Preprocess Images

In [None]:
from tensorflow.keras.preprocessing.image import ImageDataGenerator

datagen = ImageDataGenerator(rescale=1./255)

# Assuming you have a directory structure for training and validation
train_generator = datagen.flow_from_directory(
    'data/train',
    target_size=(150, 150),  # Resize images
    batch_size=32,
    class_mode='binary')

validation_generator = datagen.flow_from_directory(
    'data/validation',
    target_size=(150, 150),
    batch_size=32,
    class_mode='binary')

# Design the Neural Network Model

In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense

model = Sequential([
    Conv2D(32, (3, 3), activation='relu', input_shape=(150, 150, 3)),
    MaxPooling2D(2, 2),
    Conv2D(64, (3, 3), activation='relu'),
    MaxPooling2D(2, 2),
    Conv2D(128, (3, 3), activation='relu'),
    MaxPooling2D(2, 2),
    Flatten(),
    Dense(512, activation='relu'),
    Dense(1, activation='sigmoid')  # Binary classification
])

model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])


# Train the Model

In [None]:
history = model.fit(
    train_generator,
    steps_per_epoch=100,  # Depends on your dataset size
    epochs=15,
    validation_data=validation_generator,
    validation_steps=50)  # Adjust based on your validation dataset size


# Evaluate and Use the Model

In [None]:
import numpy as np
from tensorflow.keras.preprocessing import image

# Load and preprocess a new image
new_image_path = 'path/to/new/image.png'
img = image.load_img(new_image_path, target_size=(150, 150))
img_tensor = image.img_to_array(img)
img_tensor = np.expand_dims(img_tensor, axis=0)
img_tensor /= 255.  # Remember to normalize the image in the same way as the training data

# Predict
prediction = model.predict(img_tensor)
if prediction[0] > 0.5:
    print("Increase")
else:
    print("Decrease")


# Code Test without Data

# Upload Image

In [None]:
from google.colab import files

uploaded = files.upload()

for filename in uploaded.keys():
  print(f"User uploaded file {filename} with length {len(uploaded[filename])} bytes")

In [None]:
from tensorflow.keras.preprocessing import image
import numpy as np

# Load the image
img_path = '/content/candlestick_chart.png'
img = image.load_img(img_path, target_size=(150, 150))  # Resize image to match model expected input

# Convert the image to a numpy array and normalize
img_array = image.img_to_array(img)
img_array = np.expand_dims(img_array, axis=0) / 255.  # Normalize to 0-1 range


In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense

# Define a simple model
model = Sequential([
    Conv2D(32, (3, 3), activation='relu', input_shape=(150, 150, 3)),
    MaxPooling2D(2, 2),
    Flatten(),
    Dense(64, activation='relu'),
    Dense(1, activation='sigmoid')  # Assuming binary classification (up/down)
])

# Since we're not training the model, the weights are random


In [None]:
prediction = model.predict(img_array)
print(f"Mock Prediction (Untrained): {prediction[0]}")
# This will just give a random output


# Testing Process with Image Generation Rather Than Upload

# Create and Save a Chart

In [None]:
# !pip install mplfinance
import matplotlib.pyplot as plt
import yfinance as yf
import mplfinance as mpf

# Download historical data for a stock (e.g., Apple)
stock_symbol = "AAPL"
stock_data = yf.download(stock_symbol, start="2020-01-01", end="2022-01-01")

# Specify the savefig dictionary with filename and other parameters (optional)
save_params = dict(fname='apple_candlestick_chart.png', dpi=100, pad_inches=0.25)

# Plot and save the candlestick chart
mpf.plot(stock_data, type='candle', style='charles', volume=True, savefig=save_params)


mpf.plot(stock_data, type='candle', style='charles', volume=True)
# plt.show()


# Test the Image Just Created

In [None]:
from tensorflow.keras.preprocessing import image
import numpy as np
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense

# Load the image
img_path = '/content/apple_candlestick_chart.png'
img = image.load_img(img_path, target_size=(150, 150))  # Resize image to match model expected input

# Convert the image to a numpy array and normalize
img_array = image.img_to_array(img)
img_array = np.expand_dims(img_array, axis=0) / 255.  # Normalize to 0-1 range

# Define a simple model
model = Sequential([
    Conv2D(32, (3, 3), activation='relu', input_shape=(150, 150, 3)),
    MaxPooling2D(2, 2),
    Flatten(),
    Dense(64, activation='relu'),
    Dense(1, activation='sigmoid')  # Assuming binary classification (up/down)
])

# Since we're not training the model, the weights are random

prediction = model.predict(img_array)
print(f"Mock Prediction (Untrained): {prediction[0]}")
# This will just give a random output

# Try With Generation of Multiple Charts for Multiple Tickers

# Ignore This For Now

In [None]:
!pip install mplfinance
import matplotlib.pyplot as plt
import yfinance as yf
import mplfinance as mpf

# List of stock symbols to download and generate candlestick charts for
stock_symbols = ["AAPL", "GOOGL", "MSFT", "AMZN", "META", "TSLA", "NFLX", "NVDA", "BABA", "JPM"]


def download_and_generate_charts(stock_symbols, window_size=5):
    labels = {}  # Dictionary to hold the labels for each stock symbol

    for symbol in stock_symbols:
        # Download historical data
        data = yf.download(symbol, start="2020-01-01", end="2020-03-01")

        for i in range(len(data) - window_size):
            # Select data window
            data_window = data.iloc[i:i+window_size]

            # Define filename for the chart image
            filename = f"{symbol}_{i}.png"

            # Plot and save the candlestick chart
            mpf.plot(data_window, type='candle', style='charles', volume=True,
                     savefig=dict(fname=filename, dpi=100, pad_inches=0.25))

            # Determine the label based on price movement
            try:
                last_day_close = data_window['Close'].iloc[-1]
                next_day_open = data.iloc[i + window_size]['Open']
                label = 1 if next_day_open > last_day_close else 0  # 1 for up, 0 for down
                labels[filename] = label
            except IndexError:  # Handle the case where there's no next day
                # This might happen at the end of the dataset
                labels[filename] = None

    return labels

# Generate charts and labels
labels = download_and_generate_charts(stock_symbols)
# print(labels)



# mpf.plot(stock_data, type='candle', style='charles', volume=True)
# plt.show()


# My Method

# Generate Images and Labels

In [None]:
!pip install mplfinance
import matplotlib.pyplot as plt
import yfinance as yf
import mplfinance as mpf
from datetime import datetime, timedelta

# List of stock symbols to download and generate candlestick charts for
stock_symbols = ["AAPL", "GOOGL", "MSFT", "AMZN", "META", "TSLA", "NFLX", "NVDA", "BABA", "JPM"]


def download_and_generate_charts(stock_symbols):
    labels = {}  # Dictionary to hold the labels for each stock symbol
    start_date = "2020-01-01"
    end_date ="2020-03-01"

    # Get next day
    new_end = datetime.strptime(end_date, "%Y-%m-%d") + timedelta(days=3)
    new_end_date = new_end.strftime("%Y-%m-%d")

    for symbol in stock_symbols:
        # Download historical data
        data = yf.download(symbol, start=start_date, end=end_date)

        # Download subsequent data to see if it went up or not
        subseq_data = yf.download(symbol, start=end_date, end = new_end_date)

        # Define filename for the chart image
        filename = f"{symbol}.png"

        # Plot and save the candlestick chart
        mpf.plot(data, type='candle', style='charles', volume=True,
                savefig=dict(fname=filename, dpi=100, pad_inches=0.25))

        # Determine the label based on price movement
        try:
            last_day_close = data['Close'].iloc[-1]
            next_day_open = subseq_data.iloc[-1]['Close']

            # 2 for increase, 0 for decrease, 1 for neither
            if next_day_open > last_day_close:
              label = 2
            elif next_day_open < last_day_close:
              label = 0
            else:
              label = 1

            # print(f"symbol: {symbol} \t p1: {last_day_close} \t p2: {next_day_open} \t label: {label} \n")

            labels[filename] = label
        except IndexError:  # Handle the case where there's no next day
            # This might happen at the end of the dataset
            labels[filename] = None

    return labels

# Generate charts and labels
labels = download_and_generate_charts(stock_symbols)
print(labels)



# mpf.plot(stock_data, type='candle', style='charles', volume=True)
# plt.show()


# Create, Train, and Evaluate Model

In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense
from tensorflow.keras.preprocessing.image import load_img, img_to_array
import numpy as np

# Define image dimensions
img_width, img_height = 150, 150

# Updated model definition for 3-class classification with integer labels
model = Sequential([
    Conv2D(32, (3, 3), activation='relu', input_shape=(img_width, img_height, 3)),
    MaxPooling2D((2, 2)),
    Conv2D(64, (3, 3), activation='relu'),
    MaxPooling2D((2, 2)),
    Conv2D(128, (3, 3), activation='relu'),
    MaxPooling2D((2, 2)),
    Flatten(),
    Dense(128, activation='relu'),
    Dense(3, activation='softmax')  # Output layer for 3 classes
])

# Compile the model with sparse_categorical_crossentropy
model.compile(optimizer='adam',
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])

# List to store model performance metrics
model_metrics = []

# Loop through each image and its label
for filename, label in labels.items():
    try:
        # Load image and preprocess
        img = load_img(filename, target_size=(img_width, img_height))
        img_array = img_to_array(img) / 255.0  # Normalize pixel values
        img_array = np.expand_dims(img_array, axis=0)  # Adjust dimensions

        # Use the same data for training and validation for simplicity
        X_train, X_val = img_array, img_array
        y_train, y_val = np.array([label]), np.array([label])

        # Train the model
        model.fit(X_train, y_train, epochs=5, batch_size=32, validation_data=(X_val, y_val), verbose=0)

        # Evaluate the model
        _, accuracy = model.evaluate(X_val, y_val, verbose=0)
        model_metrics.append((filename, accuracy))

        print(f"Model for {filename}: Accuracy = {accuracy}")
    except Exception as e:
        print(f"Error processing {filename}: {str(e)}")


# Notes

This code still uses the same image for both training and validation, which is not ideal for a realistic scenario. We'll want to have a more robust dataset and potentially use data augmentation to improve our model's generalization capability


---


Also, for each stock symbol, we're essentially training a model on a single instance, which is highly insufficient. A more effective approach would involve ***collecting a large dataset of candlestick charts across various time frames and conditions for each stock, then splitting this dataset into training, validation, and test sets***. This way, the model can learn general patterns associated with price movements rather than memorizing the specific details of a few charts.

# Trying with one stock symbol over many date ranges

In [None]:
!pip install mplfinance
import matplotlib.pyplot as plt
import yfinance as yf
import mplfinance as mpf
from datetime import datetime, timedelta
import pandas as pd
import random
!python --version

help(mpf.plot)


# List of stock symbols to download and generate candlestick charts for
stock_symbols = ["SPY"]
p = True

def download_and_generate_charts(stock_symbols, num_samples_per_stock=100):
    global p
    labels = {}  # array to hold the labels for each graph/file

    # Define date range for data download (adjust as needed)
    start_date_range = "2022-01-01"
    end_date_range = "2023-12-31"

    hourly_data = pd.DataFrame(columns=["Open", "High", "Low", "Close", "Volume"])

    for symbol in stock_symbols:
        for _ in range(num_samples_per_stock):
            # Generate random start and end dates within the range
            start_date, end_date = random_date(start_date_range, end_date_range)

            # Download historical data
            data = yf.download(symbol, start=start_date, end=end_date)
            hourly_data = yf.Ticker(symbol).history(interval='60m', start=start_date, end=end_date)

            # Reset index for mplfinance compatibility
            hourly_data.index = pd.to_datetime(hourly_data.index)

            # Download subsequent data to see if it went up or not
            # Ensure next day is a week day
            # days_to_add = (7 - end_date.weekday()) % 7 not sure if this works properly

            new_end = datetime.strptime(end_date, "%Y-%m-%d") + timedelta(days=3)
            new_end_date = new_end.strftime("%Y-%m-%d")
            subseq_data = yf.download(symbol, start=end_date, end=new_end_date)

            # Define filename for the chart image
            filename = f"{symbol}_{start_date}_{end_date}.png"

            # Plot and save the candlestick chart
            mpf.plot(hourly_data, type='candle', style='charles', volume=False,    # Disable volume
                    ylabel='',      # Remove y-axis label
                    xrotation=90,    # Rotate x-axis labels to be horizontal
                    show_nontrading=False,   # Hide non-trading days
                    savefig=dict(fname=filename, dpi=100, pad_inches=0.25))

            # Determine the label based on price movement
            try:
                last_day_close = data['Close'].iloc[-1]
                next_day_open = subseq_data.iloc[-1]['Close']

                # 2 for increase, 0 for decrease, 1 for neither
                difference = next_day_open - last_day_close

                # 3% change tolerance
                if difference >= 0.01*last_day_close:
                    label = 2
                elif difference <= -0.01*last_day_close:
                    label = 0
                else:
                    label = 1

                labels[filename] = label
            except IndexError:  # Handle the case where there's no next day
                # This might happen at the end of the dataset
                labels[filename] = None


                        # print the first graph just to see
            if p:
              mpf.plot(hourly_data, type='candle', style='charles', volume=False, ylabel='', xrotation=0, show_nontrading=False)
              plt.show()
              print(f"file: {filename} \t label: {labels[filename]}")
              p = False

    print(f"length of labels dict: {len(labels)}")
    print(f"labels: {labels}")
    return labels

def random_date(start, end):
    """Generate a random 10-day date between start and end dates."""
    start_date = datetime.strptime(start, "%Y-%m-%d")
    end_date = datetime.strptime(end, "%Y-%m-%d")

    # Ensure that the end date is at least 10 days after the start date
    if end_date - start_date < timedelta(days=10):
        raise ValueError("End date must be at least 10 days after start date.")

    # Calculate the maximum possible start date within the 10-day range
    max_start_date = end_date - timedelta(days=10)

    # Generate a random start date within the maximum start date
    start_date = start_date + timedelta(days=random.randint(0, (max_start_date - start_date).days))

    # Calculate the end date as 10 days after the start date
    end_date = start_date + timedelta(days=10)

    return start_date.strftime("%Y-%m-%d"), end_date.strftime("%Y-%m-%d")

def next_business_day(date):
  while date.weekday() >= 5:  # Saturday or Sunday
      date += timedelta(days=1)
  return date


# Generate charts and labels with augmented data
labels = download_and_generate_charts(stock_symbols, num_samples_per_stock=100)

In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense
from tensorflow.keras.preprocessing.image import load_img, img_to_array
import numpy as np

# Define image dimensions
img_width, img_height = 150, 150

# Updated model definition for 3-class classification with integer labels
model = Sequential([
    Conv2D(32, (3, 3), activation='relu', input_shape=(img_width, img_height, 3)),
    MaxPooling2D((2, 2)),
    Conv2D(64, (3, 3), activation='relu'),
    MaxPooling2D((2, 2)),
    Conv2D(128, (3, 3), activation='relu'),
    MaxPooling2D((2, 2)),
    Flatten(),
    Dense(128, activation='relu'),
    Dense(3, activation='softmax')  # Output layer for 3 classes
])

# Compile the model with sparse_categorical_crossentropy
model.compile(optimizer='adam',
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])

# List to store model performance metrics
model_metrics = []

# Loop through each image and its label
for filename, label in labels.items():
    try:
        # Load image and preprocess
        img = load_img(filename, target_size=(img_width, img_height))
        img_array = img_to_array(img) / 255.0  # Normalize pixel values
        img_array = np.expand_dims(img_array, axis=0)  # Adjust dimensions

        # Use the same data for training and validation for simplicity
        X_train, X_val = img_array, img_array
        y_train, y_val = np.array([label]), np.array([label])

        # Train the model
        model.fit(X_train, y_train, epochs=5, batch_size=32, validation_data=(X_val, y_val), verbose=0)

        # Evaluate the model
        _, accuracy = model.evaluate(X_val, y_val, verbose=0)
        model_metrics.append((filename, accuracy))

        print(f"Model for {filename}: Accuracy = {accuracy}")
    except Exception as e:
        print(f"Error processing {filename}: {str(e)}")


avg_accuracy = sum(metric[1] for metric in model_metrics)/len(model_metrics)
print(f"average accuracy: {avg_accuracy}")

need to change date ranges to be smaller and split training and testing sets and maybe add a tolerance for price changes so low changes don't matter

# Using Model with Separate Training and Validation Datasets

In [None]:
from sklearn.model_selection import train_test_split
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense
from tensorflow.keras.preprocessing.image import load_img, img_to_array
import numpy as np

# Define image dimensions
img_width, img_height = 150, 150

# Updated model definition for 3-class classification with integer labels
model = Sequential([
    Conv2D(32, (3, 3), activation='relu', input_shape=(img_width, img_height, 3)),
    MaxPooling2D((2, 2)),
    Conv2D(64, (3, 3), activation='relu'),
    MaxPooling2D((2, 2)),
    Conv2D(128, (3, 3), activation='relu'),
    MaxPooling2D((2, 2)),
    Flatten(),
    Dense(128, activation='relu'),
    Dense(3, activation='softmax')  # Output layer for 3 classes
])

# Compile the model with sparse_categorical_crossentropy
model.compile(optimizer='adam',
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])

# List to store model performance metrics
model_metrics = []

# Convert labels dictionary to list of tuples (filename, label)
file_label_pairs = list(labels.items())

# Split the dataset into training and validation sets
train_pairs, val_pairs = train_test_split(file_label_pairs, test_size=0.2, random_state=42)

# Extract filenames and labels for training and validation sets
train_filenames, train_labels = zip(*train_pairs)
val_filenames, val_labels = zip(*val_pairs)

# Lists to store image arrays
train_images = []
val_images = []

# Load and preprocess images for training set
for filename in train_filenames:
    img = load_img(filename, target_size=(img_width, img_height))
    img_array = img_to_array(img) / 255.0  # Normalize pixel values
    train_images.append(img_array)

# Load and preprocess images for validation set
for filename in val_filenames:
    img = load_img(filename, target_size=(img_width, img_height))
    img_array = img_to_array(img) / 255.0  # Normalize pixel values
    val_images.append(img_array)

# Convert lists to numpy arrays
X_train = np.array(train_images)
X_val = np.array(val_images)
y_train = np.array(train_labels)
y_val = np.array(val_labels)

# Train the model
model.fit(X_train, y_train, epochs=5, batch_size=32, validation_data=(X_val, y_val))

# Evaluate the model on the validation set
val_loss, val_accuracy = model.evaluate(X_val, y_val)
print(f"Validation accuracy: {val_accuracy}")
