In [1]:
# --- Price Level Alert System (Live Check) ---
import yfinance as yf
import pandas as pd
from datetime import datetime, timedelta
from pathlib import Path
from collections import defaultdict
import pytz
import smtplib
import os
import json

ZONE_STATE_FILE = "reports/last_known_zone.json"

# Load or initialize
if os.path.exists(ZONE_STATE_FILE):
    with open(ZONE_STATE_FILE, "r") as f:
        last_known_zone = json.load(f)
else:
    last_known_zone = {}
# === CONFIG ===
TICKER_LIST = [
    'USDCAD=X', 'USDGBP=X', 'USDNOK=X', 'USDPLN=X', 'USDAUD=X', 'USDSGD=X',
    'USDJPY=X', 'USDZAR=X', 'USDBRL=X', 'EURUSD=X', 'EURGBP=X', 'EURCHF=X',
    'EURPLN=X', 'EURCZK=X', 'EURNZD=X', 'EURSEK=X', 'EURZAR=X', 'EURSGD=X',
    'GBPNOK=X', 'GBPJPY=X', 'GBPAUD=X', 'GBPCAD=X', 'SEKNOK=X', 'SEKJPY=X',
    'CHFNOK=X', 'CADNOK=X', 'AUDNZD=X', 'AUDJPY=X', 'AUDSEK=X', 'AUDCAD=X',
    'NZDSGD=X', 'NZDCHF=X', 'NZDNOK=X', 'SGDJPY=X', 'SGDHKD=X', 'EURCAD=X',
    'USDCHF=X', 'GBPCHF=X', 'EURNOK=X'
]

KEY_LEVELS_FILE = Path(r"C:\Users\T460\Documents\Quant_trading_research\Data Packs & Scripts\Dev_scripts\FX_1D\FX_1D_KEY.xlsx")
PIP_RANGE = 0.001
LOOKBACK_HOURS = 24
since = datetime.utcnow() - timedelta(hours=LOOKBACK_HOURS)
# === Load Key Levels ===
# === ZONE DEFINITIONS ===
ZONE_DEFINITIONS = [
    ("Premium+", float("inf"), "Purple upper"),
    ("Premium", "Purple upper", "Red  Upper"),
    ("Plus+", "Red  Upper", "Yellow Upper"),
    ("Fair", "Yellow Upper", "Green"),
    ("Budget", "Green", "Yellow Lower"),
    ("Discount", "Yellow Lower", "Red Lower"),
    ("Clearance", "Red Lower", "Purple lower"),
    ("Reset", "Purple lower", float("-inf")),
]

def load_key_levels(filepath):
    df = pd.read_excel(filepath)
    df.set_index("Ticker", inplace=True)
    return df




# === Check if price touched a key level (with Level Name) ===
def check_proximity(level_dict, high, low):
    matches = []
    for level_name, level_value in level_dict.items():
        if pd.isna(level_value):
            continue
        if (low <= level_value + PIP_RANGE) and (high >= level_value - PIP_RANGE):
            matches.append({
                "Level": round(level_value, 5),
                "Level Name": level_name
            })
    return matches

# === Summarize Touch Events ===
def summarize_touch_events(touches):
    from collections import defaultdict
    stats_dict = defaultdict(lambda: {"count": 0, "last_touch": None})

    for touch in touches:
        key = (touch["Ticker"], touch["Level"], touch["Level Name"])
        stats_dict[key]["count"] += 1
        stats_dict[key]["last_touch"] = touch["Time"]

    rows = []
    for (ticker, level, level_name), stats in stats_dict.items():
        sast_time = stats["last_touch"] + timedelta(hours=2)
        rows.append({
            "Ticker": ticker,
            "Level Name": level_name,
            "Level": level,
            "Touches (24h)": stats["count"],
            "Most Recent Touch (SAST)": sast_time.strftime("%Y-%m-%d %H:%M")
        })

    return pd.DataFrame(rows)

# === Main Alert Generator ===
def generate_alert_report():
    global since
    if 'since' not in globals():
        from datetime import datetime, timedelta
        LOOKBACK_HOURS = 24
        since = datetime.utcnow() - timedelta(hours=LOOKBACK_HOURS)
    
    key_levels_df = load_key_levels(KEY_LEVELS_FILE)
    touches = []

    for ticker in TICKER_LIST:
        print(f"[→] Checking {ticker}...")
        try:
            # Disable yfinance progress bar → no more extra printing
            data = yf.download(ticker, start=since.strftime('%Y-%m-%d'), interval="1h", progress=False)
            if data.empty:
                print(f"[⚠️] No data for {ticker}")
                continue
            data = data.dropna()

            # Map ticker to short version (index row in key_levels_df)
            short = ticker.split("=")[0] + "=X" if "=X" in ticker else ticker

            # Prepare level dict with Level Name → Level Value
            levels_series = key_levels_df.loc[short].dropna()
            level_dict = dict(levels_series)

            # Process candles
            for ts, row in data.iterrows():
                matches = check_proximity(level_dict, row.High.item(), row.Low.item())  # <== FINAL safe version!
                if matches:
                    for match in matches:
                        touches.append({
                            "Ticker": ticker,
                            "Level": match["Level"],
                            "Level Name": match["Level Name"],
                            "Time": ts
                        })

        except Exception as e:
            print(f"[❌] Failed for {ticker}: {e}")

    if not touches:
        print("[✅] No key levels touched in past 24 hours.")
    else:
        df_summary = summarize_touch_events(touches)
        df_summary = df_summary.sort_values(by=["Ticker", "Level Name"])
        print("[✅] Summary of Key Level Touches:\n")
        print(df_summary.to_string(index=False))

    return

# === Compute which zone a price is in ===
def compute_current_zone(price, zone_definitions, level_dict):
    # === Prepare level name → value lookup
    levels = {}
    for _, upper_bound, lower_bound in zone_definitions:
        if isinstance(upper_bound, str):
            levels[upper_bound] = float(level_dict.get(upper_bound, float("inf")))
        if isinstance(lower_bound, str):
            levels[lower_bound] = float(level_dict.get(lower_bound, float("-inf")))

    # Force boundary defaults
    levels["Purple upper"] = float(level_dict.get("Purple upper", float("inf")))
    levels["Purple lower"] = float(level_dict.get("Purple lower", float("-inf")))

    # === Check zones
    for zone_name, upper_bound, lower_bound in zone_definitions:
        # Resolve boundaries
        if isinstance(upper_bound, str):
            upper_value = levels.get(upper_bound, float("inf"))
        else:
            upper_value = upper_bound
        if isinstance(lower_bound, str):
            lower_value = levels.get(lower_bound, float("-inf"))
        else:
            lower_value = lower_bound

        # Is price in this zone?
        if lower_value < price <= upper_value:
            return zone_name

    return "Unknown"

def generate_current_zone_snapshot():
    global since
    
    # === SAFETY CHECK ===
    if 'ZONE_DEFINITIONS' not in globals():
        print("[⚠️] ZONE_DEFINITIONS not defined — please run the ZONE_DEFINITIONS cell first.")
        return
    
    if 'since' not in globals():
        from datetime import datetime, timedelta
        LOOKBACK_HOURS = 24
        since = datetime.utcnow() - timedelta(hours=LOOKBACK_HOURS)
    
    key_levels_df = load_key_levels(KEY_LEVELS_FILE)
    current_zone_results = []

    for ticker in TICKER_LIST:
        print(f"[→] Checking {ticker} current zone...")
        try:
            # Load latest 1H price or last daily close
            data = yf.download(ticker, period="1d", interval="1h", progress=False)
            if data.empty:
                print(f"[⚠️] No data for {ticker}")
                continue

            # === FINAL FIX → use .item() → no warning! ===
            latest_close = data["Close"].iloc[-1].item()
            
            # Map ticker to short version for your key levels
            short = ticker.split("=")[0] + "=X" if "=X" in ticker else ticker
            
            # Prepare level dict — force all floats
            levels_series = key_levels_df.loc[short].dropna()
            level_dict = {k: float(v) for k, v in levels_series.items()}
            
            # Compute zone
            zone = compute_current_zone(latest_close, ZONE_DEFINITIONS, level_dict)
            
            # --- Update zone_history.csv ---
            history_row = {
                "Date": pd.Timestamp.utcnow().strftime("%Y-%m-%d"),
                "Ticker": ticker,
                "Zone": zone
            }
            history_file = "reports/zone_history.csv"
            pd.DataFrame([history_row]).to_csv(history_file, mode="a", index=False, header=not os.path.exists(history_file))
            previous_zone = last_known_zone.get(ticker, None)
            if previous_zone != zone and previous_zone is not None:
                transition_row = {
                    "Date": pd.Timestamp.utcnow().strftime("%Y-%m-%d %H:%M:%S"),
                    "Ticker": ticker,
                    "From Zone": previous_zone,
                    "To Zone": zone
                }
                transition_file = "reports/zone_transition_log.csv"
                pd.DataFrame([transition_row]).to_csv(transition_file, mode="a", index=False, header=not os.path.exists(transition_file))
            #update memory 
            last_known_zone[ticker] = zone
            
            print(f"[✓] {ticker} → Zone: {zone} (Price: {latest_close:.4f})")  # Progress print
            
            current_zone_results.append({
                "Ticker": ticker,
                "Current Zone": zone,
                "Current Price": latest_close
            })
        except Exception as e:
            print(f"[❌] Failed for {ticker}: {e}")

    # Display current zones
    with open(ZONE_STATE_FILE, "w") as f:
        json.dump(last_known_zone, f)
    df_current_zones = pd.DataFrame(current_zone_results)
    df_current_zones = df_current_zones.sort_values(by="Current Zone")
    print("[✅] Current Zone Snapshot:\n")
    print(df_current_zones.to_string(index=False))
    
    # Safe export — ensure folder exists
    #import os
    os.makedirs("reports", exist_ok=True)

    outpath = "reports/current_zone_snapshot.xlsx"
    df_current_zones.to_excel(outpath, index=False)
    print(f"[💾] Exported current zone snapshot to: {outpath}")
    
    return df_current_zones

def export_current_zone_heatmap(df_current_zones, output_path="reports/current_zone_snapshot_heatmap.xlsx"):
    if df_current_zones.empty:
        print("[⚠️] No current zones to export.")
        return

    print("[🎨] Exporting color heatmap version...")
    with pd.ExcelWriter(output_path, engine="xlsxwriter") as writer:
        df_current_zones.to_excel(writer, sheet_name="Current Zones", index=False)

        workbook = writer.book
        worksheet = writer.sheets["Current Zones"]

        # Define format rules
        format_premium = workbook.add_format({"bg_color": "#FFD700", "bold": True})  # Gold
        format_fair = workbook.add_format({"bg_color": "#90EE90"})  # LightGreen
        format_budget = workbook.add_format({"bg_color": "#ADD8E6"})  # LightBlue
        format_discount = workbook.add_format({"bg_color": "#FF9999"})  # LightRed

        # Apply conditional formats to Current Zone column
        zone_col = df_current_zones.columns.get_loc("Current Zone")
        zone_range = f"${chr(65 + zone_col)}2:${chr(65 + zone_col)}{len(df_current_zones)+1}"

        worksheet.conditional_format(zone_range, {"type": "text", "criteria": "containing", "value": "Premium", "format": format_premium})
        worksheet.conditional_format(zone_range, {"type": "text", "criteria": "containing", "value": "Fair", "format": format_fair})
        worksheet.conditional_format(zone_range, {"type": "text", "criteria": "containing", "value": "Budget", "format": format_budget})
        worksheet.conditional_format(zone_range, {"type": "text", "criteria": "containing", "value": "Discount", "format": format_discount})

    print(f"[💾] Exported color heatmap to: {output_path}")

#generate_current_zone_snapshot()

if __name__ == "__main__":
    #generate_alert_report()
    #generate_current_zone_snapshot()
    df_current_zones = generate_current_zone_snapshot()
    export_current_zone_heatmap(df_current_zones)


[→] Checking USDCAD=X current zone...
YF.download() has changed argument auto_adjust default to True
[❌] Failed for USDCAD=X: Cannot save file into a non-existent directory: 'reports'
[→] Checking USDGBP=X current zone...
[❌] Failed for USDGBP=X: Cannot save file into a non-existent directory: 'reports'
[→] Checking USDNOK=X current zone...
[❌] Failed for USDNOK=X: Cannot save file into a non-existent directory: 'reports'
[→] Checking USDPLN=X current zone...
[❌] Failed for USDPLN=X: Cannot save file into a non-existent directory: 'reports'
[→] Checking USDAUD=X current zone...
[❌] Failed for USDAUD=X: Cannot save file into a non-existent directory: 'reports'
[→] Checking USDSGD=X current zone...
[❌] Failed for USDSGD=X: Cannot save file into a non-existent directory: 'reports'
[→] Checking USDJPY=X current zone...
[❌] Failed for USDJPY=X: Cannot save file into a non-existent directory: 'reports'
[→] Checking USDZAR=X current zone...
[❌] Failed for USDZAR=X: Cannot save file into a non

KeyboardInterrupt: 