In [3]:
import os 
os.chdir('/app')

In [4]:
from dotenv import load_dotenv
load_dotenv(dotenv_path='config/.env')
database_path = os.getenv('DATABASE_PATH')
print(database_path)

database/db/ohlcv_data.db


In [14]:
import duckdb
import os
import pandas as pd
from concurrent.futures import ProcessPoolExecutor

def read_parquet_for_symbol(symbol, market_name='indian_equity', timeframe='1d', base_directory='database/finstore'):
    """
    Reads the Parquet file for a given symbol and returns it as a DataFrame.

    Parameters:
        symbol (str): The symbol to read data for.
        market_name (str): The market name (default: 'indian_equity').
        timeframe (str): The timeframe (default: '1d').
        base_directory (str): The base directory where data is stored (default: 'database/finstore').

    Returns:
        tuple: A tuple containing the symbol and its corresponding DataFrame.
    """
    # Define the directory path and file path based on the input parameters
    file_path = os.path.join(base_directory, f"market_name={market_name}", f"timeframe={timeframe}", symbol, 'ohlcv_data.parquet')
    technical_indicators_path = os.path.join(base_directory, f"market_name={market_name}", f"timeframe={timeframe}", symbol, 'technical_indicators.parquet')

    # Check if the file exists
    if not os.path.isfile(file_path):
        raise FileNotFoundError(f"Parquet file not found for symbol '{symbol}' at '{file_path}'")
    if not os.path.isfile(technical_indicators_path):
        print(f"Technical indicators file not found for symbol '{symbol}' at '{technical_indicators_path}'")

    # Create a DuckDB connection (in-memory for this operation)
    conn = duckdb.connect()
    conn.execute("PRAGMA threads=4")  # Use multiple threads for parallel reading

    # Read the entire Parquet file into a DataFrame
    df = conn.execute(f"SELECT * FROM read_parquet('{file_path}')").fetchdf()
    technical_indicators_df = conn.execute(f"SELECT * FROM read_parquet('{technical_indicators_path}')").fetchdf()

    # Ensure 'timestamp' is a column in both DataFrames
    technical_indicators_df = technical_indicators_df.drop_duplicates(subset=['timestamp', 'indicator_name'])

    # Pivot the technical indicators DataFrame
    technical_indicators_df = technical_indicators_df.pivot(index='timestamp', columns='indicator_name', values='indicator_value').reset_index()

    # Merge the two DataFrames on the 'timestamp' column
    merged_df = df.merge(technical_indicators_df, on='timestamp', how='left')

    # Close the DuckDB connection
    conn.close()

    return symbol, merged_df

def read_all_symbols(symbols, market_name='indian_equity', timeframe='1d', base_directory='database/finstore'):
    """
    Reads the Parquet files for all given symbols in parallel and returns a dictionary with the results.

    Parameters:
        symbols (list): List of symbols to read data for.
        market_name (str): The market name (default: 'indian_equity').
        timeframe (str): The timeframe (default: '1d').
        base_directory (str): The base directory where data is stored (default: 'database/finstore').

    Returns:
        dict: A dictionary with symbols as keys and their corresponding DataFrames as values.
    """
    results = {}
    with ProcessPoolExecutor() as executor:
        # Use ProcessPoolExecutor to read each symbol's data in parallel
        futures = {executor.submit(read_parquet_for_symbol, symbol, market_name, timeframe, base_directory): symbol for symbol in symbols}
        for future in futures:
            symbol = futures[future]
            try:
                symbol, df = future.result()
                results[symbol] = df
            except Exception as e:
                print(f"Error reading data for symbol {symbol}: {e}")
    
    return results

# Example of usage
# symbols_list = ['AAPL', 'GOOGL', 'MSFT']  # List of symbols to read
# data_dict = read_all_symbols(symbols_list, market_name='indian_equity', timeframe='1d')
# print(data_dict)


In [17]:
from data.fetch.indian_equity import fetch_symbol_list_indian_equity
symbol_list = fetch_symbol_list_indian_equity(index_name='nse_eq_symbols')

In [21]:
ohlcv_data = read_all_symbols(symbol_list, market_name='indian_equity', timeframe='1d')

In [22]:
ohlcv_data['360ONE.NS']

Unnamed: 0,timestamp,open,high,low,close,volume,market_name,timeframe,average_volume_90,ema_100,ema_200,ema_500,gap_90_0.15,slope_r2_product_15,slope_r2_product_30,slope_r2_product_90,spike_90_0.15,supertrend_7_3
0,2019-09-19 00:00:00,302.500000,317.625000,302.500000,317.625000,262.504547,indian_equity,1d,,317.625000,317.625000,317.625000,0.0,0.000000,0.000000,0.000000,0.0,
1,2019-09-20 00:00:00,332.250000,333.500000,332.250000,333.500000,275.624542,indian_equity,1d,,317.939356,317.782960,317.688373,0.0,0.000000,0.000000,0.000000,0.0,
2,2019-09-23 00:00:00,350.174988,350.174988,350.174988,350.174988,289.405823,indian_equity,1d,,318.577686,318.105269,317.818060,0.0,0.000000,0.000000,0.000000,0.0,
3,2019-09-24 00:00:00,367.674988,367.674988,367.674988,367.674988,303.868866,indian_equity,1d,,319.549910,318.598500,318.017090,0.0,0.000000,0.000000,0.000000,0.0,
4,2019-09-25 00:00:00,386.049988,386.049988,349.299988,351.237488,290.283936,indian_equity,1d,,320.177386,318.923266,318.149706,0.0,0.000000,0.000000,0.000000,0.0,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1247,2024-10-04 00:00:00,962.900024,1025.000000,951.599976,1011.200012,1011.200012,indian_equity,1d,982724.641278,974.411552,860.971307,678.480825,0.0,-0.004555,-0.001104,0.002781,0.0,1120.858073
1248,2024-10-07 00:00:00,1007.599976,1021.049988,955.200012,963.799988,963.799988,indian_equity,1d,986026.040214,974.201422,861.994478,679.619824,0.0,-0.005979,-0.001601,0.002524,0.0,1120.858073
1249,2024-10-08 00:00:00,965.000000,984.450012,953.400024,976.549988,976.549988,indian_equity,1d,989529.806710,974.247929,863.134334,680.805173,0.0,-0.006335,-0.001936,0.002298,0.0,1120.858073
1250,2024-10-09 00:00:00,979.950012,1022.250000,979.000000,1010.000000,1010.000000,indian_equity,1d,994094.424200,974.955891,864.595684,682.119324,0.0,-0.005582,-0.002087,0.002109,0.0,1120.858073
