In [1]:
import numpy as np
import pandas as pd
import yfinance as yf  # for ingesting data from Yahoo Finance
import sqlite3  # for ingesting data from SQL

def ingest_ohlc_data(data_source):
    if data_source == "CSV":
        df = pd.read_csv(r"C:\Users\Hp\OneDrive\Desktop\Assignment\GOOGL_ohlc_data.csv")

    elif data_source == "JSON":
        # Get symbol OHLC data
        df = yf.download("CL=F")

    elif data_source == "SQL":
        engine = sqlite3.connect("stock_data.db")
        df = pd.read_sql("SELECT * FROM ohlc_data", engine)

    data_check(df)

    return df

def data_check(data):
    
    assert not data.empty, "DataFrame is empty. Please check the data source."

    num_missing_values = data.isna().sum().sum()
    assert num_missing_values == 0, f"There are {num_missing_values} missing values in the data."

    assert data["Open"].dtype == np.float64, "Data type for Open needs to be float64."
    assert data["High"].dtype == np.float64, "Data type for High needs to be float64."
    assert data["Low"].dtype == np.float64, "Data type for Low needs to be float64."
    assert data["Close"].dtype == np.float64, "Data type for Close needs to be float64."

    print("Data integrity is valid. No missing values or type inconsistencies detected.")
    print("\n")

df = ingest_ohlc_data("CSV")
print(df)

Data integrity is valid. No missing values or type inconsistencies detected.


           Date        Open        High         Low       Close   Adj Close  \
0    2023-01-03   89.589996   91.050003   88.519997   89.120003   89.120003   
1    2023-01-04   90.349998   90.650002   87.269997   88.080002   88.080002   
2    2023-01-05   87.470001   87.570000   85.900002   86.199997   86.199997   
3    2023-01-06   86.790001   87.690002   84.860001   87.339996   87.339996   
4    2023-01-09   88.360001   90.050003   87.860001   88.019997   88.019997   
..          ...         ...         ...         ...         ...         ...   
245  2023-12-22  140.770004  141.990005  140.710007  141.490005  141.490005   
246  2023-12-26  141.589996  142.679993  141.190002  141.520004  141.520004   
247  2023-12-27  141.589996  142.080002  139.889999  140.369995  140.369995   
248  2023-12-28  140.779999  141.139999  139.750000  140.229996  140.229996   
249  2023-12-29  139.630005  140.360001  138.779999 

In [5]:
import pandas as pd
import numpy as np

def outlier_handling(df, column_with_outliers):
    q1 = df[column_with_outliers].quantile(0.25)
    q3 = df[column_with_outliers].quantile(0.75)
    iqr = q3 - q1
    # Remove outliers
    df = df[(df[column_with_outliers] > (q1 - 1.5 * iqr)) & (df[column_with_outliers] < (q3 + 1.5 * iqr))]
    return df

def date_formatting(df, column_with_date):
    # format date column
    df[column_with_date] = pd.to_datetime(df[column_with_date], format='%Y-%m-%d')
    return df

def remove_missing_values(df):
    # Remove rows with missing values
    df.dropna(inplace=True)
    return df

def ohlc_data_cleaning_pipeline(df_path, column_with_outliers, column_with_date):
    df = pd.read_csv(df_path)
    print('Before cleaning:')
    print(df.head())
    
    df_no_outliers = outlier_handling(df, column_with_outliers)
    df_date_formatted = date_formatting(df_no_outliers, column_with_date)
    clean_df = remove_missing_values(df_date_formatted)
    
    print('\nAfter cleaning:')
    print(clean_df.head())
    
    return clean_df

clean_df = ohlc_data_cleaning_pipeline(r"C:\Users\Hp\OneDrive\Desktop\Assignment\GOOGL_ohlc_data.csv", 'Close', 'Date')


Before cleaning:
         Date       Open       High        Low      Close  Adj Close    Volume
0  2023-01-03  89.589996  91.050003  88.519997  89.120003  89.120003  28131200
1  2023-01-04  90.349998  90.650002  87.269997  88.080002  88.080002  34854800
2  2023-01-05  87.470001  87.570000  85.900002  86.199997  86.199997  27194400
3  2023-01-06  86.790001  87.690002  84.860001  87.339996  87.339996  41381500
4  2023-01-09  88.360001  90.050003  87.860001  88.019997  88.019997  29003900

After cleaning:
        Date       Open       High        Low      Close  Adj Close    Volume
0 2023-01-03  89.589996  91.050003  88.519997  89.120003  89.120003  28131200
1 2023-01-04  90.349998  90.650002  87.269997  88.080002  88.080002  34854800
2 2023-01-05  87.470001  87.570000  85.900002  86.199997  86.199997  27194400
3 2023-01-06  86.790001  87.690002  84.860001  87.339996  87.339996  41381500
4 2023-01-09  88.360001  90.050003  87.860001  88.019997  88.019997  29003900


In [6]:
import pandas as pd

def calculate_sma(data, window=20):
  
    return data['Close'].rolling(window=window).mean()

def calculate_bollinger_bands(data, window=20, num_std=2):
    
    sma = data['Close'].rolling(window=window).mean()
    std = data['Close'].rolling(window=window).std()
    upper_band = sma + (num_std * std)
    lower_band = sma - (num_std * std)
    return upper_band, lower_band

def calculate_rsi(data, window=14):
    
    delta = data['Close'].diff()
    gain = (delta.where(delta > 0, 0)).rolling(window=window).mean()
    loss = (-delta.where(delta < 0, 0)).rolling(window=window).mean()
    rs = gain / loss
    rsi = 100 - (100 / (1 + rs))
    return rsi


# Calculate SMA with a window of 20 days
df['SMA_20'] = calculate_sma(df, window=20)

# Calculate Bollinger Bands with a window of 20 days and 2 standard deviations
upper_band, lower_band = calculate_bollinger_bands(df, window=20, num_std=2)
df['Upper_BB'] = upper_band
df['Lower_BB'] = lower_band

# Calculate RSI with a window of 14 days
df['RSI_14'] = calculate_rsi(df, window=14)

# Print DataFrame with calculated indicators
print(df)


           Date        Open        High         Low       Close   Adj Close  \
0    2023-01-03   89.589996   91.050003   88.519997   89.120003   89.120003   
1    2023-01-04   90.349998   90.650002   87.269997   88.080002   88.080002   
2    2023-01-05   87.470001   87.570000   85.900002   86.199997   86.199997   
3    2023-01-06   86.790001   87.690002   84.860001   87.339996   87.339996   
4    2023-01-09   88.360001   90.050003   87.860001   88.019997   88.019997   
..          ...         ...         ...         ...         ...         ...   
245  2023-12-22  140.770004  141.990005  140.710007  141.490005  141.490005   
246  2023-12-26  141.589996  142.679993  141.190002  141.520004  141.520004   
247  2023-12-27  141.589996  142.080002  139.889999  140.369995  140.369995   
248  2023-12-28  140.779999  141.139999  139.750000  140.229996  140.229996   
249  2023-12-29  139.630005  140.360001  138.779999  139.690002  139.690002   

       Volume      SMA_20    Upper_BB    Lower_BB  

In [7]:
import pandas as pd
import numpy as np

def calculate_volatility_measures(data, window=20):
    """
    Calculate volatility measures such as standard deviation and ATR.
    """
    # Standard Deviation
    data['Std_Dev'] = data['Close'].rolling(window=window).std()

    # Average True Range (ATR)
    high_low_range = data['High'] - data['Low']
    high_close_range = np.abs(data['High'] - data['Close'].shift())
    low_close_range = np.abs(data['Low'] - data['Close'].shift())
    true_range = pd.concat([high_low_range, high_close_range, low_close_range], axis=1).max(axis=1)
    data['ATR'] = true_range.rolling(window=window).mean()

    return data

def calculate_price_patterns(data, short_window=12, long_window=26, signal_window=9):
    """
    Calculate price patterns such as MACD and EMA.
    """
    # Moving Average Convergence Divergence (MACD)
    data['EMA_short'] = data['Close'].ewm(span=short_window, min_periods=1).mean()
    data['EMA_long'] = data['Close'].ewm(span=long_window, min_periods=1).mean()
    data['MACD'] = data['EMA_short'] - data['EMA_long']
    data['Signal_line'] = data['MACD'].ewm(span=signal_window, min_periods=1).mean()

    # Exponential Moving Average (EMA)
    data['EMA_9'] = data['Close'].ewm(span=9, min_periods=1).mean()

    return data

# Calculate volatility measures
df = calculate_volatility_measures(df)

# Calculate price patterns
df = calculate_price_patterns(df)

# Print DataFrame with calculated features
print(df)


           Date        Open        High         Low       Close   Adj Close  \
0    2023-01-03   89.589996   91.050003   88.519997   89.120003   89.120003   
1    2023-01-04   90.349998   90.650002   87.269997   88.080002   88.080002   
2    2023-01-05   87.470001   87.570000   85.900002   86.199997   86.199997   
3    2023-01-06   86.790001   87.690002   84.860001   87.339996   87.339996   
4    2023-01-09   88.360001   90.050003   87.860001   88.019997   88.019997   
..          ...         ...         ...         ...         ...         ...   
245  2023-12-22  140.770004  141.990005  140.710007  141.490005  141.490005   
246  2023-12-26  141.589996  142.679993  141.190002  141.520004  141.520004   
247  2023-12-27  141.589996  142.080002  139.889999  140.369995  140.369995   
248  2023-12-28  140.779999  141.139999  139.750000  140.229996  140.229996   
249  2023-12-29  139.630005  140.360001  138.779999  139.690002  139.690002   

       Volume      SMA_20    Upper_BB    Lower_BB  

In [8]:
import pandas as pd

def resample_ohlc_data(data, frequency='H'):
    """
    Resample OHLC data based on desired frequency.
    """
    # Set the 'Date' column as the index
    data['Date'] = pd.to_datetime(data['Date'])
    data.set_index('Date', inplace=True)

    # Resample OHLC data
    resampled_data = data.resample(frequency).agg({
        'Open': 'first',
        'High': 'max',
        'Low': 'min',
        'Close': 'last',
        'Volume': 'sum'
    })

    return resampled_data


# Resample OHLC data from daily to hourly frequency
hourly_data = resample_ohlc_data(df, frequency='H')

# Print resampled DataFrame
print(hourly_data)


                           Open        High         Low       Close    Volume
Date                                                                         
2023-01-03 00:00:00   89.589996   91.050003   88.519997   89.120003  28131200
2023-01-03 01:00:00         NaN         NaN         NaN         NaN         0
2023-01-03 02:00:00         NaN         NaN         NaN         NaN         0
2023-01-03 03:00:00         NaN         NaN         NaN         NaN         0
2023-01-03 04:00:00         NaN         NaN         NaN         NaN         0
...                         ...         ...         ...         ...       ...
2023-12-28 20:00:00         NaN         NaN         NaN         NaN         0
2023-12-28 21:00:00         NaN         NaN         NaN         NaN         0
2023-12-28 22:00:00         NaN         NaN         NaN         NaN         0
2023-12-28 23:00:00         NaN         NaN         NaN         NaN         0
2023-12-29 00:00:00  139.630005  140.360001  138.779999  139.690

In [11]:
import numpy as np
import pandas as pd
import yfinance as yf  # for ingesting data from Yahoo Finance
import sqlite3  # for ingesting data from SQL
import unittest

def ingest_ohlc_data(data_source):
    if data_source == "CSV":
        df = pd.read_csv(r"C:\Users\Hp\OneDrive\Desktop\Assignment\GOOGL_ohlc_data.csv")

    elif data_source == "JSON":
        # Get symbol OHLC data
        df = yf.download("CL=F")

    elif data_source == "SQL":
        engine = sqlite3.connect("stock_data.db")
        df = pd.read_sql("SELECT * FROM ohlc_data", engine)

    return df

class TestDataIngestion(unittest.TestCase):

    def test_csv_ingestion(self):
        df = ingest_ohlc_data("CSV")
        self.assertIsNotNone(df)
        self.assertIsInstance(df, pd.DataFrame)

    # Add more tests for JSON and SQL ingestion if needed

class TestDataCheck(unittest.TestCase):

    def setUp(self):
        self.df = pd.read_csv(r"C:\Users\Hp\OneDrive\Desktop\Assignment\GOOGL_ohlc_data.csv")

    def test_data_check_not_empty(self):
        data_check(self.df)
        self.assertFalse(self.df.empty)

    def test_data_check_no_missing_values(self):
        data_check(self.df)
        self.assertEqual(self.df.isna().sum().sum(), 0)

    def test_data_check_dtypes(self):
        data_check(self.df)
        self.assertEqual(self.df["Open"].dtype, np.float64)
        self.assertEqual(self.df["High"].dtype, np.float64)
        self.assertEqual(self.df["Low"].dtype, np.float64)
        self.assertEqual(self.df["Close"].dtype, np.float64)

def outlier_handling(df, column_with_outliers):
    q1 = df[column_with_outliers].quantile(0.25)
    q3 = df[column_with_outliers].quantile(0.75)
    iqr = q3 - q1
    # Remove outliers
    df = df[(df[column_with_outliers] > (q1 - 1.5 * iqr)) & (df[column_with_outliers] < (q3 + 1.5 * iqr))]
    return df

def date_formatting(df, column_with_date):
    # format date column
    df[column_with_date] = pd.to_datetime(df[column_with_date], format='%Y-%m-%d')
    return df

def remove_missing_values(df):
    # Remove rows with missing values
    df.dropna(inplace=True)
    return df

class TestDataCleaningPipeline(unittest.TestCase):

    def setUp(self):
        self.df = pd.read_csv(r"C:\Users\Hp\OneDrive\Desktop\Assignment\GOOGL_ohlc_data.csv")

    def test_outlier_handling(self):
        df_cleaned = outlier_handling(self.df, 'Close')
        self.assertEqual(len(self.df), len(df_cleaned))

    def test_date_formatting(self):
        df_formatted = date_formatting(self.df, 'Date')
        self.assertTrue(isinstance(df_formatted['Date'].iloc[0], pd.Timestamp))

    def test_remove_missing_values(self):
        df_no_missing = remove_missing_values(self.df)
        self.assertEqual(len(df_no_missing), len(self.df) - self.df.isna().sum().sum())

unittest.main(argv=[''], verbosity=2, exit=False)


test_data_check_dtypes (__main__.TestDataCheck) ... ok
test_data_check_no_missing_values (__main__.TestDataCheck) ... ok


Data integrity is valid. No missing values or type inconsistencies detected.


Data integrity is valid. No missing values or type inconsistencies detected.




test_data_check_not_empty (__main__.TestDataCheck) ... 

Data integrity is valid. No missing values or type inconsistencies detected.




ok
test_date_formatting (__main__.TestDataCleaningPipeline) ... ok
test_outlier_handling (__main__.TestDataCleaningPipeline) ... ok
test_remove_missing_values (__main__.TestDataCleaningPipeline) ... ok
test_csv_ingestion (__main__.TestDataIngestion) ... ok
test_correct_outliers (__main__.TestDataPipeline) ... ok
test_detect_outliers_iqr (__main__.TestDataPipeline) ... ok

----------------------------------------------------------------------
Ran 9 tests in 0.879s

OK


<unittest.main.TestProgram at 0x19a57ccc8b0>

In [12]:
import logging

# Configure logging
logging.basicConfig(level=logging.INFO)

def ingest_ohlc_data(data_source):
    try:
        if data_source == "CSV":
            df = pd.read_csv(r"C:\Users\Hp\OneDrive\Desktop\Assignment\GOOGL_ohlc_data.csv")
        elif data_source == "JSON":
            # Get symbol OHLC data
            df = yf.download("CL=F")
        elif data_source == "SQL":
            engine = sqlite3.connect("stock_data.db")
            df = pd.read_sql("SELECT * FROM ohlc_data", engine)

        data_check(df)
        logging.info("Data ingestion successful.")
        return df

    except Exception as e:
        logging.error("Error during data ingestion: %s", e)
        raise

def data_check(data):
    try:
        assert not data.empty, "DataFrame is empty. Please check the data source."
        num_missing_values = data.isna().sum().sum()
        assert num_missing_values == 0, f"There are {num_missing_values} missing values in the data."
        assert data["Open"].dtype == np.float64, "Data type for Open needs to be float64."
        assert data["High"].dtype == np.float64, "Data type for High needs to be float64."
        assert data["Low"].dtype == np.float64, "Data type for Low needs to be float64."
        assert data["Close"].dtype == np.float64, "Data type for Close needs to be float64."
        logging.info("Data integrity check passed.")

    except AssertionError as ae:
        logging.warning("Data quality issue: %s", ae)
        raise


try:
    df = ingest_ohlc_data("CSV")
except Exception as e:
    logging.error("Pipeline failed with error: %s", e)


INFO:root:Data integrity check passed.
INFO:root:Data ingestion successful.


In [13]:
import pandas as pd
import numpy as np
import yfinance as yf
import sqlite3

def ingest_ohlc_data(data_source, db_file):
    if data_source == "CSV":
        df = pd.read_csv(r"C:\Users\Hp\OneDrive\Desktop\Assignment\GOOGL_ohlc_data.csv")

    elif data_source == "JSON":
        # Get symbol OHLC data
        df = yf.download("CL=F")

    elif data_source == "SQL":
        engine = sqlite3.connect(db_file)
        df = pd.read_sql("SELECT * FROM ohlc_data", engine)

    data_check(df)

    return df

def data_check(data):
    assert not data.empty, "DataFrame is empty. Please check the data source."

    num_missing_values = data.isna().sum().sum()
    assert num_missing_values == 0, f"There are {num_missing_values} missing values in the data."

    assert data["Open"].dtype == np.float64, "Data type for Open needs to be float64."
    assert data["High"].dtype == np.float64, "Data type for High needs to be float64."
    assert data["Low"].dtype == np.float64, "Data type for Low needs to be float64."
    assert data["Close"].dtype == np.float64, "Data type for Close needs to be float64."

    print("Data integrity is valid. No missing values or type inconsistencies detected.")

def save_to_sqlite(data, db_file, table_name='ohlc_data'):
    engine = sqlite3.connect(db_file)
    data.to_sql(table_name, engine, if_exists='replace', index=False)

# Example usage
db_file = 'ohlc_data.db'
df = ingest_ohlc_data("CSV", db_file)
save_to_sqlite(df, db_file)


Data integrity is valid. No missing values or type inconsistencies detected.


In [14]:
import pandas as pd

def partition_data(data):
    
    # Convert the 'Date' column to datetime if it's not already in datetime format
    data['Date'] = pd.to_datetime(data['Date'])

    # Extract year and month from the 'Date' column
    data['Year'] = data['Date'].dt.year
    data['Month'] = data['Date'].dt.month

    # Group the data by year and month
    grouped_data = data.groupby(['Year', 'Month'])

    return grouped_data

# Partition the data
partitioned_data = partition_data(df)

# Iterate over each partition and print the first few rows
for (year, month), partition in partitioned_data:
    print(f"Year: {year}, Month: {month}")
    print(partition.head())
    print("\n")


Year: 2023, Month: 1
        Date       Open       High        Low      Close  Adj Close    Volume  \
0 2023-01-03  89.589996  91.050003  88.519997  89.120003  89.120003  28131200   
1 2023-01-04  90.349998  90.650002  87.269997  88.080002  88.080002  34854800   
2 2023-01-05  87.470001  87.570000  85.900002  86.199997  86.199997  27194400   
3 2023-01-06  86.790001  87.690002  84.860001  87.339996  87.339996  41381500   
4 2023-01-09  88.360001  90.050003  87.860001  88.019997  88.019997  29003900   

   Year  Month  
0  2023      1  
1  2023      1  
2  2023      1  
3  2023      1  
4  2023      1  


Year: 2023, Month: 2
         Date        Open        High         Low       Close   Adj Close  \
20 2023-02-01   98.709999  101.190002   97.580002  100.430000  100.430000   
21 2023-02-02  105.800003  107.849998  105.610001  107.739998  107.739998   
22 2023-02-03  102.930000  107.809998  102.580002  104.779999  104.779999   
23 2023-02-06  102.400002  104.360001  101.879997  102.9000

In [17]:
import pandas as pd

# Convert DataFrame to Parquet format
df.to_parquet('ohlc_data.parquet', compression='snappy')

# Read Parquet file
parquet_data = pd.read_parquet('ohlc_data.parquet')

# Print DataFrame from Parquet file
print(parquet_data)


          Date        Open        High         Low       Close   Adj Close  \
0   2023-01-03   89.589996   91.050003   88.519997   89.120003   89.120003   
1   2023-01-04   90.349998   90.650002   87.269997   88.080002   88.080002   
2   2023-01-05   87.470001   87.570000   85.900002   86.199997   86.199997   
3   2023-01-06   86.790001   87.690002   84.860001   87.339996   87.339996   
4   2023-01-09   88.360001   90.050003   87.860001   88.019997   88.019997   
..         ...         ...         ...         ...         ...         ...   
245 2023-12-22  140.770004  141.990005  140.710007  141.490005  141.490005   
246 2023-12-26  141.589996  142.679993  141.190002  141.520004  141.520004   
247 2023-12-27  141.589996  142.080002  139.889999  140.369995  140.369995   
248 2023-12-28  140.779999  141.139999  139.750000  140.229996  140.229996   
249 2023-12-29  139.630005  140.360001  138.779999  139.690002  139.690002   

       Volume  Year  Month  
0    28131200  2023      1  
1    