# üß¨ Project: Oncology Clinical Intelligence Agent (Backend Logic)
**Author:** Carnegie Johnson | Consultant / AI Engineer / Instructor  
**Phase:** 1 - Data Ingestion & Environment Setup

### **üëã Welcome!**
This notebook serves as the "Engine Room" for a prototype data analytics platform. The goal is to build an automated pipeline that fetches/downloads public cancer research data from public sources like **cBioPortal** and securely store the raw data it in our private **Microsoft OneLake**.

If you are new to Azure, Python, Visual Studio Code, or Jupyter notebooks don't worry! We are here to learn together and *enjoy* what we are doing. I welcome feedback and questions.

### **Data Architecture Overview**
Using "Best Practices" in the Microsoft Fabric ecosystem, we follow the Medallion Architecture
1.  **Source:** Public Internet for our ü•â Bronze Layer (cBioPortal, etc.)
2.  **Processing:** Local Python Logic (In-Memory Decompression)
3.  **Destination:** Microsoft OneLake (The "OneDrive" for our data)


### üîé Discovery Step 1: Find your Workspace ID

Before we can create or access a Lakehouse, we need to know the unique ID of the Microsoft Fabric Workspace.

**Note:**
Since Microsoft Fabric is relatively new, standard Azure CLI commands often don't cover it yet. In some cases We use pure Python or `az rest` to send a direct "GET" or "POST" request to the **Fabric API**. 

The following code cell asks Microsoft: *"Please show me a table of all workspaces this user has access to."*

> **üìù Action Item:** Look at the output below. Copy the **ID** string for the workspace you want to use. You will need to paste this GUID value into your `.env` file assigned to `AZURE_WORKSPACE_ID`.

In [3]:
# List all Fabric Workspaces
# We add --resource to tell the CLI we need a token for the Fabric API
!az rest --method get \
    --url "https://api.fabric.microsoft.com/v1/workspaces" \
    --resource "https://api.fabric.microsoft.com" \
    --query "value[].{Name:displayName, ID:id, Description:description}" \
    --output table

Name               ID                                    Description
-----------------  ------------------------------------  -------------
DAT206xVA          ff6a4d50-b544-4cbf-a4f5-b9c480acb2a1
My workspace       5ebb56e4-c58f-422e-932a-ae218b374bd0
KeepItSimpleAgain  899f6064-e792-4c93-a05d-ef5345b61568  Co-Hack


### ** Optional **
Inspect Specific Workspace

In [None]:
import os
from dotenv import load_dotenv

# 1. Force reload the .env file
# override=True is critical; otherwise, it ignores changes if the var is already set
load_dotenv(override=True)

# ‚ö†Ô∏è ACTION REQUIRED: Paste the Workspace ID you found above
target_workspace_id = "PASTE_YOUR_GUID_HERE"  # os.getenv("AZURE_WORKSPACE_ID")
api_url = f"https://api.fabric.microsoft.com/v1/workspaces/{target_workspace_id}/items"
if target_workspace_id != "PASTE_YOUR_GUID_HERE":
    print(f"Inspecting Workspace: {target_workspace_id}...")
    
    command = (
        f'az rest --method get '
        f'--url "{api_url}" '
        f'--resource "https://api.fabric.microsoft.com" '
        f'--query "value[?type==\'Lakehouse\'].{{Name:displayName, ID:id}}" '
        f'--output table'
    )
    exit_code = os.system(command)
    if exit_code == 0:
        print("\n‚úÖ Success")
    else:
        print("\n‚ùå Failed")
else:
    print("‚ö†Ô∏è Please update the 'target_workspace_id' variable above.")

Inspecting Workspace: 5ebb56e4-c58f-422e-932a-ae218b374bd0...

‚úÖ Success


### Create the Lakehouse
Execute the lakehouse creation command if needed. 

1.  **Payload Check:** Add `"type": "Lakehouse"` to the JSON body (required by Microsoft).
2.  **Auto-Execution:** Use Python's `subprocess` tool to actually run the command.
3.  **Quote Safety:** Let Python handle the JSON formatting so we don't have to worry about escaping quotes manually.

In [5]:
import os
import requests
import json
from azure.identity import DefaultAzureCredential
from azure.core.exceptions import ClientAuthenticationError
from dotenv import load_dotenv
load_dotenv()    # loads .env from the notebook's working dir (or project root)

def smart_create_lakehouse():
    # --- 1. CONFIGURATION
    ws_id = os.getenv("AZURE_WORKSPACE_ID")
    lh_nm = os.getenv("LAKEHOUSE_NAME")
    
    if not ws_id or not lh_nm:
        print("‚ùå Error: Missing configuration. Check your .env file.")
        return

    print(f"‚öôÔ∏è  Configuration Loaded.")
    print(f"   - Workspace ID: {ws_id}")
    print(f"   - Target Lakehouse: {lh_nm}")

    # --- 2. AUTHENTICATION ---
    print("\nüîë Authenticating...")
    cred = DefaultAzureCredential()
    try:
        # Request token specifically for Fabric API
        token = cred.get_token("https://api.fabric.microsoft.com/.default")
    except ClientAuthenticationError:
        print("‚ùå Authentication failed. Please run '!az login' in a separate cell.")
        return

    headers = {
        "Authorization": f"Bearer {token.token}",
        "Content-Type": "application/json"
    }

    # --- 3. PRE-FLIGHT CHECK: FABRIC CAPACITY ---
    print("üîç Verifying Workspace Capacity...")
    ws_url = f"https://api.fabric.microsoft.com/v1/workspaces/{ws_id}"
    
    ws_response = requests.get(ws_url, headers=headers)
    
    if ws_response.status_code != 200:
        print(f"‚ùå Failed to fetch workspace details (Status: {ws_response.status_code})")
        print(ws_response.text)
        return

    ws_data = ws_response.json()
    capacity_id = ws_data.get('capacityId')
    
    if not capacity_id:
        print("\nüõë CRITICAL BLOCKER: No Capacity Assigned")
        print(f"   The workspace '{ws_data.get('displayName')}' is not assigned to a Fabric Capacity.")
        print("   -> Action: Go to Fabric Portal -> Workspace Settings -> Premium/Fabric Capacity -> Assign a Trial or F-SKU.")
        return
    else:
        print(f"‚úÖ Capacity Verified (ID: {capacity_id})")

    # --- 4. IDEMPOTENCY CHECK (Does it exist?) ---
    items_url = f"https://api.fabric.microsoft.com/v1/workspaces/{ws_id}/items"
    items_response = requests.get(items_url, headers=headers)
    
    if items_response.status_code == 200:
        existing_items = items_response.json().get('value', [])
        for item in existing_items:
            if item['displayName'] == lh_nm and item['type'] == 'Lakehouse':
                print(f"\n‚úÖ Lakehouse '{lh_nm}' already exists (ID: {item['id']}). Skipping creation.")
                return

    # --- 5. EXECUTION (Create Lakehouse) ---
    print(f"\nüî® Creating Lakehouse '{lh_nm}'...")
    payload = {
        "displayName": lh_nm,
        "type": "Lakehouse"
    }
    
    create_response = requests.post(items_url, headers=headers, json=payload)
    
    if create_response.status_code in [201, 202]:
        data = create_response.json()
        print("üéâ SUCCESS! Lakehouse created successfully.")
        print(f"   ID: {data.get('id')}")
    else:
        print(f"\n‚ùå Creation Failed (Status {create_response.status_code})")
        print(f"   Error Message: {create_response.text}")

# Run the smart function
smart_create_lakehouse()

‚öôÔ∏è  Configuration Loaded.
   - Workspace ID: 899f6064-e792-4c93-a05d-ef5345b61568
   - Target Lakehouse: OncologyRawData

üîë Authenticating...
üîç Verifying Workspace Capacity...
‚úÖ Capacity Verified (ID: e4092c29-f2bc-4267-bb6a-32798e85d5c5)

‚úÖ Lakehouse 'OncologyRawData' already exists (ID: 46ca0043-801d-4453-9fa6-81530dce1cd4). Skipping creation.


### Data Ingestion

This cell may be used as a template to be customized per data source for ingestion.
Site 1: cBioPortal

In [None]:
import os
import requests
import logging
from azure.identity import DefaultAzureCredential
from azure.storage.filedatalake import DataLakeServiceClient
from tqdm import tqdm # Switched to standard tqdm (works everywhere)
from datetime import datetime
from dotenv import load_dotenv

# --- 1. Configuration & Auth ---
# Force reload .env to fix the "ValueError" from before
load_dotenv(override=True)

ws_nm = os.getenv("AZURE_WORKSPACE_NAME") or "MyWorkspace" # Fallback if env fails
lh_nm = os.getenv("LAKEHOUSE_NAME") or "MyLakehouse"
ol_ep = os.getenv("ONELAKE_ENDPOINT") or "https://onelake.dfs.fabric.microsoft.com"

# The Official Source of Truth API
CBIOPORTAL_API_STUDIES = "https://www.cbioportal.org/api/studies"
# The Official S3 Bucket for direct downloads
CBIOPORTAL_S3_BASE = "https://cbioportal-datahub.s3.amazonaws.com"

# Setup Logging
os.makedirs("logs", exist_ok=True)
logging.basicConfig(
    filename=f"logs/ingestion_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log",
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logging.getLogger().addHandler(logging.StreamHandler())

print(f"Connecting to OneLake: {ws_nm} / {lh_nm}")

# Auth
try:
    credential = DefaultAzureCredential()
    service_client = DataLakeServiceClient(account_url=ol_ep, credential=credential)
    fs_client = service_client.get_file_system_client(file_system=ws_nm)
except Exception as e:
    print(f"Auth failed: {e}")
    # Stop execution if auth fails
    raise 

# --- 2. Helper Functions ---

def get_valid_studies():
    """Fetches the official list of study IDs from the cBioPortal API."""
    try:
        logging.info("Fetching study list from API...")
        r = requests.get(CBIOPORTAL_API_STUDIES, timeout=10)
        r.raise_for_status()
        
        # Parse JSON and extract IDs
        all_studies = r.json()
        
        # Create a dictionary of {id: url}
        study_map = {}
        for s in all_studies:
            s_id = s['studyId']
            # Construct S3 URL
            study_map[s_id] = f"{CBIOPORTAL_S3_BASE}/{s_id}.tar.gz"
            
        logging.info(f"API returned {len(study_map)} studies.")
        return study_map
    except Exception as e:
        logging.error(f"API Error: {e}")
        return {}

def upload_file_to_onelake(target_path, data_bytes):
    """Uploads bytes to OneLake."""
    try:
        full_path = f"{lh_nm}.Lakehouse/Files/{target_path}"
        file_client = fs_client.get_file_client(full_path)
        file_client.create_file()
        file_client.upload_data(data_bytes, overwrite=True)
        return True
    except Exception as e:
        logging.error(f"Upload Error ({target_path}): {e}")
        return False

def process_study(study_id, url):
    """Downloads from S3 and uploads to OneLake."""
    try:
        # Check if file exists (HEAD request)
        head = requests.head(url, timeout=5)
        if head.status_code != 200:
            logging.warning(f"Skipping {study_id}: File not found on S3.")
            return

        logging.info(f"Downloading {study_id}...")
        with requests.get(url, stream=True) as r:
            r.raise_for_status()
            
            target_path = f"Raw/cBioPortal/Archives/{study_id}.tar.gz"
            upload_file_to_onelake(target_path, r.content)
            
    except Exception as e:
        logging.error(f"Failed {study_id}: {e}")

# --- 3. Execution ---

# Get List
studies = get_valid_studies()

# Filter for a specific type to test (e.g., 'gbm' for Glioblastoma or 'brca' for Breast)
# REMOVE the filter to download everything (Warning: 300+ studies!)
target_ids = {k: v for k, v in studies.items() if 'gbm' in k.lower()}

print(f"Found {len(target_ids)} matching studies (filtering for 'gbm' for safety) out of {len(studies)} available studies.")

# Run Loop (Using standard tqdm)
for s_id, url in tqdm(list(target_ids.items())[:3], desc="Ingesting..."):
    process_study(s_id, url)

print("Done. Check OneLake 'Files/Raw/cBioPortal/Archives'.")

No environment configuration found.
No environment configuration found.
ManagedIdentityCredential will use IMDS
ManagedIdentityCredential will use IMDS
Fetching study list from API...
Fetching study list from API...


Connecting to OneLake: KeepItSimpleAgain / OncologyRawData


API returned 510 studies.
API returned 510 studies.


Found 10 matching studies (filtering for 'gbm' for safety).


Downloading utuc_igbmc_2021...
Downloading utuc_igbmc_2021...
Request URL: 'http://169.254.169.254/metadata/identity/oauth2/token?api-version=REDACTED&resource=REDACTED'
Request method: 'GET'
Request headers:
    'User-Agent': 'azsdk-python-identity/1.25.1 Python/3.13.7 (Windows-11-10.0.26200-SP0)'
No body was attached to the request
Request URL: 'http://169.254.169.254/metadata/identity/oauth2/token?api-version=REDACTED&resource=REDACTED'
Request method: 'GET'
Request headers:
    'User-Agent': 'azsdk-python-identity/1.25.1 Python/3.13.7 (Windows-11-10.0.26200-SP0)'
No body was attached to the request
DefaultAzureCredential acquired a token from AzureCliCredential
DefaultAzureCredential acquired a token from AzureCliCredential
Request URL: 'https://onelake.dfs.fabric.microsoft.com/KeepItSimpleAgain/OncologyRawData.Lakehouse/Files/Raw/cBioPortal/Archives/utuc_igbmc_2021.tar.gz?resource=REDACTED'
Request method: 'PUT'
Request headers:
    'x-ms-version': 'REDACTED'
    'Accept': 'applic

Done. Check OneLake 'Files/Raw/cBioPortal/Archives'.





# üèÅ Phase 1 Complete: Transitioning to the "Silver Layer"

**Congratulations!**   Pipeline established that fetches raw data from the public datasets and securely stores it in **OneLake**.

### **Architecture Insight: The Medallion Model**
In Data Engineering, best practice is the **Medallion Architecture**:
* **ü•â Bronze Layer (Raw):** (Currently here) Raw `.tar.gz` files sitting in a folder. They are hard to query and "messy."
* **ü•à Silver Layer (Clean):** Next step is to unpack these files, clean the headers, and organize them into **Delta Tables** (high-performance SQL tables).
* **ü•á Gold Layer (Curated):** Aggregated data ready for dashboards and AI agents.

### **üëâ Next Step: Cloud-Based Processing**
To deal with complex compressed files, switch to **Microsoft Fabric Notebooks** (powered by PySpark) to handle the heavy lifting.

# ‚ö†Ô∏è ACTION REQUIRED: 
**Instructions:**
1.  Open your browser and go to [Microsoft Fabric](https://app.fabric.microsoft.com).
2.  Navigate to your Workspace (`<YOUR_WORKSPACE_NAME>`).
3.  Click **+ New Item** -> **Notebook**.
4.  Copy the **PySpark Code** noted below and paste it into the first cell of your new online notebook.
5.  Run the cell to transform the Raw Files into a Silver Delta Table.

---

# ‚ö†Ô∏è ACTION REQUIRED: 
### **üìã Copy-Paste Code for Fabric Notebook**

```python
# -----------------------------------------------------------------------
# PHASE 2: BRONZE TO SILVER TRANSFORMATION
# Run this in a Microsoft Fabric Notebook (PySpark Kernel)
# -----------------------------------------------------------------------

import tarfile
import os
from glob import glob
from pyspark.sql.functions import input_file_name, regexp_extract, col

# --- CONFIGURATION ---
# The "Lakehouse" mount point is standard in Fabric
BASE_PATH = "/lakehouse/default/Files"
RAW_PATH = f"{BASE_PATH}/Raw/cBioPortal"
EXTRACT_PATH = f"{BASE_PATH}/Staging/Clinical"

# --- STEP 1: UNPACK (Standard Python) ---
# Spark cannot read inside .tar.gz files easily, so we unzip them first.
print("üì¶ Step 1: Extracting Clinical Data from Archives...")
os.makedirs(EXTRACT_PATH, exist_ok=True)

# Find all uploaded tar.gz files
archives = glob(f"{RAW_PATH}/*/*.tar.gz")

if not archives:
    print("‚ùå No files found! Did you run the ingestion pipeline?")
else:
    for archive in archives:
        try:
            # Create a readable ID from the folder name
            study_id = os.path.basename(os.path.dirname(archive))
            
            with tarfile.open(archive, "r:gz") as tar:
                for member in tar.getmembers():
                    # We only extract the 'patient' data file
                    if "data_clinical_patient.txt" in member.name:
                        # Rename it to avoid collisions (e.g., study1_patient.txt)
                        member.name = f"{study_id}_patient_data.txt" 
                        tar.extract(member, path=EXTRACT_PATH)
                        print(f"   -> Extracted: {member.name}")
        except Exception as e:
            print(f"   ‚ö†Ô∏è Error reading {archive}: {e}")

# --- STEP 2: LOAD & CLEAN (PySpark) ---
print("\n‚ú® Step 2: Loading into Delta Table...")

# cBioPortal files have comments starting with '#'. We tell Spark to ignore them.
df = spark.read.option("header", "true") \
    .option("delimiter", "\t") \
    .option("comment", "#") \
    .option("inferSchema", "true") \
    .csv(f"{EXTRACT_PATH}/*.txt")

# Add the 'Study_ID' column by extracting it from the filename
df_enriched = df.withColumn("SourceFile", input_file_name()) \
                .withColumn("Study_ID", regexp_extract(col("SourceFile"), r".*/(.*)_patient_data.txt", 1)) \
                .drop("SourceFile")

# Clean Column Names (Remove spaces/parentheses for database compatibility)
for name in df_enriched.columns:
    clean_name = name.replace(" ", "_").replace("[", "").replace("]", "").replace("(", "").replace(")", "")
    df_enriched = df_enriched.withColumnRenamed(name, clean_name)

# --- STEP 3: WRITE TO SILVER ---
table_name = "Clinical_Patients_Silver"
df_enriched.write.format("delta").mode("overwrite").saveAsTable(table_name)

print(f"\nüéâ Success! Data saved to Delta Table: '{table_name}'")
print(f"üìä Total Records: {df_enriched.count()}")

# Show a sample
display(df_enriched.limit(5))