In [4]:
import yfinance as yf

def fetch_stock_data(tickers, start_date, end_date):
    data = {}
    for ticker in tickers:
        try:
            stock_data = yf.download(ticker, start=start_date, end=end_date)
            data[ticker] = stock_data
        except Exception as e:
            print(f"Error fetching data for {ticker}: {e}")
    return data

tickers = ['AAPL', 'MSFT', 'GOOGL', 'AMZN', 'FB']
start_date = '2023-01-01'
end_date = '2024-01-01'

stock_data = fetch_stock_data(tickers, start_date, end_date)
print(stock_data)

[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
ERROR:yfinance:
1 Failed download:
ERROR:yfinance:['FB']: Exception('%ticker%: No timezone found, symbol may be delisted')


{'AAPL':                   Open        High         Low       Close   Adj Close  \
Date                                                                     
2023-01-03  130.279999  130.899994  124.169998  125.070000  124.216301   
2023-01-04  126.889999  128.660004  125.080002  126.360001  125.497505   
2023-01-05  127.129997  127.769997  124.760002  125.019997  124.166641   
2023-01-06  126.010002  130.289993  124.889999  129.619995  128.735229   
2023-01-09  130.470001  133.410004  129.889999  130.149994  129.261627   
...                ...         ...         ...         ...         ...   
2023-12-22  195.179993  195.410004  192.970001  193.600006  193.353287   
2023-12-26  193.610001  193.889999  192.830002  193.050003  192.803986   
2023-12-27  192.490005  193.500000  191.089996  193.149994  192.903839   
2023-12-28  194.139999  194.660004  193.169998  193.580002  193.333298   
2023-12-29  193.899994  194.399994  191.729996  192.529999  192.284637   

               Volume  
Date

In [12]:
import pandas as pd
import json
import numpy as np

def ingest_ohlc_data(file_path, file_format):
    if file_format.lower() == 'csv':
        df = pd.read_csv(file_path)
    elif file_format.lower() == 'json':
        with open(file_path, 'r') as f:
            data = json.load(f)
        df = pd.DataFrame(data)
    else:
        raise ValueError("Unsupported file format. Supported formats are CSV and JSON.")

    # Validate data integrity
    if df.isnull().values.any():
        print("Warning: Missing values found in the data.")
        # Replace missing values with NaN
        df.replace('', np.nan, inplace=True)

    # Check for outliers
    numeric_columns = df.select_dtypes(include=np.number).columns
    z_scores = np.abs((df[numeric_columns] - df[numeric_columns].mean()) / df[numeric_columns].std())
    outliers = df[(z_scores > 3).any(axis=1)]  # Adjust threshold as needed (e.g., z-score > 3 for extreme outliers)

    if not outliers.empty:
        print("Outliers found:")
        print(outliers)
        # Correct outliers using domain knowledge or statistical methods
        df[numeric_columns] = df[numeric_columns].mask(z_scores > 3, np.nan)  # Replace outliers with NaN
        df.fillna(df.median(), inplace=True)
    # Address timestamp inconsistencies
    if 'Date' in df.columns:
        df['Date'] = pd.to_datetime(df['Date'])  # Convert Date column to datetime if not already
        df.set_index('Date', inplace=True)  # Set Date as index

    return df

# Example usage:
file_path = 'RELIANCE.csv'
file_format = 'csv'

ohlc_data = ingest_ohlc_data(file_path, file_format)
print(ohlc_data.head())



Outliers found:
            Date    Symbol Series  Prev Close     Open     High      Low  \
2     2000-01-05  RELIANCE     EQ      271.85   256.65   287.90   256.65   
4     2000-01-07  RELIANCE     EQ      294.35   295.00   317.90   293.00   
61    2000-03-31  RELIANCE     EQ      293.00   297.00   316.45   280.10   
62    2000-04-03  RELIANCE     EQ      316.45   327.65   333.70   318.10   
64    2000-04-05  RELIANCE     EQ      302.35   303.40   312.00   278.25   
...          ...       ...    ...         ...      ...      ...      ...   
5070  2020-05-22  RELIANCE     EQ     1441.25  1451.80  1458.00  1426.50   
5071  2020-05-26  RELIANCE     EQ     1431.55  1448.15  1449.70  1416.30   
5072  2020-05-27  RELIANCE     EQ     1424.05  1431.00  1454.00  1412.00   
5073  2020-05-28  RELIANCE     EQ     1445.55  1455.00  1479.75  1449.00   
5074  2020-05-29  RELIANCE     EQ     1472.25  1468.00  1472.00  1452.65   

         Last    Close     VWAP    Volume      Turnover    Trades  \
2 

  df.fillna(df.median(), inplace=True)


In [16]:
def calculate_technical_indicators(df):
    # Calculate moving averages (e.g., 50-day and 200-day)
    df['MA50'] = df['Close'].rolling(window=50).mean()
    df['MA200'] = df['Close'].rolling(window=200).mean()

    # Calculate Bollinger Bands
    window = 20
    df['MA20'] = df['Close'].rolling(window=window).mean()
    df['std_dev'] = df['Close'].rolling(window=window).std()
    df['Upper_Band'] = df['MA20'] + (df['std_dev'] * 2)
    df['Lower_Band'] = df['MA20'] - (df['std_dev'] * 2)

    # Calculate Relative Strength Index (RSI)
    delta = df['Close'].diff()
    gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
    loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
    rs = gain / loss
    df['RSI'] = 100 - (100 / (1 + rs))

    return df
ohlc_data_with_indicators = calculate_technical_indicators(ohlc_data)
print(ohlc_data_with_indicators)

              Symbol Series  Prev Close     Open     High      Low     Last  \
Date                                                                          
2000-01-03  RELIANCE     EQ      233.05   237.50   251.70   237.50   251.70   
2000-01-04  RELIANCE     EQ      251.70   258.40   271.85   251.30   271.85   
2000-01-05  RELIANCE     EQ      271.85   256.65   287.90   256.65   286.75   
2000-01-06  RELIANCE     EQ      282.50   289.00   300.70   289.00   293.50   
2000-01-07  RELIANCE     EQ      294.35   295.00   317.90   293.00   314.50   
...              ...    ...         ...      ...      ...      ...      ...   
2020-05-22  RELIANCE     EQ     1441.25  1451.80  1458.00  1426.50  1433.00   
2020-05-26  RELIANCE     EQ     1431.55  1448.15  1449.70  1416.30  1426.00   
2020-05-27  RELIANCE     EQ     1424.05  1431.00  1454.00  1412.00  1449.85   
2020-05-28  RELIANCE     EQ     1445.55  1455.00  1479.75  1449.00  1471.05   
2020-05-29  RELIANCE     EQ     1472.25  1468.00  14

In [23]:
def apply_feature_engineering(df):
    # Price Range
    df['Price_Range'] = df['High'] - df['Low']

    # Volatility (Average True Range)
    df['Volatility'] = df['High'].rolling(window=14).max() - df['Low'].rolling(window=14).min()

    # Price Pattern
    df['Price_Pattern'] = np.where(df['Close'] > df['Open'], 1, 0)

    return df
ohlc_data_with_indicators = calculate_technical_indicators(ohlc_data)
print(ohlc_data_with_indicators)

              Symbol Series  Prev Close     Open     High      Low     Last  \
Date                                                                          
2000-01-03  RELIANCE     EQ      233.05   237.50   251.70   237.50   251.70   
2000-01-04  RELIANCE     EQ      251.70   258.40   271.85   251.30   271.85   
2000-01-05  RELIANCE     EQ      271.85   256.65   287.90   256.65   286.75   
2000-01-06  RELIANCE     EQ      282.50   289.00   300.70   289.00   293.50   
2000-01-07  RELIANCE     EQ      294.35   295.00   317.90   293.00   314.50   
...              ...    ...         ...      ...      ...      ...      ...   
2020-05-22  RELIANCE     EQ     1441.25  1451.80  1458.00  1426.50  1433.00   
2020-05-26  RELIANCE     EQ     1431.55  1448.15  1449.70  1416.30  1426.00   
2020-05-27  RELIANCE     EQ     1424.05  1431.00  1454.00  1412.00  1449.85   
2020-05-28  RELIANCE     EQ     1445.55  1455.00  1479.75  1449.00  1471.05   
2020-05-29  RELIANCE     EQ     1472.25  1468.00  14

In [40]:
import logging
import pandas as pd
import json

def ingest_ohlc_data(file_path, file_format, resample_frequency=None):
    try:
        if file_format.lower() == 'csv':
            df = pd.read_csv(file_path)
        elif file_format.lower() == 'json':
            with open(file_path, 'r') as f:
                data = json.load(f)
            df = pd.DataFrame(data)
        else:
            raise ValueError("Unsupported file format. Supported formats are CSV and JSON.")
    except Exception as e:
        # Log error message and raise exception
        logging.error(f"Error during data ingestion: {e}")
        raise
    else:
        # Log successful ingestion
        logging.info("Data ingestion completed successfully.")
        return df

if __name__ == "__main__":
    # Configure logging
    logging.basicConfig(filename='data_pipeline.log', level=logging.DEBUG)


    # Call the data ingestion function
    try:
        df = ingest_ohlc_data('RELIANCE.csv', 'csv', 'H')
        # Add further processing here if needed
    except Exception as e:
        # Send alert/notification for error
        logging.error(f"Data pipeline encountered an error: {e}")
        # Implement alerting mechanism (e.g., send email, trigger notification)


In [59]:
import sqlite3
import pandas as pd
import numpy as np

def create_database(db_file, csv_file):
    df = pd.read_csv(csv_file, nrows=1)  # Read only the first row to get column names and types

    conn = sqlite3.connect(db_file)
    cursor = conn.cursor()

    # Construct the CREATE TABLE statement based on CSV columns and their types
    create_table_sql = "CREATE TABLE IF NOT EXISTS ohlc_data ("
    for column_name, dtype in zip(df.columns, df.dtypes):
        # Sanitize column name
        column_name_sanitized = column_name.replace(' ', '_').replace('%', 'pct')
        # Enclose column name in double quotes if it contains special characters or spaces
        if any(c in column_name for c in ' !"#$%&\'()*+,-./:;<=>?@[\\]^`{|}~'):
            column_name_sanitized = f'"{column_name_sanitized}"'
        sqlite_type = 'REAL' if dtype == np.float64 else 'INTEGER'  # Map numpy types to SQLite types
        create_table_sql += f"{column_name_sanitized} {sqlite_type}, "
    create_table_sql += "Symbol TEXT" + ")"  # Add Symbol column
    cursor.execute(create_table_sql)

    conn.commit()
    conn.close()

def ingest_ohlc_data_to_db(file_path, db_file):
    df = pd.read_csv(file_path)

    # Drop 'Symbol' column if present in the DataFrame
    if 'Symbol' in df.columns:
        df.drop(columns=['Symbol'], inplace=True)
    if 'Series' in df.columns:
        df.drop(columns=['Series'], inplace=True)
    if 'Prev Close' in df.columns:
        df.drop(columns=['Prev Close'], inplace=True)
    if 'Last' in df.columns:
        df.drop(columns=['Last'], inplace=True)
    if 'VWAP' in df.columns:
        df.drop(columns=['VWAP'], inplace=True)
    if 'Turnover' in df.columns:
        df.drop(columns=['Turnover'], inplace=True)
    if 'Trades' in df.columns:
        df.drop(columns=['Trades'], inplace=True)
    if 'Deliverable Volume' in df.columns:
        df.drop(columns=['Deliverable Volume'], inplace=True)
    if '%Deliverble' in df.columns:
        df.drop(columns=['%Deliverble'], inplace=True)


    conn = sqlite3.connect(db_file)
    df.to_sql('ohlc_data', conn, if_exists='append', index=False)
    conn.close()

def partition_data_by_year_month(db_file):
    conn = sqlite3.connect(db_file)
    cursor = conn.cursor()

    cursor.execute('''CREATE TABLE IF NOT EXISTS ohlc_data_partitioned (
                        Year INTEGER,
                        Month INTEGER,
                        Date TEXT,
                        Open REAL,
                        High REAL,
                        Low REAL,
                        Close REAL,
                        Volume INTEGER,
                        PRIMARY KEY (Year, Month, Date)
                        )''')

    cursor.execute('''INSERT INTO ohlc_data_partitioned
                    SELECT strftime('%Y', Date) AS Year,
                           strftime('%m', Date) AS Month,
                           *
                    FROM ohlc_data''')

    conn.commit()
    conn.close()

if __name__ == "__main__":
    db_file = 'ohlc_data.db'
    csv_file = 'RELIANCE.csv'

    # Step 1: Create SQLite database and define table structure
    create_database(db_file, csv_file)

    # Step 2: Ingest OHLC data from CSV file to SQLite database
    ingest_ohlc_data_to_db(csv_file, db_file)

    # Step 3: Partition the data by year and month
    partition_data_by_year_month(db_file)
