In [None]:
!pip install mplfinance

In [17]:
# import required pakcages
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import mplfinance as mpf


In [3]:
# define the logger
import logging.config

# set format for log
FORMAT = (
    "%(asctime)-15s %(threadName)s %(filename)s:%(lineno)d %(levelname)s %(message)s"
)
logging.basicConfig(level=logging.INFO, format=FORMAT)

logging.basicConfig(level=logging.INFO, format=FORMAT)
logging.getLogger("seleniumwire").setLevel(logging.WARNING)

logger = logging.getLogger("crawlLog")
logger.setLevel(logging.INFO)

In [8]:
# connect to the database


import sys
import pathlib
from urllib.parse import quote_plus


import pymongo
from pymongo.errors import BulkWriteError


# step1: get mongo name
DB_HOST = "127.0.0.1"
DB_PORT = 27017
DB_USER = ""
DB_PASSWORD = ""
DB_NAME = "stock"


def get_client():
    if DB_USER and DB_PASSWORD:
        return pymongo.MongoClient(
            f"mongodb://{DB_USER}:{quote_plus(DB_PASSWORD)}@{DB_HOST}:{DB_PORT}"
        )
    else:
        return pymongo.MongoClient(f"mongodb://{DB_HOST}:{DB_PORT}")


def get_db():
    return get_client()[DB_NAME]


def insert_docs(doc_list, col_name="test", db_name=DB_NAME):
    ret = -1
    try:
        curr_client = get_client()
        curr_db = curr_client[db_name]

        result = curr_db[col_name].insert_many(doc_list, ordered=False)
        ret = len(result.inserted_ids)
    except BulkWriteError as bwe:
        ret = len(doc_list) - len(bwe.details["writeErrors"])
        logger.error(
            f"Duplicate key error occurred. {ret} documents were successfully inserted."
        )
    except Exception as e:
        logger.error(f"Failed to insert documents into mongodb due to {e}")
    finally:
        curr_client.close()

    return ret


def insert_doc(doc_data, col_name="test", db_name=DB_NAME):
    ret = 0
    try:
        curr_client = get_client()
        curr_db = curr_client[db_name]
        # set ordered to False to ignore the duplicate key error

        result = curr_db[col_name].insert_one(doc_data)
        ret = result.inserted_id
    except Exception as e:
        logger.error("Failed to insert documents into mongodb due to {}".format(str(e)))
    finally:
        curr_client.close()

    return ret


def query_data(col_name="test", limit=10, offset=0, db_name=DB_NAME):
    data_list = list()
    try:
        curr_client = get_client()
        curr_db = curr_client[db_name]
        if limit > 0:
            result = curr_db[col_name].find().skip(offset).limit(limit)
        else:
            result = curr_db[col_name].find()

        for each in result:
            data_list.append(each)
    except Exception as e:
        logger.info("Failed to get data from mongodb due to {}".format(e))
    finally:
        curr_client.close()

    return data_list


def query_by_condition(
    condition, col_name="test", db_name=DB_NAME, limit=0, offset=0, projection=None
):
    data_list = list()
    try:
        curr_client = get_client()
        curr_db = curr_client[db_name]

        if limit > 0:
            result = (
                curr_db[col_name]
                .find(condition, projection=projection)
                .skip(offset)
                .limit(limit)
            )
        else:
            result = curr_db[col_name].find(condition, projection=projection)
        for each in result:
            data_list.append(each)
    except Exception as e:
        logger.info("Failed to get data from mongodb due to {}".format(e))
    finally:
        curr_client.close()

    return data_list


def query_by_condition_random(
    condition, col_name="test", db_name=DB_NAME, limit=0, offset=0, projection=None
):
    data_list = list()
    try:
        curr_client = get_client()
        curr_db = curr_client[db_name]

        pipeline = [
            {"$match": condition},
            {
                "$sample": {
                    "size": (
                        limit
                        if limit > 0
                        else curr_db[col_name].count_documents(condition)
                    )
                }
            },
        ]

        if projection:
            pipeline.insert(1, {"$project": projection})

        result = curr_db[col_name].aggregate(pipeline)

        for each in result:
            data_list.append(each)
    except Exception as e:
        logger.info("Failed to get data from mongodb due to {}".format(e))
    finally:
        curr_client.close()

    return data_list


# count the documemnts by condition
def count_by_condition(query_condition, col_name="test"):
    try:
        curr_client = get_client()
        curr_db = curr_client[DB_NAME]
        count = curr_db[col_name].count_documents(query_condition)
        return count
    except Exception as e:
        logger.error("Failed to count documents due to {}".format(e))
    finally:
        curr_client.close()


# update the document with condition
def update_by_condition(query_condition, new_values, col_name="test", db_name=DB_NAME):
    try:
        curr_client = get_client()
        curr_db = curr_client[db_name]
        result = curr_db[col_name].update_many(query_condition, new_values)
        return result.modified_count
    except Exception as e:
        logger.info("Failed to update data from mongodb due to {}".format(e))
    finally:
        curr_client.close()



In [None]:
# test get the data from stock
def get_stock_data():
    query = {"trade_date": "2025-01-02"}
    results = query_by_condition(query, col_name="qqq_1m", limit=10)
    print(len(results))
    print(results)
    print(results[0])

get_stock_data()

In [None]:
# get the data of given date, then plot the candlestick chart with 1min data

trade_date = "2025-01-03"
query = {"trade_date": trade_date}
results = query_by_condition(query, col_name="qqq_1m")

# plot the candlestick chart
# format the data
# {'_id': '6aeb962e6e545dc6a4023efa7215bc10', 'Datetime': '2025-01-02 09:30:00-0500', 'Open': 514.3599853515625, 'High': 514.3599853515625, 'Low': 513.7000122070312, 'Close': 514.1900024414062, 'Volume': 1735771, 'Dividends': 0.0, 'Stock Splits': 0.0, 'Capital Gains': 0.0, 'trade_date': '2025-01-02'}

# convert the data to dataframe
df = pd.DataFrame(results)

# convert the data to the format of candlestick chart

# Convert datetime string to datetime object
df['Datetime'] = pd.to_datetime(df['Datetime'])
df.set_index('Datetime', inplace=True)

# Sort by datetime
df = df.sort_index()

# Create the candlestick chart
mc = mpf.make_marketcolors(up='g', down='r',
                          edge='inherit',
                          wick='inherit',
                          volume='in')
style = mpf.make_mpf_style(marketcolors=mc,
                          gridstyle='--',
                          y_on_right=True)

mpf.plot(df, 
        type='candle',
        title=f'QQQ 1-Min Candlestick Chart ({trade_date})',
        volume=True,
        style=style,
        figsize=(15,8),
        panel_ratios=(3,1))
