In [None]:
# ======================================================
# 📦 Scarica e analizza holdings SPDR dai file ufficiali ZIP
# ======================================================

!pip install yfinance openpyxl pandas requests matplotlib --quiet

import pandas as pd
import requests
import zipfile
import io
import yfinance as yf
import matplotlib.pyplot as plt
import warnings
from pathlib import Path
import re

warnings.filterwarnings("ignore", category=FutureWarning)

# === CONFIG ===
ZIP_URL = "https://www.ssga.com/us/en/individual/library-content/products/fund-data/etfs/us/us_spdrallholdings.zip"
ETF_TARGETS = {"XLC","XLY","XLP","XLE","XLF","XLV","XLI","XLB","XLRE","XLK","XLU"}  # ETF settoriali SPDR

print("📥 Scaricamento ZIP ufficiale SPDR holdings...")
r = requests.get(ZIP_URL)
r.raise_for_status()

zip_bytes = io.BytesIO(r.content)
holdings_list = []

with zipfile.ZipFile(zip_bytes, "r") as z:
    files = [f for f in z.namelist() if f.lower().endswith(".xlsx")]
    print(f"📂 Trovati {len(files)} file Excel nel pacchetto.")

    regex_pattern = r'holdings-daily-us-en-(xl[a-z]{1,4})\.xlsx'

    for fname in files:
        etf_match = re.search(regex_pattern, fname, re.IGNORECASE)
        if etf_match:
            etf = etf_match.group(1).upper()
            if etf not in ETF_TARGETS:
                print(f"   ℹ️ Skipping {fname} as ETF {etf} is not in targets.")
                continue
        else:
            continue

        print(f"   🔍 Elaboro {etf} ({fname}) ...")
        with z.open(fname) as f:
            raw = pd.read_excel(f, header=None, engine="openpyxl")

            header_row = None
            for i, row in raw.iterrows():
                if row.astype(str).str.contains("Identifier", case=False, na=False).any():
                    header_row = i
                    break
            if header_row is None:
                print(f"      ⚠ Nessuna intestazione trovata per {etf}, salto.")
                continue

            df = pd.read_excel(z.open(fname), header=header_row, engine="openpyxl")
            cols = [c.strip() if isinstance(c, str) else c for c in df.columns]
            df.columns = cols

            ticker_col = None
            for c in cols:
                if re.search(r"(identifier|ticker|symbol)", str(c), re.I):
                    ticker_col = c
                    break
            if ticker_col is None:
                print(f"      ⚠ Nessuna colonna Ticker trovata in {etf}, salto.")
                continue

            keep = [ticker_col]
            for c in ["Name", "Weight", "Weight (%)", "Holding", "Security Name"]:
                if c in cols:
                    keep.append(c)
            df = df[keep].copy()
            df = df[df[ticker_col].astype(str).str.match(r"^[A-Z0-9\.\-]{1,10}$", na=False)]
            df.insert(0, "ETF", etf)

            holdings_list.append(df)

if not holdings_list:
    raise RuntimeError("❌ Nessun ETF valido trovato.")

all_holdings = pd.concat(holdings_list, ignore_index=True)

# === Pulizia ticker (Yahoo accetta "BRK-B", non "BRK.B")
all_holdings["Ticker"] = (
    all_holdings.iloc[:,1]
    .astype(str)
    .str.strip()
    .str.replace(".", "-", regex=False)
)

# === Salva file pulito ===
out_file = "SPDR_holdings_cleaned.xlsx"
all_holdings.to_excel(out_file, index=False)
print(f"✅ File Excel pulito salvato come: {out_file}")
print(f"Contiene {len(all_holdings)} righe totali.")
display(all_holdings.head(10))

# ======================================================
# 📈 Analisi: % titoli sopra SMA200 per ETF (giorno per giorno)
# ======================================================

# Ensure etf_groups is correctly derived from all_holdings
etf_groups = {etf: g.iloc[:,1].dropna().astype(str).tolist() for etf, g in all_holdings.groupby("ETF")}

sma200_daily_pct = {}

print("\n📈 Analisi daily % titoli sopra SMA200 per ETF...")
skipped_tickers_analysis = [] # List to store tickers skipped during this analysis phase

# === gestione eccezioni ticker particolari e pulizia ticker non validi ===
def fix_ticker(t):
    t = t.strip()
    if t == "BF.B": return "BF-B"
    if t == "BRK.B": return "BRK-B"
    return t

for etf, tickers in etf_groups.items():
    print(f"\n📊 Analisi ETF {etf} ({len(tickers)} titoli)")
    # Filter invalid tickers before download
    tickers_fixed = [fix_ticker(t) for t in tickers if t != '-' and re.match(r"^[A-Z0-9\.\-]{1,10}$", t)]

    if not tickers_fixed:
        print(f"   ⚠ Nessun ticker valido trovato per {etf}, salto.")
        sma200_daily_pct[etf] = pd.Series(dtype=float)
        continue

    etf_daily_counts = {}
    etf_daily_valid_counts = {}
    all_dates_etf = set()
    etf_skipped_in_analysis = []

    # Download and process data for each ticker individually
    for t in tickers_fixed:
        try:
            # Download data for a single ticker for the last 7 years
            df_ticker = yf.download(t, period="7y", interval="1d", progress=False, auto_adjust=False)

            if df_ticker.empty:
                 print(f"      ⚠ Download per ticker {t} ha restituito dati vuoti, saltando.")
                 etf_skipped_in_analysis.append(t) # Add ticker to skipped list
                 continue

            # Check if 'Adj Close' column exists for the ticker's data
            if 'Adj Close' not in df_ticker.columns:
                print(f"      ⚠ Colonna 'Adj Close' non trovata per ticker {t}, saltando.")
                etf_skipped_in_analysis.append(t) # Add ticker to skipped list
                continue

            # Process the data for the single ticker
            df_adj_close = df_ticker["Adj Close"].dropna()

            # Calculate SMA200 and check condition for each day
            if len(df_adj_close) < 200:
                print(f"      ⚠ Dati insufficienti per ticker {t} (<200 giorni), saltando dall'analisi daily.")
                etf_skipped_in_analysis.append(t) # Add ticker to skipped list if not enough data
                continue

            sma200 = df_adj_close.rolling(window=200).mean()

            # Align SMA and Adj Close by date and iterate
            aligned_data = pd.concat([df_adj_close, sma200], axis=1).dropna()
            aligned_data.columns = ['Adj Close', 'SMA200']

            all_dates_etf.update(aligned_data.index)

            for date, row in aligned_data.iterrows():
                 if date not in etf_daily_counts:
                      etf_daily_counts[date] = 0
                      etf_daily_valid_counts[date] = 0

                 etf_daily_valid_counts[date] += 1
                 if row['Adj Close'] > row['SMA200']:
                      etf_daily_counts[date] += 1

        except Exception as e:
            # Catch any other errors during download or processing for this ticker
            print(f"      ⚠ Errore elaborazione dati per {t}: {e}, saltando.")
            etf_skipped_in_analysis.append(t) # Add ticker to skipped list
            continue

    # Calculate daily percentage for the ETF after processing all individual tickers
    all_dates_etf = sorted(list(all_dates_etf))
    daily_percentages = {}
    for date in all_dates_etf:
        if date in etf_daily_valid_counts and etf_daily_valid_counts[date] > 0:
            daily_percentages[date] = (etf_daily_counts[date] / etf_daily_valid_counts[date]) * 100
        else:
            # If no valid tickers for a day, set percentage to 0.0 or np.nan
            daily_percentages[date] = 0.0 # Using 0.0 for plotting

    sma200_daily_pct[etf] = pd.Series(daily_percentages).sort_index()
    # Update the main skipped_tickers list with unique skipped tickers for this ETF
    skipped_tickers_analysis.extend(list(set(etf_skipped_in_analysis)))

    print(f"✅ Analisi daily completa per {etf}. Ticker saltati in questo ETF: {len(list(set(etf_skipped_in_analysis)))}")

# Prepare data for plotting
df_daily_sma200_pct = pd.DataFrame(sma200_daily_pct)
print("\nDaily % above SMA200 DataFrame:")
display(df_daily_sma200_pct.head())

# === Stampa ticker saltati durante l'analisi ===
if skipped_tickers_analysis:
    print("\n⚠ Ticker saltati a causa di errori di download o elaborazione durante l'analisi daily:")
    for ticker in sorted(list(set(skipped_tickers_analysis))): # Print unique skipped tickers alphabetically
        print(f"- {ticker}")
else:
    print("\n✅ Nessun ticker è stato saltato durante l'analisi daily.")

In [None]:
import pandas as pd
import numpy as np
import yfinance as yf
import matplotlib.pyplot as plt

# ==========================================
# PARAMETRI MODIFICABILI
# ==========================================

start = '2010-01-01'        # Data di inizio per il download dei dati
holding_days = 60           # Durata in giorni della posizione long
buy_threshold_range = range(5, 55, 1) # Range e passo per testare le soglie BUY (es: range(5, 55, 1) per 5% a 54% con passo 1)

# ==========================================
# PREPARAZIONE DATI E ANALISI PER OGNI ETF
# ==========================================
# Ensure etf_groups and skipped_tickers_analysis are available from the first cell
if 'etf_groups' not in locals() or 'skipped_tickers_analysis' not in locals() or 'ETF_TARGETS' not in locals():
    raise RuntimeError("Variabili 'etf_groups', 'skipped_tickers_analysis' o 'ETF_TARGETS' non trovate. Esegui prima la prima cella.")

# Dictionary to store best results for each ETF
best_results = {}

# Loop through each target ETF
for TARGET_ETF in ETF_TARGETS:

    print(f"\n{'='*50}")
    print(f"🚀 Analisi Backtesting e Ottimizzazione per {TARGET_ETF}")
    print(f"{'='*50}")

    # Get the list of tickers for the TARGET_ETF from etf_groups
    etf_tickers_from_holdings = etf_groups.get(TARGET_ETF, [])

    # Filter out tickers that were skipped during the daily analysis in the first cell
    tickers = [t for t in etf_tickers_from_holdings if t not in skipped_tickers_analysis]

    if not tickers:
        print(f"❌ Nessun ticker valido trovato per {TARGET_ETF} dopo aver escluso quelli saltati nell'analisi daily.")
        continue # Skip to the next ETF

    index_ticker = TARGET_ETF

    # DOWNLOAD DATI
    try:
        # Download data for all tickers at once
        data = yf.download(tickers, start=start, period="max", interval="1d", progress=False, auto_adjust=False)['Adj Close']
    except Exception as e:
        print(f"❌ Errore durante il download dei dati dei ticker per {TARGET_ETF}: {e}")
        continue # Skip to the next ETF

    if data.empty:
        print(f"❌ Nessun dato valido scaricato per i ticker di {TARGET_ETF}.")
        continue # Skip to the next ETF

    # Download index data
    try:
        index = yf.download(index_ticker, start=start, period="max", interval="1d", progress=False, auto_adjust=False)['Adj Close']
    except Exception as e:
        print(f"❌ Errore durante il download dei dati dell'indice {index_ticker}: {e}")
        continue # Skip to the next ETF

    if index.empty:
        print(f"❌ Nessun dato valido scaricato per l'indice {index_ticker}.")
        continue # Skip to the next ETF

    # Align data and index
    data, index = data.align(index, join='inner', axis=0)

    # Filter out tickers that have become all NaNs after alignment
    data = data.dropna(axis=1, how='all')
    valid_tickers = data.columns.tolist()

    if not valid_tickers:
        print(f"❌ Nessun ticker valido rimasto dopo l'allineamento dei dati per {TARGET_ETF}.")
        continue # Skip to the next ETF

    print(f"\n✅ Dati scaricati per {len(valid_tickers)} tickers validi per {TARGET_ETF}.")

    # ==========================================
    # CALCOLO INDICATORE (% sopra MA200)
    # ==========================================
    ma200 = data.rolling(200).mean()
    # Use the length of valid_tickers AFTER alignment for the calculation
    above200 = (data > ma200).sum(axis=1) / len(valid_tickers) * 100

    # ==========================================
    # FUNZIONE DI BACKTEST (uscita dopo X giorni - entrata su cross)
    # ==========================================
    def backtest_timed_exit(above200, index, buy_thr, hold_days):
        position = 0
        days_in_trade = 0
        signal = pd.Series(0, index=above200.index)

        for i in range(1, len(signal)):
            # Check if the current date and previous date exist in above200 and index
            if (signal.index[i] in above200.index and signal.index[i] in index.index and
                signal.index[i-1] in above200.index and signal.index[i-1] in index.index): # Ensure previous day also exists in index and above200

                # Entry condition: only if flat AND indicator crosses below buy_thr
                if position == 0 and above200.iloc[i] < buy_thr and above200.iloc[i-1] >= buy_thr:
                    position = 1
                    days_in_trade = 0 # Reset days_in_trade on new entry

                elif position == 1:
                    days_in_trade += 1
                    if days_in_trade >= hold_days:
                        position = 0 # Exit after hold_days

                signal.iloc[i] = position

        returns = index.pct_change().fillna(0)
        # Ensure signal is aligned to the returns index before multiplication
        aligned_signal = signal.reindex(returns.index).fillna(0)
        # Calculate strategy returns by multiplying index returns with the aligned signal
        # Explicitly select the index column name from returns before multiplication
        strategy_returns = returns.iloc[:, 0] * aligned_signal
        # Ensure strategy is a Series before cumsum
        strategy = strategy_returns.cumsum()

        total_return = np.exp(strategy.iloc[-1]) - 1
        cagr = (1 + total_return) ** (252 / len(index)) - 1 # Using len(index) for days in CAGR calculation

        return cagr, strategy, signal

    # ==========================================
    # TEST DI VARIE SOGLIE BUY
    # ==========================================
    results = []

    for buy in buy_threshold_range:
        try:
            cagr, _, _ = backtest_timed_exit(above200, index, buy, holding_days)
            results.append((buy, cagr))
        except Exception as e:
            print(f"⚠ Errore durante il backtest per {TARGET_ETF} con soglia BUY={buy}%: {e}")
            results.append((buy, np.nan)) # Append NaN for failed backtest

    df_res = pd.DataFrame(results, columns=['Buy','CAGR'])
    # Explicitly convert CAGR to numeric, coercing errors to NaN
    df_res['CAGR'] = pd.to_numeric(df_res['CAGR'], errors='coerce')
    df_res = df_res.dropna().sort_values('CAGR', ascending=False)

    if df_res.empty:
        print(f"\n❌ Nessun risultato di backtest valido trovato per {TARGET_ETF}. Impossibile determinare la miglior soglia BUY.")
    else:
        best = df_res.iloc[0]
        best_results[TARGET_ETF] = best # Store the best result for this ETF
        print(f"\n⭐ Miglior soglia BUY trovata per {TARGET_ETF} (uscita dopo", holding_days, "giorno/i):")
        print(best)

        # ==========================================
        # GRAFICO EQUITY LINE OTTIMALE
        # ==========================================
        _, strat, signal = backtest_timed_exit(above200, index, best.Buy, holding_days)
        plt.figure(figsize=(12, 6)) # Increased figure size slightly
        # Ensure plotting data is aligned
        aligned_strat = np.exp(strat).reindex(index.index).ffill() # Use .ffill()
        aligned_index = (index / index.iloc[0]).reindex(index.index).ffill() # Use .ffill()

        plt.plot(aligned_strat, label=f'Strategy (Buy<{best.Buy}%, hold {holding_days}d, cross)')
        plt.plot(aligned_index, label='Buy & Hold', alpha=0.5)
        plt.title(f"Equity line strategia breadth su {TARGET_ETF} (holding {holding_days} giorno/i, entrata su cross)")
        plt.xlabel("Date") # Added X-axis label
        plt.ylabel("Cumulative Return (Factor)") # Added Y-axis label
        plt.legend()
        plt.grid(True)
        plt.xticks(rotation=45) # Rotate x-axis labels
        plt.tight_layout() # Adjust layout to prevent labels overlapping
        plt.show()

        # ==========================================
        # GRAFICO DELL’INDICATORE + SEGNALI
        # ==========================================
        plt.figure(figsize=(12, 6)) # Increased figure size slightly
        plt.plot(above200, label='% sopra MA200')
        plt.axhline(best.Buy, color='red', linestyle='--', label=f'Soglia Buy {best.Buy}%')
        # Ensure signal is aligned for plotting
        aligned_signal = signal.reindex(above200.index).fillna(0)
        plt.fill_between(above200.index, 0, 100, where=aligned_signal > 0, color='green', alpha=0.1, label='Posizione Long')
        plt.title(f"Indicatore breadth con segnali di acquisto su {TARGET_ETF} (holding {holding_days} giorno/i, entrata su cross)")
        plt.xlabel("Date") # Added X-axis label
        plt.ylabel("% Above MA200") # Added Y-axis label
        plt.legend()
        plt.grid(True)
        plt.xticks(rotation=45) # Rotate x-axis labels
        plt.tight_layout() # Adjust layout
        plt.show()

        # ==========================================
        # GRAFICO PERFORMANCE PER SOGLIA BUY
        # ==========================================
        plt.figure(figsize=(10, 6)) # Increased figure size slightly
        plt.bar(df_res['Buy'], df_res['CAGR'])
        plt.xlabel(f'Soglia Buy (% titoli sotto MA200 per {TARGET_ETF})')
        plt.ylabel('CAGR')
        plt.title(f'Performance (CAGR) per diverse soglie Buy su {TARGET_ETF}')
        plt.xticks(df_res['Buy'], rotation=45) # Rotate x-axis labels for the bar chart too
        plt.grid(axis='y', linestyle='--', alpha=0.6)
        plt.tight_layout() # Adjust layout
        plt.show()

# ==========================================
# RIEPILOGO FINALE
# ==========================================
print(f"\n{'='*50}")
print("📊 Riepilogo Migliori Soglie BUY e CAGR per ETF")
print(f"{'='*50}")

if best_results:
    df_summary = pd.DataFrame.from_dict(best_results, orient='index')
    df_summary.index.name = 'ETF'
    df_summary.columns = ['Miglior Soglia BUY (%)', 'CAGR']
    df_summary['Miglior Soglia BUY (%)'] = df_summary['Miglior Soglia BUY (%)'].astype(int) # Display as integer
    print(df_summary.to_string())
else:
    print("❌ Nessun risultato valido da riepilogare.")