## Plutus
A simple classifier which aims to identify stocks currently on a "bullish" trend.

In [17]:


# the greek god of wealth (son of Iason and Demeter)
# dependencies

import os
import math
import csv
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

# for loading data
from scipy.io import loadmat
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.metrics import accuracy_score
from random import randint

import alpaca_trade_api as tradeapi

# api keys
api = tradeapi.REST(
        'AKUGANQEC0256T5OKJJA',
        'LNPrnn2jpq8HTRb86xv7jeEfV4qPxJbJz18IozgD',
        'https://api.alpaca.markets',
        api_version = 'v2')


device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print('device: ', device)

# nasdaq tickers
# can add from other exchanges
nasdaq = pd.read_csv('nasdaq.csv', sep=',', low_memory=False)
tickers = np.array(nasdaq['Symbol'])
print(tickers)

device:  cpu
['A' 'AA' 'AAC' ... 'ZYME' 'ZYNE' 'ZYXI']


## Purchase classifier
The aim of this section is to identify stocks which are viable for purchase, meaning that they are currently on a bullish trend.

In [29]:

# getData
# String -> [int]
# The purpose of getData is to build the predictors to train our
# classifier. It will return the above data for the given ticker.
def getData(ticker):
    predictor = []
    barset = api.get_barset(ticker, 'day', limit=10)
    bars = barset[ticker]
    if(len(bars) != 10):
        return None
    # append the last ten closing prices as features
    for x in range(10):
        predictor.append(bars[x].c/bars[0].c)
    predictor = predictor
    # append the last ten volumes as features
    for y in range(10):
        predictor.append(bars[y].v/bars[0].v)
    #toReturn = np.array(predictor)
    return predictor

# call to get the volume data

#classifyData
# [float] -> int 
# classifyData is passed a set of features (a predictor) 
# and classifies the data point as profitable (purchase)
# or not profitable. In essence, we want to confirm that 
# drawbacks in price (if a closing price is less than the previous day)
# are increasingly high (so, the stock is still on a positive trajectory) <-- is the sample size too small?
# and that the volume trend confirms the price trend (volume is generally increasing?)

# the thing is, we don't want the neural network to learn
# our basic, scuffed up function
# we want it to identify patterns of its own
def classifyData(predictor):
    # we begin with the assumption of profitability
    profitable = True
    # move through the closing prices
    # of the stock

    # we are going to go with higher lows (in terms of drawbacks)
    
    # so first, we identify the first drawback in price (if it exists in the 10-day span)
    for x in range(10):
        if(x > 0):
            if(predictor[x] < predictor[x - 1]):
                low = predictor[x]
                # exit the loop
                break
    # if we make it through the loop without a drawback
    # then, profitable does not become False
    for x in range(10):
        if(x > 0):
            if(predictor[x] < predictor[x - 1]):
                if(predictor[x] < low):
                    profitable = False
                    break
                else:
                    # else, we keep moving through the list, 
                    # with the new drawback set as the low
                    low = predictor[x]
    # now, the volume requirements:
    # if it has made it past this point, then the price is on a bullish trend
    # volume has to "confirm" the trend
    # "higher highs" and "lower lows"
    p = 10
    while(p < 20):
        if(p > 10):
            if(predictor[p] < predictor[p - 1]):
                volLow = predictor[x]
                # exit the loop
                break
        p+=1
    u = 10
    while(u < 20):
        if(x > 0):
            if(predictor[x] < predictor[x - 1]):
                # it means the volume is not hitting a "higher high"
                if(predictor[x] < volLow):
                    profitable = False
                    break
                else:
                    # else, we keep moving through the list, 
                    # with the new drawback set as the low
                    volLow = predictor[x]
        u+=1
    return profitable


# build the training data for the classifier
def buildData(tickers):
    x_data = []
    y_data = []
    for ticker in tickers:
        if(getData(ticker) != None):
            toAdd = getData(ticker)
            x_data.append(toAdd)
            if(classifyData(toAdd)): 
                # will be a profitable stock
                y_data.append(1)
            else:
                # won't be a profitable stock
                y_data.append(0)
        else:
            # we do nothing
            continue
    return x_data, y_data

In [30]:
# lets grab 100 random tickers
names = []
while(len(names) < 100):
    value = randint(0, len(tickers)  - 1)
    if(not(tickers[value] in names)):
       names.append(tickers[value])
    
# now, for test data, random batch of 30 names

In [31]:
x_train, y_train = buildData(names)
#x_test, y_test = builData(tickers[201:300])

In [34]:
test_names = []
while(len(test_names) < 30):
    value = randint(0, len(tickers) - 1)
    if(not(tickers[value] in names) and not(tickers[value] in test_names)):
        test_names.append(tickers[value])

In [35]:
x_test, y_test = buildData(test_names)

In [71]:
print(len(x_test))
print(len(x_test[0]))
print(len(y_test))

27
20
27


In [75]:
# check data
print(len(x_train))
print(len(y_train))
print(len(x_test))
print(len(y_test))

print(y_train)
# this shit imbalanced
print(y_test)

94
94
27
27
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]


## Classifier
Here we start the classifier.

In [45]:
# Train the network
# (A simple feed foward network)

# network will use binary classification, either "habitable" or not
class NN(nn.Module):
    def __init__(self, n_layers, hidden_size, activations):
        super().__init__()
        assert len(hidden_size) > 0
        # valid activation functions to choose from
        # can the activations affect the classification in an imbalanced dataset
        act = {'sigmoid': nn.Sigmoid(), 'tanh': nn.Tanh(), 'relu': nn.ReLU(),'identity': nn.Identity()}
        self.layers = []
        
        for i in range(n_layers - 1):
            # hidden size i will be number of input neurons'
            # hidden size i + 1 will be number of neurons to send signals to
            self.layers.append(nn.Linear(hidden_size[i], hidden_size[i + 1]))
            
            # if activations[i] in act, meaning if its a valid activation function that
            # we are looking for
            if activations[i] in act:
                self.layers.append(act[activations[i]])
            else:
                assert activations in ['sigmoid', 'tanh', 'relu', 'identity']
                
            # the activation will be the last layer, the output layer on top of the
            # initial n-1 layers
            # that is why the ouput is "2"
            # should be able to have the features reduced into one neuron though, correct?
        
        self.layers = nn.ModuleList(self.layers)
        
    def forward(self, x):
        for idx in range(len(self.layers) - 1):
            x = self.layers[idx](x)
        return x

In [46]:
def train(model, optimizer, criterion, n_epoch, data, label):
    # signals floor division
    print_iteration = n_epoch//5
    
    # data will be the x_training data as a tensor
    data = torch.tensor(data, dtype=torch.float).to(device)
    
    # label will be the y_training data as a tensor
    label = torch.tensor(label, dtype=torch.long).squeeze().to(device)
    
    for epoch in range(n_epoch):
        predict = model(data)
        optimizer.zero_grad()
        
        # the loss function should be tuned
        loss = criterion(predict, label)
        
        loss.backward()
        
        optimizer.step()
        
    if epoch%print_iteration == 0:
        print('epoch: ', epoch, '\tloss: ', loss.item())
        
    print('epoch: ', epoch, '\tloss: ', loss.item())
    return model.named_parameters

In [64]:
def model_accuracy(data, label):
    data = torch.tensor(data, dtype=torch.float).to(device)
    predict = model(data)
    predict = torch.argmax(predict, dim=-1).cpu().detach().numpy()
    acc = accuracy_score(predict, label)
    return acc, predict

In [86]:
n_epoch = 1000
learning_rate = 0.1

# want to train the network using different assortments of settings
settings = {'hidden layer = 1, neuron = 11': {'hs': [20, 20, 2], 'act': ['relu','relu','sigmoid']}}

for setting in settings:
    print('---------------- Setting -------------------')
    print(setting) 
    n_layers = len(settings[setting]['hs'])
    hidden_size = settings[setting]['hs']
    activations = settings[setting]['act']
    model = NN(n_layers, hidden_size, activations).to(device)
    #change for imbalanced data 
    #optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    optimizer = optim.Adadelta(model.parameters())
    criterion = nn.CrossEntropyLoss()
    #criterion = nn.MSELoss()
    print('--------------- Training -------------------')
    param = train(model, optimizer, criterion, n_epoch, x_train, y_train)
    print('Train Accuracy: ', model_accuracy(x_train, y_train))
    print('--------------- Testing -------------------')
    accuracy, predict = model_accuracy(x_test, y_test)
    print('Test Accuracy: ', model_accuracy(x_test, y_test))



---------------- Setting -------------------
hidden layer = 1, neuron = 11
--------------- Training -------------------
epoch:  999 	loss:  0.016864916309714317
Train Accuracy:  (1.0, array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0]))
--------------- Testing -------------------
Test Accuracy:  (0.9629629629629629, array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 1, 0, 0, 0]))


In [97]:
# Lets test for tesla


testa = []
testa.append('TSLA')

tsla_test_x, tsla_test_y = buildData(testa)


data = torch.tensor(tsla_test_x, dtype=torch.float).to(device)
#print(tsla_test_y)
prediction = model(data)

# what does this mean
print(prediction)

tensor([[ 3.7991, -4.6675]], grad_fn=<AddmmBackward0>)
