In [4]:
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.decomposition import PCA
from sklearn import linear_model
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor
import yfinance as yf
import numpy as np
from sklearn.preprocessing import StandardScaler

In [2]:
import logging
import pymongo
import pandas as pds
import expiringdict

import utils

client = pymongo.MongoClient()
logger = logging.Logger(__name__)
RESULT_CACHE_EXPIRATION = 10        


def upsert_stock(df):
    """
    Update MongoDB database `stock` and collection `stock` with the given `DataFrame`.
    """
    db = client.get_database("stock")
    collection = db.get_collection("stock")
    update_count = 0
    for record in df.to_dict('records'):
        result = collection.replace_one(
            filter={'Datetime': record['Datetime']},    
            replacement=record,                         
            upsert=True)                                
        if result.matched_count > 0:
            update_count += 1
    logger.info("rows={}, update={}, ".format(df.shape[0], update_count) +
                "insert={}".format(df.shape[0]-update_count))


def fetch_all_stock():
    db = client.get_database("stock")
    collection = db.get_collection("stock")
    ret = list(collection.find())
    logger.info(str(len(ret)) + ' documents read from the db')
    return ret


_fetch_all_stock_as_df_cache = expiringdict.ExpiringDict(max_len=1,
                                                       max_age_seconds=RESULT_CACHE_EXPIRATION)


def fetch_all_stock_as_df(allow_cached=False):
    """Converts list of dicts returned by `fetch_all_stock` to DataFrame with ID removed
    Actual job is done in `_worker`. When `allow_cached`, attempt to retrieve timed cached from
    `_fetch_all_stock_as_df_cache`; ignore cache and call `_work` if cache expires or `allow_cached`
    is False.
    """
    def _work():
        data = fetch_all_stock()
        if len(data) == 0:
            return None
        df = pds.DataFrame.from_records(data)
        df.drop('_id', axis=1, inplace=True)
        return df

    if allow_cached:
        try:
            return _fetch_all_stock_as_df_cache['cache']
        except KeyError:
            pass
    ret = _work()
    _fetch_all_stock_as_df_cache['cache'] = ret
    return ret


if __name__ == '__main__':
    print(fetch_all_stock_as_df())


                Datetime         SPY       SBUX        AAPL  MSFT
0    2020-11-18 14:30:00  360.760010  98.510002  118.910004   NaN
1    2020-11-18 14:31:00  360.679993  98.565498  118.684998   NaN
2    2020-11-18 14:32:00  360.730011  98.669998  118.620003   NaN
3    2020-11-18 14:33:00  360.660004  98.705002  118.377701   NaN
4    2020-11-18 14:34:00  360.695007  98.644997  118.499901   NaN
...                  ...         ...        ...         ...   ...
1945 2020-11-24 20:55:00  363.364990  98.330002  115.170097   NaN
1946 2020-11-24 20:56:00  363.140015  98.300003  115.014999   NaN
1947 2020-11-24 20:57:00  363.359985  98.349998  115.149902   NaN
1948 2020-11-24 20:58:00  363.109985  98.300003  115.025002   NaN
1949 2020-11-24 20:59:00  363.179993  98.320000  115.169998   NaN

[1950 rows x 5 columns]
