## Data Ingestion


In [1]:
import pandas as pd
import numpy as np
import yfinance as yf
from retry import retry

## """
    Fetch OHLC data for a list of tickers from Yahoo Finance.

    Parameters:
    - tickers (list): List of ticker symbols for the equities.
    - start_date (str): Start date for fetching data in 'YYYY-MM-DD' format.
    - end_date (str): End date for fetching data in 'YYYY-MM-DD' format.

    Returns:
    - ohlc_data (dict): Dictionary containing OHLC data for each ticker.
    """

In [2]:
def fetch_ohlc_data(tickers,start_date,end_date):
    ohlc_data={}
    for ticker in tickers:
        try:
            # Fetch data from Yahoo Finance
            data = yf.download(ticker, start=start_date, end=end_date)
            if not data.empty:  # Check if data is not empty
                ohlc_data[ticker] = data
            else:
                print(f"No data available for {ticker}. Skipping.")
        except Exception as e:
            print(f"Error fetching data for {ticker}: {str(e)}. Skipping.")
    return ohlc_data

In [3]:
# Define the list of ticker symbols and date range
tickers = ['AAPL', 'MSFT', 'GOOGL', 'AMZN', 'TSLA', 'NFLX', 'NVDA', 'BA', 'JPM']
start_date = '2020-01-01'
end_date = '2021-01-01'

In [4]:
# Fetch OHLC data for the specified tickers and date range:
ohlc_data =fetch_ohlc_data(tickers, start_date,end_date)


[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed


In [5]:
# Display the number of tickers and the fetched data for one of the tickers (e.g., AAPL)
print(f"Number of tickers: {len(ohlc_data)}")
print("\nOHLC Data for AAPL:")
print(ohlc_data['AAPL'].head())

Number of tickers: 9

OHLC Data for AAPL:
                 Open       High        Low      Close  Adj Close     Volume
Date                                                                        
2020-01-02  74.059998  75.150002  73.797501  75.087502  73.059425  135480400
2020-01-03  74.287498  75.144997  74.125000  74.357498  72.349152  146322800
2020-01-06  73.447502  74.989998  73.187500  74.949997  72.925636  118387200
2020-01-07  74.959999  75.224998  74.370003  74.597504  72.582657  108872000
2020-01-08  74.290001  76.110001  74.290001  75.797501  73.750252  132079200


In [7]:
# Define a function to validate data integrity
def validate_data_integrity(ohlc_data):
    data_validity = {}  # Initialize an empty dictionary to store validation results for each ticker
    for ticker, data in ohlc_data.items():
        validity = {}  # Initialize an empty dictionary to store validation results for the current ticker
        validity['missing_values'] = data.isnull().sum().sum()  # Check for missing values
        validity['outliers'] = detect_outliers(data)  # Detect outliers
        validity['data_type_consistency'] = check_data_type_consistency(data)  # Check data type consistency
        data_validity[ticker] = validity
    return data_validity

# Define a function to detect outliers (example implementation, you may need to adjust)
def detect_outliers(data):
    # For simplicity, let's assume outliers are values that are 3 standard deviations away from the mean
    outliers = (data - data.mean()).abs() > 3 * data.std()
    return outliers.sum().sum()
# Define a function to check data type consistency
def check_data_type_consistency(data):
    # Check if all columns have the same data type
    return data.apply(lambda x: x.apply(type)).nunique().eq(1).all()
# Validate data integrity for the fetched OHLC data
data_validity = validate_data_integrity(ohlc_data)
# Display validation results
for ticker, validity in data_validity.items():
    print(f"Validation results for {ticker}:")
    print(f"- Missing Values: {validity['missing_values']}")
    print(f"- Outliers: {validity['outliers']}")
    print(f"- Data Type Consistency: {'Consistent' if validity['data_type_consistency'] else 'Inconsistent'}")

Validation results for AAPL:
- Missing Values: 0
- Outliers: 5
- Data Type Consistency: Consistent
Validation results for MSFT:
- Missing Values: 0
- Outliers: 4
- Data Type Consistency: Consistent
Validation results for GOOGL:
- Missing Values: 0
- Outliers: 5
- Data Type Consistency: Consistent
Validation results for AMZN:
- Missing Values: 0
- Outliers: 4
- Data Type Consistency: Consistent
Validation results for TSLA:
- Missing Values: 0
- Outliers: 5
- Data Type Consistency: Consistent
Validation results for NFLX:
- Missing Values: 0
- Outliers: 7
- Data Type Consistency: Consistent
Validation results for NVDA:
- Missing Values: 0
- Outliers: 3
- Data Type Consistency: Consistent
Validation results for BA:
- Missing Values: 0
- Outliers: 4
- Data Type Consistency: Consistent
Validation results for JPM:
- Missing Values: 0
- Outliers: 2
- Data Type Consistency: Consistent


In [10]:
# Define a function to standardize OHLC data to a common structure (DataFrame)
def standardize_data_format(ohlc_data):
    
    frames = []  # Initialize an empty list to store individual DataFrames for each ticker
    for ticker, data in ohlc_data.items():
        # Add a 'Ticker' column to identify the data for each ticker
        data['Ticker'] = ticker
        frames.append(data)
    # Concatenate all DataFrames into a single DataFrame
    ohlc_dataframe = pd.concat(frames)
    return ohlc_dataframe

# Standardize OHLC data to a common DataFrame format
ohlc_dataframe = standardize_data_format(ohlc_data)

# Display the standardized DataFrame
print("Standardized OHLC Data:")
print(ohlc_dataframe.head())


Standardized OHLC Data:
                 Open       High        Low      Close  Adj Close     Volume  \
Date                                                                           
2020-01-02  74.059998  75.150002  73.797501  75.087502  73.059425  135480400   
2020-01-03  74.287498  75.144997  74.125000  74.357498  72.349152  146322800   
2020-01-06  73.447502  74.989998  73.187500  74.949997  72.925636  118387200   
2020-01-07  74.959999  75.224998  74.370003  74.597504  72.582657  108872000   
2020-01-08  74.290001  76.110001  74.290001  75.797501  73.750252  132079200   

           Ticker  
Date               
2020-01-02   AAPL  
2020-01-03   AAPL  
2020-01-06   AAPL  
2020-01-07   AAPL  
2020-01-08   AAPL  


## DATA CLEANING


In [13]:
# Define a function to handle missing values in OHLC data
def handle_missing_values(ohlc_dataframe, method='imputation'):
    
    if method == 'imputation':
        # Impute missing values using forward fill method
        cleaned_ohlc_dataframe = ohlc_dataframe.ffill()
    elif method == 'removal':
        # Remove rows with missing values
        cleaned_ohlc_dataframe = ohlc_dataframe.dropna()
    else:
        raise ValueError("Invalid method. Please choose 'imputation' or 'removal'.")

    return cleaned_ohlc_dataframe

# Handle missing values in OHLC data using imputation method
cleaned_ohlc_dataframe = handle_missing_values(ohlc_dataframe, method='imputation')

# Display the cleaned DataFrame
print("Cleaned OHLC Data:")
print(cleaned_ohlc_dataframe.head())


Cleaned OHLC Data:
                 Open       High        Low      Close  Adj Close     Volume  \
Date                                                                           
2020-01-02  74.059998  75.150002  73.797501  75.087502  73.059425  135480400   
2020-01-03  74.287498  75.144997  74.125000  74.357498  72.349152  146322800   
2020-01-06  73.447502  74.989998  73.187500  74.949997  72.925636  118387200   
2020-01-07  74.959999  75.224998  74.370003  74.597504  72.582657  108872000   
2020-01-08  74.290001  76.110001  74.290001  75.797501  73.750252  132079200   

           Ticker  
Date               
2020-01-02   AAPL  
2020-01-03   AAPL  
2020-01-06   AAPL  
2020-01-07   AAPL  
2020-01-08   AAPL  


In [16]:
# Define a function to detect and correct outliers in OHLC data
def detect_and_correct_outliers(ohlc_dataframe, method='z-score', threshold=3):
    """
    Detect and correct outliers in OHLC data.

    Parameters:
    - ohlc_dataframe (DataFrame): DataFrame containing OHLC data.
    - method (str): Method for detecting outliers. Options: 'z-score' (default) or 'iqr'.
    - threshold (float): Threshold value for defining outliers.

    Returns:
    - cleaned_ohlc_dataframe (DataFrame): DataFrame with outliers corrected.
    """
    cleaned_ohlc_dataframe = ohlc_dataframe.copy()  # Create a copy of the original DataFrame
    
    # Check if DataFrame contains only numeric data
    numeric_dataframe = cleaned_ohlc_dataframe.select_dtypes(include=['number'])
    if not numeric_dataframe.empty:
        if method == 'z-score':
            # Calculate z-scores for each column
            z_scores = (numeric_dataframe - numeric_dataframe.mean()) / numeric_dataframe.std()
            # Replace outliers with NaN based on z-score threshold
            cleaned_ohlc_dataframe[numeric_dataframe.columns] = numeric_dataframe.where(z_scores.abs() <= threshold, None)
        elif method == 'iqr':
            # Calculate IQR (Interquartile Range) for each column
            q1 = numeric_dataframe.quantile(0.25)
            q3 = numeric_dataframe.quantile(0.75)
            iqr = q3 - q1
            # Replace outliers with NaN based on IQR threshold
            cleaned_ohlc_dataframe[numeric_dataframe.columns] = numeric_dataframe.mask(
                (numeric_dataframe < (q1 - 1.5 * iqr)) | (numeric_dataframe > (q3 + 1.5 * iqr)), None)
        else:
            raise ValueError("Invalid method. Please choose 'z-score' or 'iqr'.")
        
        # Forward fill NaN values to replace outliers
        cleaned_ohlc_dataframe = cleaned_ohlc_dataframe.ffill()
    else:
        print("DataFrame contains no numeric data. Skipping outlier detection and correction.")
    
    return cleaned_ohlc_dataframe

# Detect and correct outliers in OHLC data using z-score method
cleaned_ohlc_dataframe = detect_and_correct_outliers(ohlc_dataframe, method='z-score', threshold=3)

# Display the cleaned DataFrame after outlier correction
print("Cleaned OHLC Data after Outlier Correction:")
print(cleaned_ohlc_dataframe.head())


Cleaned OHLC Data after Outlier Correction:
                 Open       High        Low      Close  Adj Close  \
Date                                                                
2020-01-02  74.059998  75.150002  73.797501  75.087502  73.059425   
2020-01-03  74.287498  75.144997  74.125000  74.357498  72.349152   
2020-01-06  73.447502  74.989998  73.187500  74.949997  72.925636   
2020-01-07  74.959999  75.224998  74.370003  74.597504  72.582657   
2020-01-08  74.290001  76.110001  74.290001  75.797501  73.750252   

                 Volume Ticker  
Date                            
2020-01-02  135480400.0   AAPL  
2020-01-03  146322800.0   AAPL  
2020-01-06  118387200.0   AAPL  
2020-01-07  108872000.0   AAPL  
2020-01-08  132079200.0   AAPL  


In [17]:
# Define a function to address inconsistencies in timestamps or date formats
def standardize_timestamps(ohlc_dataframe):
    """
    Address inconsistencies in timestamps or date formats in OHLC data.

    Parameters:
    - ohlc_dataframe (DataFrame): DataFrame containing OHLC data.

    Returns:
    - standardized_dataframe (DataFrame): DataFrame with standardized timestamps.
    """
    standardized_dataframe = ohlc_dataframe.copy()  # Create a copy of the original DataFrame
    
    # Convert index to datetime format
    standardized_dataframe.index = pd.to_datetime(standardized_dataframe.index)
    
    # Sort DataFrame by timestamps if not already sorted
    standardized_dataframe.sort_index(inplace=True)
    
    return standardized_dataframe

# Standardize timestamps in OHLC data
standardized_ohlc_dataframe = standardize_timestamps(cleaned_ohlc_dataframe)

# Display the DataFrame with standardized timestamps
print("OHLC Data with Standardized Timestamps:")
print(standardized_ohlc_dataframe.head())


OHLC Data with Standardized Timestamps:
                  Open        High         Low       Close   Adj Close  \
Date                                                                     
2020-01-02   74.059998   75.150002   73.797501   75.087502   73.059425   
2020-01-02  139.789993  141.100006  139.259995  141.089996  124.254173   
2020-01-02  158.779999  160.729996  158.330002  160.619995  154.493805   
2020-01-02   93.750000   94.900497   93.207497   94.900497   94.900497   
2020-01-02   59.687500   59.977501   59.180000   59.977501   59.741241   

                 Volume Ticker  
Date                            
2020-01-02  135480400.0   AAPL  
2020-01-02   10803700.0    JPM  
2020-01-02   22622100.0   MSFT  
2020-01-02   80580000.0   AMZN  
2020-01-02   23753600.0   NVDA  


## Data Transformation

In [32]:
# Define a function to calculate moving averages (MA)
def calculate_moving_average(ohlc_dataframe, window=20):
    """
    Calculate moving averages (MA) based on OHLC data.

    Parameters:
    - ohlc_dataframe (DataFrame): DataFrame containing OHLC data.
    - window (int): Window size for the moving average calculation (default: 20).

    Returns:
    - ma_dataframe (DataFrame): DataFrame containing moving average values.
    """
    # Drop the 'Ticker' column
    numeric_ohlc_dataframe = ohlc_dataframe.drop(columns=['Ticker'])
    
    # Calculate moving averages
    ma_dataframe = numeric_ohlc_dataframe.rolling(window=window).mean()
    
    return ma_dataframe


# Define a function to calculate Bollinger Bands (BB)
def calculate_bollinger_bands(ohlc_dataframe, window=20, num_std=2):
    """
    Calculate Bollinger Bands (BB) based on OHLC data.

    Parameters:
    - ohlc_dataframe (DataFrame): DataFrame containing OHLC data.
    - window (int): Window size for the rolling mean and standard deviation (default: 20).
    - num_std (int): Number of standard deviations for the Bollinger Bands (default: 2).

    Returns:
    - bb_upper (Series): Series containing upper Bollinger Band values.
    - bb_lower (Series): Series containing lower Bollinger Band values.
    """
    # Drop the 'Ticker' column
    numeric_ohlc_dataframe = ohlc_dataframe.drop(columns=['Ticker'])
    
    # Calculate rolling mean and standard deviation
    rolling_mean = numeric_ohlc_dataframe.rolling(window=window).mean()
    rolling_std = numeric_ohlc_dataframe.rolling(window=window).std()
    
    # Calculate Bollinger Bands
    bb_upper = rolling_mean + (rolling_std * num_std)
    bb_lower = rolling_mean - (rolling_std * num_std)
    
    return bb_upper, bb_lower

# Define a function to calculate Relative Strength Index (RSI)
def calculate_rsi(ohlc_dataframe, window=14):
    """
    Calculate Relative Strength Index (RSI) based on OHLC data.

    Parameters:
    - ohlc_dataframe (DataFrame): DataFrame containing OHLC data.
    - window (int): Window size for calculating RSI (default: 14).

    Returns:
    - rsi_series (Series): Series containing RSI values.
    """
    # Drop the 'Ticker' column
    numeric_ohlc_dataframe = ohlc_dataframe.drop(columns=['Ticker'])
    
    # Calculate price differences
    delta = numeric_ohlc_dataframe.diff()
    
    # Calculate gains and losses
    gain = delta.where(delta > 0, 0).rolling(window=window).mean()
    loss = (-delta.where(delta < 0, 0)).rolling(window=window).mean()
    
    # Calculate relative strength (RS)
    rs = gain / loss
    
    # Calculate RSI
    rsi_series = 100 - (100 / (1 + rs))
    
    return rsi_series



# Group the DataFrame by 'Ticker' column
# Group the DataFrame by 'Ticker' column
grouped_df = standardized_ohlc_dataframe.groupby('Ticker')

# Calculate technical indicators for each group
ma_dataframe = {}
bb_upper = {}
bb_lower = {}
rsi_series = {}

for ticker, group_df in grouped_df:
    ma_dataframe[ticker] = calculate_moving_average(group_df)
    upper, lower = calculate_bollinger_bands(group_df)
    bb_upper[ticker] = upper
    bb_lower[ticker] = lower
    rsi_series[ticker] = calculate_rsi(group_df)

# Print the calculated technical indicators
print("Moving Averages (MA):")
for ticker, df in ma_dataframe.items():
    print(f"\nTicker: {ticker}")
    print(df)

print("\nBollinger Bands:")
for ticker, (upper, lower) in zip(bb_upper.keys(), zip(bb_upper.values(), bb_lower.values())):
    print(f"\nTicker: {ticker}")
    print("Upper Bollinger Band:")
    print(upper)
    print("\nLower Bollinger Band:")
    print(lower)

print("\nRelative Strength Index (RSI):")
for ticker, series in rsi_series.items():
    print(f"\nTicker: {ticker}")
    print(series)

Moving Averages (MA):

Ticker: AAPL
                  Open        High         Low       Close   Adj Close  \
Date                                                                     
2020-01-02         NaN         NaN         NaN         NaN         NaN   
2020-01-03         NaN         NaN         NaN         NaN         NaN   
2020-01-06         NaN         NaN         NaN         NaN         NaN   
2020-01-07         NaN         NaN         NaN         NaN         NaN   
2020-01-08         NaN         NaN         NaN         NaN         NaN   
...                ...         ...         ...         ...         ...   
2020-12-24  124.457500  126.049001  123.303499  124.903000  122.578960   
2020-12-28  125.328500  127.041500  124.167999  125.908001  123.565260   
2020-12-29  126.382500  127.932500  125.044499  126.699000  124.341541   
2020-12-30  127.111000  128.558500  125.713998  127.249000  124.881308   
2020-12-31  127.714001  129.127000  126.255499  127.729500  125.352866   

 

In [2]:
# Define a function to detect and correct outliers using Z-score method
def detect_correct_outliers(dataframe, threshold=3):
    z_scores = np.abs((dataframe - dataframe.mean()) / dataframe.std())
    outlier_mask = (z_scores > threshold).any(axis=1)
    cleaned_dataframe = dataframe[~outlier_mask]
    return cleaned_dataframe

# Create a sample DataFrame
data = {'Column1': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 100],
        'Column2': [11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21]}
df = pd.DataFrame(data)

# Display original DataFrame
print("Original DataFrame:")
print(df)

# Detect and correct outliers
cleaned_df = detect_correct_outliers(df)

# Display cleaned DataFrame
print("\nCleaned DataFrame:")
print(cleaned_df)



Original DataFrame:
    Column1  Column2
0         1       11
1         2       12
2         3       13
3         4       14
4         5       15
5         6       16
6         7       17
7         8       18
8         9       19
9        10       20
10      100       21

Cleaned DataFrame:
    Column1  Column2
0         1       11
1         2       12
2         3       13
3         4       14
4         5       15
5         6       16
6         7       17
7         8       18
8         9       19
9        10       20
10      100       21


In [8]:


# Sample DataFrame with inconsistent date formats
data = {'Timestamp': ['2022/01/01 08:00:00', '2022/02/01 09:00:00', '2022/03/01 10:00:00'],
        'Value': [10, 20, 30]}
df = pd.DataFrame(data)

# Convert 'Timestamp' column to datetime format with specified format
df['Timestamp'] = pd.to_datetime(df['Timestamp'], format='%Y/%m/%d %H:%M:%S')

# Display Original DataFrame
print("Original DataFrame:")
print(df)

# Display Cleaned DataFrame with standardized date format
print("\nCleaned DataFrame:")
print(df)


Original DataFrame:
            Timestamp  Value
0 2022-01-01 08:00:00     10
1 2022-02-01 09:00:00     20
2 2022-03-01 10:00:00     30

Cleaned DataFrame:
            Timestamp  Value
0 2022-01-01 08:00:00     10
1 2022-02-01 09:00:00     20
2 2022-03-01 10:00:00     30


## DATA TRANSFORMATION

In [17]:
# Sample OHLC data (replace this with your actual OHLC DataFrame)
data = {
    'Date': pd.date_range(start='2022-01-01', periods=100),
    'Close': [10, 12, 15, 14, 16, 18, 20, 22, 24, 26] * 10,  # Example Close prices
    'Ticker': ['AAPL'] * 100  # Example Ticker values
}

# Create a DataFrame from the sample data
ohlc_dataframe = pd.DataFrame(data)

# Define a function to calculate moving averages (MA)
def calculate_moving_average(df, window=20):
    ma_dataframe = df['Close'].rolling(window=window).mean()
    return ma_dataframe

# Define a function to calculate Bollinger Bands (BB)
def calculate_bollinger_bands(df, window=20, num_std=2):
    rolling_mean = df['Close'].rolling(window=window).mean()
    rolling_std = df['Close'].rolling(window=window).std()
    bb_upper = rolling_mean + (rolling_std * num_std)
    bb_lower = rolling_mean - (rolling_std * num_std)
    bb_df = pd.DataFrame({'BB_Upper': bb_upper, 'BB_Lower': bb_lower})
    return bb_df

# Define a function to calculate Relative Strength Index (RSI)
def calculate_rsi(df, window=14):
    delta = df['Close'].diff()
    gain = delta.where(delta > 0, 0).rolling(window=window).mean()
    loss = (-delta.where(delta < 0, 0)).rolling(window=window).mean()
    rs = gain / loss
    rsi_series = 100 - (100 / (1 + rs))
    return rsi_series

# Group the OHLC data by ticker symbol
grouped_df = ohlc_dataframe.groupby('Ticker')

# Calculate technical indicators for each group
ma_df = grouped_df.apply(calculate_moving_average, include_groups=False)
bb_df = grouped_df.apply(calculate_bollinger_bands, include_groups=False)
rsi_series = grouped_df.apply(calculate_rsi, include_groups=False)

# Print the calculated technical indicators
print("Moving Averages (MA):")
print(ma_df)

print("\nBollinger Bands:")
print(bb_df)

print("\nRelative Strength Index (RSI):")
print(rsi_series)


Moving Averages (MA):
Close   0   1   2   3   4   5   6   7   8   9   ...    90    91    92    93  \
Ticker                                          ...                           
AAPL   NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN  ...  17.7  17.7  17.7  17.7   

Close     94    95    96    97    98    99  
Ticker                                      
AAPL    17.7  17.7  17.7  17.7  17.7  17.7  

[1 rows x 100 columns]

Bollinger Bands:
            BB_Upper  BB_Lower
Ticker                        
AAPL   0         NaN       NaN
       1         NaN       NaN
       2         NaN       NaN
       3         NaN       NaN
       4         NaN       NaN
...              ...       ...
       95  27.920722  7.479278
       96  27.920722  7.479278
       97  27.920722  7.479278
       98  27.920722  7.479278
       99  27.920722  7.479278

[100 rows x 2 columns]

Relative Strength Index (RSI):
Close   0   1   2   3   4   5   6   7   8   9   ...         90         91  \
Ticker                     

In [18]:
import pandas as pd
import numpy as np

# Function to calculate daily returns
def calculate_daily_returns(close_prices):
    return close_prices.pct_change()

# Function to calculate rolling standard deviation as a measure of volatility
def calculate_volatility(returns, window=20):
    return returns.rolling(window=window).std()

# Function to detect price patterns (Up or Down)
def detect_price_patterns(close_prices):
    return np.where(close_prices > close_prices.shift(1), 'Up', 'Down')

# Example OHLC (Open, High, Low, Close) data
data = {
    'Timestamp': pd.date_range(start='2022-01-01', periods=100, freq='D'),
    'Open': np.random.rand(100),
    'High': np.random.rand(100),
    'Low': np.random.rand(100),
    'Close': np.random.rand(100)
}

# Create DataFrame
ohlc_dataframe = pd.DataFrame(data)

# Calculate daily returns
ohlc_dataframe['Daily_Return'] = calculate_daily_returns(ohlc_dataframe['Close'])

# Calculate volatility
ohlc_dataframe['Volatility'] = calculate_volatility(ohlc_dataframe['Daily_Return'])

# Detect price patterns
ohlc_dataframe['Price_Pattern'] = detect_price_patterns(ohlc_dataframe['Close'])

# Display the DataFrame with new features
print(ohlc_dataframe)


    Timestamp      Open      High       Low     Close  Daily_Return  \
0  2022-01-01  0.963290  0.840892  0.923192  0.445451           NaN   
1  2022-01-02  0.128122  0.728622  0.961522  0.951182      1.135323   
2  2022-01-03  0.994034  0.451801  0.068227  0.186610     -0.803812   
3  2022-01-04  0.272003  0.543819  0.653350  0.236806      0.268986   
4  2022-01-05  0.087059  0.194892  0.750201  0.422075      0.782371   
..        ...       ...       ...       ...       ...           ...   
95 2022-04-06  0.906423  0.559778  0.165816  0.202188     -0.758024   
96 2022-04-07  0.351585  0.648715  0.171729  0.028550     -0.858794   
97 2022-04-08  0.809966  0.888935  0.220229  0.493941     16.300826   
98 2022-04-09  0.865117  0.921036  0.518924  0.595087      0.204774   
99 2022-04-10  0.339803  0.644544  0.639073  0.508099     -0.146177   

    Volatility Price_Pattern  
0          NaN          Down  
1          NaN            Up  
2          NaN          Down  
3          NaN         

In [34]:
# Resample the data from daily to hourly frequency
ohlc_dataframe.index=pd.to_datetime(ohlc_dataframe.index)
hourly_data = ohlc_dataframe.resample('h').ffill()

# Display the resampled data
print(hourly_data)


                         Open      High       Low     Close  Daily_Return  \
Timestamp                                                                   
2022-01-01 00:00:00  0.963290  0.840892  0.923192  0.445451           NaN   
2022-01-01 01:00:00  0.963290  0.840892  0.923192  0.445451           NaN   
2022-01-01 02:00:00  0.963290  0.840892  0.923192  0.445451           NaN   
2022-01-01 03:00:00  0.963290  0.840892  0.923192  0.445451           NaN   
2022-01-01 04:00:00  0.963290  0.840892  0.923192  0.445451           NaN   
...                       ...       ...       ...       ...           ...   
2022-04-09 20:00:00  0.865117  0.921036  0.518924  0.595087      0.204774   
2022-04-09 21:00:00  0.865117  0.921036  0.518924  0.595087      0.204774   
2022-04-09 22:00:00  0.865117  0.921036  0.518924  0.595087      0.204774   
2022-04-09 23:00:00  0.865117  0.921036  0.518924  0.595087      0.204774   
2022-04-10 00:00:00  0.339803  0.644544  0.639073  0.508099     -0.146177   

## DATA VALIDATION

In [37]:
import pandas as pd
import numpy as np
import unittest

# Define functions for data processing
def clean_data(df):
    # Drop missing values
    df.dropna(inplace=True)
    # Ensure data types are correct
    df['Timestamp'] = pd.to_datetime(df['Timestamp'])
    return df

def calculate_moving_average(df, window=20):
    df['Moving_Average'] = df['Close'].rolling(window=window).mean()
    return df

def calculate_bollinger_bands(df, window=20, num_std=2):
    rolling_mean = df['Close'].rolling(window=window).mean()
    rolling_std = df['Close'].rolling(window=window).std()
    df['BB_Upper'] = rolling_mean + (rolling_std * num_std)
    df['BB_Lower'] = rolling_mean - (rolling_std * num_std)
    return df

# Define unit tests for the functions
class TestDataValidation(unittest.TestCase):
    
    # Test data cleaning function
    def test_clean_data(self):
        data = {'Timestamp': ['2022-01-01', '2022-01-02', '2022-01-03'],
                'Close': [10, np.nan, 20]}
        df = pd.DataFrame(data)
        cleaned_df = clean_data(df)
        self.assertEqual(len(cleaned_df), 2)  # Check if missing values are dropped
    
    # Test moving average calculation function
    def test_calculate_moving_average(self):
        data = {'Timestamp': pd.date_range(start='2022-01-01', periods=100),
                'Close': np.random.randint(1, 100, 100)}
        df = pd.DataFrame(data)
        df_with_ma = calculate_moving_average(df, window=20)
        self.assertEqual(len(df_with_ma), 100)  # Check if moving average column is added
    
    # Test Bollinger Bands calculation function
    def test_calculate_bollinger_bands(self):
        data = {'Timestamp': pd.date_range(start='2022-01-01', periods=100),
                'Close': np.random.randint(1, 100, 100)}
        df = pd.DataFrame(data)
        df_with_bb = calculate_bollinger_bands(df, window=20, num_std=2)
        self.assertEqual(len(df_with_bb), 100)  # Check if BB_Upper and BB_Lower columns are added

# Run the unit tests
unittest.main(argv=[''], verbosity=2, exit=False)



test_calculate_bollinger_bands (__main__.TestDataValidation) ... ok
test_calculate_moving_average (__main__.TestDataValidation) ... ok
test_clean_data (__main__.TestDataValidation) ... ok

----------------------------------------------------------------------
Ran 3 tests in 0.019s

OK


<unittest.main.TestProgram at 0x187aca434c0>

In [38]:
import pandas as pd
import numpy as np
import logging

# Configure logging
logging.basicConfig(level=logging.INFO)

# Define functions for data processing
def clean_data(df):
    try:
        # Drop missing values
        df.dropna(inplace=True)
        logging.info("Data cleaning: Missing values dropped successfully.")
        
        # Ensure data types are correct
        df['Timestamp'] = pd.to_datetime(df['Timestamp'])
        logging.info("Data cleaning: Timestamp column converted to datetime.")
        
        return df
    except Exception as e:
        logging.error(f"Data cleaning error: {e}")
        return None

def calculate_moving_average(df, window=20):
    try:
        df['Moving_Average'] = df['Close'].rolling(window=window).mean()
        logging.info("Moving average calculation: Successfully calculated moving average.")
        return df
    except Exception as e:
        logging.error(f"Moving average calculation error: {e}")
        return None

def calculate_bollinger_bands(df, window=20, num_std=2):
    try:
        rolling_mean = df['Close'].rolling(window=window).mean()
        rolling_std = df['Close'].rolling(window=window).std()
        df['BB_Upper'] = rolling_mean + (rolling_std * num_std)
        df['BB_Lower'] = rolling_mean - (rolling_std * num_std)
        logging.info("Bollinger Bands calculation: Successfully calculated Bollinger Bands.")
        return df
    except Exception as e:
        logging.error(f"Bollinger Bands calculation error: {e}")
        return None

# Define a function to monitor pipeline for errors and data quality issues
def monitor_pipeline(data):
    try:
        # Clean data
        cleaned_data = clean_data(data.copy())
        
        # Calculate moving averages
        ma_data = calculate_moving_average(cleaned_data.copy())
        
        # Calculate Bollinger Bands
        bb_data = calculate_bollinger_bands(ma_data.copy())
        
        # Optionally, perform additional data validation or monitoring
        
        logging.info("Pipeline monitoring: Pipeline executed successfully.")
    except Exception as e:
        logging.error(f"Pipeline monitoring error: {e}")
        # Handle errors or send alerts as necessary

# Example usage
data = pd.DataFrame({'Timestamp': ['2022-01-01', '2022-01-02', '2022-01-03'],
                     'Close': [10, np.nan, 20]})
monitor_pipeline(data)


INFO:root:Data cleaning: Missing values dropped successfully.
INFO:root:Data cleaning: Timestamp column converted to datetime.
INFO:root:Moving average calculation: Successfully calculated moving average.
INFO:root:Bollinger Bands calculation: Successfully calculated Bollinger Bands.
INFO:root:Pipeline monitoring: Pipeline executed successfully.


In [41]:
import pandas as pd
import numpy as np
import logging
import sqlite3

# Configure logging
logging.basicConfig(level=logging.INFO)

# Define functions for data processing
def clean_data(df):
    try:
        # Drop missing values
        df.dropna(inplace=True)
        logging.info("Data cleaning: Missing values dropped successfully.")
        
        # Ensure data types are correct
        df['Timestamp'] = pd.to_datetime(df['Timestamp'])
        logging.info("Data cleaning: Timestamp column converted to datetime.")
        
        return df
    except Exception as e:
        logging.error(f"Data cleaning error: {e}")
        return None

def calculate_moving_average(df, window=20):
    try:
        df['Moving_Average'] = df['Close'].rolling(window=window).mean()
        logging.info("Moving average calculation: Successfully calculated moving average.")
        return df
    except Exception as e:
        logging.error(f"Moving average calculation error: {e}")
        return None

def calculate_bollinger_bands(df, window=20, num_std=2):
    try:
        rolling_mean = df['Close'].rolling(window=window).mean()
        rolling_std = df['Close'].rolling(window=window).std()
        df['BB_Upper'] = rolling_mean + (rolling_std * num_std)
        df['BB_Lower'] = rolling_mean - (rolling_std * num_std)
        logging.info("Bollinger Bands calculation: Successfully calculated Bollinger Bands.")
        return df
    except Exception as e:
        logging.error(f"Bollinger Bands calculation error: {e}")
        return None

# Define a function to store data in SQLite database
def store_data_in_database(df, db_file='trading_data.db', table_name='processed_data'):
    try:
        conn = sqlite3.connect(db_file)
        df.to_sql(table_name, conn, if_exists='replace', index=False)
        logging.info(f"Data stored in SQLite database: Table '{table_name}' in '{db_file}'.")
        conn.close()
    except Exception as e:
        logging.error(f"Error storing data in SQLite database: {e}")

# Define a function to monitor pipeline for errors and data quality issues
def monitor_pipeline(data):
    try:
        # Clean data
        cleaned_data = clean_data(data.copy())
        
        # Calculate moving averages
        ma_data = calculate_moving_average(cleaned_data.copy())
        
        # Calculate Bollinger Bands
        bb_data = calculate_bollinger_bands(ma_data.copy())
        
        # Store processed data in database
        store_data_in_database(bb_data)
        
        logging.info("Pipeline monitoring: Pipeline executed successfully.")
    except Exception as e:
        logging.error(f"Pipeline monitoring error: {e}")
        # Handle errors or send alerts as necessary

# Example usage
data = pd.DataFrame({'Timestamp': ['2022-01-01', '2022-01-02', '2022-01-03'],
                     'Close': [10, np.nan, 20]})
monitor_pipeline(data)


INFO:root:Data cleaning: Missing values dropped successfully.
INFO:root:Data cleaning: Timestamp column converted to datetime.
INFO:root:Moving average calculation: Successfully calculated moving average.
INFO:root:Bollinger Bands calculation: Successfully calculated Bollinger Bands.
INFO:root:Data stored in SQLite database: Table 'processed_data' in 'trading_data.db'.
INFO:root:Pipeline monitoring: Pipeline executed successfully.


In [45]:
import pandas as pd
import sqlite3
import logging

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def clean_data(df):
    """
    Clean the data by dropping missing values and converting Timestamp column to datetime.
    """
    logger.info("Data cleaning: Missing values dropped successfully.")
    df.dropna(inplace=True)
    df['Timestamp'] = pd.to_datetime(df['Timestamp'])
    logger.info("Data cleaning: Timestamp column converted to datetime.")
    return df

def store_data_in_database(df, db_path):
    """
    Store the cleaned data in SQLite database.
    """
    conn = sqlite3.connect(db_path)
    df.to_sql('processed_data', conn, if_exists='replace', index=False)
    conn.close()
    logger.info("Data stored in SQLite database successfully.")

def pipeline_monitoring():
    """
    Monitor the pipeline for errors and data quality issues.
    """
    logger.info("Pipeline monitoring: Pipeline executed successfully.")

if __name__ == "__main__":
    # Sample DataFrame
    data = {
        'Timestamp': ['2022-01-01', '2022-01-02', '2022-01-03'],
        'Value': [10, 20, 30]
    }
    df = pd.DataFrame(data)
    
    # Add Year column to the DataFrame
    df['Year'] = pd.to_datetime(df['Timestamp']).dt.year
    
    # Clean the data
    df = clean_data(df)
    
    # Store the cleaned data in SQLite database
    db_path = 'example.db'
    store_data_in_database(df, db_path)
    
    # Monitor the pipeline
    pipeline_monitoring()


INFO:__main__:Data cleaning: Missing values dropped successfully.
INFO:__main__:Data cleaning: Timestamp column converted to datetime.
INFO:__main__:Data stored in SQLite database successfully.
INFO:__main__:Pipeline monitoring: Pipeline executed successfully.


In [51]:
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import logging

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def clean_data(df):
    """
    Clean the data by dropping missing values and converting Timestamp column to datetime.
    """
    logger.info("Data cleaning: Missing values dropped successfully.")
    df.dropna(inplace=True)
    df['Timestamp'] = pd.to_datetime(df['Timestamp'])
    logger.info("Data cleaning: Timestamp column converted to datetime.")
    return df

def store_data_in_parquet(df, parquet_path):
    """
    Store the cleaned data in Parquet format.
    """
    table = pa.Table.from_pandas(df)
    pq.write_table(table, parquet_path)
    logger.info("Data stored in Parquet format successfully.")

def pipeline_monitoring():
    """
    Monitor the pipeline for errors and data quality issues.
    """
    logger.info("Pipeline monitoring: Pipeline executed successfully.")

if __name__ == "__main__":
    # Sample DataFrame
    data = {
        'Timestamp': ['2022-01-01', '2022-01-02', '2022-01-03'],
        'Value': [10, 20, 30]
    }
    df = pd.DataFrame(data)
    
    # Clean the data
    df = clean_data(df)
    
    # Store the cleaned data in Parquet format
    parquet_path = 'example.parquet'
    store_data_in_parquet(df, parquet_path)
    
    # Monitor the pipeline
    pipeline_monitoring()


INFO:__main__:Data cleaning: Missing values dropped successfully.
INFO:__main__:Data cleaning: Timestamp column converted to datetime.
INFO:__main__:Data stored in Parquet format successfully.
INFO:__main__:Pipeline monitoring: Pipeline executed successfully.
