In [2]:
import pandas as pd
import numpy as np
import plotly.graph_objects as go
import plotly.io as pio
from plotly.subplots import make_subplots
from datetime import timedelta
from dateutil.relativedelta import relativedelta

In [3]:
# ---- Load Data (Ensure You Replace This with Actual PostgreSQL Query) ----
origin_df = pd.read_csv('../Gold Futures Historical Data.csv')
origin_df.dropna(inplace=True)

df = origin_df[['Date', 'Price','Open','High' ,'Low']]
df = df.rename(columns={
    'Date': 'date_time',
    'Price': 'close',
    'Open': 'open',
    'High': 'high',
    'Low': 'low'
})

# Convert to datetime format
df['date_time'] = pd.to_datetime(df['date_time'])
# Remove commas and convert to float
for col in ['open', 'high', 'low', 'close']:
    df[col] = df[col].astype(str).str.replace(',', '').astype(float)

df = df.sort_values('date_time', ascending=True).reset_index(drop=True)

# Set CCI window size
n = 20

# Compute Typical Price (TP)
df['TP'] = (df['high'] + df['low'] + df['close']) / 3

# Compute Simple Moving Average (SMA) of TP
df['SMA_TP'] = df['TP'].rolling(window=n).mean()

# Compute Mean Absolute Deviation (MAD)
df['MAD'] = df['TP'].rolling(window=n).apply(lambda x: np.mean(np.abs(x - np.mean(x))), raw=True)

# Compute CCI
df['cci'] = (df['TP'] - df['SMA_TP']) / (0.015 * df['MAD'])

# Compute ADX Indicator
adx_period = 14
df['TR'] = np.maximum(df['high'] - df['low'], np.maximum(abs(df['high'] - df['close'].shift(1)), abs(df['low'] - df['close'].shift(1))))
df['+DM'] = np.where((df['high'] - df['high'].shift(1)) > (df['low'].shift(1) - df['low']), df['high'] - df['high'].shift(1), 0)
df['-DM'] = np.where((df['low'].shift(1) - df['low']) > (df['high'] - df['high'].shift(1)), df['low'].shift(1) - df['low'], 0)
df['TR_smooth'] = df['TR'].rolling(window=adx_period).mean()
df['+DM_smooth'] = df['+DM'].rolling(window=adx_period).mean()
df['-DM_smooth'] = df['-DM'].rolling(window=adx_period).mean()
df['+DI'] = 100 * (df['+DM_smooth'] / df['TR_smooth'])
df['-DI'] = 100 * (df['-DM_smooth'] / df['TR_smooth'])
df['DX'] = 100 * abs(df['+DI'] - df['-DI']) / (df['+DI'] + df['-DI'])
df['ADX'] = df['DX'].rolling(window=adx_period).mean()

stoch_period = 14  # Lookback period
smoothing_period = 3  # %D moving average period

# Compute Stochastic %K
df['stoch_k'] = 100 * (df['close'] - df['low'].rolling(window=stoch_period).min()) / \
                (df['high'].rolling(window=stoch_period).max() - df['low'].rolling(window=stoch_period).min())

# Compute Stochastic %D (3-period SMA of %K)
df['stoch_d'] = df['stoch_k'].rolling(window=smoothing_period).mean()

# Compute ATR for volatility measurement
df['ATR'] = df['high'].rolling(window=n).mean() - df['low'].rolling(window=n).mean()

# Define baseline ATR for normalization (e.g., median ATR value)
atr_baseline = df['ATR'].median()

# Initialize signal columns
df['bullish_divergence'] = 0
df['bearish_divergence'] = 0
df['bullish_convergence'] = 0
df['bearish_convergence'] = 0

for i in range(2, len(df)):
    price_now, price_prev = df['close'].iloc[i], df['close'].iloc[i-1]
    cci_now, cci_prev = df['cci'].iloc[i], df['cci'].iloc[i-1]

    # Compute dynamic cooldown based on volatility
    dynamic_cooldown = max(3, int(df['ATR'].fillna(0).iloc[i] / atr_baseline * 8))  # Adjust multiplier (5) as needed

    # Bullish Divergence
    if price_now < price_prev and cci_now > cci_prev and df['cci'][i] < -100:
        if df['bullish_divergence'].shift(dynamic_cooldown).iloc[i] == 0:
            df.loc[i, 'bullish_divergence'] = 1

    # Bearish Divergence
    if price_now > price_prev and cci_now < cci_prev and df['cci'][i] > 100:
        if df['bearish_divergence'].shift(dynamic_cooldown).iloc[i] == 0:
            df.loc[i, 'bearish_divergence'] = 1

    # Bullish Convergence
    if price_now > price_prev and cci_now > cci_prev:
        if df['bullish_convergence'].shift(dynamic_cooldown).iloc[i] == 0:
            df.loc[i, 'bullish_convergence'] = 1

    # Bearish Convergence
    if price_now < price_prev and cci_now < cci_prev:
        if df['bearish_convergence'].shift(dynamic_cooldown).iloc[i] == 0:
            df.loc[i, 'bearish_convergence'] = 1

# ---- Set Time Window for Visualization ----
end_date = df['date_time'].max()
start_date = end_date - relativedelta(weeks=8)

filtered_df_6h = df[(df['date_time'] >= start_date) & (df['date_time'] <= end_date)].copy()
filtered_df_6h = filtered_df_6h.sort_values('date_time')  # Sort for correct plotting order

# Convert timestamps to strings to enforce categorical axis
filtered_df_6h['date_time'] = filtered_df_6h['date_time'].astype(str)

# ---- Identify Divergence & Convergence Points in the Filtered Data ----
bullish_points = filtered_df_6h[filtered_df_6h['bullish_divergence'] == 1]
bearish_points = filtered_df_6h[filtered_df_6h['bearish_divergence'] == 1]
bullish_convergence = filtered_df_6h[filtered_df_6h['bullish_convergence'] == 1]
bearish_convergence = filtered_df_6h[filtered_df_6h['bearish_convergence'] == 1]

# ---- Update Subplot Titles ----
fig = make_subplots(rows=4, cols=1, shared_xaxes=True, 
                    vertical_spacing=0.02, subplot_titles=('Candlestick Chart', 'CCI', 'ADX', 'Stochastic Oscillator'), 
                    row_width=[0.2, 0.2, 0.2, 0.4])


# ---- Candlestick Chart ----
fig.add_trace(go.Candlestick(x=filtered_df_6h['date_time'],
                             open=filtered_df_6h['open'], high=filtered_df_6h['high'],
                             low=filtered_df_6h['low'], close=filtered_df_6h['close'], 
                             name='Candles'),
              row=1, col=1)

# ---- Divergence Markers ----
# fig.add_trace(go.Scatter(x=bullish_points['date_time'], y=bullish_points['low'],
#                          mode='markers', marker=dict(color='green', symbol='triangle-up', size=10),
#                          name='Bullish Divergence'),
#               row=1, col=1)

# fig.add_trace(go.Scatter(x=bearish_points['date_time'], y=bearish_points['high'],
#                          mode='markers', marker=dict(color='red', symbol='triangle-down', size=10),
#                          name='Bearish Divergence'),
#               row=1, col=1)

# ---- CCI Oscillator ----
fig.add_trace(go.Scatter(x=filtered_df_6h['date_time'], y=filtered_df_6h['cci'], 
                         mode='lines', name='CCI', 
                         line=dict(color='blue')),
              row=2, col=1)

# ---- ADX Indicator ----
fig.add_trace(go.Scatter(x=filtered_df_6h['date_time'], y=filtered_df_6h['ADX'], 
                         mode='lines', name='ADX', 
                         line=dict(color='purple')),
              row=3, col=1)

# ---- Add Stochastic Oscillator (%K and %D) ----
fig.add_trace(go.Scatter(x=filtered_df_6h['date_time'], y=filtered_df_6h['stoch_k'],
                         mode='lines', name='%K', line=dict(color='blue')),
              row=4, col=1)

fig.add_trace(go.Scatter(x=filtered_df_6h['date_time'], y=filtered_df_6h['stoch_d'],
                         mode='lines', name='%D', line=dict(color='orange')),
              row=4, col=1)

# ---- Overbought and Oversold Levels for Stochastic ----
fig.add_shape(type="line", x0=filtered_df_6h['date_time'].min(), x1=filtered_df_6h['date_time'].max(),
              y0=80, y1=80, line=dict(color="red", width=2, dash="dash"), row=4, col=1, yref="y4")

fig.add_shape(type="line", x0=filtered_df_6h['date_time'].min(), x1=filtered_df_6h['date_time'].max(),
              y0=20, y1=20, line=dict(color="green", width=2, dash="dash"), row=4, col=1, yref="y4")

# ---- Divergence Points on CCI ----
fig.add_trace(go.Scatter(x=bullish_points['date_time'], y=bullish_points['cci'],
                         mode='markers', marker=dict(color='green', symbol='triangle-up', size=10),
                         name='Bullish Divergence on CCI'),
              row=2, col=1)

fig.add_trace(go.Scatter(x=bearish_points['date_time'], y=bearish_points['cci'],
                         mode='markers', marker=dict(color='red', symbol='triangle-down', size=10),
                         name='Bearish Divergence on CCI'),
              row=2, col=1)

# ---- Convergence Points ----
fig.add_trace(go.Scatter(x=bullish_convergence['date_time'], y=bullish_convergence['close'],
                         mode='markers', marker=dict(color='blue', symbol='triangle-up', size=10),
                         name='Bullish Convergence'),
              row=1, col=1)

fig.add_trace(go.Scatter(x=bearish_convergence['date_time'], y=bearish_convergence['close'],
                         mode='markers', marker=dict(color='orange', symbol='triangle-down', size=10),
                         name='Bearish Convergence'),
              row=1, col=1)

# ---- Overbought, Oversold, and Zero Lines ----
fig.add_shape(type="line", x0=filtered_df_6h['date_time'].min(), x1=filtered_df_6h['date_time'].max(),
              y0=100, y1=100, line=dict(color="green", width=2, dash="dash"), row=2, col=1, yref="y2")

fig.add_shape(type="line", x0=filtered_df_6h['date_time'].min(), x1=filtered_df_6h['date_time'].max(),
              y0=-100, y1=-100, line=dict(color="purple", width=2, dash="dash"), row=2, col=1, yref="y2")

fig.add_shape(type="line", x0=filtered_df_6h['date_time'].min(), x1=filtered_df_6h['date_time'].max(),
              y0=0, y1=0, line=dict(color="black", width=1, dash="dot"), row=2, col=1, yref="y2")

# ---- Final Layout Formatting ----
fig.update_layout(title='Candlestick Chart with CCI, Divergence, and Convergence ,ADX and Stochastic (Daily) Dynamic', 
                  yaxis_title='Price',
                  xaxis_title='Time',
                  yaxis2_title='CCI',
                  showlegend=True,
                  template="plotly_white",
                  height=800, width=1200)

fig.update_xaxes(rangeslider_visible=False,type='category',tickformat="%b %d %H:%M",  # Format similar to Trading Economics
        dtick="D1",showticklabels=False,  # Show labels at daily intervals (adjust as needed)
        tickangle=90)

# Show Figure
fig.show()

fig3_path = "candlestick_6h.png"
# pio.write_image(fig, fig3_path, format='png')