### Imports

In [1]:
import pandas as pd
import numpy as np
import plotly.express as px
from collections import deque
from datetime import datetime,timedelta
from typing import Callable
import requests
from enum import Enum, auto
import sqlite3
from scipy.stats import linregress as lr, median_abs_deviation as mad
import plotly.graph_objects as go
import calendar


### Columns

In [2]:
class Columns(Enum):
    CURRENCY=('currency', 'TEXT')
    SYMBOL=('symbol', 'TEXT')
    EXCHANGENAME=('exchangeName', 'TEXT')
    FULLEXCHANGENAME=('fullExchangeName', 'TEXT')
    INSTRUMENTTYPE=('instrumentType', 'TEXT')
    GMTOFFSET=('gmtoffset', 'INT')
    TIMEZONE=('timezone', 'TEXT')
    EXCHANGETIMEZONENAME=('exchangeTimezoneName', 'TEXT')
    FIFTYTWOWEEKHIGH=('fiftyTwoWeekHigh', 'REAL')
    FIFTYTWOWEEKLOW=('fiftyTwoWeekLow', 'REAL')
    SHORTNAME=('shortName', 'TEXT')
    CHARTPREVIOUSCLOSE=('chartPreviousClose', 'REAL')
    PRICEHINT=('priceHint', 'INT')
    START=('start', 'INT')
    END=('end', 'INT')
    TIMESTAMP=('timestamp', 'INT')
    DATE=('date', 'TEXT')
    CLOSE=('close', 'REAL')
    LOW=('low', 'REAL')
    OPEN=('open', 'REAL')
    VOLUME=('volume', 'INT')
    HIGH=('high', 'REAL')
    ADJCLOSE=('adjclose', 'REAL')
    ERROR=('error', 'TEXT')

### Helper Classes

In [3]:
class helper_datetime:
    @staticmethod
    def freq_map():
        return {"d":1,"m":30,"y":365}
    # Helper Functions
    @staticmethod
    def get_date_n_ago(date:datetime, period:int, freq:str="d") -> str:
        return date-timedelta(days=period*helper_datetime.freq_map()[freq])
            

    @staticmethod
    def get_today_date() -> str:
        return datetime.today()

    
    @staticmethod
    def to_timestamp(date:datetime) -> int:
        if date:
            return round(datetime.timestamp(date))
        else:
            return None

    @staticmethod
    def to_datetime_s(timestamp:int) -> int:
        return datetime.fromtimestamp(timestamp)

    @staticmethod
    def to_datetime(timestamps:list) -> int:
        to_datetime = lambda x: datetime.fromtimestamp(x)
        return [*map(to_datetime,timestamps)]

    @staticmethod
    def get_sequence(start_date:datetime, end_date:datetime, period:int, freq:str="d") -> list:
        
        seq:list = []
        while start_date < end_date:
            seq.append(start_date)
            start_date+=timedelta(days=period*helper_datetime.freq_map()[freq])
        #if seq[-1] > end_date:
        #    seq[-1] = end_date
        #elif seq[-1] != end_date:
        #    seq.append(end_date)
    
        return seq
    
    @staticmethod
    def get_last_weekday(date:datetime) -> datetime:
        yyyy_mm_dd = date.date()
        """Returns friday's date if weekend else returns same date"""
        week_day = calendar.weekday(*map(int,str(date.date()).split("-")))
        if week_day == calendar.SATURDAY:
            return yyyy_mm_dd - timedelta(days=1)
        elif week_day == calendar.SUNDAY:
            return yyyy_mm_dd - timedelta(days=2)
        else:
            return yyyy_mm_dd

    @staticmethod
    def get_month_from_num(num: int) -> str:
        months = {
            1: "January",
            2: "February",
            3: "March",
            4: "April",
            5: "May",
            6: "June",
            7: "July",
            8: "August",
            9: "September",
            10: "October",
            11: "November",
            12: "December"
        }
        return months.get(num)


class helper_dictionary:
    @staticmethod
    def unravel(d:dict):
        r:dict={}
        def unravel_inner(d:dict):
            for k, v in d.items():
                if isinstance(v,dict):
                    unravel_inner(v)
                    
                elif isinstance(v,list):
                    for obj in v:
                        if isinstance(obj,dict):
                            unravel_inner(obj)
                        else:
                            r[k] = v
                            
                else:
                    r[k] = v
        unravel_inner(d)
        return r

class helper_enum:
    @staticmethod
    def to_dict(e:Enum):
        return {i.value[0]:i.value[1] for i in e}
    

class helper_database:
    @staticmethod
    def _get_statement(get_statement:Callable):
        def wrapper(func:Callable):
            def inner(*args,statement:str=None,**kwargs) -> None:
                return func(*args,**kwargs,statement=get_statement(**kwargs))
            return inner
        return wrapper


    @staticmethod
    def _get_create_statement(table:str, columns:dict[str,str]):
        return f"CREATE TABLE IF NOT EXISTS {table} (" \
                            + ",".join([f"'{col_name}' {types}" for col_name,types in columns.items()])\
                            + ")"

    @staticmethod
    def _get_insert_statement(table:str, data:dict[str,object]):
        return (f"INSERT INTO {table} (" \
                            + ",".join([f"'{col_name}'" for col_name in data.keys()])\
                            + ") VALUES ("
                            + ",".join([f"'{val}'" for val in data.values()])\
                            + ")")


    @staticmethod
    def _get_delete_statement(table:str) -> str:
        return f"DROP TABLE IF EXISTS {table}"


    @staticmethod
    def _get_get_statement(table:str) -> str:
        return f"SELECT * FROM {table}"


    @staticmethod
    def _get_check_statement(table:str) -> str:
        return f"SELECT name FROM sqlite_master WHERE name = '{table}'"

    
    @staticmethod
    def _get_latest_date_statement(table:str) -> str:
        return f"SELECT MAX(date) AS max_date FROM {table}"

    
    @staticmethod
    def error_handle(table_check:Callable[str,bool],should_exist:bool):
        def wrapper(func):
            def inner(self,table:str):
                if table_check(table):
                    return func(self,table) if should_exist else None
                else:
                    return None if should_exist else func(self,table)
            return inner
        return wrapper


    @staticmethod
    def _log_delete(table:str):
        return f"UPDATE logs SET updated_at = '{helper.datetime.get_today_date().date()}', deleted_at = '{helper.datetime.get_today_date().date()}'  WHERE name = '{table}'; "


    @staticmethod
    def _log_create(table:str, columns:dict[str,object]):
        return f"INSERT INTO logs ('name','created_at', 'updated_at') VALUES ('{table}','{helper.datetime.get_today_date().date()}','{helper.datetime.get_today_date().date()}')"

    @staticmethod
    def _log_update(table:str):
        return f"UPDATE logs SET updated_at = '{helper.datetime.get_today_date().date()}' WHERE name = '{table}'; "

              
    @staticmethod
    def _log(log_statement:Callable):
        def wrapper(func:Callable):
            def inner(*args,**kwargs) -> None:
                args[0].Cursor.execute(log_statement(**kwargs))
                args[0].commit()    
                return func(*args,**kwargs)
            return inner
        return wrapper

    
    def dict_factory(cursor, row):
        fields = [column[0] for column in cursor.description]
        return {key: value for key, value in zip(fields, row)}

class helper_yfinance:

    @staticmethod
    def trim_response_json(response:dict) ->dict:
        trimmed_response:dict = {}
        for pair in Columns:
            trimmed_response[pair.value[0]] = response[pair.value[0]]
        return trimmed_response
            



class helper:
    database=helper_database
    datetime=helper_datetime
    dictionary=helper_dictionary
    enum=helper_enum
    yfinance = helper_yfinance

### Database

In [4]:
class DataBase(sqlite3.Connection):
    def __init__(self,*args, **kwargs):
        super().__init__(*args, **kwargs)
        self.row_factory = helper.database.dict_factory
        self.Cursor = self.cursor()
        if not self.check(table="logs"):
            self.Cursor.execute(helper.database._get_create_statement(table="logs", columns={"name":"TEXT", "created_at":"TEXT","deleted_at":"TEXT", "updated_at":"TEXT"}))
            self.commit()

    @helper.database._get_statement(helper.database._get_check_statement)
    def check(self, table:str,statement:str=None) -> None:
        return True if self.Cursor.execute(statement).fetchone() else False# returns empty if not found

    @helper.database._log(helper.database._log_create)
    @helper.database._get_statement(helper.database._get_create_statement)
    def create(self, table:str, columns:dict[str,str],statement:str=None):
        self.Cursor.execute(statement)
        self.commit()

    @helper.database._log(helper.database._log_delete)
    @helper.database._get_statement(helper.database._get_delete_statement)
    def delete(self, table:str,statement:str=None) -> None:
        self.Cursor.execute(statement)
        self.commit()
          
    
    @helper.database._get_statement(helper.database._get_insert_statement)
    def insert(self, table:str,data:dict[str,object],statement:str=None) ->None:
        self.Cursor.execute(statement)
        self.commit()

    
    @helper.database._get_statement(helper.database._get_get_statement)
    def get(self, table:str,statement:str=None) ->None:
        return self.Cursor.execute(statement).fetchall()

    @helper.database._get_statement(helper.database._get_latest_date_statement)
    def get_latest_date(self, table:str,statement:str=None) -> str:
        return self.Cursor.execute(statement).fetchall()[0]["max_date"]
        

### Yahoo Webscraper

In [5]:
class request_response:
    def __init__(self,symbol:str, start_date:datetime=None, end_date:datetime=None) -> None:
        self.symbol:str = symbol
        self.update(start_date, end_date)
        self.data:dict = {}

    def update(self, start_date:datetime, end_date:datetime) -> None:
        self.start_date:datetime = start_date
        self.end_date:datetime = end_date
        self.start_period:int = helper.datetime.to_timestamp(start_date)
        self.end_period:int = helper.datetime.to_timestamp(end_date)
        

    def clear(self) -> None:
        self.symbol = None
        self.start_date = None
        self.end_date = None
        self.start_period = None
        self.end_period = None

In [6]:
class ScrapeMethod(int,Enum):
    ALL=auto() # requires no dates scrapes everything
    FROM=auto() # only require first date d1
    TO=auto() # only requires first date d1
    FROMTO = auto() # requires both dates d1 and d2

In [7]:
class WebScraper:
    def __init__(self,database:DataBase):
        """Initialise response class, database and user agent to be used, response object will holds the data returned from the get request and written to the
        appropriate database table, response objest will be reused for subsequent calls"""
        self.db:DataBase = database
        self.response_info:response_info = response_info()
        self.user_agent={'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:129.0) Gecko/20100101 Firefox/129.0'}

    def __call__(self, method:ScrapeMethod, d1:datetime, d2:datetime) -> None:
        """Decides which method to use
        Method:
            For a given stock.
            ALL: Scrapes all data.
            FROM: Scrapes all data from data specified to present day.
            TO: Scrapes all data from first record to data specified.
            FROMTO: Scrapes all data from first date specified to second date specified.
        """
    def scrape(self) -> None:
        """Will repeatedly use the get, transform and write members to sequentially scrape and write data to database"""
        pass 
        

    def get(self) -> None:
        """This function will use the request object to make the get request,
        and store the response as a dictionary within the response object"""
        pass
        

    def transform(self) -> bool:
        """Transformation steps for the response object will be executed returning true if response object is valid"""
        
        return True


    def write(self):
        """Writes response object to database"""
        pass
            

In [8]:
            


class yahoo_finance:
    def __init__(self,database:DataBase, db_symbol:str):
        self.db = database
        self.db_symbol = db_symbol
        
        self.user_agent={'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:129.0) Gecko/20100101 Firefox/129.0'}
        

    def __call__(self,stock:str, start_date:datetime=None, end_date:datetime=None, method:ScrapeMethod=ScrapeMethod.ALL) -> bool:
        self.method = method
        self.request_info = request_response(symbol=stock)
        self.response_info = request_response(symbol=stock)   

        
        if self.method == ScrapeMethod.ALL:
            self.start_date:datetime=helper.datetime.get_date_n_ago(helper.datetime.get_today_date(),period=40,freq="y")
            self.end_date:datetime=helper.datetime.get_today_date()
        elif self.method == ScrapeMethod.FROM:
            self.start_date:datetime=start_date
            self.end_date:datetime=helper.datetime.get_today_date()
        elif self.method == ScrapeMethod.TO:
            self.start_date:datetime=None
            self.end_date:datetime=end_date
        elif self.method == ScrapeMethod.FROMTO:
            self.start_date:datetime=start_date
            self.end_date:datetime=end_date
        else:
            raise AttributeError(f"Selected method {self.method} does not exist")

        self.scrape()
            

    def scrape(self):
        self.response_info.data["code"] = ""
        self.request_info.update(helper.datetime.get_date_n_ago(self.end_date, period=6, freq="m"), self.end_date)
        while (self._gt_start_date() and self._lt_end_date() and not self._is_bad_request()):
            self.execute()

        if self.start_date:
            self.request_info.update(self.start_date, self.request_info.end_date)
            self.execute()

    def _gt_start_date(self) -> bool:
        return self.request_info.start_date > self.start_date if self.start_date else True

    def _lt_end_date(self) -> bool:
        return self.request_info.end_date <= self.end_date if self.end_date else True

    def _is_bad_request(self) ->bool:
        return self.response_info.data.get("code").lower() == "bad request" if self.response_info.data.get("code") else False

        
    def execute(self) -> None:
        self.get()
        self.transform()
        self.write()
    

    def get(self) -> None:
        self.response_info.response = requests.get(f"https://query1.finance.yahoo.com/v8/finance/chart/{self.request_info.symbol}?formatted=true&crumb=Gg8TRsb%2F.kh&lang=en-GB&region=GB&includeAdjustedClose=true&interval=1d&period1={self.request_info.start_period}&period2={self.request_info.end_period}&events=capitalGain%7Cdiv%7Csplit&useYfid=true&corsDomain=uk.finance.yahoo.com",
                      headers=self.user_agent)
        

    def transform(self):
        self.response_info.response.string = self.response_info.response.text.replace("true","True")
        self.response_info.response.string = self.response_info.response.string.replace("false","False")
        self.response_info.response.string = self.response_info.response.string.replace("null","None")
        self.response_info.JSON = eval(self.response_info.response.string)
        self.response_info.data = helper.dictionary.unravel(self.response_info.JSON)
        if self._is_bad_request():
            return

        if not self.response_info.data.get(Columns.TIMESTAMP.value[0]):
            self.response_info.data["code"] = "bad request"
            return
        self.response_info.data[Columns.DATE.value[0]] = \
        helper.datetime.to_datetime(self.response_info.data[Columns.TIMESTAMP.value[0]])
        self.response_info.update(start_date = min(self.response_info.data[Columns.DATE.value[0]]),
                                  end_date = max(self.response_info.data[Columns.DATE.value[0]]))
        self.response_info.data = helper.yfinance.trim_response_json(self.response_info.data)

        #New request
        self.request_info.update(start_date= helper.datetime.get_date_n_ago(self.response_info.start_date,period=6, freq="m"),
                                 end_date= helper.datetime.get_date_n_ago(self.response_info.start_date,period=1,freq="d"))


    def write(self):
        if self._is_bad_request():
            return
        for idx in range(len(self.response_info.data[Columns.OPEN.value[0]])):
            self.db.insert(table=self.db_symbol, 
                      data={key:value[idx] if isinstance(value,list) else value for key, value in self.response_info.data.items()})
            

            

In [9]:
# Main
db = DataBase("stock.db")

#fd = processor(db)
#for stk in ["SBUX","AAPL","MSFT","TSLA","CMCSA","AMZN"]:
#    fd(symbol=stk)



In [11]:
AMZN = pd.DataFrame(db.get(table="AMZN"))

In [18]:
AMZN.date.max()

'2024-08-30 21:00:01'