In [None]:
import pandas as pd
import matplotlib.pyplot as plt

# Define the path to the CSV file
ticker = 'APPLE INC'
path = r"C:\Users\IMI\Documents\Courses\SAPM\AY 2025-26\R-Exercises\AAPL.csv"

# Load the CSV file into a pandas DataFrame
data = pd.read_csv(path)

# Data columns
print(data.columns)

In [None]:
# Check the missing values
print(data.isna().sum(), "missing values in Adjusted column")

In [None]:
# Ensure the index is a DateTime index (assuming the CSV has a 'Date' column)
data['Date'] = pd.to_datetime(data['Date'])
data.set_index('Date', inplace=True)

In [None]:
# Display the first few rows of the data
data.tail()

In [None]:
# Filter the dates
data = data.loc['2023-01-01':'2023-05-30']

In [None]:
# Line Chart
plt.figure(figsize=(10, 5))
plt.plot(data['Adjusted'], label='Adj Close Price')
plt.title(f'{ticker} Line Chart')
plt.xlabel('Date')
plt.xticks(rotation=90, ha='left')
plt.ylabel('Adj Close Price')
plt.legend()
plt.show()

In [None]:
import pandas as pd
import yfinance as yf
import matplotlib.pyplot as plt

# Resample data to monthly close prices
monthly_data = data['Adjusted'].resample('M').last()

# Plot Monthly Bar Chart
plt.figure(figsize=(14, 7))
plt.bar(monthly_data.index, monthly_data.values)
plt.title(f'{ticker} Monthly Bar Chart')
plt.xlabel('Date')
plt.ylabel('Adj Close Price')

# Format the x-axis labels to show only year and month
plt.gca().xaxis.set_major_formatter(plt.matplotlib.dates.DateFormatter('%Y-%m'))
plt.xticks(rotation=45, ha='right')

plt.tight_layout()
plt.show()

In [None]:
from mplfinance.original_flavor import candlestick_ohlc
import matplotlib.dates as mdates

# Prepare data for candlestick chart
# data_ohlc = data['Adjusted'].resample('2D').ohlc()
data_ohlc = data[['Open', 'High', 'Low', 'Close']].resample('1D').agg({
    'Open': 'first',
    'High': 'max',
    'Low': 'min',
    'Close': 'last'
}).dropna()
data_ohlc.reset_index(inplace=True)
data_ohlc['Date'] = data_ohlc['Date'].map(mdates.date2num)

# Candlestick Chart
fig, ax = plt.subplots(figsize=(10, 5))
candlestick_ohlc(ax, data_ohlc.values, width=1, colorup='g', colordown='r')
ax.xaxis_date()
ax.set_title(f'{ticker} Candlestick Chart')
ax.set_xlabel('Date')
ax.set_ylabel('Price')
plt.show()

In [None]:
# Candlestickpatterns with TradeSignals

import pandas as pd
import numpy as np

# Helper functions for pattern recognition
def is_bullish(candle):
    return candle['Close'] > candle['Open']

def is_bearish(candle):
    return candle['Close'] < candle['Open']

def calculate_mid(candle):
    return (candle['Open'] + candle['Close']) / 2

def calculate_body_size(candle):
    return abs(candle['Close'] - candle['Open'])

def calculate_upper_wick(candle):
    return candle['High'] - max(candle['Open'], candle['Close'])

def calculate_lower_wick(candle):
    return min(candle['Open'], candle['Close']) - candle['Low']

def moving_average(data, window):
    return data['Close'].rolling(window=window).mean()

def determine_trend(data, period_short=5, period_long=20):
    short_ma = moving_average(data, period_short)
    long_ma = moving_average(data, period_long)
    return np.where(short_ma > long_ma, 'uptrend', 'downtrend')

# Pattern-specific functions
def is_doji(candle, tolerance=0.001):
    return calculate_body_size(candle) <= tolerance * (candle['High'] - candle['Low'])

def is_hammer(candle):
    body_size = calculate_body_size(candle)
    lower_wick = calculate_lower_wick(candle)
    upper_wick = calculate_upper_wick(candle)
    return lower_wick > 2 * body_size and upper_wick < body_size

def is_inverted_hammer(candle):
    body_size = calculate_body_size(candle)
    lower_wick = calculate_lower_wick(candle)
    upper_wick = calculate_upper_wick(candle)
    return upper_wick > 2 * body_size and lower_wick < body_size

def is_shooting_star(candle, previous_trend='uptrend'):
    body_size = calculate_body_size(candle)
    upper_wick = calculate_upper_wick(candle)
    lower_wick = calculate_lower_wick(candle)
    return previous_trend == 'uptrend' and upper_wick > 2 * body_size and lower_wick < body_size and is_bearish(candle)

def is_spinning_top(candle):
    body_size = calculate_body_size(candle)
    upper_wick = calculate_upper_wick(candle)
    lower_wick = calculate_lower_wick(candle)
    return upper_wick > 1.5 * body_size and lower_wick > 1.5 * body_size

def is_marubozu(candle, tolerance=0.001):
    range_size = candle['High'] - candle['Low']
    return (abs(candle['Open'] - candle['Low']) <= tolerance * range_size and 
            abs(candle['Close'] - candle['High']) <= tolerance * range_size)

def is_bullish_engulfing(c1, c2):
    return is_bearish(c1) and is_bullish(c2) and c2['Close'] > c1['Open'] and c2['Open'] < c1['Close']

def is_bearish_engulfing(c1, c2):
    return is_bullish(c1) and is_bearish(c2) and c2['Close'] < c1['Open'] and c2['Open'] > c1['Close']

def is_piercing_line(c1, c2):
    if is_bearish(c1) and is_bullish(c2):
        return c2['Open'] < c1['Low'] and c2['Close'] > calculate_mid(c1)
    return False

def is_dark_cloud_cover(c1, c2):
    if is_bullish(c1) and is_bearish(c2):
        return c2['Open'] > c1['High'] and c2['Close'] < calculate_mid(c1)
    return False

def is_bullish_harami(c1, c2):
    return is_bearish(c1) and is_bullish(c2) and c2['Open'] < c1['Open'] and c2['Close'] > c1['Close']

def is_bearish_harami(c1, c2):
    return is_bullish(c1) and is_bearish(c2) and c2['Open'] > c1['Open'] and c2['Close'] < c1['Close']

def is_harami_cross(c1, c2):
    return (is_bearish(c1) or is_bullish(c1)) and is_doji(c2) and c2['Open'] < c1['Open'] and c2['Close'] > c1['Close']

def is_morning_star(c1, c2, c3):
    return is_bearish(c1) and is_doji(c2) and is_bullish(c3) and c3['Close'] > calculate_mid(c1)

def is_evening_star(c1, c2, c3):
    return is_bullish(c1) and is_doji(c2) and is_bearish(c3) and c3['Close'] < calculate_mid(c1)

def is_three_white_soldiers(c1, c2, c3):
    return is_bullish(c1) and is_bullish(c2) and is_bullish(c3) and \
           c2['Open'] > c1['Close'] and c3['Open'] > c2['Close']

def is_three_black_crows(c1, c2, c3):
    return is_bearish(c1) and is_bearish(c2) and is_bearish(c3) and \
           c2['Open'] < c1['Close'] and c3['Open'] < c2['Close']

def is_three_inside_up(c1, c2, c3):
    return is_bearish(c1) and is_bullish(c2) and c2['Open'] < c1['Open'] and c2['Close'] > c1['Close'] and \
           is_bullish(c3) and c3['Close'] > c2['Close']

def is_three_inside_down(c1, c2, c3):
    return is_bullish(c1) and is_bearish(c2) and c2['Open'] > c1['Open'] and c2['Close'] < c1['Close'] and \
           is_bearish(c3) and c3['Close'] < c2['Close']

def is_three_outside_up(c1, c2, c3):
    return is_bearish(c1) and is_bullish(c2) and c2['Close'] > c1['Open'] and c2['Open'] < c1['Close'] and \
           is_bullish(c3) and c3['Close'] > c2['Close']

def is_three_outside_down(c1, c2, c3):
    return is_bullish(c1) and is_bearish(c2) and c2['Close'] < c1['Open'] and c2['Open'] > c1['Close'] and \
           is_bearish(c3) and c3['Close'] < c2['Close']

# Main function to identify candlestick patterns and generate trade signals
def identify_candlestick_patterns_and_signals(data):
    # Ensure data is not empty and has required columns
    required_columns = ['Open', 'High', 'Low', 'Close']
    if data.empty or not all(col in data.columns for col in required_columns):
        raise ValueError(f"DataFrame is empty or missing required columns: {required_columns}. Available columns: {data.columns}")
    
    # Check for non-numeric data
    if not all(data[required_columns].dtypes.apply(lambda x: np.issubdtype(x, np.number))):
        raise ValueError("OHLC columns must contain numeric data.")
    
    # Handle missing data
    if data[required_columns].isna().any().any():
        print("Warning: Missing values detected in OHLC columns. Dropping rows with NaN.")
        data = data.dropna(subset=required_columns)

    # Check if data is daily to avoid unnecessary resampling
    if not isinstance(data.index, pd.DatetimeIndex):
        raise ValueError("DataFrame index must be a DateTimeIndex.")
    date_diffs = data.index.to_series().diff().dropna()
    is_daily = (date_diffs == pd.Timedelta('1 days')).all()
    if not is_daily:
        print("Warning: Data is not daily. Resampling to daily frequency.")
        data = data[required_columns].resample('1D').agg({
            'Open': 'first',
            'High': 'max',
            'Low': 'min',
            'Close': 'last'
        }).dropna()

    # Ensure enough data for moving averages (at least 20 days for long MA)
    if len(data) < 20:
        raise ValueError("Insufficient data for trend detection (need at least 20 days).")

    data['Pattern'] = np.nan
    data['Pattern'] = data['Pattern'].astype(object)
    data['Trade_Signal'] = 'Hold'

    # Track detected patterns for logging
    detected_patterns = set()

    for i in range(3, len(data)):  # Start at 3 to allow three-candle patterns
        candle = data.iloc[i]
        prev_candle_1 = data.iloc[i - 1]
        prev_candle_2 = data.iloc[i - 2]
        prev_candle_3 = data.iloc[i - 3]

        # Determine the trend using moving averages
        previous_trend = determine_trend(data.iloc[:i+1])[-1]

        # Identify Patterns and Generate Trade Signals
        if is_doji(candle):
            data.loc[data.index[i], 'Pattern'] = 'Doji'
            data.loc[data.index[i], 'Trade_Signal'] = 'Hold'
            detected_patterns.add('Doji')

        elif previous_trend == 'downtrend' and is_hammer(candle):
            data.loc[data.index[i], 'Pattern'] = 'Hammer'
            data.loc[data.index[i], 'Trade_Signal'] = 'Buy'
            detected_patterns.add('Hammer')

        elif previous_trend == 'uptrend' and is_hammer(candle):
            data.loc[data.index[i], 'Pattern'] = 'Hanging Man'
            data.loc[data.index[i], 'Trade_Signal'] = 'Sell'
            detected_patterns.add('Hanging Man')

        elif previous_trend == 'downtrend' and is_inverted_hammer(candle):
            data.loc[data.index[i], 'Pattern'] = 'Inverted Hammer'
            data.loc[data.index[i], 'Trade_Signal'] = 'Buy'
            detected_patterns.add('Inverted Hammer')

        elif previous_trend == 'uptrend' and is_shooting_star(candle, previous_trend):
            data.loc[data.index[i], 'Pattern'] = 'Shooting Star'
            data.loc[data.index[i], 'Trade_Signal'] = 'Sell'
            detected_patterns.add('Shooting Star')

        elif is_spinning_top(candle):
            data.loc[data.index[i], 'Pattern'] = 'Spinning Top'
            data.loc[data.index[i], 'Trade_Signal'] = 'Hold'
            detected_patterns.add('Spinning Top')

        elif is_marubozu(candle):
            if is_bullish(candle):
                data.loc[data.index[i], 'Pattern'] = 'Bullish Marubozu'
                data.loc[data.index[i], 'Trade_Signal'] = 'Buy'
                detected_patterns.add('Bullish Marubozu')
            else:
                data.loc[data.index[i], 'Pattern'] = 'Bearish Marubozu'
                data.loc[data.index[i], 'Trade_Signal'] = 'Sell'
                detected_patterns.add('Bearish Marubozu')

        elif previous_trend == 'downtrend' and is_piercing_line(prev_candle_1, candle):
            data.loc[data.index[i], 'Pattern'] = 'Piercing Line'
            data.loc[data.index[i], 'Trade_Signal'] = 'Buy'
            detected_patterns.add('Piercing Line')

        elif previous_trend == 'uptrend' and is_dark_cloud_cover(prev_candle_1, candle):
            data.loc[data.index[i], 'Pattern'] = 'Dark Cloud Cover'
            data.loc[data.index[i], 'Trade_Signal'] = 'Sell'
            detected_patterns.add('Dark Cloud Cover')

        elif is_bullish_engulfing(prev_candle_1, candle):
            data.loc[data.index[i], 'Pattern'] = 'Bullish Engulfing'
            data.loc[data.index[i], 'Trade_Signal'] = 'Buy'
            detected_patterns.add('Bullish Engulfing')

        elif is_bearish_engulfing(prev_candle_1, candle):
            data.loc[data.index[i], 'Pattern'] = 'Bearish Engulfing'
            data.loc[data.index[i], 'Trade_Signal'] = 'Sell'
            detected_patterns.add('Bearish Engulfing')

        elif is_bullish_harami(prev_candle_1, candle):
            data.loc[data.index[i], 'Pattern'] = 'Bullish Harami'
            data.loc[data.index[i], 'Trade_Signal'] = 'Buy'
            detected_patterns.add('Bullish Harami')

        elif is_bearish_harami(prev_candle_1, candle):
            data.loc[data.index[i], 'Pattern'] = 'Bearish Harami'
            data.loc[data.index[i], 'Trade_Signal'] = 'Sell'
            detected_patterns.add('Bearish Harami')

        elif is_harami_cross(prev_candle_1, candle):
            if is_bearish(prev_candle_1):
                data.loc[data.index[i], 'Pattern'] = 'Bullish Harami Cross'
                data.loc[data.index[i], 'Trade_Signal'] = 'Buy'
                detected_patterns.add('Bullish Harami Cross')
            else:
                data.loc[data.index[i], 'Pattern'] = 'Bearish Harami Cross'
                data.loc[data.index[i], 'Trade_Signal'] = 'Sell'
                detected_patterns.add('Bearish Harami Cross')

        elif is_morning_star(prev_candle_2, prev_candle_1, candle):
            data.loc[data.index[i], 'Pattern'] = 'Morning Star'
            data.loc[data.index[i], 'Trade_Signal'] = 'Buy'
            detected_patterns.add('Morning Star')

        elif is_evening_star(prev_candle_2, prev_candle_1, candle):
            data.loc[data.index[i], 'Pattern'] = 'Evening Star'
            data.loc[data.index[i], 'Trade_Signal'] = 'Sell'
            detected_patterns.add('Evening Star')

        elif is_three_white_soldiers(prev_candle_2, prev_candle_1, candle):
            data.loc[data.index[i], 'Pattern'] = 'Three White Soldiers'
            data.loc[data.index[i], 'Trade_Signal'] = 'Buy'
            detected_patterns.add('Three White Soldiers')

        elif is_three_black_crows(prev_candle_2, prev_candle_1, candle):
            data.loc[data.index[i], 'Pattern'] = 'Three Black Crows'
            data.loc[data.index[i], 'Trade_Signal'] = 'Sell'
            detected_patterns.add('Three Black Crows')

        elif is_three_inside_up(prev_candle_2, prev_candle_1, candle):
            data.loc[data.index[i], 'Pattern'] = 'Three Inside Up'
            data.loc[data.index[i], 'Trade_Signal'] = 'Buy'
            detected_patterns.add('Three Inside Up')

        elif is_three_inside_down(prev_candle_2, prev_candle_1, candle):
            data.loc[data.index[i], 'Pattern'] = 'Three Inside Down'
            data.loc[data.index[i], 'Trade_Signal'] = 'Sell'
            detected_patterns.add('Three Inside Down')

        elif is_three_outside_up(prev_candle_2, prev_candle_1, candle):
            data.loc[data.index[i], 'Pattern'] = 'Three Outside Up'
            data.loc[data.index[i], 'Trade_Signal'] = 'Buy'
            detected_patterns.add('Three Outside Up')

        elif is_three_outside_down(prev_candle_2, prev_candle_1, candle):
            data.loc[data.index[i], 'Pattern'] = 'Three Outside Down'
            data.loc[data.index[i], 'Trade_Signal'] = 'Sell'
            detected_patterns.add('Three Outside Down')

    # Log detected patterns
    print("Detected patterns:", sorted(detected_patterns) if detected_patterns else "None")
    if not detected_patterns:
        print("Warning: No patterns detected. Check data or pattern logic.")

    return data

# Apply the function to your DataFrame
try:
    ticker = 'AAPL'  # Define ticker
    # Note: Load data in a separate cell or adjust path as needed
    # data = pd.read_csv(r'C:\Users\IMI\Documents\Courses\SAPM\AY 2025-26\R-Exercises\AAPL.csv')
    # data['Date'] = pd.to_datetime(data['Date'])
    # data.set_index('Date', inplace=True)

    data = identify_candlestick_patterns_and_signals(data)
    print("Columns in DataFrame:", data.columns)

    # Save results to Excel
    output_path = r'C:\Users\IMI\Documents\Courses\SAPM\AY 2025-26\R-Exercises\APPLE_Candlestickpatterns_with_TradeSignals.xlsx'
    data.to_excel(output_path, sheet_name='Candlestick Patterns')
    print("Excel file generated successfully!")

except Exception as e:
    print(f"Error in pattern recognition: {e}")

In [None]:
# Plotting the Candlestick Patterns

import pandas as pd
import matplotlib.pyplot as plt
from mplfinance.original_flavor import candlestick_ohlc
import matplotlib.dates as mdates

# Function to plot candlestick chart for a specific pattern
def plot_pattern(data, pattern_name, max_plots=3):
    # Ensure required columns for plotting
    required_columns = ['Open', 'High', 'Low', 'Close', 'Pattern']
    if not all(col in data.columns for col in required_columns):
        raise ValueError(f"Missing required columns for plotting: {required_columns}. Available columns: {data.columns}")

    # Ensure DateTime index
    if not isinstance(data.index, pd.DatetimeIndex):
        raise ValueError("DataFrame index must be a DateTimeIndex.")

    # Filter for the specific pattern, excluding NaN
    pattern_data = data[data['Pattern'] == pattern_name]
    
    if pattern_data.empty:
        print(f"No occurrences of {pattern_name} found.")
        return

    # Limit the number of plots to avoid overwhelming output
    plot_count = 0
    for i in pattern_data.index:
        if plot_count >= max_plots:
            break

        try:
            # Extract a small window of data around the pattern using Timedelta
            start_idx = i - pd.Timedelta(days=5)
            end_idx = i + pd.Timedelta(days=5)
            window = data.loc[start_idx:end_idx]

            if window.empty:
                print(f"Skipping plot for {pattern_name} on {i.date()}: Empty window.")
                continue

            data_ohlc = window[['Open', 'High', 'Low', 'Close']].copy()
            data_ohlc.reset_index(inplace=True)
            data_ohlc['Date'] = data_ohlc['Date'].map(mdates.date2num)

            # Plot the candlestick chart
            fig, ax = plt.subplots(figsize=(10, 5))
            candlestick_ohlc(ax, data_ohlc.values, width=0.6, colorup='g', colordown='r')
            ax.xaxis_date()
            ax.set_title(f'{pattern_name} Pattern on {i.date()}')
            ax.set_xlabel('Date')
            ax.set_ylabel('Price')

            # Rotate x-axis labels by 45 degrees for better readability
            ax.xaxis.set_tick_params(rotation=45)
            plt.tight_layout()
            plt.show()

            plot_count += 1

        except Exception as e:
            print(f"Error plotting {pattern_name} on {i.date()}: {e}")

# Plot patterns one by one
try:
    patterns_to_plot = data['Pattern'].dropna().unique()  # Exclude NaN
    for pattern in patterns_to_plot:
        plot_pattern(data, pattern, max_plots=3)

except Exception as e:
    print(f"Error in visualization: {e}")

In [None]:
# Effectiveness of the Trade Signals
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np

# Calculate the effectiveness of the trade signals
def calculate_strategy_performance(df):
    # Ensure required columns
    required_columns = ['Trade_Signal']
    price_column = 'Adj Close' if 'Adj Close' in df.columns else 'Close'
    required_columns.append(price_column)
    if not all(col in df.columns for col in required_columns):
        raise ValueError(f"Missing required columns: {required_columns}. Available columns: {df.columns}")

    # Ensure DateTime index
    if not isinstance(df.index, pd.DatetimeIndex):
        raise ValueError("DataFrame index must be a DateTimeIndex.")

    # Initialize columns
    df['Position'] = 0  # 1 for holding, 0 for not holding
    df['Strategy_Returns'] = 0.0  # Daily returns of the strategy
    df['Market_Returns'] = df[price_column].pct_change()  # Daily returns of buy-and-hold

    # Handle missing returns
    df['Market_Returns'] = df['Market_Returns'].fillna(0)

    # Initialize variables
    position = 0
    for i in range(1, len(df)):
        if df.loc[df.index[i], 'Trade_Signal'] == 'Buy':
            position = 1
        elif df.loc[df.index[i], 'Trade_Signal'] == 'Sell':
            position = 0
        # 'Hold' maintains the current position
        df.loc[df.index[i], 'Position'] = position
        df.loc[df.index[i], 'Strategy_Returns'] = position * df.loc[df.index[i], 'Market_Returns']

    # Calculate cumulative returns
    df['Cumulative_Strategy_Returns'] = (1 + df['Strategy_Returns']).cumprod() - 1
    df['Cumulative_Market_Returns'] = (1 + df['Market_Returns']).cumprod() - 1

    # Calculate performance metrics
    annual_trading_days = 252
    strategy_annualized_return = df['Strategy_Returns'].mean() * annual_trading_days
    strategy_volatility = df['Strategy_Returns'].std() * np.sqrt(annual_trading_days)
    sharpe_ratio_strategy = strategy_annualized_return / strategy_volatility if strategy_volatility != 0 else np.nan
    market_annualized_return = df['Market_Returns'].mean() * annual_trading_days
    market_volatility = df['Market_Returns'].std() * np.sqrt(annual_trading_days)
    sharpe_ratio_market = market_annualized_return / market_volatility if market_volatility != 0 else np.nan
    max_drawdown_strategy = (df['Cumulative_Strategy_Returns'].cummax() - df['Cumulative_Strategy_Returns']).max()
    max_drawdown_market = (df['Cumulative_Market_Returns'].cummax() - df['Cumulative_Market_Returns']).max()

    # Print performance metrics
    print(f"Strategy Performance Metrics:")
    print(f"  Annualized Return: {strategy_annualized_return:.4f}")
    print(f"  Volatility: {strategy_volatility:.4f}")
    print(f"  Sharpe Ratio: {sharpe_ratio_strategy:.4f}")
    print(f"  Max Drawdown: {max_drawdown_strategy:.4f}")
    print(f"Market (Buy-and-Hold) Performance Metrics:")
    print(f"  Annualized Return: {market_annualized_return:.4f}")
    print(f"  Volatility: {market_volatility:.4f}")
    print(f"  Sharpe Ratio: {sharpe_ratio_market:.4f}")
    print(f"  Max Drawdown: {max_drawdown_market:.4f}")

    return df

# Apply the function to your DataFrame
try:
    ticker = 'AAPL'  # Define ticker
    data = calculate_strategy_performance(data)
    print("Columns in DataFrame:", data.columns)

    # Save results to Excel
    output_path = r'C:\Users\IMI\Documents\Courses\SAPM\AY 2025-26\R-Exercises\APPLE_Candlestickpatterns_Strategy.xlsx'
    data.to_excel(output_path, sheet_name='Strategy Performance')
    print("Excel file generated successfully!")

    # Plot cumulative returns
    plt.figure(figsize=(12, 6))
    plt.plot(data.index, data['Cumulative_Strategy_Returns'], label='Strategy Returns')
    plt.plot(data.index, data['Cumulative_Market_Returns'], label='Market Returns (Buy and Hold)')
    plt.title(f'{ticker} Strategy Performance vs. Market')
    plt.xlabel('Date')
    plt.ylabel('Cumulative Returns')
    plt.legend()
    plt.grid(True)
    plt.tight_layout()
    plt.show()

except Exception as e:
    print(f"Error in performance evaluation: {e}")

In [None]:
# Indicators and Oscillators

import pandas as pd
import matplotlib.pyplot as plt

# Define the path to the CSV file
ticker = 'APPLE INC'
path = r"C:\Users\IMI\Documents\Courses\SAPM\AY 2025-26\R-Exercises\AAPL.csv"

try:
    # Load the CSV file into a pandas DataFrame
    data = pd.read_csv(path)

    # Validate required columns
    required_columns = ['Date', 'Open', 'High', 'Low', 'Close', 'Adjusted']
    if not all(col in data.columns for col in required_columns):
        raise ValueError(f"Missing required columns: {required_columns}. Available columns: {data.columns}")

    # Ensure the index is a DateTime index
    data['Date'] = pd.to_datetime(data['Date'], errors='coerce')
    if data['Date'].isna().any():
        raise ValueError("Invalid date format in 'Date' column.")
    data.set_index('Date', inplace=True)

    # Verify numeric data
    if not all(data[['Open', 'High', 'Low', 'Close', 'Adjusted']].dtypes.apply(lambda x: np.issubdtype(x, np.number))):
        raise ValueError("OHLC and Adjusted columns must contain numeric data.")

    # Display the first few rows of the data
    print(data.head())

except FileNotFoundError:
    print(f"Error: File not found at {path}")
except Exception as e:
    print(f"Error in data loading: {e}")

In [None]:
# Simple Moving Average (SMA)

import pandas as pd
import matplotlib.pyplot as plt
import numpy as np

try:
    # Validate required column
    if 'Adjusted' not in data.columns:
        raise ValueError("Missing 'Adjusted' column.")

    # Check for sufficient data
    if len(data) < 200:
        raise ValueError("Insufficient data for 200-day SMA (need at least 200 days).")

    # Simple Moving Average (SMA)
    data['SMA_50'] = data['Adjusted'].rolling(window=50).mean()
    data['SMA_200'] = data['Adjusted'].rolling(window=200).mean()

    # Plot
    plt.figure(figsize=(10, 5))
    plt.plot(data['Adjusted'], label='Adj Close Price')
    plt.plot(data['SMA_50'], label='50-day SMA')
    plt.plot(data['SMA_200'], label='200-day SMA')
    plt.title(f'{ticker} Moving Averages')
    plt.xlabel('Date')
    plt.ylabel('Price')
    plt.legend()
    plt.grid(True)
    plt.tight_layout()
    plt.show()

except Exception as e:
    print(f"Error in SMA calculation: {e}")

In [None]:
# EMA 20 and 50 days

import pandas as pd
import matplotlib.pyplot as plt
from mplfinance.original_flavor import candlestick_ohlc
import matplotlib.dates as mdates

try:
    # Validate required columns
    required_columns = ['Open', 'High', 'Low', 'Close', 'Adjusted']
    if not all(col in data.columns for col in required_columns):
        raise ValueError(f"Missing required columns: {required_columns}. Available columns: {data.columns}")

    # Check for sufficient data
    if len(data) < 50:
        raise ValueError("Insufficient data for 50-day EMA (need at least 50 days).")

    # Calculate the 20-day and 50-day EMA
    data['EMA_20'] = data['Adjusted'].ewm(span=20, adjust=False).mean()
    data['EMA_50'] = data['Adjusted'].ewm(span=50, adjust=False).mean()

    # Prepare data for candlestick chart
    data_ohlc = data[['Open', 'High', 'Low', 'Close']].copy()
    data_ohlc.reset_index(inplace=True)
    data_ohlc['Date'] = data_ohlc['Date'].map(mdates.date2num)

    # Candlestick Chart with 20-day and 50-day EMA
    fig, ax = plt.subplots(figsize=(10, 5))
    candlestick_ohlc(ax, data_ohlc.values, width=0.6, colorup='g', colordown='r')

    # Plot the 20-day and 50-day EMA
    ax.plot(mdates.date2num(data.index), data['EMA_20'], label='20-day EMA', color='blue', linewidth=1.5)
    ax.plot(mdates.date2num(data.index), data['EMA_50'], label='50-day EMA', color='red', linewidth=1.5)

    # Format the plot
    ax.xaxis_date()
    ax.set_title(f'{ticker} Candlestick Chart with 20-day and 50-day EMA')
    ax.set_xlabel('Date')
    ax.set_ylabel('Price')
    ax.legend()
    ax.grid(True)
    plt.tight_layout()
    plt.show()

except Exception as e:
    print(f"Error in EMA candlestick plot: {e}")

In [None]:
# EMA on Line Chart

import pandas as pd
import matplotlib.pyplot as plt

try:
    # Validate required column
    if 'Adjusted' not in data.columns:
        raise ValueError("Missing 'Adjusted' column.")

    # Check for sufficient data
    if len(data) < 50:
        raise ValueError("Insufficient data for 50-day EMA (need at least 50 days).")

    # Calculate the 20-day and 50-day EMA
    data['EMA_20'] = data['Adjusted'].ewm(span=20, adjust=False).mean()
    data['EMA_50'] = data['Adjusted'].ewm(span=50, adjust=False).mean()

    # Plot the Adjusted Close with 20-day and 50-day EMA
    plt.figure(figsize=(12, 6))
    plt.plot(data['Adjusted'], label='Adjusted Close', color='black', linewidth=2)
    plt.plot(data['EMA_20'], label='20-day EMA', color='blue', linewidth=1.5)
    plt.plot(data['EMA_50'], label='50-day EMA', color='red', linewidth=1.5)

    # Formatting the plot
    plt.title(f'{ticker} Adjusted Close with 20-day and 50-day EMA')
    plt.xlabel('Date')
    plt.ylabel('Price')
    plt.legend()
    plt.grid(True)
    plt.tight_layout()
    plt.show()

except Exception as e:
    print(f"Error in EMA line chart: {e}")

In [None]:
# Multiple EMAs on Line Chart

import pandas as pd
import matplotlib.pyplot as plt

try:
    # Validate required column
    if 'Adjusted' not in data.columns:
        raise ValueError("Missing 'Adjusted' column.")

    # Check for sufficient data
    if len(data) < 200:
        raise ValueError("Insufficient data for 200-day EMA (need at least 200 days).")

    # Calculate multiple EMAs
    ema_periods = [20, 50, 100, 200]
    for period in ema_periods:
        data[f'EMA_{period}'] = data['Adjusted'].ewm(span=period, adjust=False).mean()

    # Plot the Adjusted Close with all EMAs
    plt.figure(figsize=(14, 7))
    plt.plot(data['Adjusted'], label='Adjusted Close', color='black', linewidth=2)

    # Plot each EMA
    colors = ['blue', 'red', 'green', 'purple']
    for i, period in enumerate(ema_periods):
        plt.plot(data[f'EMA_{period}'], label=f'{period}-day EMA', color=colors[i], linewidth=1.5)

    # Formatting the plot
    plt.title(f'{ticker} Adjusted Close with EMAs')
    plt.xlabel('Date')
    plt.ylabel('Price')
    plt.legend()
    plt.grid(True)
    plt.tight_layout()
    plt.show()

except Exception as e:
    print(f"Error in multiple EMAs plot: {e}")

In [None]:
# Relative Strength Index

import pandas as pd
import matplotlib.pyplot as plt
import numpy as np

try:
    # Validate required column
    if 'Adjusted' not in data.columns:
        raise ValueError("Missing 'Adjusted' column.")

    # Check for sufficient data
    if len(data) < 14:
        raise ValueError("Insufficient data for 14-day RSI (need at least 14 days).")

    # Function to calculate the Relative Strength Index (RSI)
    def calculate_rsi(data, window):
        delta = data['Adjusted'].diff(1)
        gain = (delta.where(delta > 0, 0)).rolling(window=window).mean()
        loss = (-delta.where(delta < 0, 0)).rolling(window=window).mean()
        rs = gain / loss
        rsi = 100 - (100 / (1 + rs))
        return rsi

    # Calculate RSI
    data['RSI'] = calculate_rsi(data, 14)

    # Calculate dynamic overbought and oversold thresholds
    mean_rsi = data['RSI'].mean()
    std_rsi = data['RSI'].std()
    overbought_dynamic = mean_rsi + std_rsi
    oversold_dynamic = mean_rsi - std_rsi

    # Plot
    plt.figure(figsize=(10, 5))
    plt.plot(data['RSI'], label='RSI', color='blue')
    plt.axhline(70, linestyle='--', alpha=0.5, color='red', label='Overbought (70)')
    plt.axhline(30, linestyle='--', alpha=0.5, color='green', label='Oversold (30)')
    plt.axhline(overbought_dynamic, linestyle=':', alpha=0.3, color='red', label=f'Dynamic Overbought ({overbought_dynamic:.2f})')
    plt.axhline(oversold_dynamic, linestyle=':', alpha=0.3, color='green', label=f'Dynamic Oversold ({oversold_dynamic:.2f})')
    plt.title(f'{ticker} Relative Strength Index (RSI)')
    plt.xlabel('Date')
    plt.ylabel('RSI')
    plt.legend()
    plt.grid(True)
    plt.tight_layout()
    plt.show()

except Exception as e:
    print(f"Error in RSI calculation: {e}")

In [None]:
# MACD

import pandas as pd
import matplotlib.pyplot as plt

try:
    # Validate required column
    if 'Adjusted' not in data.columns:
        raise ValueError("Missing 'Adjusted' column.")

    # Check for sufficient data
    if len(data) < 26:
        raise ValueError("Insufficient data for MACD (need at least 26 days).")

    # MACD
    exp1 = data['Adjusted'].ewm(span=12, adjust=False).mean()
    exp2 = data['Adjusted'].ewm(span=26, adjust=False).mean()
    macd = exp1 - exp2
    signal = macd.ewm(span=9, adjust=False).mean()

    # Plot
    plt.figure(figsize=(10, 5))
    plt.plot(data.index, macd, label='MACD', color='green')
    plt.plot(data.index, signal, label='Signal Line', color='red')
    plt.axhline(0, linestyle='--', alpha=0.5, color='black', label='Zero Line')
    plt.title(f'{ticker} MACD')
    plt.xlabel('Date')
    plt.ylabel('MACD')
    plt.legend()
    plt.grid(True)
    plt.tight_layout()
    plt.show()

except Exception as e:
    print(f"Error in MACD calculation: {e}")

In [None]:
# Calculate the MACD Histogram
macd_histogram = macd - signal

# Plotting the MACD line, Signal line, and MACD Histogram
plt.figure(figsize=(14, 7))

# Plot the MACD Histogram
plt.bar(data.index, macd_histogram, label='MACD Histogram', color='blue', alpha=0.5)

plt.title(f'{ticker} MACD Histogram')
plt.xlabel('Date')
plt.ylabel('MACD')
plt.legend()
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

In [None]:
# Bollinger Bands

import pandas as pd
import matplotlib.pyplot as plt

try:
    # Validate required column
    if 'Adjusted' not in data.columns:
        raise ValueError("Missing 'Adjusted' column.")

    # Check for sufficient data
    if len(data) < 20:
        raise ValueError("Insufficient data for Bollinger Bands (need at least 20 days).")

    # Bollinger Bands
    data['Middle Band'] = data['Adjusted'].rolling(window=20).mean()
    data['Upper Band'] = data['Middle Band'] + 2 * data['Adjusted'].rolling(window=20).std()
    data['Lower Band'] = data['Middle Band'] - 2 * data['Adjusted'].rolling(window=20).std()

    # Plot
    plt.figure(figsize=(10, 5))
    plt.plot(data['Adjusted'], label='Adj Close Price')
    plt.plot(data['Middle Band'], label='Middle Band')
    plt.plot(data['Upper Band'], label='Upper Band')
    plt.plot(data['Lower Band'], label='Lower Band')
    plt.fill_between(data.index, data['Upper Band'], data['Lower Band'], alpha=0.1)
    plt.title(f'{ticker} Bollinger Bands')
    plt.xlabel('Date')
    plt.ylabel('Price')
    plt.legend()
    plt.grid(True)
    plt.tight_layout()
    plt.show()

except Exception as e:
    print(f"Error in Bollinger Bands calculation: {e}")

In [None]:
# Stochastic Oscillator

import pandas as pd
import matplotlib.pyplot as plt

try:
    # Validate required columns
    required_columns = ['High', 'Low', 'Adjusted']
    if not all(col in data.columns for col in required_columns):
        raise ValueError(f"Missing required columns: {required_columns}. Available columns: {data.columns}")

    # Check for sufficient data
    if len(data) < 14:
        raise ValueError("Insufficient data for Stochastic Oscillator (need at least 14 days).")

    # Function to calculate the Stochastic Oscillator
    def stochastic_oscillator(data, window):
        low_min = data['Low'].rolling(window=window).min()
        high_max = data['High'].rolling(window=window).max()
        stoch = 100 * ((data['Adjusted'] - low_min) / (high_max - low_min))
        return stoch

    # Calculate %K and %D
    data['%K'] = stochastic_oscillator(data, 14)
    data['%D'] = data['%K'].rolling(window=3).mean()

    # Plot
    plt.figure(figsize=(10, 5))
    plt.plot(data.index, data['%K'], label='Stochastic %K (14)', color='blue')
    plt.plot(data.index, data['%D'], label='Stochastic %D (3)', color='red')
    plt.axhline(80, linestyle='--', alpha=0.5, color='red', label='Overbought (80)')
    plt.axhline(20, linestyle='--', alpha=0.5, color='green', label='Oversold (20)')
    plt.title(f'{ticker} Stochastic Oscillator')
    plt.xlabel('Date')
    plt.ylabel('Stochastic')
    plt.legend()
    plt.grid(True)
    plt.tight_layout()
    plt.show()

except Exception as e:
    print(f"Error in Stochastic Oscillator calculation: {e}")

In [None]:
# Stochastic Oscillator Indication Using Dynamic Overbought and Oversold Thresholds

import pandas as pd
import matplotlib.pyplot as plt

try:
    # Validate required columns
    required_columns = ['High', 'Low', 'Adjusted']
    if not all(col in data.columns for col in required_columns):
        raise ValueError(f"Missing required columns: {required_columns}. Available columns: {data.columns}")

    # Check for sufficient data
    if len(data) < 14:
        raise ValueError("Insufficient data for Stochastic Oscillator (need at least 14 days).")

    # Function to calculate the Stochastic Oscillator
    def stochastic_oscillator(data, window):
        low_min = data['Low'].rolling(window=window).min()
        high_max = data['High'].rolling(window=window).max()
        stoch = 100 * ((data['Adjusted'] - low_min) / (high_max - low_min))
        return stoch

    # Calculate %K and %D
    data['%K'] = stochastic_oscillator(data, 14)
    data['%D'] = data['%K'].rolling(window=3).mean()

    # Calculate dynamic overbought and oversold thresholds
    mean_k = data['%K'].mean()
    std_k = data['%K'].std()
    overbought = mean_k + std_k
    oversold = mean_k - std_k

    # Plot
    plt.figure(figsize=(10, 5))
    plt.plot(data.index, data['%K'], label='Stochastic %K (14)', color='blue')
    plt.plot(data.index, data['%D'], label='Stochastic %D (3)', color='red')
    plt.axhline(80, linestyle='--', alpha=0.5, color='red', label='Overbought (80)')
    plt.axhline(20, linestyle='--', alpha=0.5, color='green', label='Oversold (20)')
    plt.axhline(overbought, linestyle=':', alpha=0.3, color='red', label=f'Dynamic Overbought ({overbought:.2f})')
    plt.axhline(oversold, linestyle=':', alpha=0.3, color='green', label=f'Dynamic Oversold ({oversold:.2f})')
    plt.title(f'{ticker} Stochastic Oscillator with Dynamic Thresholds')
    plt.xlabel('Date')
    plt.ylabel('Stochastic')
    plt.legend()
    plt.grid(True)
    plt.tight_layout()
    plt.show()

except Exception as e:
    print(f"Error in Stochastic Oscillator with dynamic thresholds: {e}")

In [None]:
# Rectified Head and Shoulder Patterns

import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
from scipy.signal import find_peaks

try:
    # Validate required column
    if 'Adjusted' not in data.columns:
        raise ValueError("Missing 'Adjusted' column.")

    # Check for sufficient data
    if len(data) < 50:
        raise ValueError("Insufficient data for Head and Shoulders (need at least 50 days).")

    # Define a function to identify Head and Shoulders pattern
    def identify_head_and_shoulders(data, window=14):
        peaks, _ = find_peaks(data['Adjusted'], distance=window)
        troughs, _ = find_peaks(-data['Adjusted'], distance=window)
        
        patterns = []
        for i in range(1, len(peaks) - 1):
            left_peak = peaks[i - 1]
            head_peak = peaks[i]
            right_peak = peaks[i + 1]
            
            left_trough_idx = np.where((troughs < head_peak) & (troughs > left_peak))[0]
            right_trough_idx = np.where((troughs > head_peak) & (troughs < right_peak))[0]

            if left_trough_idx.size > 0 and right_trough_idx.size > 0:
                left_trough = troughs[left_trough_idx[-1]]
                right_trough = troughs[right_trough_idx[0]]

                # Conditions to validate the Head and Shoulders pattern
                if (data['Adjusted'].iloc[head_peak] > data['Adjusted'].iloc[left_peak]) and \
                   (data['Adjusted'].iloc[head_peak] > data['Adjusted'].iloc[right_peak]) and \
                   (data['Adjusted'].iloc[head_peak] - data['Adjusted'].iloc[left_trough] > 
                    data['Adjusted'].iloc[left_peak] - data['Adjusted'].iloc[left_trough]) and \
                   (data['Adjusted'].iloc[head_peak] - data['Adjusted'].iloc[right_trough] > 
                    data['Adjusted'].iloc[right_peak] - data['Adjusted'].iloc[right_trough]):
                    
                    patterns.append((left_peak, head_peak, right_peak))

        return patterns, peaks, troughs

    # Identify Head and Shoulders patterns
    patterns, peaks, troughs = identify_head_and_shoulders(data)

    # Convert peaks and troughs to DataFrame for plotting
    peaks_df = pd.DataFrame(data['Adjusted'].iloc[peaks])
    troughs_df = pd.DataFrame(data['Adjusted'].iloc[troughs])

    # Plotting the identified patterns
    fig, ax = plt.subplots(figsize=(14, 7))
    ax.plot(data.index, data['Adjusted'], label='Adj Close Price')

    # Plot peaks and troughs
    ax.scatter(peaks_df.index, peaks_df['Adjusted'], marker='^', color='red', label='Peaks')
    ax.scatter(troughs_df.index, troughs_df['Adjusted'], marker='v', color='blue', label='Troughs')

    # Highlight identified Head and Shoulders patterns
    for i, (left_peak, head_peak, right_peak) in enumerate(patterns):
        label = 'Head & Shoulders' if i == 0 else None  # Avoid duplicate legend entries
        ax.plot(data.index[[left_peak, head_peak, right_peak]], 
                data['Adjusted'].iloc[[left_peak, head_peak, right_peak]], 
                color='green', linewidth=1, label=label)

    ax.set_title(f'{ticker} Head and Shoulders Patterns')
    ax.set_xlabel('Date')
    ax.set_ylabel('Price')
    ax.legend()
    ax.grid(True)
    plt.xticks(rotation=45)
    plt.tight_layout()
    plt.show()

    print(f"Detected {len(patterns)} Head and Shoulders patterns.")

except Exception as e:
    print(f"Error in Head and Shoulders detection: {e}")

In [None]:
# Trend Line
plt.figure(figsize=(10, 5))
plt.plot(data['Adjusted'], label='Adj Close Price')
plt.plot(data.index, data['Adjusted'].rolling(window=50).mean(), label='Trend Line', linestyle='--')
plt.title(f'{ticker} Trend Line')
plt.xlabel('Date')
plt.ylabel('Adj Close Price')
plt.legend()
plt.show()

In [None]:
# Support and Resistance Levels 

import pandas as pd
import matplotlib.pyplot as plt
from scipy.signal import argrelextrema
import numpy as np

try:
    # Validate required column
    if 'Adjusted' not in data.columns:
        raise ValueError("Missing 'Adjusted' column.")

    # Check for sufficient data
    if len(data) < 30:
        raise ValueError("Insufficient data for support/resistance levels (need at least 30 days).")

    # Identify local minima and maxima
    n = 30  # Number of points to be checked before and after
    data['Min'] = data.iloc[argrelextrema(data['Adjusted'].values, np.less_equal, order=n)[0]]['Adjusted']
    data['Max'] = data.iloc[argrelextrema(data['Adjusted'].values, np.greater_equal, order=n)[0]]['Adjusted']

    # Filter out NaN values
    support_levels = data.dropna(subset=['Min'])
    resistance_levels = data.dropna(subset=['Max'])

    # Plotting the close price along with support and resistance levels
    plt.figure(figsize=(14, 7))
    plt.plot(data['Adjusted'], label='Adj Close Price')
    plt.scatter(support_levels.index, support_levels['Min'], label='Support Level', color='green', marker='^')
    plt.scatter(resistance_levels.index, resistance_levels['Max'], label='Resistance Level', color='red', marker='v')
    plt.title(f'{ticker} Support and Resistance Levels')
    plt.xlabel('Date')
    plt.ylabel('Price')
    plt.legend()
    plt.grid(True)
    plt.xticks(rotation=45)
    plt.tight_layout()
    plt.show()

    print(f"Detected {len(support_levels)} support levels and {len(resistance_levels)} resistance levels.")

except Exception as e:
    print(f"Error in support/resistance calculation: {e}")

In [None]:
# Volume Analysis
plt.figure(figsize=(10, 5))
plt.bar(data.index, data['Volume'], label='Volume', color='gray')
plt.title(f'{ticker} Volume Analysis')
plt.xlabel('Date')
plt.ylabel('Volume')
plt.legend()
plt.show()

In [None]:
# Fibonacci Retracement Levels

import pandas as pd
import matplotlib.pyplot as plt

try:
    # Validate required column
    if 'Adjusted' not in data.columns:
        raise ValueError("Missing 'Adjusted' column.")

    # Check for sufficient data
    if len(data) < 2:
        raise ValueError("Insufficient data for Fibonacci retracement (need at least 2 days).")

    # Function to plot Fibonacci retracement levels
    def plot_fibonacci_retracement(data, lookback_days=None):
        if lookback_days:
            data = data.tail(lookback_days)
        max_price = data['Adjusted'].max()
        min_price = data['Adjusted'].min()
        difference = max_price - min_price
        first_level = max_price - difference * 0.236
        second_level = max_price - difference * 0.382
        third_level = max_price - difference * 0.618

        plt.figure(figsize=(10, 5))
        plt.plot(data['Adjusted'], label='Adj Close Price')
        plt.axhline(max_price, linestyle='--', alpha=0.5, color='red', label='0%')
        plt.axhline(first_level, linestyle='--', alpha=0.5, color='orange', label='23.6%')
        plt.axhline(second_level, linestyle='--', alpha=0.5, color='yellow', label='38.2%')
        plt.axhline(third_level, linestyle='--', alpha=0.5, color='green', label='61.8%')
        plt.axhline(min_price, linestyle='--', alpha=0.5, color='blue', label='100%')
        plt.title(f'{ticker} Fibonacci Retracement Levels')
        plt.xlabel('Date')
        plt.ylabel('Price')
        plt.legend()
        plt.grid(True)
        plt.tight_layout()
        plt.show()

    plot_fibonacci_retracement(data, lookback_days=252)  # Last year of data

except Exception as e:
    print(f"Error in Fibonacci retracement: {e}")

In [None]:
# Indicators and Oscillators Based Trading Strategy

# Load Data
import pandas as pd
import numpy as np

# Define the path to the CSV file
ticker = 'APPLE INC'
path = r"C:\Users\IMI\Documents\Courses\SAPM\AY 2025-26\R-Exercises\AAPL.csv"

try:
    # Load the CSV file into a pandas DataFrame
    data = pd.read_csv(path)

    # Validate required columns
    required_columns = ['Date', 'Open', 'High', 'Low', 'Close', 'Adjusted']
    if not all(col in data.columns for col in required_columns):
        raise ValueError(f"Missing required columns: {required_columns}. Available columns: {data.columns}")

    # Ensure the index is a DateTime index
    data['Date'] = pd.to_datetime(data['Date'], errors='coerce')
    if data['Date'].isna().any():
        raise ValueError("Invalid date format in 'Date' column.")
    data.set_index('Date', inplace=True)

    # Verify numeric data
    if not all(data[['Open', 'High', 'Low', 'Close', 'Adjusted']].dtypes.apply(lambda x: np.issubdtype(x, np.number))):
        raise ValueError("OHLC and Adjusted columns must contain numeric data.")

    # Display the first few rows and data frequency
    print("Data head:")
    print(data.head())
    print("Data frequency check:")
    print(data.index.to_series().diff().value_counts())

except FileNotFoundError:
    print(f"Error: File not found at {path}")
except Exception as e:
    print(f"Error in data loading: {e}")

In [None]:
data.tail()

In [None]:
# Filter the dates
data = data.loc['2023-01-01':'2023-06-29']

In [None]:
# Trade Signals with Candle Stick Patterns or Execute the Next Cell in Case Candle Stick Based Trading is Not Intended
import pandas as pd
import numpy as np

def generate_indicator_signals(data):
    try:
        # Validate required columns
        required_columns = ['Open', 'High', 'Low', 'Close', 'Volume']
        if not all(col in data.columns for col in required_columns):
            raise ValueError(f"Missing required columns: {required_columns}. Available columns: {data.columns}")

        # Ensure DateTime index
        if not isinstance(data.index, pd.DatetimeIndex):
            data.index = pd.to_datetime(data.index)
            
        # Use Adjusted if available, else Close
        if 'Adjusted' not in data.columns:
            data['Adjusted'] = data['Close']
            
        # Check for sufficient data
        if len(data) < 50:
            raise ValueError("Insufficient data for indicators (need at least 50 days).")

        # Indicator Calculations
        data['SMA_20'] = data['Adjusted'].rolling(window=20, min_periods=20).mean()
        data['SMA_50'] = data['Adjusted'].rolling(window=50, min_periods=50).mean()
        data['EMA_20'] = data['Adjusted'].ewm(span=20, adjust=False).mean()
        data['Volume_MA'] = data['Volume'].rolling(window=20, min_periods=20).mean()

        def calculate_atr(data, window):
            hl = data['High'] - data['Low']
            hc = abs(data['High'] - data['Close'].shift(1))
            lc = abs(data['Low'] - data['Close'].shift(1))
            tr = pd.concat([hl, hc, lc], axis=1).max(axis=1)
            return tr.rolling(window, min_periods=1).mean()
        data['ATR'] = calculate_atr(data, 14)

        def calculate_rsi(data, window):
            delta = data['Adjusted'].diff()
            gain = delta.where(delta > 0, 0)
            loss = -delta.where(delta < 0, 0)
            avg_gain = gain.rolling(window, min_periods=window).mean()
            avg_loss = loss.rolling(window, min_periods=window).mean()
            rs = avg_gain / avg_loss
            rs = rs.replace([np.inf, -np.inf], np.nan).fillna(0)
            return 100 - (100 / (1 + rs))
        data['RSI'] = calculate_rsi(data, 14)

        exp1 = data['Adjusted'].ewm(span=12, adjust=False).mean()
        exp2 = data['Adjusted'].ewm(span=26, adjust=False).mean()
        data['MACD'] = exp1 - exp2
        data['MACD_Signal'] = data['MACD'].ewm(span=9, adjust=False).mean()
        data['MACD_Hist'] = data['MACD'] - data['MACD_Signal']

        data['Middle Band'] = data['Adjusted'].rolling(window=20).mean()
        std = data['Adjusted'].rolling(window=20).std()
        data['Upper Band'] = data['Middle Band'] + 1.5 * std
        data['Lower Band'] = data['Middle Band'] - 1.5 * std

        def stochastic_oscillator(data, window):
            low_min = data['Low'].rolling(window=window).min()
            high_max = data['High'].rolling(window=window).max()
            stoch = 100 * ((data['Adjusted'] - low_min) / (high_max - low_min))
            return stoch
        data['%K'] = stochastic_oscillator(data, 14)
        data['%D'] = data['%K'].rolling(window=3).mean()

        n = 20
        data['Rolling_Low'] = data['Low'].rolling(n, min_periods=n).min()
        data['Rolling_High'] = data['High'].rolling(n, min_periods=n).max()
        data['Support'] = data['Rolling_Low'] + 0.2 * data['ATR']
        data['Resistance'] = data['Rolling_High'] - 0.2 * data['ATR']

        # Fibonacci Retracement (3-month rolling)
        data['Fib_23.6'] = np.nan
        data['Fib_38.2'] = np.nan
        data['Fib_61.8'] = np.nan
        for i in range(63, len(data)):
            window = data.iloc[i-63:i]
            max_price = window['Adjusted'].max()
            min_price = window['Adjusted'].min()
            difference = max_price - min_price
            data.loc[data.index[i], 'Fib_23.6'] = max_price - difference * 0.236
            data.loc[data.index[i], 'Fib_38.2'] = max_price - difference * 0.382
            data.loc[data.index[i], 'Fib_61.8'] = max_price - difference * 0.618

        # Candlestick Patterns
        data['Candlestick_Pattern'] = np.nan
        for i in range(3, len(data)):
            # Bullish Engulfing
            if (data['Close'].iloc[i-1] < data['Open'].iloc[i-1] and
                data['Close'].iloc[i] > data['Open'].iloc[i] and
                data['Close'].iloc[i] > data['Open'].iloc[i-1] and
                data['Open'].iloc[i] < data['Close'].iloc[i-1] and
                data['Volume'].iloc[i] > 0.65 * data['Volume_MA'].iloc[i]):
                data.loc[data.index[i], 'Candlestick_Pattern'] = 'Bullish_Engulfing'
            # Bearish Engulfing
            elif (data['Close'].iloc[i-1] > data['Open'].iloc[i-1] and
                  data['Close'].iloc[i] < data['Open'].iloc[i] and
                  data['Close'].iloc[i] < data['Open'].iloc[i-1] and
                  data['Open'].iloc[i] > data['Close'].iloc[i-1] and
                  data['Volume'].iloc[i] > 0.65 * data['Volume_MA'].iloc[i]):
                data.loc[data.index[i], 'Candlestick_Pattern'] = 'Bearish_Engulfing'
            # Morning Star
            elif (data['Close'].iloc[i-2] < data['Open'].iloc[i-2] and
                  abs(data['Close'].iloc[i-1] - data['Open'].iloc[i-1]) < data['ATR'].iloc[i-1] * 0.2 and
                  data['Close'].iloc[i] > data['Open'].iloc[i] and
                  data['Close'].iloc[i] > data['Middle Band'].iloc[i] and
                  data['Volume'].iloc[i] > 0.65 * data['Volume_MA'].iloc[i]):
                data.loc[data.index[i], 'Candlestick_Pattern'] = 'Morning_Star'
            # Evening Star
            elif (data['Close'].iloc[i-2] > data['Open'].iloc[i-2] and
                  abs(data['Close'].iloc[i-1] - data['Open'].iloc[i-1]) < data['ATR'].iloc[i-1] * 0.2 and
                  data['Close'].iloc[i] < data['Open'].iloc[i] and
                  data['Close'].iloc[i] < data['Middle Band'].iloc[i] and
                  data['Volume'].iloc[i] > 0.65 * data['Volume_MA'].iloc[i]):
                data.loc[data.index[i], 'Candlestick_Pattern'] = 'Evening_Star'
            # Doji
            elif (abs(data['Close'].iloc[i] - data['Open'].iloc[i]) < data['ATR'].iloc[i] * 0.15 and
                  data['Volume'].iloc[i] > 0.65 * data['Volume_MA'].iloc[i]):
                data.loc[data.index[i], 'Candlestick_Pattern'] = 'Doji'
            # Hammer
            elif (data['Close'].iloc[i] >= data['Open'].iloc[i] and
                  (data['Open'].iloc[i] - data['Low'].iloc[i]) > 2 * abs(data['Close'].iloc[i] - data['Open'].iloc[i]) and
                  (data['High'].iloc[i] - data['Close'].iloc[i]) < 0.5 * abs(data['Close'].iloc[i] - data['Open'].iloc[i]) and
                  data['Volume'].iloc[i] > 0.65 * data['Volume_MA'].iloc[i] and
                  data['Adjusted'].iloc[i] <= data['Lower Band'].iloc[i]):
                data.loc[data.index[i], 'Candlestick_Pattern'] = 'Hammer'
            # Shooting Star
            elif (data['Close'].iloc[i] <= data['Open'].iloc[i] and
                  (data['High'].iloc[i] - data['Open'].iloc[i]) > 2 * abs(data['Close'].iloc[i] - data['Open'].iloc[i]) and
                  (data['Close'].iloc[i] - data['Low'].iloc[i]) < 0.5 * abs(data['Close'].iloc[i] - data['Open'].iloc[i]) and
                  data['Volume'].iloc[i] > 0.65 * data['Volume_MA'].iloc[i] and
                  data['Adjusted'].iloc[i] >= data['Upper Band'].iloc[i]):
                data.loc[data.index[i], 'Candlestick_Pattern'] = 'Shooting_Star'
            # Bullish Harami
            elif (data['Close'].iloc[i-1] < data['Open'].iloc[i-1] and
                  data['Close'].iloc[i] > data['Open'].iloc[i] and
                  data['Open'].iloc[i] > data['Close'].iloc[i-1] and
                  data['Close'].iloc[i] < data['Open'].iloc[i-1] and
                  data['Volume'].iloc[i] > 0.65 * data['Volume_MA'].iloc[i]):
                data.loc[data.index[i], 'Candlestick_Pattern'] = 'Bullish_Harami'
            # Bearish Harami
            elif (data['Close'].iloc[i-1] > data['Open'].iloc[i-1] and
                  data['Close'].iloc[i] < data['Open'].iloc[i] and
                  data['Open'].iloc[i] < data['Close'].iloc[i-1] and
                  data['Close'].iloc[i] > data['Open'].iloc[i-1] and
                  data['Volume'].iloc[i] > 0.65 * data['Volume_MA'].iloc[i]):
                data.loc[data.index[i], 'Candlestick_Pattern'] = 'Bearish_Harami'
            # Three White Soldiers
            elif (i > 2 and
                  data['Close'].iloc[i-2] > data['Open'].iloc[i-2] and
                  data['Close'].iloc[i-1] > data['Open'].iloc[i-1] and
                  data['Close'].iloc[i] > data['Open'].iloc[i] and
                  data['Close'].iloc[i] > data['Close'].iloc[i-1] > data['Close'].iloc[i-2] and
                  data['Volume'].iloc[i] > 0.65 * data['Volume_MA'].iloc[i]):
                data.loc[data.index[i], 'Candlestick_Pattern'] = 'Three_White_Soldiers'
            # Three Black Crows
            elif (i > 2 and
                  data['Close'].iloc[i-2] < data['Open'].iloc[i-2] and
                  data['Close'].iloc[i-1] < data['Open'].iloc[i-1] and
                  data['Close'].iloc[i] < data['Open'].iloc[i] and
                  data['Close'].iloc[i] < data['Close'].iloc[i-1] < data['Close'].iloc[i-2] and
                  data['Volume'].iloc[i] > 0.65 * data['Volume_MA'].iloc[i]):
                data.loc[data.index[i], 'Candlestick_Pattern'] = 'Three_Black_Crows'

        # Signal Generation
        data['Indicator_Signal'] = 'Hold'
        data['Signal_Source'] = np.nan
        signal_counts = {}
        state = {
            'in_position': False,
            'trailing_stop': None,
            'last_signal_date': None,
            'last_signal_type': None
        }
        cooldown_days_same = 5
        cooldown_days_diff = 2

        for i in range(50, len(data)):
            if pd.isna(data['SMA_50'].iloc[i]) or pd.isna(data['ATR'].iloc[i]):
                continue

            high_volume = data['Volume'].iloc[i] > 0.65 * data['Volume_MA'].iloc[i]
            price_change = abs(data['Adjusted'].iloc[i] - data['Adjusted'].iloc[i-1])
            momentum = price_change > data['ATR'].iloc[i] * 0.05
            trend = data['Adjusted'].iloc[i] > data['EMA_20'].iloc[i] and data['EMA_20'].iloc[i] > data['EMA_20'].iloc[i-5]

            if state['last_signal_date'] is not None:
                days_since_last = (data.index[i] - state['last_signal_date']).days
                if state['last_signal_type'] in ['Buy', 'Sell'] and days_since_last < cooldown_days_same:
                    continue
                elif state['last_signal_type'] != data['Indicator_Signal'].iloc[i] and days_since_last < cooldown_days_diff:
                    continue

            # Indicator Signals
            signals = []
            sources = []
            # Candlestick Patterns
            if (data['Candlestick_Pattern'].iloc[i] in ['Bullish_Engulfing', 'Morning_Star', 'Hammer', 'Bullish_Harami', 'Three_White_Soldiers'] and
                high_volume and trend and data['Close'].iloc[i] > data['Open'].iloc[i]):
                signals.append('Buy')
                sources.append(data['Candlestick_Pattern'].iloc[i])
            elif (data['Candlestick_Pattern'].iloc[i] in ['Bearish_Engulfing', 'Evening_Star', 'Shooting_Star', 'Bearish_Harami', 'Three_Black_Crows'] and
                  high_volume and not trend and data['Close'].iloc[i] < data['Open'].iloc[i]):
                if state['in_position']:
                    signals.append('Sell')
                    sources.append(data['Candlestick_Pattern'].iloc[i])
            elif (data['Candlestick_Pattern'].iloc[i] == 'Doji' and
                  data['RSI'].iloc[i] > 65 and high_volume and not trend):
                if state['in_position']:
                    signals.append('Sell')
                    sources.append('Doji')
            # Non-Candlestick Indicators
            if (data['SMA_20'].iloc[i] > data['SMA_50'].iloc[i] and
                data['SMA_20'].iloc[i-1] <= data['SMA_50'].iloc[i-1] and
                high_volume and momentum and trend):
                signals.append('Buy')
                sources.append('SMA_Crossover')
            if (data['RSI'].iloc[i] < 35 and 
                data['RSI'].iloc[i-1] >= 35 and
                high_volume and data['RSI'].iloc[i] < data['RSI'].iloc[i-1] and
                data['Close'].iloc[i] > data['Open'].iloc[i] and trend):
                signals.append('Buy')
                sources.append('RSI_Oversold')
            if (data['Low'].iloc[i] <= data['Support'].iloc[i] and
                high_volume and momentum and
                data['Close'].iloc[i] > data['Open'].iloc[i] and trend):
                signals.append('Buy')
                sources.append('Support_Bounce')
            if (data['MACD'].iloc[i] > data['MACD_Signal'].iloc[i] and
                data['MACD'].iloc[i-1] <= data['MACD_Signal'].iloc[i-1] and
                data['MACD_Hist'].iloc[i] > 0 and
                high_volume and trend):
                signals.append('Buy')
                sources.append('MACD_Crossover')
            if (data['Adjusted'].iloc[i] <= data['Lower Band'].iloc[i] and
                data['Adjusted'].iloc[i-1] > data['Lower Band'].iloc[i-1] and
                high_volume and data['Close'].iloc[i] > data['Open'].iloc[i] and trend):
                signals.append('Buy')
                sources.append('Bollinger_Lower')
            if (data['Adjusted'].iloc[i] <= data['Fib_61.8'].iloc[i] and
                data['Adjusted'].iloc[i-1] > data['Fib_61.8'].iloc[i-1] and
                high_volume and momentum and trend):
                signals.append('Buy')
                sources.append('Fib_61.8')
            if (data['%K'].iloc[i] < 20 and data['%D'].iloc[i] < 20 and
                data['%K'].iloc[i] > data['%D'].iloc[i] and
                high_volume and trend):
                signals.append('Buy')
                sources.append('Stochastic_Oversold')
            if state['in_position'] and data['Low'].iloc[i] <= state['trailing_stop']:
                signals.append('Sell')
                sources.append('Trailing_Stop')
            if state['in_position'] and (data['SMA_20'].iloc[i] < data['SMA_50'].iloc[i] and
                                        data['SMA_20'].iloc[i-1] >= data['SMA_50'].iloc[i-1] and
                                        high_volume and not trend):
                signals.append('Sell')
                sources.append('SMA_Crossover')
            if state['in_position'] and (data['High'].iloc[i] >= data['Resistance'].iloc[i] and
                                        high_volume and
                                        data['Close'].iloc[i] < data['Open'].iloc[i] and not trend):
                signals.append('Sell')
                sources.append('Resistance_Rejection')
            if state['in_position'] and (data['MACD'].iloc[i] < data['MACD_Signal'].iloc[i] and
                                        data['MACD'].iloc[i-1] >= data['MACD_Signal'].iloc[i-1] and
                                        data['MACD_Hist'].iloc[i] < 0 and
                                        high_volume and not trend):
                signals.append('Sell')
                sources.append('MACD_Crossover')
            if state['in_position'] and (data['Adjusted'].iloc[i] >= data['Upper Band'].iloc[i] and
                                        data['Adjusted'].iloc[i-1] < data['Upper Band'].iloc[i-1] and
                                        high_volume and data['Close'].iloc[i] < data['Open'].iloc[i] and not trend):
                signals.append('Sell')
                sources.append('Bollinger_Upper')
            if state['in_position'] and (data['Adjusted'].iloc[i] >= data['Fib_23.6'].iloc[i] and
                                        data['Adjusted'].iloc[i-1] < data['Fib_23.6'].iloc[i-1] and
                                        high_volume and not trend):
                signals.append('Sell')
                sources.append('Fib_23.6')
            if state['in_position'] and (data['%K'].iloc[i] > 80 and data['%D'].iloc[i] > 80 and
                                        data['%K'].iloc[i] < data['%D'].iloc[i] and
                                        high_volume and not trend):
                signals.append('Sell')
                sources.append('Stochastic_Overbought')

            # Confirmation Logic
            if state['in_position'] and signals and signals[0] == 'Sell':
                data.loc[data.index[i], 'Indicator_Signal'] = 'Sell'
                data.loc[data.index[i], 'Signal_Source'] = sources[0]
                signal_counts[sources[0] + '_Sell'] = signal_counts.get(sources[0] + '_Sell', 0) + 1
                state['in_position'] = False
                state['trailing_stop'] = None
                state['last_signal_date'] = data.index[i]
                state['last_signal_type'] = 'Sell'
                print(f"Sell Signal at {data.index[i]}: {sources[0]}")
            elif not state['in_position'] and signals and signals[0] == 'Buy':
                data.loc[data.index[i], 'Indicator_Signal'] = 'Buy'
                data.loc[data.index[i], 'Signal_Source'] = sources[0]
                signal_counts[sources[0] + '_Buy'] = signal_counts.get(sources[0] + '_Buy', 0) + 1
                state['in_position'] = True
                state['trailing_stop'] = data['Adjusted'].iloc[i] - 3.5 * data['ATR'].iloc[i]
                state['last_signal_date'] = data.index[i]
                state['last_signal_type'] = 'Buy'
                print(f"Buy Signal at {data.index[i]}: {sources[0]}, trailing_stop={state['trailing_stop']}")
            if state['in_position'] and data['Adjusted'].iloc[i] > data['Adjusted'].iloc[i-1]:
                new_stop = data['Adjusted'].iloc[i] - 3.5 * data['ATR'].iloc[i]
                state['trailing_stop'] = max(state['trailing_stop'], new_stop)

        # Remove Confirmed_Signal logic to simplify
        data = data.drop(columns=['Confirmed_Signal', 'Confirmed_Signal_Source'], errors='ignore')

        print("Detected signals:", signal_counts if signal_counts else "None")
        if not signal_counts:
            print("Warning: No signals detected. Check indicator logic or data.")

        return data

    except Exception as e:
        print(f"Error in signal generation: {e}")
        import traceback
        traceback.print_exc()
        return data

# Apply the function to your DataFrame
try:
    ticker = 'APPLE INC'
    data = generate_indicator_signals(data)
    
    print("Columns in DataFrame:", data.columns.tolist())
    print("\nSignal Counts:")
    print(data['Indicator_Signal'].value_counts())
    print("\nSignal Source Distribution:")
    print(data['Signal_Source'].value_counts(dropna=False))
    
    signals = data[data['Indicator_Signal'] != 'Hold']
    if not signals.empty:
        print("\nSignal Details:")
        print(signals[['Indicator_Signal', 'Signal_Source']].groupby(['Indicator_Signal', 'Signal_Source']).size())
        
        buy_signals = signals[signals['Indicator_Signal'] == 'Buy']
        sell_signals = signals[signals['Indicator_Signal'] == 'Sell']
        if not buy_signals.empty and not sell_signals.empty:
            durations = []
            for buy_date in buy_signals.index:
                next_sell = sell_signals[sell_signals.index > buy_date]
                if not next_sell.empty:
                    sell_date = next_sell.index[0]
                    durations.append((sell_date - buy_date).days)
            
            if durations:
                print(f"\nAverage Position Duration: {np.mean(durations):.1f} days")
                print(f"Min Position Duration: {min(durations)} days")
                print(f"Max Position Duration: {max(durations)} days")

    output_path = r'C:\Users\IMI\Documents\Courses\SAPM\AY 2025-26\R-Exercises\APPLE_IndicatorSignals.xlsx'
    data.to_excel(output_path, sheet_name='Indicator Signals')
    print(f"\nExcel file saved successfully to: {output_path}")

except Exception as e:
    print(f"Error in signal generation: {e}")
    import traceback
    traceback.print_exc()

In [None]:
# Trade Signals without Candle Sticks

import pandas as pd
import numpy as np

def generate_indicator_signals(data):
    try:
        # Validate required columns
        required_columns = ['Open', 'High', 'Low', 'Close', 'Volume']
        if not all(col in data.columns for col in required_columns):
            raise ValueError(f"Missing required columns: {required_columns}")

        # Ensure DateTime index
        if not isinstance(data.index, pd.DatetimeIndex):
            data.index = pd.to_datetime(data.index)
            
        # Use Adjusted if available, else Close
        if 'Adjusted' not in data.columns:
            data['Adjusted'] = data['Close']
            
        # Check for sufficient data
        if len(data) < 50:
            raise ValueError("Insufficient data for indicators (need at least 50 days).")

        # Indicator Calculations
        data['SMA_20'] = data['Adjusted'].rolling(window=20, min_periods=20).mean()
        data['SMA_50'] = data['Adjusted'].rolling(window=50, min_periods=50).mean()
        data['EMA_20'] = data['Adjusted'].ewm(span=20, adjust=False).mean()
        data['Volume_MA'] = data['Volume'].rolling(window=20, min_periods=20).mean()

        def calculate_atr(data, window):
            hl = data['High'] - data['Low']
            hc = abs(data['High'] - data['Close'].shift(1))
            lc = abs(data['Low'] - data['Close'].shift(1))
            tr = pd.concat([hl, hc, lc], axis=1).max(axis=1)
            return tr.rolling(window, min_periods=1).mean()
        data['ATR'] = calculate_atr(data, 14)

        def calculate_rsi(data, window):
            delta = data['Adjusted'].diff()
            gain = delta.where(delta > 0, 0)
            loss = -delta.where(delta < 0, 0)
            avg_gain = gain.rolling(window, min_periods=window).mean()
            avg_loss = loss.rolling(window, min_periods=window).mean()
            rs = avg_gain / avg_loss
            rs = rs.replace([np.inf, -np.inf], np.nan).fillna(0)
            return 100 - (100 / (1 + rs))
        data['RSI'] = calculate_rsi(data, 14)

        exp1 = data['Adjusted'].ewm(span=12, adjust=False).mean()
        exp2 = data['Adjusted'].ewm(span=26, adjust=False).mean()
        data['MACD'] = exp1 - exp2
        data['MACD_Signal'] = data['MACD'].ewm(span=9, adjust=False).mean()
        data['MACD_Hist'] = data['MACD'] - data['MACD_Signal']

        data['Middle Band'] = data['Adjusted'].rolling(window=20).mean()
        std = data['Adjusted'].rolling(window=20).std()
        data['Upper Band'] = data['Middle Band'] + 1.5 * std
        data['Lower Band'] = data['Middle Band'] - 1.5 * std

        n = 20
        data['Rolling_Low'] = data['Low'].rolling(n, min_periods=n).min()
        data['Rolling_High'] = data['High'].rolling(n, min_periods=n).max()
        data['Support'] = data['Rolling_Low'] + 0.2 * data['ATR']
        data['Resistance'] = data['Rolling_High'] - 0.2 * data['ATR']

        # Signal Generation
        data['Indicator_Signal'] = 'Hold'
        data['Signal_Source'] = np.nan
        signal_counts = {}
        state = {
            'in_position': False,
            'trailing_stop': None,
            'last_signal_date': None,
            'last_signal_type': None
        }
        cooldown_days_same = 5
        cooldown_days_diff = 2

        for i in range(50, len(data)):
            if pd.isna(data['SMA_50'].iloc[i]) or pd.isna(data['ATR'].iloc[i]):
                continue

            high_volume = data['Volume'].iloc[i] > 0.65 * data['Volume_MA'].iloc[i]
            price_change = abs(data['Adjusted'].iloc[i] - data['Adjusted'].iloc[i-1])
            momentum = price_change > data['ATR'].iloc[i] * 0.05
            trend = data['Adjusted'].iloc[i] > data['EMA_20'].iloc[i] and data['EMA_20'].iloc[i] > data['EMA_20'].iloc[i-5]

            if state['last_signal_date'] is not None:
                days_since_last = (data.index[i] - state['last_signal_date']).days
                if state['last_signal_type'] in ['Buy', 'Sell'] and days_since_last < cooldown_days_same:
                    continue
                elif state['last_signal_type'] != data['Indicator_Signal'].iloc[i] and days_since_last < cooldown_days_diff:
                    continue

            # Indicator Signals
            signals = []
            sources = []
            if (data['SMA_20'].iloc[i] > data['SMA_50'].iloc[i] and
                data['SMA_20'].iloc[i-1] <= data['SMA_50'].iloc[i-1] and
                high_volume and momentum and trend):
                signals.append('Buy')
                sources.append('SMA_Crossover')
            if (data['RSI'].iloc[i] < 35 and 
                data['RSI'].iloc[i-1] >= 35 and
                high_volume and data['RSI'].iloc[i] < data['RSI'].iloc[i-1] and
                data['Close'].iloc[i] > data['Open'].iloc[i]):
                signals.append('Buy')
                sources.append('RSI_Oversold')
            if (data['Low'].iloc[i] <= data['Support'].iloc[i] and
                high_volume and momentum and
                data['Close'].iloc[i] > data['Open'].iloc[i] and trend):
                signals.append('Buy')
                sources.append('Support_Bounce')
            if (data['MACD'].iloc[i] > data['MACD_Signal'].iloc[i] and
                data['MACD'].iloc[i-1] <= data['MACD_Signal'].iloc[i-1] and
                data['MACD_Hist'].iloc[i] > 0 and
                high_volume and trend):
                signals.append('Buy')
                sources.append('MACD_Crossover')
            if (data['Adjusted'].iloc[i] <= data['Lower Band'].iloc[i] and
                data['Adjusted'].iloc[i-1] > data['Lower Band'].iloc[i-1] and
                high_volume and data['Close'].iloc[i] > data['Open'].iloc[i] and trend):
                signals.append('Buy')
                sources.append('Bollinger_Lower')
            if state['in_position'] and data['Low'].iloc[i] <= state['trailing_stop']:
                signals.append('Sell')
                sources.append('Trailing_Stop')
            if state['in_position'] and (data['SMA_20'].iloc[i] < data['SMA_50'].iloc[i] and
                                        data['SMA_20'].iloc[i-1] >= data['SMA_50'].iloc[i-1] and
                                        high_volume and not trend):
                signals.append('Sell')
                sources.append('SMA_Crossover')
            if state['in_position'] and (data['High'].iloc[i] >= data['Resistance'].iloc[i] and
                                        high_volume and
                                        data['Close'].iloc[i] < data['Open'].iloc[i] and not trend):
                signals.append('Sell')
                sources.append('Resistance_Rejection')
            if state['in_position'] and (data['MACD'].iloc[i] < data['MACD_Signal'].iloc[i] and
                                        data['MACD'].iloc[i-1] >= data['MACD_Signal'].iloc[i-1] and
                                        data['MACD_Hist'].iloc[i] < 0 and
                                        high_volume and not trend):
                signals.append('Sell')
                sources.append('MACD_Crossover')
            if state['in_position'] and (data['Adjusted'].iloc[i] >= data['Upper Band'].iloc[i] and
                                        data['Adjusted'].iloc[i-1] < data['Upper Band'].iloc[i-1] and
                                        high_volume and data['Close'].iloc[i] < data['Open'].iloc[i] and not trend):
                signals.append('Sell')
                sources.append('Bollinger_Upper')

            # Confirmation Logic
            if state['in_position'] and signals and signals[0] == 'Sell':
                data.loc[data.index[i], 'Indicator_Signal'] = 'Sell'
                data.loc[data.index[i], 'Signal_Source'] = sources[0]
                signal_counts[sources[0] + '_Sell'] = signal_counts.get(sources[0] + '_Sell', 0) + 1
                state['in_position'] = False
                state['trailing_stop'] = None
                state['last_signal_date'] = data.index[i]
                state['last_signal_type'] = 'Sell'
                print(f"Sell Signal at {data.index[i]}: {sources[0]}")
            elif not state['in_position'] and signals and signals[0] == 'Buy':
                data.loc[data.index[i], 'Indicator_Signal'] = 'Buy'
                data.loc[data.index[i], 'Signal_Source'] = sources[0]
                signal_counts[sources[0] + '_Buy'] = signal_counts.get(sources[0] + '_Buy', 0) + 1
                state['in_position'] = True
                state['trailing_stop'] = data['Adjusted'].iloc[i] - 3.5 * data['ATR'].iloc[i]
                state['last_signal_date'] = data.index[i]
                state['last_signal_type'] = 'Buy'
                print(f"Buy Signal at {data.index[i]}: {sources[0]}, trailing_stop={state['trailing_stop']}")
            if state['in_position'] and data['Adjusted'].iloc[i] > data['Adjusted'].iloc[i-1]:
                new_stop = data['Adjusted'].iloc[i] - 3.5 * data['ATR'].iloc[i]
                state['trailing_stop'] = max(state['trailing_stop'], new_stop)

        print("Detected signals:", signal_counts if signal_counts else "None")
        if not signal_counts:
            print("Warning: No signals detected. Check indicator logic or data.")

        return data

    except Exception as e:
        print(f"Error in signal generation: {e}")
        import traceback
        traceback.print_exc()
        return data

# Apply the function to your DataFrame
try:
    ticker = 'APPLE INC'
    data = generate_indicator_signals(data)
    
    print("Columns in DataFrame:", data.columns.tolist())
    print("\nSignal Counts:")
    print(data['Indicator_Signal'].value_counts())
    print("\nSignal Source Distribution:")
    print(data['Signal_Source'].value_counts(dropna=False))
    
    signals = data[data['Indicator_Signal'] != 'Hold']
    if not signals.empty:
        print("\nSignal Details:")
        print(signals[['Indicator_Signal', 'Signal_Source']].groupby(['Indicator_Signal', 'Signal_Source']).size())
        
        buy_signals = signals[signals['Indicator_Signal'] == 'Buy']
        sell_signals = signals[signals['Indicator_Signal'] == 'Sell']
        if not buy_signals.empty and not sell_signals.empty:
            durations = []
            for buy_date in buy_signals.index:
                next_sell = sell_signals[sell_signals.index > buy_date]
                if not next_sell.empty:
                    sell_date = next_sell.index[0]
                    durations.append((sell_date - buy_date).days)
            
            if durations:
                print(f"\nAverage Position Duration: {np.mean(durations):.1f} days")
                print(f"Min Position Duration: {min(durations)} days")
                print(f"Max Position Duration: {max(durations)} days")

    output_path = r'C:\Users\IMI\Documents\Courses\SAPM\AY 2025-26\R-Exercises\APPLE_IndicatorSignals.xlsx'
    data.to_excel(output_path, sheet_name='Indicator Signals')
    print(f"\nExcel file saved successfully to: {output_path}")

except Exception as e:
    print(f"Error in signal generation: {e}")
    import traceback
    traceback.print_exc()

In [None]:
# Visualization

import pandas as pd
import matplotlib.pyplot as plt
from mplfinance.original_flavor import candlestick_ohlc
import matplotlib.dates as mdates

def plot_signal(data, signal_type, max_plots=3):
    try:
        required_columns = ['Open', 'High', 'Low', 'Close', 'Indicator_Signal']
        if not all(col in data.columns for col in required_columns):
            raise ValueError(f"Missing required columns: {required_columns}. Available columns: {data.columns}")

        if not isinstance(data.index, pd.DatetimeIndex):
            raise ValueError("DataFrame index must be a DateTimeIndex.")
    
        signal_data = data[data['Indicator_Signal'] == signal_type]
        if signal_data.empty:
            print(f"No occurrences of {signal_type} signals found.")
            return 0

        plot_count = 0
        skipped_dates = []
        for date in signal_data.index:
            if plot_count >= max_plots:
                break

            try:
                idx = data.index.get_loc(date)
                start_idx = max(0, idx - 3)
                end_idx = min(len(data), idx + 4)
                window = data.iloc[start_idx:end_idx].copy()
                window = window.dropna(subset=['Open', 'High', 'Low', 'Close'])

                if len(window) < 2:
                    skipped_dates.append(date.date())
                    print(f"Skipped {signal_type} plot for {date.date()}: Insufficient data points ({len(window)})")
                    continue

                data_ohlc = window[['Open', 'High', 'Low', 'Close']].copy()
                data_ohlc.reset_index(inplace=True)
                data_ohlc['Date'] = data_ohlc['Date'].map(mdates.date2num)

                fig, ax = plt.subplots(figsize=(10, 5))
                candlestick_ohlc(ax, data_ohlc.values, width=0.6, colorup='g', colordown='r')

                signal_date_num = mdates.date2num([date])[0]
                signal_source = data.loc[date, 'Signal_Source']
                if signal_type == 'Buy':
                    ax.scatter([signal_date_num], [window['Low'].min() * 0.98], marker='^', color='blue', s=100, label=f'Buy Signal ({signal_source})')
                elif signal_type == 'Sell':
                    ax.scatter([signal_date_num], [window['High'].max() * 1.02], marker='v', color='red', s=100, label=f'Sell Signal ({signal_source})')

                ax.xaxis_date()
                ax.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d'))
                ax.set_title(f'{signal_type} Signal on {date.date()} ({signal_source})')
                ax.set_xlabel('Date')
                ax.set_ylabel('Price')
                ax.legend()
                ax.grid(True)
                plt.xticks(rotation=45)
                plt.tight_layout()
                plt.show()

                plot_count += 1

            except Exception as e:
                skipped_dates.append(date.date())
                print(f"Error plotting {signal_type} on {date.date()}: {e}")

        print(f"Generated {plot_count} {signal_type} plots.")
        if skipped_dates:
            print(f"Skipped {signal_type} plots for dates: {skipped_dates}")
        return plot_count

    except Exception as e:
        print(f"Error in visualization: {e}")
        return 0

try:
    total_plots = 0
    for signal in ['Buy', 'Sell']:
        total_plots += plot_signal(data, signal, max_plots=3)
    print(f"Total plots generated: {total_plots}")

except Exception as e:
    print(f"Error in signal visualization: {e}")

In [None]:
# Strategy Performance

import pandas as pd
import matplotlib.pyplot as plt
import numpy as np

def calculate_strategy_performance(df):
    try:
        required_columns = ['Indicator_Signal', 'Adjusted']
        if not all(col in df.columns for col in required_columns):
            raise ValueError(f"Missing required columns: {required_columns}. Available columns: {df.columns}")

        if not isinstance(df.index, pd.DatetimeIndex):
            raise ValueError("DataFrame index must be a DateTimeIndex.")

        df['Position'] = 0
        df['Strategy_Returns'] = 0.0
        df['Market_Returns'] = df['Adjusted'].pct_change().fillna(0)
        transaction_cost = 0.001

        position = 0
        trade_details = []
        buy_price = None
        buy_date = None
        buy_source = None
        for i in range(1, len(df)):
            if df.loc[df.index[i], 'Indicator_Signal'] == 'Buy' and position == 0:
                position = 1
                buy_price = df['Adjusted'].iloc[i]
                buy_date = df.index[i]
                buy_source = df['Signal_Source'].iloc[i]
            elif df.loc[df.index[i], 'Indicator_Signal'] == 'Sell' and position == 1:
                position = 0
                sell_price = df['Adjusted'].iloc[i]
                sell_date = df.index[i]
                sell_source = df['Signal_Source'].iloc[i]
                if buy_price is not None:
                    trade_return = (sell_price / buy_price - 1) - 2 * transaction_cost
                    trade_details.append({
                        'Buy_Date': buy_date,
                        'Sell_Date': sell_date,
                        'Buy_Source': buy_source,
                        'Sell_Source': sell_source,
                        'Return': trade_return,
                        'Holding_Period': (sell_date - buy_date).days
                    })
            df.loc[df.index[i], 'Position'] = position
            df.loc[df.index[i], 'Strategy_Returns'] = position * df.loc[df.index[i], 'Market_Returns'] - transaction_cost * abs(df['Position'].iloc[i] - df['Position'].iloc[i-1])

        df['Cumulative_Strategy_Returns'] = (1 + df['Strategy_Returns']).cumprod() - 1
        df['Cumulative_Market_Returns'] = (1 + df['Market_Returns']).cumprod() - 1

        days = (df.index[-1] - df.index[0]).days
        years = days / 365.25
        annual_trading_days = 252
        strategy_total_return = df['Cumulative_Strategy_Returns'].iloc[-1]
        market_total_return = df['Cumulative_Market_Returns'].iloc[-1]
        strategy_annualized_return = (1 + strategy_total_return) ** (1 / years) - 1
        market_annualized_return = (1 + market_total_return) ** (1 / years) - 1
        strategy_volatility = df['Strategy_Returns'].std() * np.sqrt(annual_trading_days)
        market_volatility = df['Market_Returns'].std() * np.sqrt(annual_trading_days)
        downside_vol_strategy = df[df['Strategy_Returns'] < 0]['Strategy_Returns'].std() * np.sqrt(annual_trading_days)
        downside_vol_market = df[df['Market_Returns'] < 0]['Market_Returns'].std() * np.sqrt(annual_trading_days)
        sharpe_ratio_strategy = strategy_annualized_return / strategy_volatility if strategy_volatility != 0 else np.nan
        sharpe_ratio_market = market_annualized_return / market_volatility if market_volatility != 0 else np.nan
        sortino_ratio_strategy = strategy_annualized_return / downside_vol_strategy if downside_vol_strategy != 0 else np.nan
        sortino_ratio_market = market_annualized_return / downside_vol_market if downside_vol_market != 0 else np.nan
        max_drawdown_strategy = (df['Cumulative_Strategy_Returns'].cummax() - df['Cumulative_Strategy_Returns']).max()
        max_drawdown_market = (df['Cumulative_Market_Returns'].cummax() - df['Cumulative_Market_Returns']).max()
        trades = len(trade_details)
        win_rate = len([t for t in trade_details if t['Return'] > 0]) / trades if trades > 0 else 0
        avg_holding_period = np.mean([t['Holding_Period'] for t in trade_details]) if trade_details else 0

        # Signal Quality Metrics
        signal_hit_ratios = {}
        for source in df['Signal_Source'].dropna().unique():
            source_trades = [t for t in trade_details if t['Buy_Source'] == source or t['Sell_Source'] == source]
            if source_trades:
                source_win_rate = len([t for t in source_trades if t['Return'] > 0]) / len(source_trades)
                signal_hit_ratios[source] = source_win_rate

        print(f"Indicator Strategy Performance Metrics:")
        print(f"  Annualized Return: {strategy_annualized_return:.4f}")
        print(f"  Volatility: {strategy_volatility:.4f}")
        print(f"  Sharpe Ratio: {sharpe_ratio_strategy:.4f}")
        print(f"  Sortino Ratio: {sortino_ratio_strategy:.4f}")
        print(f"  Max Drawdown: {max_drawdown_strategy:.4f}")
        print(f"  Number of Trades: {trades:.0f}")
        print(f"  Win Rate: {win_rate:.4f}")
        print(f"  Average Holding Period: {avg_holding_period:.1f} days")
        print(f"Market (Buy-and-Hold) Performance Metrics:")
        print(f"  Annualized Return: {market_annualized_return:.4f}")
        print(f"  Volatility: {market_volatility:.4f}")
        print(f"  Sharpe Ratio: {sharpe_ratio_market:.4f}")
        print(f"  Sortino Ratio: {sortino_ratio_market:.4f}")
        print(f"  Max Drawdown: {max_drawdown_market:.4f}")
        print("Signal Source Distribution:")
        print(df['Signal_Source'].value_counts(dropna=False))
        print("Signal Counts:")
        print(df['Indicator_Signal'].value_counts())
        print("Trade Dates:")
        trade_dates = df[df['Position'].diff().abs() > 0][['Indicator_Signal', 'Signal_Source', 'Adjusted']]
        print(trade_dates)
        print("Signal Profitability by Source:")
        print(df[df['Strategy_Returns'] != 0].groupby('Signal_Source')['Strategy_Returns'].mean())
        print("Individual Trade Details:")
        for t in trade_details:
            print(f"Buy: {t['Buy_Date'].date()} ({t['Buy_Source']}) -> Sell: {t['Sell_Date'].date()} ({t['Sell_Source']}), Return: {t['Return']:.4f}, Holding: {t['Holding_Period']} days")
        print("Signal Hit Ratios by Source:")
        for source, hit_ratio in signal_hit_ratios.items():
            print(f"{source}: {hit_ratio:.4f}")

        plt.figure(figsize=(12, 6))
        plt.plot(df.index, df['Cumulative_Strategy_Returns'], label='Indicator Strategy Returns')
        plt.plot(df.index, df['Cumulative_Market_Returns'], label='Market Returns (Buy and Hold)')
        plt.title(f'APPLE INC Indicator Strategy Performance vs. Market')
        plt.xlabel('Date')
        plt.ylabel('Cumulative Returns')
        plt.legend()
        plt.grid(True)
        plt.tight_layout()
        plt.show()

        return df

    except Exception as e:
        print(f"Error in performance evaluation: {e}")
        return df

try:
    ticker = 'APPLE INC'
    data = calculate_strategy_performance(data)
    print("Columns in DataFrame:", data.columns.tolist())

    output_path = r'C:\Users\IMI\Documents\Courses\SAPM\AY 2025-26\R-Exercises\APPLE_IndicatorStrategy.xlsx'
    data.to_excel(output_path, sheet_name='Strategy Performance')
    print("Excel file generated successfully!")

except Exception as e:
    print(f"Error in strategy performance: {e}")

In [None]:
# Eliot Wave Theory Demonstration

import pandas as pd
import matplotlib.pyplot as plt
from scipy.signal import find_peaks

# Function to identify Elliott Waves
def identify_elliott_waves(data, prominence=20):
    peaks, _ = find_peaks(data['Adjusted'], prominence=prominence)
    troughs, _ = find_peaks(-data['Adjusted'], prominence=prominence)
    return peaks, troughs

# Function to validate impulse wave rules
def is_valid_impulse_wave(data, wave_points):
    # Wave 2 should not retrace more than 100% of Wave 1
    if data['Adjusted'].iloc[wave_points[1]] >= data['Adjusted'].iloc[wave_points[0]]:
        return False
    # Wave 3 should not be the shortest and should surpass Wave 1
    if (data['Adjusted'].iloc[wave_points[2]] <= data['Adjusted'].iloc[wave_points[0]]) or \
       (data['Adjusted'].iloc[wave_points[2]] <= data['Adjusted'].iloc[wave_points[4]]):
        return False
    # Wave 4 should not overlap with the price range of Wave 1
    if data['Adjusted'].iloc[wave_points[3]] >= data['Adjusted'].iloc[wave_points[1]]:
        return False
    return True

# Identify potential Elliott Waves
peaks, troughs = identify_elliott_waves(data, prominence=20)

# Debugging: Print identified peaks and troughs
print("Peaks identified at:", peaks)
print("Troughs identified at:", troughs)

# Looking for impulse wave (5-wave pattern)
impulse_wave = []
for i in range(len(peaks) - 4):
    potential_wave = peaks[i:i+5]
    if is_valid_impulse_wave(data, potential_wave):
        impulse_wave = potential_wave
        print("Valid impulse wave found:", impulse_wave)
        break

# Plotting the data with peaks and troughs
fig, ax = plt.subplots(figsize=(14, 7))
ax.plot(data.index, data['Adjusted'], label='Adj Close Price')

# Plot peaks and troughs
ax.scatter(data.index[peaks], data['Adjusted'].iloc[peaks], marker='^', color='red', label='Peaks')
ax.scatter(data.index[troughs], data['Adjusted'].iloc[troughs], marker='v', color='blue', label='Troughs')

# Annotate Impulse Waves if valid impulse wave is found
if len(impulse_wave) > 0:
    wave_labels = ['Wave 1', 'Wave 2', 'Wave 3', 'Wave 4', 'Wave 5']
    for i, peak in enumerate(impulse_wave):
        ax.annotate(wave_labels[i], (data.index[peak], data['Adjusted'].iloc[peak]), 
                    textcoords="offset points", xytext=(0,10), ha='center', fontsize=12, color='black')

ax.set_title(f'{ticker} - Elliott Wave Impulse Identification')
ax.set_xlabel('Date')
ax.set_ylabel('Price')
ax.legend()
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy.signal import find_peaks

# Use adjusted close price for analysis
price = data['Adjusted']

# Identify peaks and troughs with adjusted parameters
peaks, _ = find_peaks(price, distance=30, prominence=5)
troughs, _ = find_peaks(-price, distance=30, prominence=5)

# Plot the price data with peaks and troughs to verify detection
plt.figure(figsize=(14, 7))
plt.plot(price, label='Adj Close Price')

# Plot the peaks and troughs for verification
plt.plot(price.index[peaks], price.iloc[peaks], 'r^', label='Peaks')
plt.plot(price.index[troughs], price.iloc[troughs], 'bv', label='Troughs')

plt.title(f'{ticker} - Peaks and Troughs Identification')
plt.xlabel('Date')
plt.ylabel('Price')
plt.legend()
plt.grid(False)
plt.show()

# Function to label Elliott Waves (impulse)
def label_elliott_waves(peaks, troughs, price, flexibility=0.15):
    wave_labels = {}
    wave_count = 0
    i = 0

    while i < len(peaks) - 4 and i < len(troughs) - 4:
        # Identify candidate points for the waves
        wave_1 = troughs[i]
        wave_2 = peaks[i + 1]
        wave_3 = troughs[i + 2]
        wave_4 = peaks[i + 3]
        wave_5 = troughs[i + 4]

        # Validate wave sequence (prioritize correct order)
        if wave_1 < wave_2 and wave_2 < wave_3 and wave_3 < wave_4 and wave_4 < wave_5:
            # Conditions to identify a valid impulse wave pattern, with flexibility
            if (price.iloc[wave_1] < price.iloc[wave_3] < price.iloc[wave_5] + price.iloc[wave_3] * flexibility) and \
               (price.iloc[wave_2] > price.iloc[wave_4] - price.iloc[wave_2] * flexibility) and \
               (price.iloc[wave_3] < price.iloc[wave_4]) and \
               (price.iloc[wave_3] < price.iloc[wave_5]):

                # Check for overlapping waves
                if not any(wave in wave_labels.values() for wave in [wave_1, wave_2, wave_3, wave_4, wave_5]):
                    wave_count += 1
                    wave_labels[f'Wave 1-{wave_count}'] = wave_1
                    wave_labels[f'Wave 2-{wave_count}'] = wave_2
                    wave_labels[f'Wave 3-{wave_count}'] = wave_3
                    wave_labels[f'Wave 4-{wave_count}'] = wave_4
                    wave_labels[f'Wave 5-{wave_count}'] = wave_5
                    i += 5  # Move forward by 5 to avoid overlapping wave detection
                else:
                    i += 1  # Move to the next potential wave set
            else:
                i += 1  # Move to the next potential wave set
        else:
            i += 1  # Move to the next potential wave set

    return wave_labels

# Apply the function to label the waves
wave_labels = label_elliott_waves(peaks, troughs, price)

# If no waves are found, print a warning message
if not wave_labels:
    print("No valid Elliott Waves found.")

# Plot the identified waves on top of the price data
plt.figure(figsize=(14, 7))
plt.plot(price, label='Adj Close Price')

# Plot the peaks and troughs for verification
plt.plot(price.index[peaks], price.iloc[peaks], 'r^', label='Peaks')
plt.plot(price.index[troughs], price.iloc[troughs], 'bv', label='Troughs')

# Annotate the identified waves if any
for wave, index in wave_labels.items():
    plt.annotate(wave, (price.index[index], price.iloc[index]),
                 textcoords="offset points", xytext=(-10, 10), ha='center')

plt.title(f'{ticker} - Elliott Wave Impulse Identification')
plt.xlabel('Date')
plt.ylabel('Price')
plt.legend()
plt.grid(False)
plt.show()

In [None]:
# Ichimoku Cloud

import numpy as np

# Ichimoku Cloud
data['Tenkan-sen'] = (data['High'].rolling(window=9).max() + data['Low'].rolling(window=9).min()) / 2
data['Kijun-sen'] = (data['High'].rolling(window=26).max() + data['Low'].rolling(window=26).min()) / 2
data['Senkou Span A'] = ((data['Tenkan-sen'] + data['Kijun-sen']) / 2).shift(26)
data['Senkou Span B'] = ((data['High'].rolling(window=52).max() + data['Low'].rolling(window=52).min()) / 2).shift(26)
data['Chikou Span'] = data['Adjusted'].shift(-26)

# Plot Ichimoku Cloud
plt.figure(figsize=(10, 5))
plt.plot(data['Adjusted'], label='Adj Close Price')
plt.plot(data['Tenkan-sen'], label='Tenkan-sen')
plt.plot(data['Kijun-sen'], label='Kijun-sen')
plt.fill_between(data.index, data['Senkou Span A'], data['Senkou Span B'], where=(data['Senkou Span A'] >= data['Senkou Span B']), color='lightgreen', alpha=0.5)
plt.fill_between(data.index, data['Senkou Span A'], data['Senkou Span B'], where=(data['Senkou Span A'] < data['Senkou Span B']), color='lightcoral', alpha=0.5)
plt.plot(data['Chikou Span'], label='Chikou Span')
plt.title(f'{ticker} Ichimoku Cloud')
plt.xlabel('Date')
plt.ylabel('Price')
plt.legend()
plt.show()

### Predictive Analytics

In [None]:
import pandas as pd
import matplotlib.pyplot as plt

# Define the path to the CSV file
ticker = 'APPLE INC'
path = r"C:\Users\IMI\Documents\Courses\SAPM\AY 2025-26\R-Exercises\AAPL.csv"

# Load the CSV file into a pandas DataFrame
data = pd.read_csv(path)

# Ensure the index is a DateTime index (assuming the CSV has a 'Date' column)
data['Date'] = pd.to_datetime(data['Date'])
data.set_index('Date', inplace=True)

# Display the first few rows of the data
data.head()

In [None]:
# ARIMA

import pandas as pd
import matplotlib.pyplot as plt
from statsmodels.tsa.arima.model import ARIMA
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
from statsmodels.tsa.stattools import adfuller
import warnings
warnings.filterwarnings("ignore")

# Check for stationarity
result = adfuller(data['Close'])
print('ADF Statistic:', result[0])
print('p-value:', result[1])
for key, value in result[4].items():
    print('Critial Values:')
    print(f'   {key}, {value}')

# Plot ACF and PACF
plt.figure(figsize=(12, 6))
plt.subplot(211)
plot_acf(data['Close'].diff().dropna(), ax=plt.gca())
plt.subplot(212)
plot_pacf(data['Close'].diff().dropna(), ax=plt.gca())
plt.show()

In [None]:
#Differencing the series for stationarity

data['Close_diff'] = data['Close'].diff().dropna()

In [None]:
# Fitting an ARIMA Model

from statsmodels.tsa.arima.model import ARIMA

# Fit ARIMA model
model = ARIMA(data['Close'], order=(1, 1, 1))  # Use the values from ACF and PACF plots
model_fit = model.fit()
print(model_fit.summary())

In [None]:
# Checking the Residuals

residuals = model_fit.resid
plt.figure(figsize=(12, 6))
plt.subplot(211)
plt.plot(residuals)
plt.subplot(212)
plot_acf(residuals, ax=plt.gca())
plt.show()

In [None]:
# Forecast using the model
forecast_steps = 30
forecast = model_fit.forecast(steps=forecast_steps)
forecast_index = pd.date_range(start=data.index[-1], periods=forecast_steps, freq='D')

# Plot the actual and forecasted values
plt.figure(figsize=(12, 6))
plt.plot(data.index, data['Close'], label='Actual')
plt.plot(forecast_index, forecast, label='Forecast')
plt.legend()
plt.title(f'{ticker} ARIMA Forecast')
plt.xlabel('Date')
plt.ylabel('Price')
plt.show()

In [None]:
# Dynamic ARIMA Model

import pandas as pd
import matplotlib.pyplot as plt
from statsmodels.tsa.arima.model import ARIMA
import warnings
warnings.filterwarnings("ignore")

# Function to find the best ARIMA model based on AIC
def find_best_arima_model(data, max_p, max_d, max_q):
    best_aic = float("inf")
    best_order = None
    best_model = None
    
    for p in range(max_p + 1):
        for d in range(max_d + 1):
            for q in range(max_q + 1):
                try:
                    model = ARIMA(data['Close'], order=(p, d, q))
                    model_fit = model.fit()
                    aic = model_fit.aic
                    if aic < best_aic:
                        best_aic = aic
                        best_order = (p, d, q)
                        best_model = model_fit
                except:
                    continue
    
    return best_order, best_model

# Set the range for p, d, q
max_p = 5
max_d = 2
max_q = 5

# Find the best ARIMA model
best_order, best_model = find_best_arima_model(data, max_p, max_d, max_q)

# Print the best model summary
print(f"Best ARIMA order: {best_order}")
print(best_model.summary())

# Forecast using the best model
forecast = best_model.forecast(steps=30)
forecast_index = pd.date_range(start=data.index[-1], periods=30, freq='D')

# Plot the actual and forecasted values
plt.figure(figsize=(12, 6))
plt.plot(data.index, data['Close'], label='Actual')
plt.plot(forecast_index, forecast, label='Forecast')
plt.legend()
plt.title(f'{ticker} ARIMA Forecast')
plt.xlabel('Date')
plt.ylabel('Price')
plt.show()

In [None]:
# Exponential Smoothing Model

import pandas as pd
import matplotlib.pyplot as plt
from statsmodels.tsa.holtwinters import ExponentialSmoothing
import warnings
warnings.filterwarnings("ignore")

# Fit ETS model
model = ExponentialSmoothing(data['Close'], trend='add', seasonal='add', seasonal_periods=12)
model_fit = model.fit()
forecast_steps = 30
forecast = model_fit.forecast(steps=forecast_steps)

# Print the forecast to verify
#print(forecast)

# Generate forecast index
forecast_index = pd.date_range(start=data.index[-1] + pd.Timedelta(days=1), periods=forecast_steps, freq='D')

# Print the forecast index to verify
#print(forecast_index)

# Create a DataFrame for the forecast to ensure proper indexing
forecast_series = pd.Series(forecast.values, index=forecast_index)

# Print the forecast series to verify
print(forecast_series)

# Plot the actual and forecasted values
plt.figure(figsize=(12, 6))
plt.plot(data.index, data['Close'], label='Actual')
plt.plot(forecast_series.index, forecast_series, label='Forecast', color='orange')
plt.legend()
plt.title(f'{ticker} Exponential Smoothing Forecast')
plt.xlabel('Date')
plt.ylabel('Price')
plt.show()