# COMP7705 Project: (Re-)Imag(in)ing Price Trends

This jupyter notebook is composed of 5 parts.

1. Data processing
2. Baseline model (including model construction, training and testing)
3. Sensitivity analysis
4. Grad-CAM
5. Regression model


In [1]:
#裁剪图片
import numpy as np
import os
from PIL import Image

image_dir = "C:/Users/YANGJ/Downloads/Project小组/datatest/datatest"
output_dir = "C:/Users/YANGJ/Downloads/Project小组/datatest/result"

if not os.path.exists(output_dir):
    os.makedirs(output_dir)

file_list = os.listdir(image_dir)
image_list = [i for i in file_list if i.endswith('.png')]
image_list.sort()

for image_file in image_list:
    image_path = os.path.join(image_dir, image_file)
    output_path = os.path.join(output_dir, image_file)
    
    img = Image.open(image_path)
    cropped_img = img.crop((208, 71, 1037, 474))
    cropped_img.save(output_path)

print("Image cropping complete!")

FileNotFoundError: [WinError 3] 系统找不到指定的路径。: 'C:/Users/YANGJ/Downloads/Project小组/datatest/datatest'

In [None]:
import os
import pandas as pd
import numpy as np
from PIL import Image
import matplotlib.pyplot as plt

image_dir = "C:/Users/YANGJ/Downloads/Project小组/datatest/result"
csv_file = "C:/Users/YANGJ/Downloads/Project小组/datatest/result/labels.csv"

# Load the CSV data
data_df = pd.read_csv(csv_file)

# Create empty lists to store the image data and labels
image_data = []
labels = []

# Iterate through the CSV rows
for index, row in data_df.iterrows():
    stock_code = row["Stock Code"]
    year = row["Year"]
    month = row["Month"]

    # Construct the image filename
    image_filename = f"chart_{stock_code}_{year}_{month}.png"
    image_path = os.path.join(image_dir, image_filename)

    # Open and convert the image to binary format
    image = Image.open(image_path)
    image = image.resize((207, 101))
    image = image.convert('L')
    image_array = np.array(image)

    # Append the image data and label to the lists
    image_data.append(image_array)
    labels.append(row["Rise"])

# Convert the lists to NumPy arrays
image_data = np.array(image_data)
labels = np.array(labels)

# Split the data into training and testing sets (80:20 ratio)
split_ratio = 0.8
split_index = int(len(image_data) * split_ratio)

image_train_val_data = image_data[:split_index]
label_train_val_data = labels[:split_index]
image_test_data = image_data[split_index:]
label_test_data = labels[split_index:]

# Print the shape of the training and testing sets
print("Training Images:", image_train_val_data.shape)
print("Training Labels:", label_train_val_data.shape)
print("Testing Images:", image_test_data.shape)
print("Testing Labels:", label_test_data.shape)

Training Images: (102, 101, 207)
Training Labels: (102,)
Testing Images: (26, 101, 207)
Testing Labels: (26,)


In [None]:
cold_start = True

if cold_start:
  #image_train_val_data = np.concatenate(image_train_val_arr, 0)
  image_train_val_data[image_train_val_data==255]=1
  #label_train_val_data = np.concatenate(label_train_val_arr, 0)

  image_train_data, image_val_data = image_train_val_data[:int(0.7*len(image_train_val_data))], image_train_val_data[int(0.7*len(image_train_val_data)):]
  label_train_data, label_val_data = label_train_val_data[:int(0.7*len(label_train_val_data))], label_train_val_data[int(0.7*len(label_train_val_data)):]

  np.save('train_x.npy', image_train_data)
  np.save('train_y.npy', label_train_data)
  np.save('val_x.npy', image_val_data)
  np.save('val_y.npy', label_val_data)

  #image_test_data = np.concatenate(image_test_arr, 0)
  image_test_data[image_test_data==255]=1
  #label_test_data = np.concatenate(label_test_arr, 0)

  np.save('test_x.npy', image_test_data)
  np.save('test_y.npy', label_test_data)

else:
  image_train_data = np.load("train_x.npy")
  image_val_data = np.load("val_x.npy")
  label_train_data = np.load("train_y.npy")
  label_val_data = np.load("val_y.npy")
  image_test_data = np.load("test_x.npy")
  label_test_data = np.load("test_y.npy")

In [None]:
print("The size of training image is " + str(image_train_data.shape))
print("The size of training label is " + str(label_train_data.shape))
print("The size of validation image is " + str(image_val_data.shape))
print("The size of validation label is " + str(label_val_data.shape))
print("The size of testing image is " + str(image_test_data.shape))
print("The size of testing label is " + str(label_test_data.shape))

The size of training image is (71, 101, 207)
The size of training label is (71,)
The size of validation image is (31, 101, 207)
The size of validation label is (31,)
The size of testing image is (26, 101, 207)
The size of testing label is (26,)


In [None]:
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
import torch

class ImageDataset(Dataset):
    def __init__(self, data_file_path, label_file_path, binary=True):
        self.data = np.load(data_file_path)
        self.label = np.load(label_file_path)
        self.binary = binary
    def __getitem__(self, index):
        x = self.data[index]
        y = self.label[index]
        print(x,y)
        x = torch.from_numpy(x).float()
        x = x.unsqueeze(0)
        if self.binary:
            y = np.where(y > 0, 1, 0)
        y = torch.from_numpy(y).float()
        return x, y
    
    def __len__(self):
        return len(self.data)

class ImgDataset(Dataset):
    def __init__(self, data_file, label_file, binary=True):
        self.data = data_file
        self.label = label_file
        self.binary = binary
    def __getitem__(self, index):
        x = self.data[index]
        y = self.label[index]
        print(x,y)
        x = torch.from_numpy(x).float()
        x = x.unsqueeze(0)
        if self.binary:
            y = np.where(y > 0, 1, 0)
        y = torch.from_numpy(y).float()
        return x,y
    
    def __len__(self):
        return len(self.data)

In [None]:
import torch
from torch import nn
import torch.nn.functional as F
class ConvNet(nn.Module):
    """Encoder for feature embedding"""
    def __init__(self):
        super(ConvNet, self).__init__()
        self.layer1 = nn.Sequential(
                        nn.Conv2d(1, 64, kernel_size=(5,3), padding=(0, 1), stride=(1,1) ,dilation=(4,1)),
                        nn.BatchNorm2d(64),
                        nn.LeakyReLU(0.01),
                        nn.MaxPool2d(kernel_size  = (2, 1), stride=(2,1)))
        nn.init.xavier_uniform_(self.layer1[0].weight)
        self.layer2 = nn.Sequential(
                        nn.Conv2d(64,128,kernel_size=(5,3),padding=(0,1), stride=(1,1), dilation=(1,1)),
                        nn.BatchNorm2d(128),
                        nn.LeakyReLU(0.01),
                        nn.MaxPool2d(kernel_size  = (2, 1), stride=(2,1)),)
        nn.init.xavier_uniform_(self.layer2[0].weight)
        self.layer3 = nn.Sequential(
                        nn.Conv2d(128,256,kernel_size=(5,3),padding=(0,1), stride=(1,1), dilation=(1,1)),
                        nn.BatchNorm2d(256),
                        nn.LeakyReLU(0.01),
                        nn.MaxPool2d(kernel_size  = (2, 1), stride=(2,1)),)
        nn.init.xavier_uniform_(self.layer3[0].weight)
        self.layer4 = nn.Sequential(
                        nn.Conv2d(256,512,kernel_size=(5,3),padding=(0,1), stride=(1,1), dilation=(1,1)),
                        nn.BatchNorm2d(512),
                        nn.LeakyReLU(0.01),
                        nn.MaxPool2d(kernel_size  = (2, 1), stride=(2,1)),)
        nn.init.xavier_uniform_(self.layer4[0].weight)
        self.fc1 = nn.Sequential(
            nn.Linear(512*1*207, 2),  
            nn.Dropout(p=0.5),  
        )
        self.softmax = nn.Softmax(dim=1)

    def forward(self,x):
        out = self.layer1(x)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.layer4(out)
        out = out.view(out.shape[0],-1)
        result = self.fc1(out)
        result = self.softmax(result)

        return result

def conv3():
    return ConvNet()

model = conv3()
print(model)

ConvNet(
  (layer1): Sequential(
    (0): Conv2d(1, 64, kernel_size=(5, 3), stride=(1, 1), padding=(0, 1), dilation=(4, 1))
    (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): LeakyReLU(negative_slope=0.01)
    (3): MaxPool2d(kernel_size=(2, 1), stride=(2, 1), padding=0, dilation=1, ceil_mode=False)
  )
  (layer2): Sequential(
    (0): Conv2d(64, 128, kernel_size=(5, 3), stride=(1, 1), padding=(0, 1))
    (1): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): LeakyReLU(negative_slope=0.01)
    (3): MaxPool2d(kernel_size=(2, 1), stride=(2, 1), padding=0, dilation=1, ceil_mode=False)
  )
  (layer3): Sequential(
    (0): Conv2d(128, 256, kernel_size=(5, 3), stride=(1, 1), padding=(0, 1))
    (1): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): LeakyReLU(negative_slope=0.01)
    (3): MaxPool2d(kernel_size=(2, 1), stride=(2, 1), padding=0, dilation=1, ceil_mode=Fal

In [None]:
from torchsummary import summary
device = 'cuda'
print(device)
summary(model.to('cuda'), input_size=(1, 101, 207), device="cuda" )

cuda
----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1          [-1, 64, 85, 207]           1,024
       BatchNorm2d-2          [-1, 64, 85, 207]             128
         LeakyReLU-3          [-1, 64, 85, 207]               0
         MaxPool2d-4          [-1, 64, 42, 207]               0
            Conv2d-5         [-1, 128, 38, 207]         123,008
       BatchNorm2d-6         [-1, 128, 38, 207]             256
         LeakyReLU-7         [-1, 128, 38, 207]               0
         MaxPool2d-8         [-1, 128, 19, 207]               0
            Conv2d-9         [-1, 256, 15, 207]         491,776
      BatchNorm2d-10         [-1, 256, 15, 207]             512
        LeakyReLU-11         [-1, 256, 15, 207]               0
        MaxPool2d-12          [-1, 256, 7, 207]               0
           Conv2d-13          [-1, 512, 3, 207]       1,966,592
      BatchNorm2d-14          [-1,

In [None]:
import torch
import numpy as np
import random
import os
from copy import deepcopy
import math
import torch.nn as nn
import torch.nn.functional as F


class Averager():

    def __init__(self):
        self.n = 0
        self.v = 0

    def add(self, x):
        self.v = (self.v * self.n + x) / (self.n + 1)
        self.n += 1

    def item(self):
        return self.v


def pretrain(model, loader, optimizer, device):
    model.train()
    ## training with ce
    loss_avg = Averager() 
    for batch_idx, batch in enumerate(loader):
        model.zero_grad()
        optimizer.zero_grad()
        train_inputs, train_targets = batch[0], batch[1]
        train_targets = train_targets.long()
        train_inputs = train_inputs.to(device=device)
        train_targets = train_targets.to(device=device)
        train_logits = model(train_inputs)
        loss = nn.CrossEntropyLoss()(train_logits, train_targets)
        loss_avg.add(loss.item())
        loss.backward()
        optimizer.step()
    print("Train Loss %.4f" % (loss_avg.item()))
    return loss_avg.item()

def evaluate_batch(model, data_loader, device):
    model.eval()
    correct = num = 0
    for iter, pack in enumerate(data_loader):
        data, target = pack[0].to(device), pack[1].to(device)
        targets = target.long()
        logits = model(data)
        _, pred = logits.max(1)
        correct += pred.eq(target).sum().item()
        num += data.shape[0]  
    torch.cuda.empty_cache()
    model.train()
    return correct/num

In [None]:
import torch
from torch.utils.data import Dataset, DataLoader
import pandas as pd
import  os 



trainset = ImageDataset("train_x.npy","train_y.npy")
valset = ImageDataset("val_x.npy","val_y.npy")

batch_size = 1
epoch = 15
lr = 1e-3
wd =  5e-4
model_name = 'weight.pth'
train_loader = DataLoader(dataset=trainset, batch_size=batch_size, shuffle=True, num_workers=0,
                pin_memory=True, drop_last=True)
val_loader = DataLoader(dataset=valset, batch_size=batch_size, shuffle=False, num_workers=0,
                pin_memory=True, drop_last=True)




device = 'cuda'
model = conv3().cuda()
optimizer=torch.optim.AdamW(model.parameters(), lr, weight_decay=wd)
evaluate_batch(model, val_loader, device)
best_acc = 0
count = 0

for i in range(1, epoch+1):
    print('Epoch : ', i)
    pretrain(model, train_loader, optimizer, device)
    val_acc = evaluate_batch(model, val_loader, device)
    print('Val Acc : ', val_acc)
    if  best_acc < val_acc:
        count = 0
        best_acc = val_acc
        torch.save(model.state_dict(), model_name)
    else:
        count += 1
    if count >= 2:
        break


[[237 242 241 ...   0   0   0]
 [238 251 252 ...   0   0   0]
 [124 136 145 ...   0   0   0]
 ...
 [  0   0   0 ...   0   0   0]
 [  0   0   0 ...   0   0   0]
 [  0   0   0 ...   0   0   0]] 1.0
[[141 156 166 ...   0   0   0]
 [203 199 181 ...   0   0   0]
 [254 254 252 ...   0   0   0]
 ...
 [  0   0   0 ...   0   0   0]
 [  0   0   0 ...   0   0   0]
 [  0   0   0 ...   0   0   0]] 0.0
[[126 126 126 ...   0   0   0]
 [119 120 120 ...   0   0   0]
 [213 182 189 ...   0   0   0]
 ...
 [  0   0   0 ...   0   0   0]
 [  0   0   0 ...   0   0   0]
 [  0   0   0 ...   0   0   0]] 0.0
[[250 250 251 ...   0   0   0]
 [250 250 251 ...   0   0   0]
 [252 254 253 ...   0   0   0]
 ...
 [  0   0   0 ...   0   0   0]
 [  0   0   0 ...   0   0   0]
 [  0   0   0 ...   0   0   0]] 0.0
[[235 226 215 ...   0   0   0]
 [103 102 101 ...   0   0   0]
 [206 216 226 ...   0   0   0]
 ...
 [  0   0   0 ...   0   0   0]
 [  0   0   0 ...   0   0   0]
 [  0   0   0 ...   0   0   0]] 1.0
[[ 99 100 115 ...   

In [None]:
torch.cuda.empty_cache()

In [None]:
testset = ImageDataset("test_x.npy","test_y.npy")
test_loader = DataLoader(dataset=testset, batch_size=batch_size, shuffle=False, num_workers=0,
                pin_memory=True, drop_last=True)
test_acc = evaluate_batch(model, test_loader, device)
print('Test Acc : ', test_acc)

[[254 212 229 ...   0   0   0]
 [  1 215 233 ...   0   0   0]
 [213 183 195 ...   0   0   0]
 ...
 [  0   0   0 ...   0   0   0]
 [  0   0   0 ...   0   0   0]
 [  0   0   0 ...   0   0   0]] 0.0
[[ 99 118 162 ...   0   0   0]
 [209 170 130 ...   0   0   0]
 [254   1 251 ...   0   0   0]
 ...
 [  0   0   0 ...   0   0   0]
 [  0   0   0 ...   0   0   0]
 [  0   0   0 ...   0   0   0]] 0.0
[[210 128 121 ...   0   0   0]
 [122 122 122 ...   0   0   0]
 [224 224 224 ...   0   0   0]
 ...
 [  0   0   0 ...   0   0   0]
 [  0   0   0 ...   0   0   0]
 [  0   0   0 ...   0   0   0]] 0.0
[[250 250 250 ...   0   0   0]
 [250 250 250 ...   0   0   0]
 [250 250 250 ...   0   0   0]
 ...
 [  0   0   0 ...   0   0   0]
 [  0   0   0 ...   0   0   0]
 [  0   0   0 ...   0   0   0]] 0.0
[[250 250 250 ...   0   0   0]
 [250 250 250 ...   0   0   0]
 [251 251 251 ...   0   0   0]
 ...
 [  0   0   0 ...   0   0   0]
 [  0   0   0 ...   0   0   0]
 [  0   0   0 ...   0   0   0]] 0.0
[[119 120 120 ...   