In [18]:
import os
import boto3
import requests
import pandas as pd
from io import StringIO
import io
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
from botocore.exceptions import ClientError
import logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

import requests
import pandas as pd
import io
import re
import os
from datetime import timezone
import urllib.parse

In [3]:
file='/Users/aesocia/Documents/BGSI/DEV/dwh-dags/bclconvert_appsessions-2025-08-25.csv'
API_BASE_URL = "https://api.aps4.sh.basespace.illumina.com/v2/runs"
API_TOKEN = "c8c09c39ab664017997f5c1caf4e49b9"
API_BASE = "https://api.aps4.sh.basespace.illumina.com/v2"
df_app=pd.read_csv(file)

In [None]:
MAX_ROWS   = 1000   # hard cap of newest rows to collect
PAGE_LIMIT = 25     # API hard cap
SORT_BY    = "DateCreated"
SORT_DIR   = "Desc"  # newest first

# Optional: early stop once DateCreated < this value
CURR_DS    = "2025-08-20"  # e.g., "2025-08-25"

# ---------------------------
# Run
# ---------------------------
headers = {
    "Authorization": f"Bearer {API_TOKEN}",
    "Content-Type": "application/json",
    "Accept": "application/json",
}

all_rows = []
offset = 0

while len(all_rows) < MAX_ROWS:
    url = (
        f"{API_BASE}/appsessions"
        f"?offset={offset}&limit={PAGE_LIMIT}"
        f"&sortBy={SORT_BY}&sortDir={SORT_DIR}"
    )
    resp = requests.get(url, headers=headers)
    resp.raise_for_status()
    payload = resp.json() or {}
    sessions = payload.get("Items", []) or []

    # Optional early stop if sessions are older than CURR_DS
    if CURR_DS:
        sessions = [s for s in sessions if s.get("DateCreated", "") >= CURR_DS]
        if not sessions:
            break

    for session in sessions:
        if "BCLConvert" not in session.get("Name", ""):
            continue

        session_id = session.get("Id")
        if not session_id:
            continue

        # Fetch session detail
        detail_url = f"{API_BASE}/appsessions/{session_id}"
        dresp = requests.get(detail_url, headers=headers)
        if dresp.status_code != 200:
            logger.warning(f"⚠️ Detail fetch failed for {session_id}: {dresp.status_code}")
            continue
        detail = dresp.json() or {}

        props_items = detail.get("Properties", {}).get("Items", []) or []
        properties = {i.get("Name"): i.get("Content") for i in props_items if i.get("Name")}

        run_items = []
        for i in props_items:
            if i.get("Name") == "Input.Runs":
                run_items = i.get("RunItems", []) or []

        for run in run_items:
            all_rows.append({
                "RowType": "Run",
                "SessionId": session_id,
                "SessionName": detail.get("Name"),
                "DateCreated": detail.get("DateCreated"),
                "DateModified": detail.get("DateModified"),
                "ExecutionStatus": detail.get("ExecutionStatus"),
                "ICA_Link": detail.get("HrefIcaAnalysis"),
                "ICA_ProjectId": properties.get("ICA.ProjectId"),
                "WorkflowReference": properties.get("ICA.WorkflowSessionUserReference"),
                "RunId": run.get("Id"),
                "RunName": run.get("Name"),
                "PercentGtQ30": run.get("SequencingStats", {}).get("PercentGtQ30"),
                "FlowcellBarcode": run.get("FlowcellBarcode"),
                "ReagentBarcode": run.get("ReagentBarcode"),
                "Status": run.get("Status"),
                "ExperimentName": run.get("ExperimentName"),
                "RunDateCreated": run.get("DateCreated"),
            })

            if len(all_rows) >= MAX_ROWS:
                logger.info(f"✅ Reached max_rows={MAX_ROWS}. Stopping.")
                break

        if len(all_rows) >= MAX_ROWS:
            break

    offset += PAGE_LIMIT
# ---------------------------
# Build DataFrame
# ---------------------------
df = pd.DataFrame(all_rows, columns=[
    "RowType", "SessionId", "SessionName", "DateCreated", "DateModified",
    "ExecutionStatus", "ICA_Link", "ICA_ProjectId", "WorkflowReference",
    "RunId", "RunName", "PercentGtQ30", "FlowcellBarcode", "ReagentBarcode",
    "Status", "ExperimentName", "RunDateCreated"
])

logger.info(f"✔ Final DataFrame shape: {df.shape}")


[[34m2025-08-26T10:26:57.080+0700[0m] {[34m974258736.py:[0m36} INFO[0m - 📦 Page offset=0 | received 25 sessions[0m
[[34m2025-08-26T10:26:58.085+0700[0m] {[34m974258736.py:[0m36} INFO[0m - 📦 Page offset=25 | received 25 sessions[0m
[[34m2025-08-26T10:26:58.087+0700[0m] {[34m974258736.py:[0m42} INFO[0m - ⏹ Hit sessions older than cutoff; stopping.[0m
[[34m2025-08-26T10:26:58.104+0700[0m] {[34m974258736.py:[0m108} INFO[0m - ✔ Final DataFrame shape: (2, 17)[0m


In [54]:
df["RunId"]

0    246246
1    245245
Name: RunId, dtype: object

In [89]:
import requests
import pandas as pd
import logging
import sys

def fetch_bclconvert_runs_with_yield(
    api_base: str,
    api_token: str,
    *,
    max_rows: int = 1000,
    page_limit: int = 25,
    sort_by: str = "DateCreated",
    sort_dir: str = "Desc",
    curr_ds: str | None = None,
    logger: logging.Logger | None = None,
) -> pd.DataFrame:
    """
    Fetch newest BCLConvert Run sessions and enrich them with total_flowcell_yield_gbp.

    Parameters
    ----------
    api_base : str
        Base URL for the AppSessions API (e.g., https://api.example.com)
    api_token : str
        API token for AppSessions
    stats_base : str
        Base URL for sequencing stats API (e.g., https://api.example.com/runs)
    max_rows : int
        Hard cap of newest rows to collect
    page_limit : int
        API page size (server hard cap is 25)
    sort_by : str
        Field to sort by (default "DateCreated")
    sort_dir : str
        Sort direction (default "Desc" = newest first)
    curr_ds : str | None
        Optional cutoff, only include sessions where DateCreated >= curr_ds
    logger : logging.Logger | None
        Optional logger; if None, a default logger is created.

    Returns
    -------
    pd.DataFrame
        DataFrame of Run rows, including total_flowcell_yield_gbp column.
    """

    if logger is None:
        logging.basicConfig(
            level=logging.INFO,
            format="%(asctime)s %(levelname)s %(message)s",
            stream=sys.stdout,
        )
        logger = logging.getLogger("bclconvert-fetch")

    all_rows = []
    offset = 0

    sessions_headers = {
        "Authorization": f"Bearer {api_token}",
        "Content-Type": "application/json",
        "Accept": "application/json",
    }

    while len(all_rows) < max_rows:
        url = (
            f"{api_base}/appsessions"
            f"?offset={offset}&limit={page_limit}"
            f"&sortBy={sort_by}&sortDir={sort_dir}"
        )
        resp = requests.get(url, headers=sessions_headers)
        resp.raise_for_status()
        payload = resp.json() or {}
        sessions = payload.get("Items", []) or []

        if not sessions:
            break

        # Optional cutoff
        if curr_ds:
            sessions = [s for s in sessions if s.get("DateModified", "") >= curr_ds]
            if not sessions:
                break

        for session in sessions:
            if "BCLConvert" not in session.get("Name", ""):
                continue

            session_id = session.get("Id")
            if not session_id:
                continue

            # fetch detail
            detail_url = f"{api_base}/appsessions/{session_id}"
            dresp = requests.get(detail_url, headers=sessions_headers)
            if dresp.status_code != 200:
                logger.warning(f"⚠️ Detail fetch failed for {session_id}: {dresp.status_code}")
                continue
            detail = dresp.json() or {}

            props_items = detail.get("Properties", {}).get("Items", []) or []
            properties = {i.get("Name"): i.get("Content") for i in props_items if i.get("Name")}

            run_items = []
            for i in props_items:
                if i.get("Name") == "Input.Runs":
                    run_items = i.get("RunItems", []) or []

            for run in run_items:
                all_rows.append({
                    "row_type": "Run",
                    "session_id": session_id,
                    "session_name": detail.get("Name"),
                    "date_created": detail.get("DateCreated"),
                    "date_modified": detail.get("DateModified"),
                    "execution_status": detail.get("ExecutionStatus"),
                    "ica_link": detail.get("HrefIcaAnalysis"),
                    "ica_project_id": properties.get("ICA.ProjectId"),
                    "workflow_reference": properties.get("ICA.WorkflowSessionUserReference"),
                    "run_id": run.get("Id"),
                    "run_name": run.get("Name"),
                    "percent_gt_q30": run.get("SequencingStats", {}).get("PercentGtQ30"),
                    "flowcell_barcode": run.get("FlowcellBarcode"),
                    "reagent_barcode": run.get("ReagentBarcode"),
                    "status": run.get("Status"),
                    "experiment_name": run.get("ExperimentName"),
                    "run_date_created": run.get("DateCreated"),
                })

                if len(all_rows) >= max_rows:
                    break
            if len(all_rows) >= max_rows:
                break

        offset += page_limit

    # ---- build DataFrame ----
    df = pd.DataFrame(all_rows, columns=[
        "row_type", "session_id", "session_name", "date_created", "date_modified",
        "execution_status", "ica_link", "ica_project_id", "workflow_reference",
        "run_id", "run_name", "percent_gt_q30", "flowcell_barcode", "reagent_barcode",
        "status", "experiment_name", "run_date_created"
    ])

    logger.info(f"✔ DataFrame shape before enrichment: {df.shape}")

    # ---- enrich with sequencing stats ----
    run_rows = df[df["row_type"] == "Run"]

    for _, row in run_rows.iterrows():
        run_id = row.get("run_id")
        if not run_id or run_id.lower() == "nan":
            continue

        api_url = f"{api_base}/runs/{run_id}/sequencingstats"
        stats_headers = {
            "x-access-token": api_token,
            "Accept": "application/json"
        }

        try:
            response = requests.get(api_url, headers=stats_headers)
            response.raise_for_status()
            data = response.json()
            total_yield = data.get("YieldTotal")

            if total_yield is not None:
                df.loc[
                    (df["row_type"] == "Run") & (df["run_id"] == run_id),
                    "total_flowcell_yield_Gbp"
                ] = total_yield
        except Exception as e:
            logger.warning(f"⚠️ Failed fetching stats for {run_id}: {e}")
            continue

    logger.info(f"✔ Final DataFrame shape: {df.shape}")
    return df

df = fetch_bclconvert_runs_with_yield(
    api_base=API_BASE,
    api_token=API_TOKEN,
    curr_ds="2025-08-20"
)




[[34m2025-08-26T13:29:24.790+0700[0m] {[34m1136230620.py:[0m144} INFO[0m - ✔ DataFrame shape before enrichment: (2, 17)[0m
[[34m2025-08-26T13:29:25.045+0700[0m] {[34m1136230620.py:[0m175} INFO[0m - ✔ Final DataFrame shape: (2, 18)[0m


In [90]:
df

Unnamed: 0,row_type,session_id,session_name,date_created,date_modified,execution_status,ica_link,ica_project_id,workflow_reference,run_id,run_name,percent_gt_q30,flowcell_barcode,reagent_barcode,status,experiment_name,run_date_created,total_flowcell_yield_Gbp
0,Run,259259,BCLConvert 08/22/2025 23:57:04Z,2025-08-22T23:57:05.0000000Z,2025-08-23T02:24:32.0000000Z,Complete,https://ica.illumina.com/ica/link/project/7feb...,7feb6619-714b-48f7-a7fd-75ad264f9c55,ws_LP2508211-P1_b823b3,246246,250821_A01856_0274_BHLMJMDSXF,89.5419,HLMJMDSXF,NV2432571-RGSBS,Complete,LP2508211-P1,2025-08-21T06:23:35.0000000Z,3650.02
1,Run,256256,BCLConvert 08/21/2025 01:50:00Z,2025-08-21T01:50:00.0000000Z,2025-08-21T04:08:42.0000000Z,Complete,https://ica.illumina.com/ica/link/project/7feb...,7feb6619-714b-48f7-a7fd-75ad264f9c55,ws_LP2508131-P2_cd272a,245245,250819_A01856_0273_BHKGCNDSXF,89.3399,HKGCNDSXF,NV4421056-RGSBS,Complete,LP2508131-P2,2025-08-19T08:17:14.0000000Z,3545.24


In [None]:
import requests
import pandas as pd
import logging
import sys

def fetch_bclconvert_runs_with_yield(
    api_base: str,
    api_token: str,
    *,
    max_rows: int = 1000,
    page_limit: int = 25,
    sort_by: str = "DateCreated",
    sort_dir: str = "Desc",
    curr_ds: str | None = None,
    logger: logging.Logger | None = None,
) -> pd.DataFrame:
    """
    Fetch newest BCLConvert Run sessions and enrich them with total_flowcell_yield_gbp.

    Parameters
    ----------
    api_base : str
        Base URL for the AppSessions API (e.g., https://api.example.com)
    api_token : str
        API token for AppSessions
    stats_base : str
        Base URL for sequencing stats API (e.g., https://api.example.com/runs)
    max_rows : int
        Hard cap of newest rows to collect
    page_limit : int
        API page size (server hard cap is 25)
    sort_by : str
        Field to sort by (default "DateCreated")
    sort_dir : str
        Sort direction (default "Desc" = newest first)
    curr_ds : str | None
        Optional cutoff, only include sessions where DateCreated >= curr_ds
    logger : logging.Logger | None
        Optional logger; if None, a default logger is created.

    Returns
    -------
    pd.DataFrame
        DataFrame of Run rows, including total_flowcell_yield_gbp column.
    """

    if logger is None:
        logging.basicConfig(
            level=logging.INFO,
            format="%(asctime)s %(levelname)s %(message)s",
            stream=sys.stdout,
        )
        logger = logging.getLogger("bclconvert-fetch")

    all_rows = []
    offset = 0

    sessions_headers = {
        "Authorization": f"Bearer {api_token}",
        "Content-Type": "application/json",
        "Accept": "application/json",
    }

    while len(all_rows) < max_rows:
        url = (
            f"{api_base}/appsessions"
            f"?offset={offset}&limit={page_limit}"
            f"&sortBy={sort_by}&sortDir={sort_dir}"
        )
        resp = requests.get(url, headers=sessions_headers)
        resp.raise_for_status()
        payload = resp.json() or {}
        sessions = payload.get("Items", []) or []

        if not sessions:
            break

        # Optional cutoff
        if curr_ds:
            sessions = [s for s in sessions if s.get("DateModified", "") >= curr_ds]
            if not sessions:
                break

        for session in sessions:
            if "BCLConvert" not in session.get("Name", ""):
                continue

            session_id = session.get("Id")
            if not session_id:
                continue

            # fetch detail
            detail_url = f"{api_base}/appsessions/{session_id}"
            dresp = requests.get(detail_url, headers=sessions_headers)
            if dresp.status_code != 200:
                logger.warning(f"⚠️ Detail fetch failed for {session_id}: {dresp.status_code}")
                continue
            detail = dresp.json() or {}

            props_items = detail.get("Properties", {}).get("Items", []) or []
            properties = {i.get("Name"): i.get("Content") for i in props_items if i.get("Name")}

            run_items = []
            for i in props_items:
                if i.get("Name") == "Input.Runs":
                    run_items = i.get("RunItems", []) or []

            for run in run_items:
                all_rows.append({
                    "row_type": "Run",
                    "session_id": session_id,
                    "session_name": detail.get("Name"),
                    "date_created": detail.get("DateCreated"),
                    "date_modified": detail.get("DateModified"),
                    "execution_status": detail.get("ExecutionStatus"),
                    "ica_link": detail.get("HrefIcaAnalysis"),
                    "ica_project_id": properties.get("ICA.ProjectId"),
                    "workflow_reference": properties.get("ICA.WorkflowSessionUserReference"),
                    "run_id": run.get("Id"),
                    "run_name": run.get("Name"),
                    "percent_gt_q30": run.get("SequencingStats", {}).get("PercentGtQ30"),
                    "flowcell_barcode": run.get("FlowcellBarcode"),
                    "reagent_barcode": run.get("ReagentBarcode"),
                    "status": run.get("Status"),
                    "experiment_name": run.get("ExperimentName"),
                    "run_date_created": run.get("DateCreated"),
                })

                if len(all_rows) >= max_rows:
                    break
            if len(all_rows) >= max_rows:
                break

        offset += page_limit

    # ---- build DataFrame ----
    df = pd.DataFrame(all_rows, columns=[
        "row_type", "session_id", "session_name", "date_created", "date_modified",
        "execution_status", "ica_link", "ica_project_id", "workflow_reference",
        "run_id", "run_name", "percent_gt_q30", "flowcell_barcode", "reagent_barcode",
        "status", "experiment_name", "run_date_created"
    ])

    logger.info(f"✔ DataFrame shape before enrichment: {df.shape}")

    # ---- enrich with sequencing stats ----
    run_rows = df[df["row_type"] == "Run"]

    for _, row in run_rows.iterrows():
        run_id = row.get("run_id")
        if not run_id or run_id.lower() == "nan":
            continue

        api_url = f"{api_base}/runs/{run_id}/sequencingstats"
        stats_headers = {
            "x-access-token": api_token,
            "Accept": "application/json"
        }

        try:
            response = requests.get(api_url, headers=stats_headers)
            response.raise_for_status()
            data = response.json()
            total_yield = data.get("YieldTotal")

            if total_yield is not None:
                df.loc[
                    (df["row_type"] == "Run") & (df["run_id"] == run_id),
                    "total_flowcell_yield_Gbp"
                ] = total_yield
        except Exception as e:
            logger.warning(f"⚠️ Failed fetching stats for {run_id}: {e}")
            continue

    logger.info(f"✔ Final DataFrame shape: {df.shape}")
    return df

df = fetch_bclconvert_runs_with_yield(
    api_base=API_BASE,
    api_token=API_TOKEN,
    curr_ds="2025-08-20"
)




In [20]:
BASE_URL = "https://ica.illumina.com/ica/rest/api"
PROJECT_ID = "7feb6619-714b-48f7-a7fd-75ad264f9c55"
API_KEY="04LlMKg4K0asFGREmIXhucZ3IV2Hinx"

curr_ds="2025-05-21"

In [21]:
def create_download_url(api_key: str, project_id: str, file_id: str) -> str:
    url = f"{BASE_URL}/projects/{project_id}/data/{file_id}:createDownloadUrl"
    headers = {
        "accept": "application/vnd.illumina.v3+json",
        "X-API-Key": api_key
    }
    response = requests.post(url, headers=headers, data='')
    response.raise_for_status()
    return response.json().get("url")

In [22]:
import requests
from datetime import datetime, timezone

def parse_iso_utc(ts: str) -> datetime:
    # e.g. "2025-08-23T02:07:00Z" -> aware datetime
    return datetime.fromisoformat(ts.replace("Z", "+00:00"))

analyses = []
page_size = 100
page_offset = 0

HEADERS = {
    "accept": "application/vnd.illumina.v3+json",
    "X-API-Key": API_KEY
}

logger.info(f"Fetching analyses for: {curr_ds}")

# Build UTC cutoff at start of day
cutoff = datetime.strptime(curr_ds, "%Y-%m-%d").replace(tzinfo=timezone.utc)

while True:
    url = (
        f"{BASE_URL}/projects/{PROJECT_ID}/analyses"
        f"?pageSize={page_size}&pageOffset={page_offset}&sort=reference%20desc"
    )
    logger.info(f"Requesting URL: {url}")
    resp = requests.get(url, headers=HEADERS)
    resp.raise_for_status()
    data = resp.json()

    items = data.get("items", [])
    logger.info(f"Fetched {len(items)} analyses (offset {page_offset})")

    # Keep only those with timeModified >= cutoff
    for a in items:
        tm = a.get("timeModified")
        if not tm:
            continue
        if parse_iso_utc(tm) >= cutoff:
            analyses.append(a)

    if len(items) < page_size:
        break
    page_offset += page_size

if not analyses:
    logger.info("No analyses found that meet timeModified cutoff.")
    latest_analyses = []
else:
    # Sort by timeModified (latest first)
    latest_analyses = sorted(
        analyses,
        key=lambda a: parse_iso_utc(a["timeModified"]),
        reverse=True
    )

# latest_analyses now has only items with timeModified >= curr_ds, newest first


[[34m2025-08-26T13:49:36.196+0700[0m] {[34m3112584711.py:[0m17} INFO[0m - Fetching analyses for: 2025-05-21[0m
[[34m2025-08-26T13:49:36.198+0700[0m] {[34m3112584711.py:[0m27} INFO[0m - Requesting URL: https://ica.illumina.com/ica/rest/api/projects/7feb6619-714b-48f7-a7fd-75ad264f9c55/analyses?pageSize=100&pageOffset=0&sort=reference%20desc[0m
[[34m2025-08-26T13:49:38.013+0700[0m] {[34m3112584711.py:[0m33} INFO[0m - Fetched 100 analyses (offset 0)[0m
[[34m2025-08-26T13:49:38.014+0700[0m] {[34m3112584711.py:[0m27} INFO[0m - Requesting URL: https://ica.illumina.com/ica/rest/api/projects/7feb6619-714b-48f7-a7fd-75ad264f9c55/analyses?pageSize=100&pageOffset=100&sort=reference%20desc[0m
[[34m2025-08-26T13:49:41.320+0700[0m] {[34m3112584711.py:[0m33} INFO[0m - Fetched 100 analyses (offset 100)[0m
[[34m2025-08-26T13:49:41.321+0700[0m] {[34m3112584711.py:[0m27} INFO[0m - Requesting URL: https://ica.illumina.com/ica/rest/api/projects/7feb6619-714b-48f7-a7fd-75a

In [None]:
import io
import pandas as pd

def create_download_url(api_key: str, project_id: str, file_id: str, BASE_URL:str) -> str:
    url = f"{BASE_URL}/projects/{project_id}/data/{file_id}:createDownloadUrl"
    headers = {
        "accept": "application/vnd.illumina.v3+json",
        "X-API-Key": api_key
    }
    response = requests.post(url, headers=headers, data='')
    response.raise_for_status()
    return response.json().get("url")

for analysis in latest_analyses:
    reference = analysis.get("reference")
    logger.info(f"Checking analysis reference: {reference}")
    if not reference:
        continue

    match = re.search(r"(LP[-_]?\d{7}(?:-P\d)?(?:[-_](?:rerun|redo))?)", str(reference), re.IGNORECASE)
    if not match:
        logger.warning(f"Could not extract id_library from {reference}")
        continue
    id_library = match.group(1)

    # --- Handle Demultiplex_Stats.csv ---
    demux_file_path = f"/ilmn-analyses/{reference}/output/Reports/Demultiplex_Stats.csv"
    demux_encoded = urllib.parse.quote(demux_file_path)

    demux_query = (
        f"{BASE_URL}/projects/{PROJECT_ID}/data"
        f"?filePath={demux_encoded}"
        f"&filenameMatchMode=EXACT"
        f"&filePathMatchMode=STARTS_WITH_CASE_INSENSITIVE"
        f"&status=AVAILABLE&type=FILE"
    )

    demux_response = requests.get(demux_query, headers=HEADERS)
    demux_response.raise_for_status()
    demux_items = demux_response.json().get("items", [])

    if demux_items:
        file_id = demux_items[0]["data"]["id"]
        download_url = create_download_url(API_KEY, PROJECT_ID, file_id, BASE_URL)
        response = requests.get(download_url)
        response.raise_for_status()

        # === Add id_library column ===
        csv_buf = io.StringIO(response.content.decode("utf-8"))
        df = pd.read_csv(csv_buf)
        df["id_library"] = id_library  # append new column

        # Convert back to CSV bytes
        csv_bytes = df.to_csv(index=False).encode("utf-8")

        # Upload to S3
        s3_key = f"{object_path_prefix}/{reference}/{id_library}_Demultiplex_Stats.csv"
        s3.load_bytes(
            bytes_data=csv_bytes,
            key=s3_key,
            bucket_name=bucket_name,
            replace=True
        )
        logger.info(f"Uploaded Demultiplex_Stats with id_library to: s3://{bucket_name}/{s3_key}")
    else:
        logger.info(f"Demultiplex_Stats.csv not found for {reference}")


[[34m2025-08-26T13:55:02.991+0700[0m] {[34m3034486776.py:[0m16} INFO[0m - Checking analysis reference: LP2508211-P1_b823b3_04e691-64321e29-bf95-42e0-9337-118876056dbf[0m
[[34m2025-08-26T13:55:05.464+0700[0m] {[34m3034486776.py:[0m16} INFO[0m - Checking analysis reference: LP2508131-P2_cd272a_8faf5b-c79b78eb-da2c-4fc8-9194-322e7dbc82ab[0m
[[34m2025-08-26T13:55:07.819+0700[0m] {[34m3034486776.py:[0m16} INFO[0m - Checking analysis reference: LP2508131-P1_62549f_199e16-f3b4d819-aabf-42a8-96fc-bd1d1cd93e63[0m
[[34m2025-08-26T13:55:10.079+0700[0m] {[34m3034486776.py:[0m16} INFO[0m - Checking analysis reference: LP2508121-P2_ae0335_382f7b-a0729fcd-d498-4cb7-84b4-caaa3c10f447[0m
[[34m2025-08-26T13:55:12.902+0700[0m] {[34m3034486776.py:[0m16} INFO[0m - Checking analysis reference: LP2508061-P1_9bbda7_194252-3e8f92e0-e3b9-458a-bde7-f85b6a840bdd[0m
[[34m2025-08-26T13:55:15.434+0700[0m] {[34m3034486776.py:[0m16} INFO[0m - Checking analysis reference: LP2508111-P1

In [None]:
# --- Quality_Metrics.csv ---
quality_file_path = f"/ilmn-analyses/{reference}/output/Reports/Quality_Metrics.csv"
quality_encoded = urllib.parse.quote(quality_file_path)

quality_query = (
    f"{BASE_URL}/projects/{PROJECT_ID}/data"
    f"?filePath={quality_encoded}"
    f"&filenameMatchMode=EXACT"
    f"&filePathMatchMode=STARTS_WITH_CASE_INSENSITIVE"
    f"&status=AVAILABLE&type=FILE"
)

quality_response = requests.get(quality_query, headers=HEADERS)
quality_response.raise_for_status()
quality_items = quality_response.json().get("items", [])

if quality_items:
    file_id = quality_items[0]["data"]["id"]
    # NOTE: your create_download_url requires BASE_URL param
    download_url = create_download_url(API_KEY, PROJECT_ID, file_id, BASE_URL)
    response = requests.get(download_url)
    response.raise_for_status()

    # Add id_library column
    q_csv_buf = io.StringIO(response.content.decode("utf-8"))
    q_df = pd.read_csv(q_csv_buf)
    q_df["id_library"] = id_library  # appended as last column

    # Convert back to CSV bytes
    q_csv_bytes = q_df.to_csv(index=False).encode("utf-8")

    # Final S3 key format: illumina/qs/{file_id}/Quality_Metrics.csv
    qs_s3_key = f"illumina/qs/{file_id}/Quality_Metrics.csv"
    s3.load_bytes(
        bytes_data=q_csv_bytes,
        key=qs_s3_key,
        bucket_name="bgsi-data-dwh-bronze",
        replace=True
    )
    logger.info(f"Uploaded Quality_Metrics (with id_library) to: s3://bgsi-data-dwh-bronze/{qs_s3_key}")
else:
    logger.info(f"Quality_Metrics.csv not found for {reference}")

'LP2505201-P1'

In [None]:
import io
import re
import urllib.parse
from typing import Dict, Any, List
from datetime import datetime, timezone

import pandas as pd
import requests


def sync_ica_qc_to_s3(
    *,                     # "YYYY-MM-DD"
    API_KEY: str,
    PROJECT_ID: str,
    BASE_URL: str,                    # e.g. "https://ica.illumina.com/ica/rest/api"
    bucket_name: str,                 # destination bucket for Demux
    object_path_prefix: str,          # prefix for Demux (e.g. "illumina/demux")
    s3_hook,                          # Airflow S3Hook instance
    logger,                            # Airflow logger
    **kwargs
) -> Dict[str, Any]:
    """
    Fetch ICA analyses (timeModified >= curr_ds), pull Demultiplex_Stats.csv and Quality_Metrics.csv,
    append id_library column, and upload to S3.

    Returns:
        {
          "analyses_considered": int,
          "analyses_processed": int,
          "demux_uploaded": int,
          "quality_uploaded": int,
          "per_analysis": [{ "reference": str, "demux": "uploaded|not_found|error", "quality": "uploaded|not_found|error" }, ...]
        }
    """

    # ---------- helpers ----------
    def create_download_url(api_key: str, project_id: str, file_id: str, base_url: str) -> str:
        url = f"{base_url}/projects/{project_id}/data/{file_id}:createDownloadUrl"
        headers = {
            "accept": "application/vnd.illumina.v3+json",
            "X-API-Key": api_key
        }
        r = requests.post(url, headers=headers, data="")
        r.raise_for_status()
        return r.json().get("url")

    def parse_iso_utc(ts: str) -> datetime:
        return datetime.fromisoformat(ts.replace("Z", "+00:00"))

    LP_REGEX = re.compile(r"(LP[-_]?\d{7}(?:-P\d)?(?:[-_](?:rerun|redo))?)", re.IGNORECASE)

    def lookup_file_by_path(file_path: str) -> List[dict]:
        encoded = urllib.parse.quote(file_path)
        q = (
            f"{BASE_URL}/projects/{PROJECT_ID}/data"
            f"?filePath={encoded}"
            f"&filenameMatchMode=EXACT"
            f"&filePathMatchMode=STARTS_WITH_CASE_INSENSITIVE"
            f"&status=AVAILABLE&type=FILE"
        )
        rr = requests.get(q, headers=HEADERS)
        rr.raise_for_status()
        return rr.json().get("items", [])

    def download_csv_bytes(file_id: str) -> bytes:
        url = create_download_url(API_KEY, PROJECT_ID, file_id, BASE_URL)
        r = requests.get(url)
        r.raise_for_status()
        return r.content

    def add_id_library(csv_bytes: bytes, id_library: str) -> bytes:
        buf = io.StringIO(csv_bytes.decode("utf-8"))
        df = pd.read_csv(buf)
        df["id_library"] = id_library
        return df.to_csv(index=False).encode("utf-8")

    # ---------- fetch analyses (filtered by timeModified >= cutoff) ----------
    HEADERS = {
        "accept": "application/vnd.illumina.v3+json",
        "X-API-Key": API_KEY
    }
    cutoff = datetime.strptime(kwargs.get("ds"), "%Y-%m-%d").replace(tzinfo=timezone.utc)

    analyses: List[dict] = []
    page_size = 100
    page_offset = 0

    logger.info(f"[ICA] Fetching analyses updated on/after {curr_ds} (UTC start-of-day cutoff)")

    while True:
        url = (
            f"{BASE_URL}/projects/{PROJECT_ID}/analyses"
            f"?pageSize={page_size}&pageOffset={page_offset}&sort=reference%20desc"
        )
        logger.info(f"[ICA] GET {url}")
        resp = requests.get(url, headers=HEADERS)
        resp.raise_for_status()
        data = resp.json()
        items = data.get("items", [])

        logger.info(f"[ICA] Page offset {page_offset}: {len(items)} analyses")

        for a in items:
            tm = a.get("timeModified")
            if tm and parse_iso_utc(tm) >= cutoff:
                analyses.append(a)

        if len(items) < page_size:
            break
        page_offset += page_size

    latest_analyses = sorted(
        analyses, key=lambda a: parse_iso_utc(a["timeModified"]), reverse=True
    ) if analyses else []

    # ---------- process each analysis ----------
    summary = {
        "analyses_considered": len(latest_analyses),
        "analyses_processed": 0,
        "demux_uploaded": 0,
        "quality_uploaded": 0,
        "per_analysis": []
    }

    for analysis in latest_analyses:
        reference = analysis.get("reference")
        if not reference:
            continue

        status_row = {"reference": reference, "demux": "not_found", "quality": "not_found"}

        m = LP_REGEX.search(str(reference))
        if not m:
            logger.warning(f"[ICA] Could not extract id_library from reference: {reference}")
            summary["per_analysis"].append(status_row)
            continue
        id_library = m.group(1)

        # --- Demultiplex_Stats.csv ---
        try:
            demux_path = f"/ilmn-analyses/{reference}/output/Reports/Demultiplex_Stats.csv"
            demux_items = lookup_file_by_path(demux_path)

            if demux_items:
                demux_file_id = demux_items[0]["data"]["id"]
                raw_bytes = download_csv_bytes(demux_file_id)
                out_bytes = add_id_library(raw_bytes, id_library)

                demux_s3_key = f"{object_path_prefix}/{reference}/{id_library}_Demultiplex_Stats.csv"
                s3_hook.load_bytes(
                    bytes_data=out_bytes,
                    key=demux_s3_key,
                    bucket_name=bucket_name,
                    replace=True
                )
                logger.info(f"[S3] Uploaded Demultiplex_Stats → s3://{bucket_name}/{demux_s3_key}")
                status_row["demux"] = "uploaded"
                summary["demux_uploaded"] += 1
            else:
                logger.info(f"[ICA] Demultiplex_Stats.csv not found for {reference}")

        except Exception as e:
            logger.error(f"[ICA] Error processing Demultiplex_Stats for {reference}: {e}", exc_info=True)
            status_row["demux"] = "error"

        # --- Quality_Metrics.csv ---
        try:
            quality_path = f"/ilmn-analyses/{reference}/output/Reports/Quality_Metrics.csv"
            quality_items = lookup_file_by_path(quality_path)

            if quality_items:
                quality_file_id = quality_items[0]["data"]["id"]
                raw_bytes = download_csv_bytes(quality_file_id)
                out_bytes = add_id_library(raw_bytes, id_library)

                qs_s3_key = f"illumina/qs/{quality_file_id}/Quality_Metrics.csv"
                s3_hook.load_bytes(
                    bytes_data=out_bytes,
                    key=qs_s3_key,
                    bucket_name="bgsi-data-dwh-bronze",
                    replace=True
                )
                logger.info(f"[S3] Uploaded Quality_Metrics → s3://bgsi-data-dwh-bronze/{qs_s3_key}")
                status_row["quality"] = "uploaded"
                summary["quality_uploaded"] += 1
            else:
                logger.info(f"[ICA] Quality_Metrics.csv not found for {reference}")

        except Exception as e:
            logger.error(f"[ICA] Error processing Quality_Metrics for {reference}: {e}", exc_info=True)
            status_row["quality"] = "error"

        summary["analyses_processed"] += 1
        summary["per_analysis"].append(status_row)

    logger.info(
        f"[DONE] considered={summary['analyses_considered']}, "
        f"processed={summary['analyses_processed']}, "
        f"demux_uploaded={summary['demux_uploaded']}, "
        f"quality_uploaded={summary['quality_uploaded']}"
    )
    return summary
