In [7]:
from constants import JQL, FIELDS, BASE_URL, JIRA_DOMAIN,EMAIL, MAX_RESULTS, MODULE_DEVS, VALID_STATUSES, MAIL_MAP, DAILY_HOURS,MIN_PROJECT_RATIO,PROJECT_MAP, DEFAULT_END_DATE, DEFAULT_END_DATE_with_timezone, DEFAULT_START_DATE,DEFAULT_START_DATE_with_timezone
from token_hidden import API_TOKEN
SPRINT_A_CALCULAR = "S20251027"

In [5]:
# -*- coding: utf-8 -*-
"""
Recolector de planificación (IT/BT) desde Google Sheets.

- Lee la hoja (ID + GID).
- Detecta el sprint actual (col 'Sprint' o escaneando SYYYYMMDD).
- Por desarrollador:
    * Inicio real de sprint: min('Fecha inicio sprint') o fallback por código de sprint.
    * Fin real de sprint: max('Fecha fin sprint') o fallback = inicio + 11 días (23:59:59).
    * Filtra tarjetas con 'Fecha Estimada' (columna AQ o header) ∈ [inicio, fin].
    * Separa en week1 (lun–vie primera) y week2 (lun segunda → fin real).
    * Suma tarjetas y horas planificadas y guarda detalle por issue.

Salida: bt_it_sprint_planning.json
"""

import os
import re
import json
from typing import Dict, List, Optional, Tuple
from datetime import datetime, timedelta

import pandas as pd

# =======================
# ===== CONFIG INICIO ===
# =======================

SHEET_ID = "1NW5uT16Req72uMfYZ6-eITvwk9uKvXncEYsqWou9-D0"
WORKSHEET_GID = 985465980
CREDS_PATH = "./service_account.json"
OUTPUT_JSON = "./bt_it_sprint_planning.json"

# Fecha de entrega:
DELIVERY_DATE_HEADER = "Fecha Estimada"
FORCE_COLUMN_LETTER = "AQ"   # forzar AQ como “Fecha Estimada”

# Columnas para horas planificadas (primera que exista)
ESTIMATE_CANDIDATES = [
    "Estimacion", "Estimación", "Estimación Personal", "Story point estimate"
]

SPRINT_COL = "Sprint"
DEV_COL_CANDIDATES = ["Desarrollador", "Persona asignada"]
SPRINT_START_HEADER = "Fecha inicio sprint"
CARD_START_HEADER   = "Fecha inicio tarjeta"
SPRINT_END_HEADER   = "Fecha fin sprint"
KEY_COL_CANDIDATES  = ["Clave", "ID"]  # “Clave” en tu Sheet
TYPE_COL            = "Tipo"

# =======================
# ===== CONFIG FIN ======
# =======================


# -------- Google Sheets helpers --------
def fetch_sheet_with_service_account_by_gid(
    sheet_id: str,
    worksheet_gid: int,
    creds_path: str,
    header_row: int = 3,      # header en fila 3
    first_data_row: int = 4,  # datos a partir de fila 4 (puede haber huecos)
) -> pd.DataFrame:
    import gspread
    from oauth2client.service_account import ServiceAccountCredentials

    scope = [
        "https://spreadsheets.google.com/feeds",
        "https://www.googleapis.com/auth/spreadsheets",
        "https://www.googleapis.com/auth/drive.readonly",
    ]
    creds = ServiceAccountCredentials.from_json_keyfile_name(creds_path, scope)
    client = gspread.authorize(creds)
    sh = client.open_by_key(sheet_id)
    ws = sh.get_worksheet_by_id(worksheet_gid)
    if ws is None:
        raise RuntimeError(f"No encontré worksheet con gid={worksheet_gid}")

    values = ws.get_all_values()
    if not values or len(values) < header_row:
        return pd.DataFrame()

    raw_headers = values[header_row - 1]
    rows = values[first_data_row - 1:]

    def norm(h):
        return (h or "").replace("\u00a0", " ").strip()

    headers, seen = [], {}
    for idx, h in enumerate(raw_headers, start=1):
        name = norm(h) or f"__blank_{idx}"
        base = name
        n = seen.get(base, 0)
        if n:
            name = f"{base}_{n+1}"
        seen[base] = n + 1
        headers.append(name)

    norm_rows = [
        r + [""] * (len(headers) - len(r)) if len(r) < len(headers) else r[:len(headers)]
        for r in rows
    ]
    df = pd.DataFrame(norm_rows, columns=headers).dropna(axis=1, how="all")
    df.columns = [c.strip() for c in df.columns]
    return df


def strip_sheet_gaps(df: pd.DataFrame) -> pd.DataFrame:
    df.columns = [c.strip() for c in df.columns]
    df = df.replace("\u00a0", " ", regex=True)
    df = df.applymap(lambda x: x.strip() if isinstance(x, str) else x)

    empty_all = df.isna().all(axis=1) | df.apply(lambda r: all(str(x).strip() == "" for x in r), axis=1)
    df = df.loc[~empty_all].copy()

    if SPRINT_COL in df.columns:
        s = df[SPRINT_COL].astype(str).str.strip()
        if (s != "").any():
            first_idx = s.ne("").idxmax()
            df = df.loc[first_idx:].copy()

    empty_all = df.isna().all(axis=1) | df.apply(lambda r: all(str(x).strip() == "" for x in r), axis=1)
    df = df.loc[~empty_all].copy()
    return df.reset_index(drop=True)


# -------- General helpers --------
def column_letter_to_index(letter: str) -> int:
    letter = letter.strip().upper()
    result = 0
    for ch in letter:
        result = result * 26 + (ord(ch) - ord('A') + 1)
    return result - 1

def parse_date_series(series_like, dayfirst=True) -> pd.Series:
    s = pd.Series(series_like)
    return pd.to_datetime(s, errors="coerce", dayfirst=dayfirst)

def to_float_series(series_like) -> pd.Series:
    s = pd.Series(series_like)
    def _coerce(x):
        if x is None:
            return 0.0
        sx = str(x).strip().replace(",", ".")
        if sx == "":
            return 0.0
        try:
            return float(sx)
        except:
            return 0.0
    return s.map(_coerce)

def pick_dev_column(df: pd.DataFrame) -> str:
    for c in DEV_COL_CANDIDATES:
        if c in df.columns:
            return c
    for c in df.columns:
        if "asignad" in c.lower() or "desarrollador" in c.lower():
            return c
    raise RuntimeError("No se encontró columna de desarrollador.")

def pick_key_column(df: pd.DataFrame) -> Optional[str]:
    for c in KEY_COL_CANDIDATES:
        if c in df.columns:
            return c
    for c in df.columns:
        if c.strip().lower() in {"clave", "key", "issue", "ticket"}:
            return c
    return None

def detect_current_sprint(df: pd.DataFrame, sprint_col: str, today: datetime) -> Tuple[str, datetime]:
    def _pick_best(candidates: List[Tuple[str, datetime]]) -> Tuple[str, datetime]:
        le_today = [c for c in candidates if c[1] <= today]
        return max(le_today, key=lambda x: x[1]) if le_today else max(candidates, key=lambda x: x[1])

    candidates: List[Tuple[str, datetime]] = []

    if sprint_col in df.columns:
        for val in df[sprint_col].dropna().astype(str).unique():
            m = re.match(r"^S(\d{8})$", val.strip())
            if m:
                d = datetime.strptime(m.group(1), "%Y%m%d")
                candidates.append((val.strip(), d))
        if candidates:
            return _pick_best(candidates)

    for col in df.columns:
        for val in df[col].dropna().astype(str).unique():
            s = val.strip()
            m = re.match(r"^S(\d{8})$", s)
            if m:
                d = datetime.strptime(m.group(1), "%Y%m%d")
                candidates.append((s, d))
    if candidates:
        return _pick_best(candidates)

    # Derivar desde fecha de inicio (global)
    start_candidates = []
    if SPRINT_START_HEADER in df.columns:
        s = parse_date_series(df[SPRINT_START_HEADER]).dropna()
        if not s.empty:
            start_candidates.append(s.min())
    if not start_candidates and CARD_START_HEADER in df.columns:
        s = parse_date_series(df[CARD_START_HEADER]).dropna()
        if not s.empty:
            start_candidates.append(s.min())
    base = start_candidates[0] if start_candidates else today
    base = datetime(base.year, base.month, base.day)
    sprint_id = f"S{base.strftime('%Y%m%d')}"
    return sprint_id, base

def compute_dev_sprint_start(dev_df: pd.DataFrame, default_start: datetime) -> datetime:
    starts = []
    if SPRINT_START_HEADER in dev_df.columns:
        s = parse_date_series(dev_df[SPRINT_START_HEADER]).dropna()
        if not s.empty: starts.append(s.min())
    if not starts and CARD_START_HEADER in dev_df.columns:
        s = parse_date_series(dev_df[CARD_START_HEADER]).dropna()
        if not s.empty: starts.append(s.min())
    if not starts:
        return default_start
    dt = starts[0]
    return datetime(dt.year, dt.month, dt.day)

def compute_dev_sprint_end(dev_df: pd.DataFrame, default_start: datetime) -> datetime:
    if SPRINT_END_HEADER in dev_df.columns:
        s = parse_date_series(dev_df[SPRINT_END_HEADER]).dropna()
        if not s.empty:
            d = s.max().to_pydatetime()
            return datetime(d.year, d.month, d.day, 23, 59, 59)
    return default_start + timedelta(days=11, hours=23, minutes=59, seconds=59)

def choose_delivery_series(df: pd.DataFrame) -> pd.Series:
    if DELIVERY_DATE_HEADER in df.columns:
        return parse_date_series(df[DELIVERY_DATE_HEADER])
    if FORCE_COLUMN_LETTER:
        col_idx = column_letter_to_index(FORCE_COLUMN_LETTER)
        if col_idx < len(df.columns):
            s = df.iloc[:, col_idx]
            return parse_date_series(s)
        raise RuntimeError(f"FORCE_COLUMN_LETTER={FORCE_COLUMN_LETTER} excede el ancho de la hoja.")
    raise RuntimeError("No encontré la columna de fecha de entrega.")

def choose_estimate_series(df: pd.DataFrame) -> pd.Series:
    for c in ESTIMATE_CANDIDATES:
        if c in df.columns:
            return to_float_series(df[c])
    return pd.Series([0.0] * len(df), index=df.index)

def within(ts: pd.Timestamp, start: datetime, end: datetime) -> bool:
    if pd.isna(ts): return False
    d = ts.to_pydatetime()
    return start <= d <= end

def build_week_windows(sprint_start: datetime, sprint_end: datetime) -> Dict[str, Tuple[datetime, datetime]]:
    # Semana 1: lunes → viernes
    w1_end = sprint_start + timedelta(days=4, hours=23, minutes=59, seconds=59)
    # Semana 2: lunes siguiente → viernes siguiente
    w2_start = sprint_start + timedelta(days=7)
    w2_end = sprint_start + timedelta(days=11, hours=23, minutes=59, seconds=59)

    # Evitar pasarse del sprint_end real (por si termina antes)
    w1_end = min(w1_end, sprint_end)
    w2_end = min(w2_end, sprint_end)

    return {
        "all": (sprint_start, sprint_end),
        "week1": (w1_end - timedelta(days=4), w1_end),
        "week2": (w2_start, w2_end)
    }


# -------- Core --------
def compute_metrics_for_sprint(df: pd.DataFrame, today: datetime) -> Dict:
    sprint_id, sprint_code_date = detect_current_sprint(df, SPRINT_COL, today)
    current = df[df[SPRINT_COL].astype(str).str.strip() == sprint_id].copy()
    if current.empty:
        raise RuntimeError(f"No hay filas para el sprint {sprint_id}")

    dev_col = pick_dev_column(current)
    key_col = pick_key_column(current)
    delivery_series = choose_delivery_series(current)
    estimate_series = choose_estimate_series(current)
    tipo_col = TYPE_COL if TYPE_COL in current.columns else None

    out_by_dev: Dict[str, Dict] = {}
    totals_w1_cards = totals_w1_hours = 0.0
    totals_w2_cards = totals_w2_hours = 0.0
    global_starts, global_ends = [], []

    for dev, dev_df in current.groupby(dev_col):
        dev_df = dev_df.copy()
        dev_delivery = delivery_series.loc[dev_df.index]
        dev_est = estimate_series.loc[dev_df.index]

        sprint_start = compute_dev_sprint_start(dev_df, sprint_code_date)
        sprint_end   = compute_dev_sprint_end(dev_df, sprint_start)
        windows = build_week_windows(sprint_start, sprint_end)

        global_starts.append(sprint_start)
        global_ends.append(sprint_end)

        w1_mask = dev_delivery.apply(lambda x: within(x, *windows["week1"]))
        w2_mask = dev_delivery.apply(lambda x: within(x, *windows["week2"]))

        w1_cards = int(w1_mask.sum())
        w2_cards = int(w2_mask.sum())

        w1_hours = float(dev_est[w1_mask].sum())
        w2_hours = float(dev_est[w2_mask].sum())

        def build_details(mask):
            dets = []
            for idx in dev_df.loc[mask].index:
                dets.append({
                    "key": str(dev_df.loc[idx, key_col]) if key_col else None,
                    "planned_hours": float(estimate_series.loc[idx]),
                    "tipo": str(dev_df.loc[idx, tipo_col]) if tipo_col else None,
                    "due": (delivery_series.loc[idx].strftime("%Y-%m-%d") if pd.notna(delivery_series.loc[idx]) else None),
                })
            return dets

        # issues (solo keys) por compatibilidad con scripts previos
        w1_issues = []
        w2_issues = []
        if key_col:
            w1_issues = dev_df.loc[w1_mask, key_col].astype(str).tolist()
            w2_issues = dev_df.loc[w2_mask, key_col].astype(str).tolist()

        out_by_dev[str(dev)] = {
            "sprint_inicio": sprint_start.strftime("%Y-%m-%d"),
            "sprint_fin":    sprint_end.strftime("%Y-%m-%d"),
            "datos_semana_1": {
                "inicio": windows["week1"][0].strftime("%Y-%m-%d"),
                "fin":    windows["week1"][1].strftime("%Y-%m-%d"),
                "tarjetas_planificadas": w1_cards,
                "horas_planificadas": w1_hours,
                "issues": w1_issues,
                "issues_detail": build_details(w1_mask),
            },
            "datos_semana_2": {
                "inicio": windows["week2"][0].strftime("%Y-%m-%d"),
                "fin":    windows["week2"][1].strftime("%Y-%m-%d"),
                "tarjetas_planificadas": w2_cards,
                "horas_planificadas": w2_hours,
                "issues": w2_issues,
                "issues_detail": build_details(w2_mask),
            },
        }

        totals_w1_cards += w1_cards
        totals_w1_hours += w1_hours
        totals_w2_cards += w2_cards
        totals_w2_hours += w2_hours

    result = {
        "sprint_id": sprint_id,
        "generado_en": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
        "sprint_inicio_global": min(global_starts).strftime("%Y-%m-%d"),
        "sprint_fin_global":    max(global_ends).strftime("%Y-%m-%d"),
        "datos_semana_1": {
            "tarjetas_planificadas": int(totals_w1_cards),
            "horas_planificadas": float(totals_w1_hours),
        },
        "datos_semana_2": {
            "tarjetas_planificadas": int(totals_w2_cards),
            "horas_planificadas": float(totals_w2_hours),
        },
        "detalle_por_desarrollador": out_by_dev,
    }
    return result


def update_json_store(path: str, sprint_id: str, payload: Dict) -> None:
    store = {}
    if os.path.exists(path):
        try:
            with open(path, "r", encoding="utf-8") as f:
                store = json.load(f)
        except Exception:
            store = {}
    store[sprint_id] = payload
    tmp_path = path + ".tmp"
    with open(tmp_path, "w", encoding="utf-8") as f:
        json.dump(store, f, ensure_ascii=False, indent=2)
    os.replace(tmp_path, path)


def main():
    today = datetime.now()
    df = fetch_sheet_with_service_account_by_gid(
        sheet_id=SHEET_ID,
        worksheet_gid=WORKSHEET_GID,
        creds_path=CREDS_PATH,
        header_row=3,
        first_data_row=4,
    )
    if df.empty:
        raise SystemExit("La hoja está vacía o no se pudo leer.")

    df = strip_sheet_gaps(df)

    # Normalizar nombre Sprint por si trae espacios
    for c in list(df.columns):
        if c.strip().lower() == "sprint" and c != SPRINT_COL:
            df.rename(columns={c: SPRINT_COL}, inplace=True)

    metrics = compute_metrics_for_sprint(df, today=today)
    update_json_store(OUTPUT_JSON, metrics["sprint_id"], metrics)

    print(f"[OK] Actualizado {OUTPUT_JSON} para sprint {metrics['sprint_id']}")
    print("Semana 1:", metrics["datos_semana_1"])
    print("Semana 2:", metrics["datos_semana_2"])


if __name__ == "__main__":
    main()


[OK] Actualizado ./bt_it_sprint_planning.json para sprint S20251027
Semana 1: {'tarjetas_planificadas': 14, 'horas_planificadas': 101.0}
Semana 2: {'tarjetas_planificadas': 20, 'horas_planificadas': 174.0}


  df = df.applymap(lambda x: x.strip() if isinstance(x, str) else x)


In [8]:
# -*- coding: utf-8 -*-
"""
Lee bt_it_sprint_planning.json, consulta Jira por estado actual y worklogs en la
ventana del sprint (global [min(inicio), max(fin)]) y guarda report_it.json.
"""

import os
import re
import json
import time
from typing import Dict, Any, List
from collections import defaultdict
from datetime import datetime, timedelta

import requests
from requests.auth import HTTPBasicAuth
import pytz

PLANNING_JSON = os.getenv("PLANNING_JSON", "bt_it_sprint_planning.json")
REPORT_JSON   = os.getenv("REPORT_JSON",   "report_it.json")


JIRA_API_ROOT = os.getenv("JIRA_API_ROOT", "https://team-1583163151751.atlassian.net/rest/api/3")
SEARCH_JQL_URL = f"{JIRA_API_ROOT}/search/jql"

REAL_HOURS_CF = "customfield_10046"
# Campos mínimos
FIELDS = [
    "summary", "status", "statuscategorychangedate", "assignee", "priority",
    "timetracking", "duedate", "updated", "resolutiondate" , REAL_HOURS_CF
]

MAX_RESULTS  = 100
CHUNK_ISSUES = 100
TIMEOUTS     = (5, 30)
TZ = pytz.timezone("America/Argentina/Buenos_Aires")

DEV_MAP = {
    # "Alan Mori - Carestino": "Alan Mori - Carestino",
    # Mapear si los displayName de Jira difieren de los del Sheet
}

def load_planning(path: str) -> Dict[str, Any]:
    if not os.path.exists(path):
        raise SystemExit(f"No existe {path}. Ejecutá primero collect_planned_metrics.py")
    with open(path, "r", encoding="utf-8") as f:
        return json.load(f)

def parse_sprint_date(sprint_id: str) -> datetime:
    m = re.match(r"^S(\d{8})$", sprint_id)
    if not m:
        raise ValueError(f"Sprint inválido: {sprint_id}")
    return datetime.strptime(m.group(1), "%Y%m%d")

def pick_latest_sprint_key(store: Dict[str, Any]) -> str:
    keys = [k for k in store.keys() if re.match(r"^S\d{8}$", k)]
    if not keys:
        raise SystemExit("No encontré sprints en planning JSON.")
    keys.sort(key=lambda k: parse_sprint_date(k))
    return keys[-1]

def extract_issue_keys(planning: Dict[str, Any], sprint_id: str) -> List[str]:
    bucket = planning.get(sprint_id, {})
    detail = bucket.get("detalle_por_desarrollador", {}) or {}
    keys = []
    for dev, dev_data in detail.items():
        for wlabel in ("datos_semana_1", "datos_semana_2"):
            w = dev_data.get(wlabel, {}) or {}
            # preferir issues_detail si está
            if "issues_detail" in w and isinstance(w["issues_detail"], list):
                for it in w["issues_detail"]:
                    k = (it or {}).get("key")
                    if k: keys.append(str(k).strip())
            else:
                for k in w.get("issues", []) or []:
                    keys.append(str(k).strip())
    # unique keep order
    seen, out = set(), []
    for k in keys:
        if k not in seen:
            seen.add(k); out.append(k)
    return out

def fetch_issues_status(issue_keys: List[str]) -> Dict[str, Dict[str, Any]]:
    if not EMAIL or not API_TOKEN:
        raise SystemExit("Faltan credenciales JIRA_EMAIL / JIRA_API_TOKEN.")

    auth = HTTPBasicAuth(EMAIL, API_TOKEN)
    headers = {"Accept": "application/json", "Content-Type": "application/json"}

    result: Dict[str, Dict[str, Any]] = {}
    for i in range(0, len(issue_keys), CHUNK_ISSUES):
        slice_keys = issue_keys[i:i+CHUNK_ISSUES]
        jql = f"issuekey in ({','.join(slice_keys)})"

        next_token = None
        while True:
            payload = {"jql": jql, "fields": FIELDS, "maxResults": MAX_RESULTS}
            if next_token: payload["nextPageToken"] = next_token

            r = requests.post(SEARCH_JQL_URL, auth=auth, headers=headers, json=payload, timeout=TIMEOUTS)
            r.raise_for_status()
            data = r.json()

            for issue in data.get("issues", []) or []:
                key = issue.get("key")
                f = issue.get("fields", {}) or {}
                status_obj = f.get("status") or {}
                assignee_obj = f.get("assignee") or {}
                result[key] = {
                    "summary": f.get("summary"),
                    "status": status_obj.get("name"),
                    "status_category": (status_obj.get("statusCategory") or {}).get("name"),
                    "assignee": assignee_obj.get("displayName"),
                    "priority": (f.get("priority") or {}).get("name"),
                    "duedate": f.get("duedate"),
                    "updated": f.get("updated"),
                    "resolutiondate": f.get("resolutiondate"),
                    REAL_HOURS_CF: f.get(REAL_HOURS_CF),
                }

            if data.get("isLast", False): break
            next_token = data.get("nextPageToken")
            if not next_token: break
            time.sleep(0.2)

    return result

def fetch_worklogs_window(keys: List[str], start_dt: datetime, end_dt: datetime,
                          jira_map: Dict[str, Dict[str, Any]] = None) -> Dict[str, Dict[str, Any]]:
    """
    Trae worklogs dentro de la ventana [start_dt, end_dt] únicamente del usuario asignado a la tarjeta.
    """
    auth = HTTPBasicAuth(EMAIL, API_TOKEN)
    headers = {"Accept": "application/json"}

    # Normalizar fechas con tz
    if start_dt.tzinfo is None:
        start_dt = TZ.localize(start_dt)
    else:
        start_dt = start_dt.astimezone(TZ)
    if end_dt.tzinfo is None:
        end_dt = TZ.localize(end_dt)
    else:
        end_dt = end_dt.astimezone(TZ)

    out = {}
    for key in keys:
        # Tomar el assignee si se tiene del mapa de issues
        assignee_name = None
        if jira_map and key in jira_map:
            assignee_name = jira_map[key].get("assignee")

        url = f"{JIRA_API_ROOT}/issue/{key}/worklog"
        startAt = 0
        total_sec = 0
        by_author = {}

        while True:
            r = requests.get(url, auth=auth, headers=headers, params={"startAt": startAt, "maxResults": 100}, timeout=TIMEOUTS)
            if r.status_code == 404:
                # Issue inexistente o sin permiso
                print(f"⚠️ No se pudo leer worklogs de {key} (404)")
                break
            r.raise_for_status()
            data = r.json()

            for wl in data.get("worklogs", []) or []:
                started = wl.get("started")
                if not started:
                    continue
                dt = datetime.strptime(started, "%Y-%m-%dT%H:%M:%S.%f%z").astimezone(TZ)
                if not (start_dt <= dt <= end_dt):
                    continue

                author = (wl.get("author") or {}).get("displayName") or "Desconocido"

                # Si hay un assignee definido, filtramos solo sus worklogs
                if assignee_name and author != assignee_name:
                    continue

                sec = int(wl.get("timeSpentSeconds") or 0)
                total_sec += sec
                by_author[author] = by_author.get(author, 0) + sec

            startAt += data.get("maxResults", 0)
            if startAt >= data.get("total", 0):
                break

        out[key] = {"total_sec": total_sec, "by_author": by_author}

    return out

def main(sprint_id: str = None):
    planning = load_planning(PLANNING_JSON)
    if sprint_id is None:
        sprint_id = pick_latest_sprint_key(planning)

    bucket = planning[sprint_id]
    s_start = datetime.strptime(bucket["sprint_inicio_global"], "%Y-%m-%d").replace(hour=0, minute=0, second=0)
    s_end   = datetime.strptime(bucket["sprint_fin_global"],    "%Y-%m-%d").replace(hour=23, minute=59, second=59)

    print(f"➡️  Sprint: {sprint_id} | Ventana: {s_start.date()} → {s_end.date()} (incl.)")

    keys = extract_issue_keys(planning, sprint_id)
    if not keys:
        raise SystemExit("No hay issues en planificación.")
    print(f"🧾 Issues a consultar: {len(keys)}")

    jira_map = fetch_issues_status(keys)
    valid_keys = [k for k in keys if isinstance(k, str) and "-" in k and not k.startswith("S")]
    worklogs = fetch_worklogs_window(valid_keys, s_start, s_end, jira_map=jira_map)


    now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    issues_out = {}
    resumen_por_estado = defaultdict(int)
    resumen_por_dev    = defaultdict(int)

    # Aux: mapa dev por issue desde planning
    dev_map_issue = {}
    for dev, dev_data in bucket["detalle_por_desarrollador"].items():
        for w in ("datos_semana_1", "datos_semana_2"):
            detail = dev_data[w].get("issues_detail", []) or []
            for it in detail:
                if not it: continue
                k = it.get("key") 
                if not k: continue
                dev_map_issue[str(k)] = dev

            if not detail and dev_data[w].get("issues"):
                for k in dev_data[w]["issues"]:
                    dev_map_issue[str(k)] = dev

    for k in keys:
        j = jira_map.get(k, {})
        dev = dev_map_issue.get(k)
        status = j.get("status", "DESCONOCIDO")
        wl = worklogs.get(k, {"total_sec": 0, "by_author": {}})

        # NUEVO: horas reales por tarjeta desde el custom (entero = horas)
        raw_cf = j.get(REAL_HOURS_CF)
        hours_cf = 0.0
        if isinstance(raw_cf, (int, float)):
            hours_cf = float(raw_cf)
        elif isinstance(raw_cf, str) and raw_cf.strip().replace('.', '', 1).isdigit():
            hours_cf = float(raw_cf.strip())

        hours_wl = (wl.get("total_sec", 0) or 0) / 3600.0

        if hours_cf > 0:
            real_hours_issue = hours_cf
            real_source = REAL_HOURS_CF
        else:
            real_hours_issue = hours_wl
            real_source = "worklogs_window"

        DONE_STATUSES = {
            "Done",
            "FINISH",
            "TO DEPLOY",
            "PENDING REVIEW",
            "REVIEWING",
            "TO DEPLOY QA",
        }

        status_name = (j.get("status") or "").strip()
        status_cat = (j.get("status_category") or "").strip()
        issues_out[k] = {
            "developer": dev,
            "summary": j.get("summary"),
            "status": status,
            "status_category": j.get("status_category"),
            "assignee": j.get("assignee"),
            "priority": j.get("priority"),
            "duedate": j.get("duedate"),
            "updated": j.get("updated"),
            "resolutiondate": j.get("resolutiondate"),
            "worklogs": wl,
            "customfield_10046_raw": raw_cf,      # <-- NUEVO
            "real_hours_issue": real_hours_issue, # <-- NUEVO
            "real_hours_source": real_source,     # <-- NUEVO
            "done": (status_cat == "Done") or (status_name in DONE_STATUSES),
            "done_before_due": False,  # se setea abajo si aplica
            "capturado_en": now,
        }

        # done_before_due
        try:
            if j.get("resolutiondate") and j.get("duedate"):
                res = datetime.fromisoformat(j["resolutiondate"].replace("Z", "+00:00")).astimezone(TZ)
                due = datetime.strptime(j["duedate"], "%Y-%m-%d").replace(tzinfo=TZ, hour=23, minute=59, second=59)
                issues_out[k]["done_before_due"] = res <= due
        except Exception:
            pass

        resumen_por_estado[status] += 1
        if dev: resumen_por_dev[dev] += 1

    payload = {
        "generado_en": now,
        "sprint_id": sprint_id,
        "sprint_inicio_global": bucket["sprint_inicio_global"],
        "sprint_fin_global": bucket["sprint_fin_global"],
        "issues": issues_out,
        "resumen": {
            "por_estado": dict(resumen_por_estado),
            "por_desarrollador": dict(resumen_por_dev),
            "total": len(keys),
        },
    }

    store = {}
    if os.path.exists(REPORT_JSON):
        try:
            with open(REPORT_JSON, "r", encoding="utf-8") as f:
                store = json.load(f)
        except Exception:
            store = {}
    store[sprint_id] = payload

    tmp = REPORT_JSON + ".tmp"
    with open(tmp, "w", encoding="utf-8") as f:
        json.dump(store, f, ensure_ascii=False, indent=2)
    os.replace(tmp, REPORT_JSON)

    print(f"[OK] Reporte escrito en {REPORT_JSON}")


if __name__ == "__main__":
    main(SPRINT_A_CALCULAR)


➡️  Sprint: S20251027 | Ventana: 2025-10-27 → 2025-11-14 (incl.)
🧾 Issues a consultar: 34
[OK] Reporte escrito en report_it.json


In [10]:
# -*- coding: utf-8 -*-
"""
Cruza bt_it_sprint_planning.json + report_it.json y genera KPIs:
- % tarjetas completadas
- % horas cumplidas
- Desviación promedio de estimación
- % tareas replanificadas
- Carga planificada por dev
- Predictibilidad (% terminadas antes del due date)
Salida: kpis_it.json
"""

import os
import re
import json
from collections import defaultdict
from datetime import datetime
from typing import Dict, Any, List

PLANNING="bt_it_sprint_planning.json"
REPORT="report_it.json"
OUT="kpis_it.json"

DEV_MAP = {
    # Opcional: mapear nombres del Sheet -> displayName de Jira
    # "Alan Mori - Carestino": "Alan Mori - Carestino",
}

def parse_sprint_date(sprint_id: str) -> datetime:
    m = re.match(r"^S(\d{8})$", sprint_id)
    return datetime.strptime(m.group(1), "%Y%m%d")

def pick_latest_sprint_key(store: Dict[str, Any]) -> str:
    keys = [k for k in store.keys() if re.match(r"^S\d{8}$", k)]
    keys.sort(key=lambda k: parse_sprint_date(k))
    return keys[-1]

def loadj(p):
    if not os.path.exists(p): raise SystemExit(f"No existe {p}")
    with open(p,"r",encoding="utf-8") as f: return json.load(f)

def main(sprint_id=None):
    plan = loadj(PLANNING)
    if sprint_id is None:
        sprint_id = pick_latest_sprint_key(plan)
    rep_store = loadj(REPORT)
    if sprint_id not in rep_store:
        raise SystemExit(f"No hay reporte para {sprint_id} en {REPORT}")

    bucket = plan[sprint_id]
    report = rep_store[sprint_id]["issues"]

    # --- Acumuladores por dev ---
    by_dev = defaultdict(lambda: {
        "planned_cards":0,
        "planned_hours":0.0,
        "done_cards":0,
        "real_hours":0.0,
    })

    # map “planned_hours por issue”
    per_issue_planned = {}  # key -> planned_hours
    # y dev por issue según planificación
    dev_by_issue = {}

    detail = bucket["detalle_por_desarrollador"]
    for dev, d in detail.items():
        for w in ("datos_semana_1","datos_semana_2"):
            dd = d.get(w, {})
            by_dev[dev]["planned_cards"] += len(dd.get("issues", []))
            by_dev[dev]["planned_hours"] += float(dd.get("horas_planificadas", 0))
            for it in dd.get("issues_detail", []) or []:
                k = (it or {}).get("key")
                if not k: continue
                per_issue_planned[k] = float((it.get("planned_hours") or 0))
                dev_by_issue[k] = dev
            # compat: si no hay issues_detail, usar lista simple
            if not dd.get("issues_detail") and dd.get("issues"):
                for k in dd.get("issues"):
                    dev_by_issue[k] = dev

    # --- Sumar realizado por dev ---
    for k, info in report.items():
        dev = dev_by_issue.get(k) or info.get("developer")
        if not dev:
            continue
        if info.get("done"):
            by_dev[dev]["done_cards"] += 1

        # Preferir horas reales del custom; fallback: worklogs
        wl = info.get("worklogs", {})
        if isinstance(wl.get("by_author"), dict) and any(wl["by_author"].values()):
            author_name = DEV_MAP.get(dev, dev)
            real_hours = int(wl["by_author"].get(author_name, 0)) / 3600.0
        else:
            # fallback si no hay worklogs (tarjeta sin registrar tiempo)
            if isinstance(info.get("real_hours_issue"), (int, float)) and info["real_hours_issue"] > 0:
                real_hours = float(info["real_hours_issue"])
            else:
                real_hours = 0.0

        by_dev[dev]["real_hours"] += real_hours

    # --- KPIs por dev + desviación promedio ---
    kpis_dev = {}
    for dev, v in by_dev.items():
        pc, ph, dc, rh = v["planned_cards"], v["planned_hours"], v["done_cards"], v["real_hours"]
        pct_cards = (dc/pc*100) if pc else None
        pct_hours = (rh/ph*100) if ph else None

        # desviación promedio por issue planificada del dev
        dev_issues = [k for k, d in dev_by_issue.items() if d == dev]
        dev_devs = []
        for k in dev_issues:
            p = per_issue_planned.get(k, 0.0)
            if p <= 0:
                continue

            # ✅ Nuevo: preferir horas reales del customfield_10046 si existe
            info_k = report.get(k, {})
            if isinstance(info_k.get("real_hours_issue"), (int, float)) and info_k["real_hours_issue"] > 0:
                real_issue = float(info_k["real_hours_issue"])
            else:
                wl = info_k.get("worklogs", {})
                if isinstance(wl.get("by_author"), dict):
                    author_name = DEV_MAP.get(dev, dev)
                    real_issue_sec = int(wl["by_author"].get(author_name, 0))
                else:
                    real_issue_sec = int(wl.get("total_sec", 0))
                real_issue = real_issue_sec / 3600.0

            dev_devs.append(abs(p - real_issue) / p)

        avg_dev = (sum(dev_devs)/len(dev_devs)) if dev_devs else None

        kpis_dev[dev] = {
            "tarjetas_planificadas": pc,
            "tarjetas_realizadas": dc,
            "porc_tarjetas_completadas": round(pct_cards,1) if pct_cards is not None else None,
            "horas_planificadas": round(ph,1),
            "horas_realizadas": round(rh,1),
            "porc_horas_cumplidas": round(pct_hours,1) if pct_hours is not None else None,
            "desviacion_promedio_estimacion": round(avg_dev,3) if avg_dev is not None else None,
        }

    # --- Globales ---
    all_keys = list(report.keys())
    done_keys = [k for k, i in report.items() if i.get("done")]
    replan_pct = round((1 - len(done_keys)/len(all_keys))*100,1) if all_keys else None

    completed_with_due = [k for k in done_keys if report[k].get("duedate")]
    ontime = [k for k in completed_with_due if report[k].get("done_before_due")]
    predict = round(len(ontime)/len(completed_with_due)*100,1) if completed_with_due else None

    out = {
        "sprint_id": sprint_id,
        "kpis_por_desarrollador": kpis_dev,
        "kpis_globales": {
            "porc_tareas_replanificadas": replan_pct,
            "predictibilidad_por_due_date": predict,
            "carga_planificada_por_dev": {d: kpis_dev[d]["horas_planificadas"] for d in kpis_dev},
        }
    }

    store = {}
    if os.path.exists(OUT):
        try:
            with open(OUT, "r", encoding="utf-8") as f:
                store = json.load(f)
        except Exception:
            store = {}
    store[sprint_id] = out

    tmp = OUT + ".tmp"
    with open(tmp, "w", encoding="utf-8") as f:
        json.dump(store, f, ensure_ascii=False, indent=2)
    os.replace(tmp, OUT)

    print(f"[OK] KPIs escritos en {OUT}")

if __name__=="__main__":
    main(SPRINT_A_CALCULAR)


[OK] KPIs escritos en kpis_it.json
