In [1]:
from pymongo.mongo_client import MongoClient
from pymongo.server_api import ServerApi
import dotenv
import os
from pydantic import BaseModel, Json
from datetime import datetime

class StockTimeSeries(BaseModel):
    #TODO: decide which fields to keep
    timestamp: datetime
    ticker: str
    open: float
    high: float
    low: float
    close: float
    volume: int
    # dividendYield: float
    # splits: int

def connect_to_mongodb():
    # Load environment variables from .env file
    dotenv.load_dotenv()
    uri = "mongodb://127.0.0.1:27017"
    client = MongoClient(uri, server_api=ServerApi('1'))
    db = client['stock_data']
    collection = db['stocks']
    return client, collection

In [2]:
connect_to_mongodb()

(MongoClient(host=['127.0.0.1:27017'], document_class=dict, tz_aware=False, connect=True, server_api=<pymongo.server_api.ServerApi object at 0x000001D9EE8C30E0>),
 Collection(Database(MongoClient(host=['127.0.0.1:27017'], document_class=dict, tz_aware=False, connect=True, server_api=<pymongo.server_api.ServerApi object at 0x000001D9EE8C30E0>), 'stock_data'), 'stocks'))

In [15]:
import pandas as pd
import glob 
import os

In [5]:
client, collection = connect_to_mongodb()

In [12]:
def insert_into_mongodb(csv_path:str, ticker:str, collection):
    df = pd.read_csv(csv_path)
    df['ticker'] = ticker
    df.sort_values(by='date', inplace=True, ascending=True)
    df['time'] = pd.to_datetime(df['date'])
    df = df[["time", "ticker", "open", "high", "low", "close", "volume"]]

    result = collection.insert_many(df.to_dict(orient="records"))
    return 

test

In [10]:
dataset_path= r"C:\Users\USER\Desktop\AQA_data\data base\dataset\ACC.csv"
ticker = "ACC"
insert_into_mongodb(dataset_path, ticker)

In [13]:

doc = collection.find_one({"ticker": "ACC"})
if doc:
    print("Document found:", doc)
else:
    print("Document not found.")


Document found: {'_id': ObjectId('68ac219a572a4b2eb1bd51ca'), 'time': datetime.datetime(2015, 2, 2, 9, 15), 'ticker': 'ACC', 'open': 1554.9, 'high': 1556.7, 'low': 1544.8, 'close': 1549.6, 'volume': 1441}


In [18]:
base_dir = r"C:\Users\USER\Desktop\AQA_data\data base\dataset"
csv_list = glob.glob(os.path.join(base_dir, "*.csv"))
csv_list.sort()
write_record = pd.read_csv(r"C:\Users\USER\write_record.csv")

In [16]:
def seed_database(dataframe):
        for idx, row in dataframe.iterrows():
            if row['written'] == 1:
                ticker = os.path.splitext(os.path.basename(row['stocks']))[0]
                if ticker == 'ACC':
                    continue #already written
                insert_into_mongodb(row["stocks"], ticker, collection)
                dataframe.at[idx, 'mongo_written'] = 1
                dataframe.to_csv("write_record.csv", index=False)
                print(f"Inserted data for {ticker}")

In [19]:
seed_database(write_record)

Inserted data for ADANIENT
Inserted data for ADANIGREEN
Inserted data for ADANIPORTS
Inserted data for AMBUJACEM
Inserted data for APOLLOHOSP
Inserted data for ASIANPAINT
Inserted data for AUROPHARMA
Inserted data for AXISBANK
Inserted data for BAJAJ-AUTO
Inserted data for BAJAJHLDNG
Inserted data for BAJFINANCE
Inserted data for BANDHANBNK
Inserted data for BANKBARODA
Inserted data for BERGEPAINT
Inserted data for BHARTIARTL
Inserted data for BIOCON


In [3]:
import time
from concurrent.futures import ThreadPoolExecutor

def run_query(start_time: str, end_time: str):
    start = time.time()
    result = collection.find(
        {
            "time": {"$gt": start_time, "$lt": end_time},
            "ticker": "ACC"
        },
        {"_id": 0, "close": 1}
    )
    latency = time.time() - start
    return latency


def run_query_concurrent(start_time: str, end_time: str, concurrency: int):
    with ThreadPoolExecutor(max_workers=concurrency) as executor:
        latencies = list(executor.map(
            lambda _: run_query(start_time, end_time),
            range(concurrency)
        ))
    return sum(latencies) / len(latencies)

def benchmark_performance_by_returned_rows(start_time: str, end_time: str, concurrency: int = 1):
    row_count = collection.count_documents({
        "time": {"$gt": start_time, "$lt": end_time},
        "ticker": "ADANIENT"
    })
    print(f"Row count for the query: {row_count}")
    return run_query_concurrent(start_time, end_time, concurrency)

In [6]:
start_time= datetime.strptime("2015-02-02 09:15:00", "%Y-%m-%d %H:%M:%S")
end_time= datetime.strptime("2015-02-03 09:15:00", "%Y-%m-%d %H:%M:%S")
benchmark_performance_by_returned_rows(start_time, end_time)

Row count for the query: 374


0.00011658668518066406

In [7]:
start_time= datetime.strptime("2015-02-02 09:15:00", "%Y-%m-%d %H:%M:%S")
end_time= datetime.strptime("2015-03-02 09:15:00", "%Y-%m-%d %H:%M:%S")
benchmark_performance_by_returned_rows(start_time, end_time)

Row count for the query: 7499


0.00021791458129882812

In [8]:
start_time= datetime.strptime("2015-02-02 09:15:00", "%Y-%m-%d %H:%M:%S")
end_time= datetime.strptime("2016-02-02 09:15:00", "%Y-%m-%d %H:%M:%S")
benchmark_performance_by_returned_rows(start_time, end_time)

Row count for the query: 92232


5.0067901611328125e-05

In [10]:
start_time= datetime.strptime("2015-02-02 09:15:00", "%Y-%m-%d %H:%M:%S")
end_time= datetime.strptime("2020-02-02 09:15:00", "%Y-%m-%d %H:%M:%S")
benchmark_performance_by_returned_rows(start_time, end_time)

Row count for the query: 461577


2.7418136596679688e-05

In [9]:
start_time= datetime.strptime("2015-02-02 09:15:00", "%Y-%m-%d %H:%M:%S")
end_time= datetime.strptime("2025-02-02 09:15:00", "%Y-%m-%d %H:%M:%S")
benchmark_performance_by_returned_rows(start_time, end_time)

Row count for the query: 830903


4.2438507080078125e-05

In [40]:


def run_agg_query(start_time: str, end_time: str):
    start = time.time()
    pipeline = [
        {"$match": {
            "time": {"$gt": start_time, "$lt": end_time},
            "ticker": "ACC"
        }},
        {"$group": {
            "_id": None,
            "maxClose": {"$max": "$close"}
        }}
    ]
    result = list(collection.aggregate(pipeline))
    latency = time.time() - start
    return latency

def run_agg_query_concurrent(start_time: str, end_time: str, concurrency: int):
    with ThreadPoolExecutor(max_workers=concurrency) as executor:
        latencies = list(executor.map(
            lambda _: run_agg_query(start_time, end_time),
            range(concurrency)
        ))
    return sum(latencies) / len(latencies)

def benchmark_agg_by_involved_rows(start_time: str, end_time: str, concurrency: int = 1):
    row_count = collection.count_documents({
        "time": {"$gt": start_time, "$lt": end_time},
        "ticker": "ACC"
    })
    print(f"Row count for the query: {row_count}")
    return run_agg_query_concurrent(start_time, end_time, concurrency)


In [41]:
start_time= datetime.strptime("2015-02-02 09:15:00", "%Y-%m-%d %H:%M:%S")
end_time= datetime.strptime("2015-02-03 09:15:00", "%Y-%m-%d %H:%M:%S")
benchmark_agg_by_involved_rows(start_time, end_time)

Row count for the query: 374


4.769644021987915

In [42]:
start_time = datetime.strptime("2015-02-02 09:15:00", "%Y-%m-%d %H:%M:%S")
end_time = datetime.strptime("2015-03-02 09:15:00", "%Y-%m-%d %H:%M:%S")
benchmark_agg_by_involved_rows(start_time, end_time)

Row count for the query: 7499


4.693589448928833

In [43]:
start_time= datetime.strptime("2015-02-02 09:15:00", "%Y-%m-%d %H:%M:%S")
end_time= datetime.strptime("2016-02-02 09:15:00", "%Y-%m-%d %H:%M:%S")
benchmark_agg_by_involved_rows(start_time, end_time)

Row count for the query: 92232


4.239988088607788

In [45]:
start_time= datetime.strptime("2015-02-02 09:15:00", "%Y-%m-%d %H:%M:%S")
end_time= datetime.strptime("2020-02-02 09:15:00", "%Y-%m-%d %H:%M:%S")
benchmark_agg_by_involved_rows(start_time, end_time)

Row count for the query: 461635


4.384538650512695

In [46]:
start_time= datetime.strptime("2015-02-02 09:15:00", "%Y-%m-%d %H:%M:%S")
end_time= datetime.strptime("2025-02-02 09:15:00", "%Y-%m-%d %H:%M:%S")
benchmark_agg_by_involved_rows(start_time, end_time)

Row count for the query: 830959


4.3456196784973145

In [47]:
results =[]
for i in range(1, 31):
    start_time = datetime.strptime("2015-02-02 09:15:00", "%Y-%m-%d %H:%M:%S")
    end_time = datetime.strptime("2015-02-03 09:15:00", "%Y-%m-%d %H:%M:%S")
    results.append(benchmark_agg_by_involved_rows(start_time, end_time, concurrency=i))


Row count for the query: 374
Row count for the query: 374
Row count for the query: 374
Row count for the query: 374
Row count for the query: 374
Row count for the query: 374
Row count for the query: 374
Row count for the query: 374
Row count for the query: 374
Row count for the query: 374
Row count for the query: 374
Row count for the query: 374
Row count for the query: 374
Row count for the query: 374
Row count for the query: 374
Row count for the query: 374
Row count for the query: 374
Row count for the query: 374
Row count for the query: 374
Row count for the query: 374
Row count for the query: 374
Row count for the query: 374
Row count for the query: 374
Row count for the query: 374
Row count for the query: 374
Row count for the query: 374
Row count for the query: 374
Row count for the query: 374
Row count for the query: 374
Row count for the query: 374


In [48]:
results

[4.695425510406494,
 5.158757448196411,
 6.089565674463908,
 8.579655885696411,
 8.480598068237304,
 10.93638265132904,
 14.856348139899117,
 15.964705139398575,
 18.362651083204483,
 21.230886220932007,
 32.837878400629215,
 24.860192239284515,
 31.72158059707055,
 30.787520340510778,
 33.800696563720706,
 33.1061297506094,
 32.41942273869234,
 34.80714086691538,
 38.807517164631896,
 39.65705789327622,
 40.36796188354492,
 49.88952309435064,
 63.595354992410414,
 59.29600534836451,
 55.13662085533142,
 49.53926875958076,
 52.474648775877775,
 54.7389977148601,
 57.47686035879727,
 61.67925324440002]

In [None]:
results =[]
for i in range(1, 31):
    start_time = datetime.strptime("2015-02-02 09:15:00", "%Y-%m-%d %H:%M:%S")
    end_time = datetime.strptime("2015-02-03 09:15:00", "%Y-%m-%d %H:%M:%S")
    results.append(benchmark_performance_by_returned_rows(start_time, end_time, concurrency=i))


Row count for the query: 374
Row count for the query: 374
Row count for the query: 374
Row count for the query: 374
Row count for the query: 374
Row count for the query: 374
Row count for the query: 374
Row count for the query: 374
Row count for the query: 374
Row count for the query: 374
Row count for the query: 374
Row count for the query: 374
Row count for the query: 374
Row count for the query: 374
Row count for the query: 374
Row count for the query: 374
Row count for the query: 374
Row count for the query: 374
Row count for the query: 374
Row count for the query: 374
Row count for the query: 374
Row count for the query: 374
Row count for the query: 374
Row count for the query: 374
Row count for the query: 374


dataset_path= r"C:\Users\USER\Desktop\AQA_data\data base\dataset\ACC.csv"
