In [None]:
import os, asyncio, urllib.parse
import numpy as np
import pandas as pd
import asyncpg
from dotenv import load_dotenv
import math

In [2]:
load_dotenv()

True

In [3]:
HEAL_BUCKETS = int(os.getenv("HEAL_BUCKETS", "2"))
K_LEN = int(os.getenv("K_LEN", "9"))
D_LEN = int(os.getenv("D_LEN", "3"))
CCI_SHORT = int(os.getenv("CCI_SHORT", "3"))
CCI_LONG  = int(os.getenv("CCI_LONG", "9"))
SLEEP_SEC = float(os.getenv("SLEEP_SEC", "5"))

In [4]:
def build_dsn() -> str:
    """
    creating a single data base connection source.
    """
    dsn = os.getenv("DB_DSN")
    if dsn:
        return dsn

    host = os.getenv("host")
    port = os.getenv("port", "5432")
    user = os.getenv("user")
    pwd  = os.getenv("pass")
    db   = os.getenv("db")

    if not all([host, user, pwd, db]):
        raise RuntimeError("DB credentials missing: set DB_DSN or host/user/pass/db in .env")

    user_q = urllib.parse.quote_plus(user)
    pwd_q  = urllib.parse.quote_plus(pwd)
    return f"postgresql://{user_q}:{pwd_q}@{host}:{port}/{db}"

DB_DSN = build_dsn()

In [5]:
print(DB_DSN)

postgresql://postgres:postgresIntegral%231@localhost:5433/live_prices


In [None]:
ASSETS = [
    ("gold", "6h", "ohlc_data_6hr_bid_xau_usd",  "indicators_6hr_bid_xau_usd", "6 hours", "bucket_6h"),
    ("gold",      "1d", "ohlc_data_daily_bid_xau_usd",   "indicators_daily_bid_xau_usd",    "1 day",   "bucket_daily"),
    ("silver",    "6h", "ohlc_data_6hr_bid_xag_usd",     "indicators_6hr_bid_xag_usd",      "6 hours", "bucket_6h"),
    ("silver",    "1d", "ohlc_data_daily_bid_xag_usd",   "indicators_daily_bid_xag_usd",    "1 day",   "bucket_daily"),
    ("platinum",  "6h", "ohlc_data_6hr_bid_xpt_usd",     "indicators_6hr_bid_xpt_usd",      "6 hours", "bucket_6h"),
    ("platinum",  "1d", "ohlc_data_daily_bid_xpt_usd",   "indicators_daily_bid_xpt_usd",    "1 day",   "bucket_daily"),
    ("sgd",       "6h", "ohlc_data_6hr_bid_usd_sgd",     "indicators_6hr_bid_usd_sgd",      "6 hours", "bucket_6h"),
    ("sgd",       "1d", "ohlc_data_daily_bid_usd_sgd",   "indicators_daily_bid_usd_sgd",    "1 day",   "bucket_daily"),
    ("myr",       "6h", "ohlc_data_6hr_bid_usd_myr",     "indicators_6hr_bid_usd_myr",      "6 hours", "bucket_6h"),
    ("myr",       "1d", "ohlc_data_daily_bid_usd_myr",   "indicators_daily_bid_usd_myr",    "1 day",   "bucket_daily")
]

In [7]:
def calculate_slowD(df: pd.DataFrame, k_period: int = 9, d_period: int = 3) -> pd.DataFrame:
    """A stochastic function that calculates the Fast %K & Slow %D using EMA.
    
    Parameters
    ----------
    df: pd.DataFrame (Input dataframe containing OHLC data.)
    k_period: int, optional (Period to calculate the Fast %K <default is 9>.)
    d_period: int, optional (Period to calculate the Slow %D <default is 3>.)
    
    Returns
    -------
    pd.DataFrame (DataFrame that contains Fast %K, Fast %D (EMA), and Slow %D (EMA).)
    """

    # find the highest high market price in the k period
    df['highest_high'] = df['high'].rolling(window=k_period).max()

    # find the lowest low market price in the k period
    df['lowest_low'] = df['low'].rolling(window=k_period).min()

    # calculate Fast %K
    df['fastk'] = ((df['close'] - df['lowest_low']) / (df['highest_high'] - df['lowest_low'])) * 100

    # calculate Fast %D (EMA of Fast %K with period 1, which is just FastK itself)
    df['fastd'] = df['fastk']

    # calculate Slow %D (EMA of Fast %D with period d_period)
    df['slowd'] = df['fastd'].ewm(span=d_period, adjust=False).mean()

    # drop unecessary columns
    df.drop(columns=['highest_high', 'lowest_low'], inplace=True)

    # Return the dataframe with stochastic values
    return df


In [8]:
def calculate_cci(df: pd.DataFrame, period: int) -> pd.DataFrame:
    """ A method that calculates commodity channel index.

        Parameters
        ----------
        df: pd.DataFrame (Input dataframe containing OHLC data.)
        period: int (lookback period)

        Returns
        -------
        pd.DataFrame (DataFrame that contains Commodity Channel Index (CCI).)
    """
        
    # calculate the typical price
    df['typical_price'] = (df['high'] + df['low'] + df['close']) / 3

    # calculate the simple moving average (SMA) of the Typical Price
    sma = df['typical_price'].rolling(window=period).mean()

    # calculate the mean deviation manually
    mean_deviation = df['typical_price'].rolling(window=period).apply(
        lambda x: (np.abs(x - x.mean()).mean()), raw=True
    )

    # calculate the CCI
    df[f'CCI{period}'] = (df['typical_price'] - sma) / \
        (0.015 * mean_deviation)

    # return the resulted dataframe
    return df

In [9]:
# ------------ DB helpers -------------
# async def fetch_ca_window(conn, schema: str, 
#                           ca_table: str, 
#                           bucket_sql: str, 
#                           lookback_buckets: int, 
#                           bucket_col: str) -> pd.DataFrame:
#     """
#     Pull a compact window from the CA, aliasing bucket_* to 'date_time'.
#     bucket_sql is '6 hours' or '1 day'.
#     """
#     sql = f"""
#         SELECT {bucket_col} AS date_time,
#                open AS "Open", high AS "High", low AS "Low", close AS "Close"
#         FROM {schema}.{ca_table}
#         WHERE {bucket_col} >= time_bucket($1::interval, now()) - ($2::int + 5) * $1::interval
#         ORDER BY {bucket_col}
#     """
#     rows = await conn.fetch(sql, bucket_sql, lookback_buckets)
#     if not rows:
#         return pd.DataFrame(columns=["date_time","Open","High","Low","Close"])
#     # asyncpg Record -> dict list -> DataFrame
#     return pd.DataFrame([dict(r) for r in rows])

async def fetch_ca_window(conn, schema, ca_table, bucket_sql, lookback_buckets, bucket_col):
    sql = f"""
        SELECT {bucket_col} AS date_time,
               (open)::double precision  AS open,
               (high)::double precision  AS high,
               (low)::double precision   AS low,
               (close)::double precision AS close
        FROM {schema}.{ca_table}
        WHERE {bucket_col} >= time_bucket(($1)::text::interval, now())
                              - ($2::int + 5) * (($1)::text::interval)
        ORDER BY {bucket_col}
    """
    rows = await conn.fetch(sql, bucket_sql, lookback_buckets)
    if not rows:
        return pd.DataFrame(columns=["date_time","open","high","low","close"])
    return pd.DataFrame([dict(r) for r in rows])


In [10]:
async def upsert_indicator_row(conn, schema: str, ind_table: str, dt, slowd, cci3, cci9):
    sql = f"""
        INSERT INTO {schema}.{ind_table} (date_time, slowd, cci3, cci9, updated_at)
        VALUES ($1, $2, $3, $4, now())
        ON CONFLICT (date_time) DO UPDATE SET
            slowd = EXCLUDED.slowd,
            cci3  = EXCLUDED.cci3,
            cci9  = EXCLUDED.cci9,
            updated_at = now();
    """
    await conn.execute(sql, dt, slowd, cci3, cci9)

In [11]:
def ema_extra_warmup(d_period: int, tol: float = 1e-3) -> int:
    # EMA alpha for pandas ewm(span=d, adjust=False)
    alpha = 2 / (d_period + 1)
    # m such that (1 - alpha)^m <= tol
    m = math.log(tol) / math.log(1 - alpha)
    return max(0, math.ceil(m))

In [12]:
# ------------ Core per-table work -------------
async def process_one_table(conn, schema: str,
                            interval_label: str,
                            ca_table: str,
                            ind_table: str,
                            bucket_sql: str,
                            bucket_col: str):
    # longest = max(K_LEN, CCI_LONG)
    # WARMUP = (K_LEN + D_LEN - 1) * 10  # for SlowD
    # longest = max(WARMUP, CCI_LONG)
    WARMUP_EMA = ema_extra_warmup(D_LEN, tol=1e-3)  # e.g., ~10 for d=3, ~35 for d=10
    WARMUP_BASE = K_LEN + (D_LEN - 1)               # %K window plus D seed
    longest = max(WARMUP_BASE + WARMUP_EMA, CCI_LONG) + 5  # +5 cushion
    df = await fetch_ca_window(conn, schema, ca_table, bucket_sql, longest, bucket_col)
    if df.empty:
        return

    # Use time as index
    df = df.set_index("date_time")

    # Compute using YOUR functions (they may output 'SlowD' and 'CCI{n}')
    stoch_df = calculate_slowD(df.copy(), k_period=K_LEN, d_period=D_LEN)
    cci3_df  = calculate_cci(df.copy(), period=CCI_SHORT)
    cci9_df  = calculate_cci(df.copy(), period=CCI_LONG)

    # --- Find columns case-insensitively and normalize names ---
    # SlowD
    try:
        slowd_src = next(c for c in stoch_df.columns if c.lower() == "slowd")
    except StopIteration:
        raise KeyError("Stochastic function did not produce a 'SlowD' column")

    # CCIx
    want_cci3 = f"cci{CCI_SHORT}"
    want_cci9 = f"cci{CCI_LONG}"
    try:
        cci3_src = next(c for c in cci3_df.columns if c.lower() == want_cci3)
    except StopIteration:
        raise KeyError(f"calculate_cci(period={CCI_SHORT}) did not produce '{want_cci3}' / 'CCI{CCI_SHORT}'")
    try:
        cci9_src = next(c for c in cci9_df.columns if c.lower() == want_cci9)
    except StopIteration:
        raise KeyError(f"calculate_cci(period={CCI_LONG}) did not produce '{want_cci9}' / 'CCI{CCI_LONG}'")

    # Combine and keep only the columns you store
    merged = df.join(stoch_df[[slowd_src]].rename(columns={slowd_src: "slowd"}), how="left")
    merged = merged.join(cci3_df[[cci3_src]].rename(columns={cci3_src: "cci3"}), how="left")
    merged = merged.join(cci9_df[[cci9_src]].rename(columns={cci9_src: "cci9"}), how="left")

    # Forming + small heal window
    n = HEAL_BUCKETS + 1
    last_idx = merged.index[-n:] if len(merged) >= n else merged.index
    trimmed = merged.loc[last_idx, ["slowd", "cci3", "cci9"]].reset_index()  # includes date_time

    # Upsert a few rows (forming + previous heal buckets)
    for _, row in trimmed.iterrows():
        await upsert_indicator_row(
            conn, schema, ind_table,
            row["date_time"], row["slowd"], row["cci3"], row["cci9"]
        )


In [13]:
# ------------ Main loop -------------
async def main():
    conn = await asyncpg.connect(dsn=DB_DSN)
    try:
        while True:
            got = await conn.fetchval(
                "SELECT pg_try_advisory_lock( hashtextextended('indicators_service_async', 202) )"
            )
            if not got:
                await asyncio.sleep(SLEEP_SEC)
                continue

            tx = conn.transaction()
            await tx.start()
            try:
                for schema, interval_label, ca_table, ind_table, bucket_sql, bucket_col in ASSETS:
                    await process_one_table(conn, schema, interval_label, ca_table, ind_table, bucket_sql, bucket_col)
                await tx.commit()
            except Exception as e:
                await tx.rollback()
                # ensure the lock is released even on error
                await conn.execute("SELECT pg_advisory_unlock_all();")
                print("ERROR in indicators loop:", e)
                await asyncio.sleep(SLEEP_SEC)
                continue
            # success path unlock
            await conn.execute("SELECT pg_advisory_unlock_all();")
            await asyncio.sleep(SLEEP_SEC)
    finally:
        await conn.close()


In [14]:
# if __name__ == "__main__":
#     asyncio.run(main())

In [None]:
await main()  # will run forever (use Ctrl+Stop to interrupt)
