In [None]:
import datetime
from typing import Tuple, Union
import requests
import pandas as pd
from enum import Enum


class Constant(Enum):

    STOCK = 'stock'
    CURRENCY = 'currency'



class FinViz:
    ticker_list = ["EURUSD", "GBPUSD", "USDJPY", "USDCAD", "USDCHF", "AUDUSD", "NZDUSD", "EURGBP", "GBPJPY", "BTCUSD",
                   "TOP"]
    time_frame_list = ["i1", "i3", "i5", "h", "d", "w", "m"]
    URL = "https://elite.finviz.com/api/quote.ashx?instrument=forex&rev=356737"

    # URL = "https://elite.finviz.com/api/quote.ashx?instrument=stock&rev=356737"
    def __init__(self, asset_type: str, timeout: int = 10, time_frame: str = 'i1') -> None:
        """FinViz class to interact with the finviz website

        Args:
            timeout (int, optional): Specifies the timeout to server in seconds. Defaults to 10.
        """

        self.time_frame = time_frame
        self.asset_type = asset_type
        self._timeout = timeout
        self._time: int = -1
        self._volume: int = -1
        self._price: float = -1.0
        if asset_type == Constant.STOCK:

            self.URL = "https://elite.finviz.com/api/quote.ashx?instrument=stock&rev=356737"

        else:
            self.URL = "https://elite.finviz.com/api/quote.ashx?instrument=forex&rev=356737"

    def get_data(self, time_frame: str = "i1", ticker: str = "EURUSD") -> Tuple[int, int, float]:
        """Get the volume in the last specified time frame

        Args:
            time_frame (str): Choose from time_frame_list:
            ticker (str): Ticker chosen from ticker list e.g.: EURUSD

        Raises:
            Exception: If incorrect timeframe specified
            Exception: If incorrect ticker specified
            Exception: No/Error response from server

        Returns:
            int: time in POSIX format
            int: volume
            float: price
        """
        if time_frame not in self.time_frame_list:
            raise Exception("Incorrect time frame specified, please choose from the list", FinViz.time_frame_list)

        # if ticker not in self.ticker_list:
        #     raise Exception("Incorrect ticker specified, please choose from the list", FinViz.ticker_list)

        payload = {"ticker": ticker, "timeframe": time_frame, "type": "new"}

        # Headers required to show that it is an actual computer
        headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36'}
        response = requests.get(self.URL, params=payload, timeout=self._timeout, headers=headers)

        if response.status_code != 200:
            raise Exception("No response from server")

        json_response = response.json()
        self._time = int(json_response["date"][-1])
        self._volume = int(json_response["volume"][-1])
        self._price = float(json_response["close"][-1])
        return self._time, self._volume, self._price

    def get_all_data(self, time_frame: str = "i1", ticker: str = "EURUSD"):

        """Get the volume in the last specified time frame

        Args:
            time_frame (str): Choose from time_frame_list:
            ticker (str): Ticker chosen from ticker list e.g.: EURUSD

        Raises:
            Exception: If incorrect timeframe specified
            Exception: If incorrect ticker specified
            Exception: No/Error response from server

        Returns:
            int: time in POSIX format
            int: volume
            float: price
        """
        if time_frame not in self.time_frame_list:
            raise Exception("Incorrect time frame specified, please choose from the list", FinViz.time_frame_list)

        # if ticker not in self.ticker_list:
        #     raise Exception("Incorrect ticker specified, please choose from the list", FinViz.ticker_list)

        payload = {"ticker": ticker, "timeframe": time_frame, "type": "new"}

        # Headers required to show that it is an actual computer
        headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36'}
        response = requests.get(self.URL, params=payload, timeout=self._timeout, headers=headers)

        if response.status_code != 200:
            print(f'asset type: {self.asset_type}')
            raise Exception("No response from server")

        json_response = response.json()
        # self._time = int(json_response["date"][:-2])
        # self._volume = int(json_response["volume"][:-2])
        # self._price = float(json_response["lastOpen"])
        # return self._time, self._volume, self._price
        all_volumes = [float(i) for i in json_response["volume"]]
        all_opens = [float(i) for i in json_response["open"]]
        all_closes = [float(i) for i in json_response["close"]]
        all_dates = [float(i) for i in json_response["date"]]

        return all_volumes, all_opens, all_closes, all_dates

    def get_time(self, human_readable: bool = True) -> Union[int, datetime.datetime]:
        """Returns time from server

        Args:
            human_readable (bool, optional): Set to True if you want in datetime format. Defaults to True.

        Raises:
            Exception: If get_data() method not called for first initialization

        Returns:
            Union[int, datetime.datetime]: time in either POSIX or datetime format
        """
        if self._time == -1:
            raise Exception("Please run the API get_data() method before accessing the time")
        if human_readable:
            return datetime.datetime.fromtimestamp(self._time)
        else:
            return self._time

    def get_volume(self):
        if self._volume == -1:
            raise Exception("Please run the API get_data() method before accessing this variable")
        return self._volume

    def get_price(self):
        if self._price == -1.0:
            raise Exception("Please run the API get_data() method before accessing this variable")


if __name__ == "__main__":
    test = FinViz(asset_type=Constant.STOCK)
    # r = test.get_all_data()
    # for k in r:
    #     print(len(k))
    #     print(k)
    # print(jr)
    all_volumes, all_opens, all_closes, all_dates = test.get_all_data(ticker='NFLX')

    print(all_volumes)
    data_dict = {
        'date-time': [datetime.datetime.fromtimestamp(t) for t in all_dates],
        'open': all_opens,
        'close': all_closes,
        'volume': all_volumes
    }

    df = pd.DataFrame(data_dict)

    df.to_csv('./NFLX_OPT_Dataset.csv')

    # time, volume, price = test.get_data(ticker='NFLX')
    # print(type(time))
    # print("Time:", time, "Volume:", volume, "Price:", price, sep="\n")
    #
    # print("Human readable Time:", test.get_time())


[1.0, 6.0, 30.0, 6.0, 35.0, 4.0, 63.0, 70.0, 10.0, 70.0, 33.0, 15.0, 5.0, 10.0, 31.0, 10.0, 5.0, 90895.0, 232.0, 2.0, 1.0, 1.0, 4.0, 5.0, 1.0, 10.0, 52.0, 4.0, 2.0, 3.0, 45.0, 109.0, 2.0, 348.0, 72.0, 11.0, 28.0, 8.0, 19.0, 206.0, 14.0, 10.0, 21.0, 118.0, 60.0, 2.0, 200.0, 2.0, 224.0, 1.0, 4.0, 6.0, 5.0, 6.0, 101.0, 56.0, 216.0, 4.0, 5.0, 29.0, 304.0, 3.0, 42.0, 38.0, 221.0, 209.0, 3.0, 4.0, 6.0, 2.0, 2.0, 222.0, 151.0, 63.0, 60.0, 15.0, 32.0, 3.0, 86.0, 28.0, 2.0, 19.0, 5.0, 11.0, 133.0, 160.0, 240.0, 85.0, 4.0, 4.0, 29.0, 32.0, 14.0, 120.0, 26.0, 77.0, 9.0, 31.0, 209.0, 1810.0, 50.0, 37.0, 49985.0, 7656.0, 3275.0, 3623.0, 6241.0, 5771.0, 7130.0, 5380.0, 3996.0, 10612.0, 10839.0, 8265.0, 4716.0, 6067.0, 7601.0, 3374.0, 4414.0, 5033.0, 2946.0, 4966.0, 2748.0, 10118.0, 6577.0, 5578.0, 5144.0, 2115.0, 4486.0, 2884.0, 4818.0, 9611.0, 4465.0, 4256.0, 5347.0, 2222.0, 3292.0, 2294.0, 2393.0, 2835.0, 4714.0, 2797.0, 1466.0, 4144.0, 5585.0, 2746.0, 1587.0, 3126.0, 3342.0, 1635.0, 5212.0, 6587.

In [None]:
import numpy as np
from keras.models import Sequential
from keras.layers import LSTM, Dense
from sklearn.preprocessing import MinMaxScaler

# List of tickers
tickers = ["AAPL", "GOOGL", "MSFT", "AMZN"]

# Fetch historical data for multiple tickers
all_data = {}
for ticker in tickers:
    try:
        test = FinViz(asset_type=Constant.STOCK, timeout=10)
        all_volumes, all_opens, all_closes, all_dates = test.get_all_data(ticker=ticker, time_frame='d')
        all_data[ticker] = {'closes': all_closes}
    except Exception as e:
        print(f"Error fetching data for {ticker}: {e}")
        continue

# Preprocess the data
combined_data = []
for ticker, data in all_data.items():
    closes = data['closes']
    scaler = MinMaxScaler(feature_range=(0, 1))
    scaled_closes = scaler.fit_transform(np.array(closes).reshape(-1, 1))
    combined_data.append(scaled_closes)

combined_data = np.concatenate(combined_data, axis=0)

# Split the data into training and testing sets
train_size = int(len(combined_data) * 0.8)
train_data = combined_data[:train_size]
test_data = combined_data[train_size:]

# Function to create input and output sequences
def create_sequences(data, look_back):
    X, y = [], []
    for i in range(len(data) - look_back):
        X.append(data[i:(i + look_back)])
        y.append(data[i + look_back])
    return np.array(X), np.array(y)

# Create input and output sequences for LSTM
look_back = 60
X_train, y_train = create_sequences(train_data, look_back)
X_test, y_test = create_sequences(test_data, look_back)

# Reshape input data for LSTM
X_train = np.reshape(X_train, (X_train.shape[0], X_train.shape[1], 1))
X_test = np.reshape(X_test, (X_test.shape[0], X_test.shape[1], 1))

# Build the LSTM model
model = Sequential()
model.add(LSTM(50, input_shape=(look_back, 1)))
model.add(Dense(1))
model.compile(loss='mean_squared_error', optimizer='adam')

# Train the model
model.fit(X_train, y_train, epochs=100, batch_size=64, verbose=1)

# Evaluate the model
score = model.evaluate(X_test, y_test, verbose=0)
print('Test MSE:', score)



Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100
Epoch 14/100
Epoch 15/100
Epoch 16/100
Epoch 17/100
Epoch 18/100
Epoch 19/100
Epoch 20/100
Epoch 21/100
Epoch 22/100
Epoch 23/100
Epoch 24/100
Epoch 25/100
Epoch 26/100
Epoch 27/100
Epoch 28/100
Epoch 29/100
Epoch 30/100
Epoch 31/100
Epoch 32/100
Epoch 33/100
Epoch 34/100
Epoch 35/100
Epoch 36/100
Epoch 37/100
Epoch 38/100
Epoch 39/100
Epoch 40/100
Epoch 41/100
Epoch 42/100
Epoch 43/100
Epoch 44/100
Epoch 45/100
Epoch 46/100
Epoch 47/100
Epoch 48/100
Epoch 49/100
Epoch 50/100
Epoch 51/100
Epoch 52/100
Epoch 53/100
Epoch 54/100
Epoch 55/100
Epoch 56/100
Epoch 57/100
Epoch 58/100
Epoch 59/100
Epoch 60/100
Epoch 61/100
Epoch 62/100
Epoch 63/100
Epoch 64/100
Epoch 65/100
Epoch 66/100
Epoch 67/100
Epoch 68/100
Epoch 69/100
Epoch 70/100
Epoch 71/100
Epoch 72/100
Epoch 73/100
Epoch 74/100
Epoch 75/100
Epoch 76/100
Epoch 77/100
Epoch 78

In [None]:
import numpy as np
from keras.models import Sequential
from keras.layers import LSTM, Dense
from sklearn.preprocessing import MinMaxScaler

# List of tickers
tickers = ["AAPL", "GOOGL", "MSFT", "AMZN"]

all_data = {}
for ticker in tickers:
    try:
        test = FinViz(asset_type=Constant.STOCK, timeout=10)
        data = test.get_all_data(ticker=ticker, time_frame='d')

        # Ensure we have all required data
        if len(data) == 4:  # Original return value
            all_volumes, all_opens, all_closes, all_dates = data
            # Calculate or use placeholder values for missing data
            all_highs = all_closes  # placeholder
            all_lows = all_opens    # placeholder
            all_avg_volumes = all_volumes  # placeholder
            all_rel_volumes = [1.0] * len(all_volumes)  # placeholder
        elif len(data) == 8:  # If method was updated to return all values
            all_volumes, all_opens, all_closes, all_dates, all_highs, all_lows, all_avg_volumes, all_rel_volumes = data
        else:
            raise ValueError("Unexpected number of return values from get_all_data")

        all_data[ticker] = {
            'closes': all_closes,
            'opens': all_opens,
            'highs': all_highs,
            'lows': all_lows,
            'avg_volumes': all_avg_volumes,
            'rel_volumes': all_rel_volumes
        }
    except Exception as e:
        print(f"Error fetching data for {ticker}: {e}")
        continue

# Preprocess the data
combined_data = []
for ticker, data in all_data.items():
    closes = data['closes']
    opens = data['opens']
    highs = data['highs']
    lows = data['lows']
    avg_volumes = data['avg_volumes']
    rel_volumes = data['rel_volumes']

    scaler = MinMaxScaler(feature_range=(0, 1))
    scaled_closes = scaler.fit_transform(np.array(closes).reshape(-1, 1))
    scaled_opens = scaler.fit_transform(np.array(opens).reshape(-1, 1))
    scaled_highs = scaler.fit_transform(np.array(highs).reshape(-1, 1))
    scaled_lows = scaler.fit_transform(np.array(lows).reshape(-1, 1))
    scaled_avg_volumes = scaler.fit_transform(np.array(avg_volumes).reshape(-1, 1))
    scaled_rel_volumes = scaler.fit_transform(np.array(rel_volumes).reshape(-1, 1))

    combined_data.append(np.column_stack((scaled_closes, scaled_opens, scaled_highs, scaled_lows, scaled_avg_volumes, scaled_rel_volumes)))

if combined_data:
    combined_data = np.concatenate(combined_data, axis=0)
else:
    print("No data available for processing")

# Split the data into training and testing sets
train_size = int(len(combined_data) * 0.8)
train_data = combined_data[:train_size]
test_data = combined_data[train_size:]

def create_sequences(data, look_back):
    X, y = [], []
    for i in range(len(data) - look_back):
        X.append(data[i:(i + look_back)])
        y.append(data[i + look_back, 0])  # Use closing price as target
    return np.array(X), np.array(y)

# Create input and output sequences for LSTM
look_back = 60
X_train, y_train = create_sequences(train_data, look_back)
X_test, y_test = create_sequences(test_data, look_back)

# Reshape input data for LSTM
X_train = np.reshape(X_train, (X_train.shape[0], X_train.shape[1], 6))
X_test = np.reshape(X_test, (X_test.shape[0], X_test.shape[1], 6))

# Build the LSTM model
model = Sequential()
model.add(LSTM(50, input_shape=(look_back, 6)))
model.add(Dense(1))
model.compile(loss='mean_squared_error', optimizer='adam')

# Train the model
model.fit(X_train, y_train, epochs=100, batch_size=64, verbose=1)

# Evaluate the model
score = model.evaluate(X_test, y_test, verbose=0)
print('Test MSE:', score)

Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100
Epoch 14/100
Epoch 15/100
Epoch 16/100
Epoch 17/100
Epoch 18/100
Epoch 19/100
Epoch 20/100
Epoch 21/100
Epoch 22/100
Epoch 23/100
Epoch 24/100
Epoch 25/100
Epoch 26/100
Epoch 27/100
Epoch 28/100
Epoch 29/100
Epoch 30/100
Epoch 31/100
Epoch 32/100
Epoch 33/100
Epoch 34/100
Epoch 35/100
Epoch 36/100
Epoch 37/100
Epoch 38/100
Epoch 39/100
Epoch 40/100
Epoch 41/100
Epoch 42/100
Epoch 43/100
Epoch 44/100
Epoch 45/100
Epoch 46/100
Epoch 47/100
Epoch 48/100
Epoch 49/100
Epoch 50/100
Epoch 51/100
Epoch 52/100
Epoch 53/100
Epoch 54/100
Epoch 55/100
Epoch 56/100
Epoch 57/100
Epoch 58/100
Epoch 59/100
Epoch 60/100
Epoch 61/100
Epoch 62/100
Epoch 63/100
Epoch 64/100
Epoch 65/100
Epoch 66/100
Epoch 67/100
Epoch 68/100
Epoch 69/100
Epoch 70/100
Epoch 71/100
Epoch 72/100
Epoch 73/100
Epoch 74/100
Epoch 75/100
Epoch 76/100
Epoch 77/100
Epoch 78