In [3]:
# PRICE HISTORY SCRIPT
# This script is responsible for getting the price
# history of any symbol passed to the constructor
from src.urls import TDA_BASE
from config.secrets import TDA_APIKEY, PERIOD, PERIODTYPE, FREQUENCY, FREQUENCYTYPE
import requests
import multiprocessing
from concurrent.futures import ProcessPoolExecutor
import pandas as pd
from pandas import DataFrame
from models.mysql_db import create_pricehistory_engine, generate_symbols, _select_symbols
from loguru import logger
import os.path
import time

In [4]:
# CREATE THE LOGGER FOR THIS SCRIPT:
log_path = str(os.path.curdir) + '/logs/'
base_fmt = "[{time:YYYY-MM-DD at HH:mm:ss}]|[{name}]-[<lvl>{message}</lvl>]"
logger.add(log_path+"pricehistory.log", format=base_fmt, level="DEBUG", rotation="20 MB",
           colorize=True, enqueue=True, catch=True)

2

In [5]:
"""
=================================================================================
THE PRICE HISTORY CLASS|
-----------------------+
This will be responsible for getting the price history for every symbol
stored in the database. Using a generator function in the models.mysql_db
script, I will be able to generate one symbol at a time and then execute
the insert_price_data() method.

I am going to try two different approaches. First, I will try the map()
function to see if it can handle executing the function on that long of
a list.

If that doesn't work, I will try to utilize the generator function to
generate one symbol at a time and only execute the insert method one
stock at a time.
=================================================================================
"""



In [6]:
class ProcessPricehistory(ProcessPoolExecutor):

    def __init__(self, task, workers: int):
        super(ProcessPricehistory, self).__init__()
        self.task = task
        self.executor = ProcessPoolExecutor(max_workers=workers)

    def run(self):
        # THIS WILL EXECUTE THE MAIN METHOD IN QUOTE USING MAP AND THREADED POOL PROCESSES:
        logger.info(
            "Running Process Pool Executor on Pricehistory's execute_main method")
        self.executor.submit(self.task)


In [7]:
class PriceHistory:

    def __init__(self, stocks:list, **params):
        self.params = params  # params are set at bottom, imported from config.ini file
        self.table_name = self._set_table_name()
        self.stock_chunks = self.chunks(stocks)
        
        
    def chunks(self, l: list, n: int = 200):
        """
        :param l: takes in a list
        :param n: Lets you know how long you want each chunk to be
        """
        n = max(1, n)
        logger.info("[+] Stocks chunked into groups of 200..")
        return (l[i: i + n] for i in range(0, len(l), n))

    def _set_table_name(self):
        """
        Simple way to set the name of the table to save the price data to
        dynamically. This way no matter what the params are, the data will
        be saved to correct table.
        """
        if int(FREQUENCY) > 1:
            name = f"_{FREQUENCY}_{FREQUENCYTYPE}_data"
            logger.info("[+] Storing Pricehistory data to table: {}", name)
            return name
        else:
            name = f"one_{FREQUENCYTYPE}_data"
            logger.info("[+] Storing Pricehistory data to table: {}", name)
            return name

    def data(self, stock):
        """
        :param symbol: company symbol/ticker
        :Example: MSFT 10 day minute 10

        :returns:
        raw json data (Open, High, Low, close, Volume, and Time (epoch time))
        """
        url = TDA_BASE + f"marketdata/{stock}/pricehistory"

        params = {
            'period': self.params['params']['period'],
            'periodType': self.params['params']['periodType'],
            'frequency': self.params['params']['frequency'],
            'frequencyType': self.params['params']['frequencyType'],
        }

        # Other users will need their own TD Ameritrade API Key
        params.update({"apikey": TDA_APIKEY})

        # request price history data
        req = requests.get(url, params=params).json()

        candles = dict(req)  # turn candles into a dict() type
        extracted_candles_list = candles["candles"]
        symbol = candles["symbol"]  # symbol of the compan's price data

        # Create data frame from extracted data
        df = pd.DataFrame.from_dict(extracted_candles_list, orient="columns")
        df.rename(columns={"datetime": "unix"}, inplace=True)
        df["unix"] = [x for x in df["unix"] // 10 ** 3]

        # This is to insert the companies symbol into the data frame
        # in every row next to the unix_time so that I can identify
        # who the data belongs to.
        df["symbol"] = symbol

        return df
    
    def insert_price_data(self, data):
        try:
            table = self.table_name
            engine = create_pricehistory_engine()
            data.to_sql(name=table, con=engine,
                        if_exists='append', index=False)
            logger.info("{} inserted successfully!", data["symbol"][1])
        except Exception as e:
            logger.error("Error Caused Due to {}", e)
            
    def execute_data(self, symbols):
        # pool = multiprocessing.Pool(processes=4)
        try:
            logger.info("Multiprocessing Pool Starting to get price data")
            data = pd.concat([self.data(each)
                                           for each in self.stock_chunks])
            logger.info("Data Retrieved from ProcessPoolExecutor map method")
            return data
        except Exception as e:
            logger.error("Exception Raised: {}", e)

    def execute_main(self):
        symbols = _select_symbols()
        # symbol = generate_symbols()
        # pool = multiprocessing.Pool(processes=4)
        try:
            count = 0
            while True:
                try:
                    # stock = next(symbol)
                    # self.insert_price_data(stock)
                    # logger.info("[{}]<green>Price History Data Inserted Successfully</green>", stock)
                    logger.info("[<green>Price History Map Function Started</green>]")
                    map(self.insert_price_data, symbols)
                    count += 1
                except KeyError as ke:
                    logger.error("Failed to insert: Due to {}", ke)
                    continue
                except StopIteration as si:
                    logger.info("{} No More Stocks to Get Data for", si)
                    continue
        except ValueError as ve:
            logger.error("Error Caused Due to {}", ve)
            if ve:
                engine = self.pricehistory_engine
                stmt = "DROP TABLE IF EXISTS {table}".format(self.table_name)
                engine.execute(stmt)
                logger.info("SQL Statement {} Executed...", stmt)
        except Exception as e:
            logger.error("Error Caused Due to {}", e)
        finally:
            logger.info("[{}] Stocks Inserted Successfully!", count)

In [8]:
symbols = _select_symbols()

In [9]:
params = {
    'symbol': 'stock',
    'period': PERIOD,
    'periodType': PERIODTYPE,
    'frequency': FREQUENCY,
    'frequencyType': FREQUENCYTYPE,
}

In [19]:
price_history = PriceHistory(symbols, params=params)
logger.info(
    "PriceHistory Object Initialized: Table {} created", price_history.table_name)


2022-05-15 17:06:33.457 | INFO     | __main__:_set_table_name:30 - [+] Storing Pricehistory data to table: one_minute_data
2022-05-15 17:06:33.458 | INFO     | __main__:chunks:15 - [+] Stocks chunked into groups of 200..
2022-05-15 17:06:33.459 | INFO     | __main__:<cell line: 2>:2 - PriceHistory Object Initialized: Table one_minute_data created


In [41]:


price_history.execute_main()


2022-05-15 13:51:15.371 | INFO     | __main__:execute_data:74 - Multiprocessing Pool Starting to get price data


In [20]:
data = price_history.stock_chunks



In [21]:
print(next(data))

['A', 'AA', 'AAC', 'AAI', 'AAIN', 'AAM', 'AAN', 'AAP', 'AAQ', 'AAQC', 'AAT', 'AB', 'ABB', 'ABBV', 'ABC', 'ABEV', 'ABG', 'ABM', 'ABR', 'ABT', 'AC', 'ACA', 'ACAQ', 'ACC', 'ACCO', 'ACD', 'ACDI', 'ACEL', 'ACH', 'ACHR', 'ACI', 'ACII', 'ACM', 'ACN', 'ACP', 'ACR', 'ACRE', 'ACRO', 'ACV', 'ADC', 'ADCT', 'ADE', 'ADEX', 'ADM', 'ADNT', 'ADT', 'ADX', 'AEE', 'AEFC', 'AEG', 'AEL', 'AEM', 'AENZ', 'AEO', 'AER', 'AES', 'AESC', 'AEV', 'AEVA', 'AFB', 'AFG', 'AFGB', 'AFGE', 'AFI', 'AFL', 'AFT', 'AFTR', 'AG', 'AGA', 'AGAC', 'AGCB', 'AGCO', 'AGD', 'AGI', 'AGL', 'AGM', 'AGO', 'AGR', 'AGRO', 'AGS', 'AGTI', 'AGX', 'AHH', 'AHL', 'AHT', 'AI', 'AIC', 'AIF', 'AIG', 'AIN', 'AIO', 'AIR', 'AIRC', 'AIT', 'AIU', 'AIV', 'AIZ', 'AIZN', 'AJG', 'AJRD', 'AJX', 'AJXA', 'AKA', 'AKO', 'AKR', 'AL', 'ALB', 'ALC', 'ALCC', 'ALE', 'ALEX', 'ALG', 'ALI', 'ALIT', 'ALK', 'ALL', 'ALLE', 'ALLG', 'ALLY', 'ALP', 'ALSN', 'ALT', 'ALV', 'ALX', 'AM', 'AMAM', 'AMB', 'AMBC', 'AMBP', 'AMC', 'AMCR', 'AME', 'AMG', 'AMH', 'AMK', 'AMN', 'AMOV', 'AMP',

In [22]:
symbols = next(data)

In [23]:
d = map(price_history.data, symbols)


In [25]:
df = []

In [26]:
for i in d:
    df.append(i)
print(i)

KeyError: 'unix'