In [9]:
import os, json

# 1) Use env-var secrets backend instead of the metastore
os.environ["AIRFLOW__SECRETS__BACKENDS"] = "airflow.secrets.env_vars.EnvVarSecretsBackend"

# 2) Provide the connection your DAG expects via env var
#    Conn ID: SYNAPSE_ORCA_SERVICE_ACCOUNT_CONN  (from your code)

# Put your real creds in these env vars (or hardcode here for local testing)
SYN_USER = os.environ.get("SYN_USER", "ramayyala")
SYN_PAT  = os.environ.get("SYN_PAT",  "eyJ0eXAiOiJKV1QiLCJraWQiOiJXN05OOldMSlQ6SjVSSzpMN1RMOlQ3TDc6M1ZYNjpKRU9VOjY0NFI6VTNJWDo1S1oyOjdaQ0s6RlBUSCIsImFsZyI6IlJTMjU2In0.eyJhY2Nlc3MiOnsic2NvcGUiOlsidmlldyIsImRvd25sb2FkIiwibW9kaWZ5Il0sIm9pZGNfY2xhaW1zIjp7fX0sInRva2VuX3R5cGUiOiJQRVJTT05BTF9BQ0NFU1NfVE9LRU4iLCJpc3MiOiJodHRwczovL3JlcG8tcHJvZC5wcm9kLnNhZ2ViYXNlLm9yZy9hdXRoL3YxIiwiYXVkIjoiMCIsIm5iZiI6MTc1MDEzOTQxMiwiaWF0IjoxNzUwMTM5NDEyLCJqdGkiOiIyMTc2NyIsInN1YiI6IjM0MzY2NjYifQ.faAoqrFfcsQysDQkhvGCPQw54yN84rAJkXSDh6v8kYz4Yi6jU9F5U3V57cUthtHTKkeqLK1-XkVI8OJ2_QeQ_IUW6W2gTgt0ATH6YHsFroQkysbfcF1BnddG9_mWeX2dl5bvz0xbwSUPJYc9LVd-Nj-GvljjYdcHumeskCKYUfj5VjgwBMQS_Qw7BG1a9krcONyv3RBPk7AVYskk1ptoNBrHafjGpqfN5-8RVtetQQKgztpyFGwgDEONTrL52Y3JxzKQAOFmmWtSrEqRNmZxmeLh4C5RTkhnc5gE3o1Fd-uCi1bOkImAFyHppCsq9zsPQYUGkuUrFFrD76YBUdJL1A")

# Airflow supports JSON-style connection definitions in env vars
airflow_conn_json = {
    #"conn_id":   "SYNAPSE_ORCA_SERVICE_ACCOUNT_CONN",
    "conn_type": "http",  # type is largely ignored by your custom hook; http is fine
    "host":      "https://repo-prod.prod.sagebase.org",
    "login":     SYN_USER,
    "password":  SYN_PAT,
    # If your SynapseHook reads extras, you can add them here:
    "extra": {"profile": "service-account"}
}

os.environ["AIRFLOW_CONN_SYNAPSE_ORCA_SERVICE_ACCOUNT_CONN"] = json.dumps(airflow_conn_json)

print("AIRFLOW__SECRETS__BACKENDS set.")
print("AIRFLOW_CONN_SYNAPSE_ORCA_SERVICE_ACCOUNT_CONN set.")


AIRFLOW__SECRETS__BACKENDS set.
AIRFLOW_CONN_SYNAPSE_ORCA_SERVICE_ACCOUNT_CONN set.


In [10]:
from datetime import datetime
import requests
from typing import Dict, List, Tuple, Any, Optional

import pandas as pd
import json
from jsonata import jsonata
from jsonschema import validate, ValidationError

from synapseclient.models import Dataset, DatasetCollection, File
from orca.services.synapse import SynapseHook
import synapseclient 
from airflow.decorators import task, dag
from airflow.models import Variable, Param
from slack_sdk import WebClient

In [11]:
syn = synapseclient.Synapse()
syn.login(authToken=SYN_PAT)

[[34m2025-10-08T23:29:03.523+0000[0m] {[34m_client.py:[0m1026} INFO[0m - HTTP Request: GET https://pypi.org/pypi/synapseclient/json "HTTP/1.1 200 OK"[0m
Welcome, ram.ayyala!

[[34m2025-10-08T23:29:03.636+0000[0m] {[34mclient.py:[0m1014} INFO[0m - Welcome, ram.ayyala!
[0m


In [12]:
dag_params = {
    "project_id": Param("syn64892175", type="string"),
    "mapping_url": Param(
        "https://raw.githubusercontent.com/amp-als/data-model/refs/heads/main/mapping/cpath.jsonata",
        type="string",
    ),
    "schema_url": Param(
        "https://raw.githubusercontent.com/amp-als/data-model/refs/heads/main/json-schemas/Dataset.json",
        type="string",
    ),
    "cpath_api_url": Param(
        "https://fair.dap.c-path.org/api/collections/als-kp/datasets", type="string"
    ),
    "ignore_cpath_datasets": Param("syn68737367", type="string"),
    "collection_id": Param("syn69962707", type="string"),
    "synapse_conn_id": Param("SYNAPSE_ORCA_SERVICE_ACCOUNT_CONN", type="string"),
    
}

dag_config = {
    "schedule_interval": "0 0 1 * *",  # Run on the first day of the month at midnight
    "start_date": datetime(2025, 5, 1),
    "catchup": False,
    "default_args": {
        "retries": 2,
    },
    "tags": ["als-kp"],
    "params": dag_params,
}


def load_mapping_from_url(url: str) -> str:
    """Load the JSONata mapping expression from a URL.

    Arguments:
        url (str): The URL to fetch the JSONata mapping expression from.

    Returns:
        str: The JSONata mapping expression as a string.

    Raises:
        requests.exceptions.RequestException: If the request fails or returns a non-200 status code.
        requests.exceptions.Timeout: If the request times out.
    """
    response = requests.get(url, timeout=30)
    response.raise_for_status()
    return response.text


def load_schema_from_url(url: str) -> Dict[str, Any]:
    """Load the JSON Schema from a URL.

    Arguments:
        url (str): The URL to fetch the JSON Schema from.

    Returns:
        Dict[str, Any]: The parsed JSON Schema as a dictionary.

    Raises:
        requests.exceptions.RequestException: If the request fails or returns a non-200 status code.
        requests.exceptions.Timeout: If the request times out.
        json.JSONDecodeError: If the response is not valid JSON.
    """
    response = requests.get(url, timeout=30)
    response.raise_for_status()
    return response.json()


def validate_item(
    item: Dict[str, Any], schema: Dict[str, Any]
) -> Tuple[bool, Optional[str]]:
    """Validate an item against a JSON Schema.

    Arguments:
        item (Dict[str, Any]): The item to validate.
        schema (Dict[str, Any]): The JSON Schema to validate against.

    Returns:
        Tuple[bool, Optional[str]]: A tuple containing:
            - bool: True if the item is valid, False otherwise.
            - Optional[str]: Error message if validation fails, None if validation succeeds.
    """
    try:
        validate(instance=item, schema=schema)
        return True, None
    except ValidationError as e:
        return False, str(e)


def transform_with_jsonata(
    source_items: List[Dict[str, Any]],
    mapping_expr: str,
    schema: Optional[Dict[str, Any]] = None,
) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
    """Transform a list of items using a JSONata expression and validate against schema.

    Arguments:
        source_items (List[Dict[str, Any]]): List of source items to transform.
        mapping_expr (str): The JSONata mapping expression to apply.
        schema (Optional[Dict[str, Any]], optional): JSON Schema to validate transformed items against.
            If None, no validation is performed. Defaults to None.

    Returns:
        Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]: A tuple containing:
            - List[Dict[str, Any]]: List of successfully transformed and validated items.
            - List[Dict[str, Any]]: List of validation errors, each containing:
                - item_index (int): Index of the item that failed validation
                - error (str): Validation error message
                - transformed_item (Dict[str, Any]): The transformed item that failed validation

    Raises:
        jsonata.JsonataError: If the JSONata expression is invalid.
    """
    expr = jsonata.Jsonata(mapping_expr)
    transformed_items: List[Dict[str, Any]] = []
    validation_errors: List[Dict[str, Any]] = []

    for i, item in enumerate(source_items):
        result = expr.evaluate(item)
        if schema:
            is_valid, error = validate_item(result, schema)
            if not is_valid:
                validation_errors.append(
                    {"item_index": i, "error": error, "transformed_item": result}
                )
                continue
        transformed_items.append(result)

    return transformed_items, validation_errors


In [13]:
def fetch_cpath_data(**context) -> Dict[str, Any]:
        """Fetch data from C-Path API using auth token from Airflow Variables.

        Arguments:
            **context: Airflow task context containing DAG parameters

        Returns:
            Dict[str, Any]: JSON response from the C-Path API containing dataset items

        Raises:
            requests.exceptions.RequestException: If the API request fails
        """
        headers = {
            "accept": "application/json",
            "Authorization": f"Bearer {'eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiIsImtpZCI6IjA5YWMyOTY0LTQzZTYtNDQ0ZC05MjhkLTg4ODgyYmQ5NGVjYSJ9.eyJleHAiOjE3NjE4Njg3OTksIm5iZiI6MTc1OTE4ODg0MSwiaWF0IjoxNzU5MTg4ODQxLCJzdWIiOiI2ZDFiYTNjYi02MWY5LTRhNDctYTEyYS0xMDMyMTAzZGYxYjciLCJpc3MiOiJodHRwczovL2ZhaXIuZGFwLmMtcGF0aC5vcmcvYXBpIiwiYXVkIjoiYXJpZGhpYSIsInRva2VuIjoic2FnZS1jcGF0aC1hcGktdG9rZW4ifQ.fSC-ZbL_Q7IGQwSrVR8ydYwQumM3k5f3QEm2LNK1ipM'}",
        }

        response = requests.get(context["params"]["cpath_api_url"], headers=headers)
        response.raise_for_status()
        return response.json()

In [14]:
# Mock the context that Airflow would provide
context = {
    "params": {
        "cpath_api_url": "https://fair.dap.c-path.org/api/collections/als-kp/datasets",
        "mapping_url": "https://raw.githubusercontent.com/amp-als/data-model/refs/heads/main/mapping/cpath.jsonata",
        "schema_url":"https://raw.githubusercontent.com/amp-als/data-model/refs/heads/main/json-schemas/Dataset.json",
        "ignore_cpath_datasets": "syn68737367",
        "collection_id": "syn69962707",
        "synapse_conn_id": "SYNAPSE_ORCA_SERVICE_ACCOUNT_CONN",
        "project_id": "syn64892175"
    }
}

data = fetch_cpath_data(**context)
data

{'items': [{'id': 1725,
   'code': 'fm2_als1001_2024_08_31',
   'created_at': '2024-10-29T23:38:49.140Z',
   'updated_at': '2025-07-23T21:14:13.791Z',
   'catalogue': {'title': 'Clinical Trial Ceftriaxone in Subjects With ALS',
    'description': 'The purpose of the study is to evaluate the safety and efficacy of ceftriaxone treatment in amyotrophic lateral sclerosis (ALS).  Ceftriaxone is approved by the U.S. Food and Drug Administration (FDA) for treating bacterial infections but not for treating ALS. Also, ceftriaxone has not been given to people over a long period of time, such as months or years. The goals of this study are to evaluate the safety and effectiveness of ceftriaxone as a treatment for ALS, and to determine the safety and effectiveness of long-term use of the drug in people with ALS.',
    'nctId': [{'link': 'https://www.clinicaltrials.gov/study/NCT00349622',
      'text': 'NCT00349622'}],
    'rights': 'https://portal.rdca.c-path.org/data-use-agreement',
    'creator'

In [15]:
def transform_data(data: Dict[str, Any], **context) -> List[Dict[str, Any]]:
        """Transform the data using JSONata mapping and validate against schema.

        This task:
        1. Loads the JSONata mapping expression from the specified URL
        2. Loads the JSON Schema from the specified URL
        3. Applies the mapping to each item in the input data
        4. Validates each transformed item against the schema
        5. Returns only the valid transformed items

        Arguments:
            data: Raw data from the C-Path API
            **context: Airflow task context containing DAG parameters

        Returns:
            List[Dict[str, Any]]: List of transformed and validated items

        Raises:
            ValueError: If any validation errors are found
            requests.exceptions.RequestException: If loading mapping or schema fails
        """
        mapping_expr = load_mapping_from_url(context["params"]["mapping_url"])

        schema = load_schema_from_url(context["params"]["schema_url"])
        #print(schema)
        #print(mapping_expr)
        transformed_items, validation_errors = transform_with_jsonata(
            data["items"], mapping_expr, schema
        )
        if validation_errors:
            print(validation_errors)
            raise ValueError(f"Found {len(validation_errors)} validation errors.")
            

        return transformed_items, validation_errors

In [16]:
transformed_items, validation_errors = transform_data(data, **context)
transformed_items

[{'title': 'Clinical Trial Ceftriaxone in Subjects With ALS',
  'creator': ['Massachusetts General Hospital'],
  'keywords': ['ALS',
   'Amyotrophic Lateral Sclerosis',
   'disorder',
   'neurodegenerative',
   'placebo',
   'cephalosporin antibiotic',
   'motor neurons'],
  'subject': ['amyotrophic lateral sclerosis'],
  'description': 'The purpose of the study is to evaluate the safety and efficacy of ceftriaxone treatment in amyotrophic lateral sclerosis (ALS).  Ceftriaxone is approved by the U.S. Food and Drug Administration (FDA) for treating bacterial infections but not for treating ALS. Also, ceftriaxone has not been given to people over a long period of time, such as months or years. The goals of this study are to evaluate the safety and effectiveness of ceftriaxone as a treatment for ALS, and to determine the safety and effectiveness of long-term use of the drug in people with ALS.',
  'collection': ['ALS Knowledge Portal'],
  'publisher': 'Critical Path Institute',
  'species

In [17]:
def parse_dataset_code(dataset_code: str) -> Tuple[str, str, str]:
    """Parse dataset code to extract ALS number, prefix, and date.

    Args:
        dataset_code: e.g., "src_als1003_2025_04_17", "fv1_als1001_2025_02_26"

    Returns:
        Tuple of (als_number, prefix, date_str)

    Raises:
        ValueError: If dataset code format is invalid
    """
    import re

    # Pattern to match: prefix_als{number}_{date}
    pattern = r'^(src_|fm[12]_|fv[123]_)(als\d+)_(\d{4}_\d{2}_\d{2})$'
    match = re.match(pattern, dataset_code)

    if not match:
        raise ValueError(f"Invalid dataset code format: {dataset_code}")

    prefix = match.group(1).rstrip('_')  # Remove trailing underscore
    als_number = match.group(2)
    date_str = match.group(3)

    return als_number, prefix, date_str


def extract_dataset_code_from_url(url: str) -> str:
    """Extract dataset code from C-Path URL.

    Args:
        url: e.g., "https://fair.dap.c-path.org/#/data/datasets/src_als1003_2025_04_17"

    Returns:
        Dataset code e.g., "src_als1003_2025_04_17"
    """
    if not url:
        return ""
    return url.split("/")[-1]


def get_version_priority(prefix: str) -> int:
    """Get priority score for dataset prefix (higher = better).

    Priority: fv3 > fv2 > fv1 > fm2 > fm1 > src
    """
    priority_map = {
        'fv3': 6,
        'fv2': 5,
        'fv1': 4,
        'fm2': 3,
        'fm1': 2,
        'src': 1
    }
    return priority_map.get(prefix, 0)


def select_latest_versions(datasets: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    """Select the latest and greatest version for each ALS dataset group.

    Args:
        datasets: List of dataset items with 'url' field containing dataset codes

    Returns:
        List of selected datasets (one per ALS number)
    """
    from collections import defaultdict
    from datetime import datetime

    # Group datasets by ALS number
    als_groups = defaultdict(list)

    for dataset in datasets:
        url = dataset.get("url", "")
        dataset_code = extract_dataset_code_from_url(url)

        if not dataset_code:
            print(f"Skipping dataset with missing URL: {dataset.get('title', 'Unknown')}")
            continue

        try:
            als_number, prefix, date_str = parse_dataset_code(dataset_code)
            als_groups[als_number].append({
                'dataset': dataset,
                'prefix': prefix,
                'date_str': date_str,
                'priority': get_version_priority(prefix),
                'dataset_code': dataset_code
            })
        except ValueError as e:
            print(f"Skipping dataset with invalid code: {dataset_code}, error: {e}")
            continue

    selected_datasets = []

    # For each ALS group, select the best version
    for als_number, versions in als_groups.items():
        # Sort by priority (desc), then by date (desc)
        best_version = max(versions, key=lambda x: (
            x['priority'],
            datetime.strptime(x['date_str'], '%Y_%m_%d')
        ))

        selected_datasets.append(best_version['dataset'])
        print(f"Selected {best_version['dataset_code']} for {als_number} "
              f"(priority: {best_version['priority']}, date: {best_version['date_str']})")

    return selected_datasets

In [18]:
def find_duplicated_datasets(
    transformed_items: List[Dict[str, Any]], **context
) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
    """Apply latest and greatest version selection for ALS datasets.

    This task:
    1. Retrieves the current C-PATH datasets from Synapse.
    2. Filters out datasets that already exist in the collection.
    3. Groups remaining datasets by ALS number and selects the latest version.

    Arguments:
        transformed_items (List[Dict[str, Any]]):
            A list of transformed and validated items from the previous task.
        **context: Airflow task context containing DAG parameters

    Returns:
        Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
            - Selected datasets to ingest
            - Empty list (no duplicates for manual review)
    """
    syn_hook = SynapseHook(context["params"]["synapse_conn_id"])
    synapse_client = syn_hook.client

    # Get current datasets (using sameAs field which has cpath:ID format)
    collection_id = context["params"]["collection_id"]
    query_str = (
        f"SELECT * FROM {collection_id} WHERE source='Critical Path Institute'"
    )

    current_data = synapse_client.tableQuery(query_str).asDataFrame()
    current_datasets = set(current_data["sameAs"].tolist())

    # Filter out items that already exist (check by sameAs field)
    new_items = []
    for item in transformed_items:
        cpath_key = item.get("sameAs")  # This is "cpath:1725" format
        title = item.get("title")
        url = item.get("url")

        if not cpath_key:
            raise ValueError(f"Missing or empty 'sameAs' in item: {item}")
        if not title:
            raise ValueError(f"Missing or empty 'title' in item: {item}")
        if not url:
            raise ValueError(f"Missing or empty 'url' in item: {item}")

        if cpath_key not in current_datasets:
            new_items.append(item)

    print(f"Found {len(new_items)} new items to process from {len(transformed_items)} total items")

    # Apply latest and greatest selection using URL field for ALS codes
    selected_items = select_latest_versions(new_items)

    print(f"Selected {len(selected_items)} datasets from {len(new_items)} new items")

    # Return selected items and empty duplicates list (no manual review needed)
    return selected_items, []

In [19]:
selected_items, duplicates = find_duplicated_datasets(transformed_items, **context)
selected_items

[[34m2025-10-08T23:30:35.603+0000[0m] {[34mbase.py:[0m84} INFO[0m - Retrieving connection 'SYNAPSE_ORCA_SERVICE_ACCOUNT_CONN'[0m


Querying table/view: 'syn69962707' ...:   0%|          | 0.00/100 [00:01<?, ?it/s]       


[[34m2025-10-08T23:30:38.207+0000[0m] {[34m_client.py:[0m1773} INFO[0m - HTTP Request: POST https://file-prod.prod.sagebase.org/file/v1/fileHandle/batch "HTTP/1.1 201 "[0m
Found 2 new items to process from 4 total items
Selected fm2_als1001_2024_08_31 for als1001 (priority: 3, date: 2024_08_31)
Selected fv2_als1003_2025_04_28 for als1003 (priority: 5, date: 2025_04_28)
Selected 2 datasets from 2 new items


[{'title': 'Clinical Trial Ceftriaxone in Subjects With ALS',
  'creator': ['Massachusetts General Hospital'],
  'keywords': ['ALS',
   'Amyotrophic Lateral Sclerosis',
   'disorder',
   'neurodegenerative',
   'placebo',
   'cephalosporin antibiotic',
   'motor neurons'],
  'subject': ['amyotrophic lateral sclerosis'],
  'description': 'The purpose of the study is to evaluate the safety and efficacy of ceftriaxone treatment in amyotrophic lateral sclerosis (ALS).  Ceftriaxone is approved by the U.S. Food and Drug Administration (FDA) for treating bacterial infections but not for treating ALS. Also, ceftriaxone has not been given to people over a long period of time, such as months or years. The goals of this study are to evaluate the safety and effectiveness of ceftriaxone as a treatment for ALS, and to determine the safety and effectiveness of long-term use of the drug in people with ALS.',
  'collection': ['ALS Knowledge Portal'],
  'publisher': 'Critical Path Institute',
  'species

In [20]:
def get_existing_als_datasets(syn_hook, collection_id: str) -> Dict[str, Dict[str, Any]]:
    """Get all existing ALS datasets from the collection, grouped by ALS number.

    Returns:
        Dict mapping als_number -> dataset_info with fields:
        - synapse_id, dataset_code, prefix, priority, date_str, sameAs
    """
    synapse_client = syn_hook.client
    query_str = f"SELECT * FROM {collection_id} WHERE publisher='Critical Path Institute'"
    current_data = synapse_client.tableQuery(query_str).asDataFrame()

    existing_datasets = {}

    for _, row in current_data.iterrows():
        url = row.get('url', '')
        dataset_code = extract_dataset_code_from_url(url)

        if not dataset_code:
            continue

        try:
            als_number, prefix, date_str = parse_dataset_code(dataset_code)
            existing_datasets[als_number] = {
                'synapse_id': row['id'],
                'dataset_code': dataset_code,
                'prefix': prefix,
                'priority': get_version_priority(prefix),
                'date_str': date_str,
                'sameAs': row['sameAs'],
                'url': row['url']
            }
        except ValueError:
            continue

    return existing_datasets


def identify_dataset_actions(
    selected_items: List[Dict[str, Any]], **context
) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
    """Identify which datasets need new versions vs new creation."""

    syn_hook = SynapseHook(context["params"]["synapse_conn_id"])
    collection_id = context["params"]["collection_id"]

    # Get existing datasets
    existing_datasets = get_existing_als_datasets(syn_hook, collection_id)

    datasets_to_create = []
    datasets_to_update = []

    for item in selected_items:
        url = item.get("url", "")
        dataset_code = extract_dataset_code_from_url(url)

        if not dataset_code:
            continue

        try:
            als_number, new_prefix, new_date_str = parse_dataset_code(dataset_code)
            new_priority = get_version_priority(new_prefix)
            new_date = datetime.strptime(new_date_str, '%Y_%m_%d')

            if als_number in existing_datasets:
                existing = existing_datasets[als_number]
                existing_date = datetime.strptime(existing['date_str'], '%Y_%m_%d')

                # Check if we should update (version upgrade OR same version with potential annotation changes)
                should_update = (
                    new_priority > existing['priority'] or
                    (new_priority == existing['priority'] and new_date >= existing_date)
                )

                if should_update:
                    upgrade_type = 'version' if new_priority > existing['priority'] else 'annotation'
                    datasets_to_update.append({
                        'new_data': item,
                        'existing_synapse_id': existing['synapse_id'],
                        'upgrade_type': upgrade_type,
                        'dataset_code': dataset_code
                    })
                    print(f"Will update {existing['dataset_code']} -> {dataset_code} ({upgrade_type})")
                else:
                    print(f"Skipping {dataset_code} (no improvement over {existing['dataset_code']})")
            else:
                # Completely new ALS number
                datasets_to_create.append(item)
                print(f"Will create new dataset: {dataset_code}")

        except ValueError as e:
            print(f"Skipping invalid dataset code {dataset_code}: {e}")
            continue

    return datasets_to_create, datasets_to_update


In [21]:
datasets_to_create, datasets_to_update = identify_dataset_actions(selected_items,**context)
datasets_to_create

[[34m2025-10-08T23:31:17.145+0000[0m] {[34mbase.py:[0m84} INFO[0m - Retrieving connection 'SYNAPSE_ORCA_SERVICE_ACCOUNT_CONN'[0m


Querying table/view: 'syn69962707' ...:   0%|          | 0.00/100 [00:00<?, ?it/s]      


[[34m2025-10-08T23:31:18.484+0000[0m] {[34m_client.py:[0m1773} INFO[0m - HTTP Request: POST https://file-prod.prod.sagebase.org/file/v1/fileHandle/batch "HTTP/1.1 201 "[0m
Skipping fm2_als1001_2024_08_31 (no improvement over fv1_als1001_2025_02_26)
Will create new dataset: fv2_als1003_2025_04_28


[{'title': 'A Study to Evaluate Efficacy, Safety and Tolerability of CK-2127107 in Patients With ALS (FORTITUDE-ALS)',
  'creator': ['Cytokinetics, Inc.'],
  'keywords': ['Amyotrophic Lateral Sclerosis',
   'Reldesemtiv',
   'Placebo',
   'Phase 2',
   'respiratory',
   'muscle function',
   'ALSFRS-R',
   'dynamometer',
   'CK-2127107'],
  'subject': ['amyotrophic lateral sclerosis'],
  'description': 'The purpose of this study was to assess the effect of CK-2127107 (hereafter referred to as reldesemtiv) versus placebo on respiratory function and other measures of skeletal muscle function in patients with ALS.\n\nThis was a phase 2, double-blind, randomized, placebo-controlled, dose ranging study of reldesemtiv in patients with ALS. Eligible patients were randomized (1:1:1:1) to receive placebo or one of three doses of reldesemtiv (150, 300, or 450 mg twice daily) for 12 weeks. Randomization was stratified by riluzole concomitant use/non-use and edaravone concomitant use/non-use. Conc

In [22]:
def find_ignored_datasets(**context) -> List[str]:
    """Datasets that need to be ignored after human review and validation

    This task:
    1. Retrieve json file: ignore_cpath_datasets.json
    2. Read the file and get a list of C-path datasets that need to be ignored based on the C-Path identifier.

    Arguments:
        **context: Airflow task context containing DAG parameters
    Returns:
        List[str]: A list of C-Path identifiers to be ignored
    """
    syn_hook = SynapseHook(context["params"]["synapse_conn_id"])
    synapse_client = syn_hook.client

    # Find datasets that need to be ignored
    ignore_cpath_datasets_json = context["params"]["ignore_cpath_datasets"]
    file = File(id=ignore_cpath_datasets_json, download_file=True).get()
    with open(file.path, "r") as f:
        contents = f.read()
        content_json = json.loads(contents)
        datasets_to_ignore = content_json.get("ignore_cpath_identifier", [])
    print(
        "dataset identifiers to be ignored after human review: "
        + str(datasets_to_ignore)
    )
    return datasets_to_ignore


In [23]:
ignored_datasets = find_ignored_datasets(**context)
ignored_datasets

[[34m2025-10-08T23:31:33.820+0000[0m] {[34mbase.py:[0m84} INFO[0m - Retrieving connection 'SYNAPSE_ORCA_SERVICE_ACCOUNT_CONN'[0m
[[34m2025-10-08T23:31:34.515+0000[0m] {[34m_client.py:[0m1773} INFO[0m - HTTP Request: POST https://repo-prod.prod.sagebase.org/repo/v1/entity/syn68737367/bundle2 "HTTP/1.1 200 "[0m



!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
ifcollision=keep.both is being IGNORED because the download destination is synapse's cache. Instead, the behavior is "overwrite.local". 
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!



dataset identifiers to be ignored after human review: []


[]

In [None]:
def update_existing_datasets(
    datasets_to_update: List[Dict[str, Any]],
    **context
) -> List[str]:
    """Create new versions of existing datasets with updated annotations."""

    syn_hook = SynapseHook(context["params"]["synapse_conn_id"])
    updated_dataset_ids = []

    for update_info in datasets_to_update:
        item = update_info['new_data']
        existing_id = update_info['existing_synapse_id']
        upgrade_type = update_info['upgrade_type']

        # Get existing dataset
        existing_dataset = Dataset(id=existing_id).get()

        # Update with new information
        existing_dataset.name = item["title"]
        existing_dataset.description = (
            item["description"][:1000]
            if len(item["description"]) > 1000
            else item["description"]
        )

        # Update annotations with new data
        existing_dataset.annotations.update({
            "source": "Critical Path Institute",
            "creator": ", ".join(item["creator"]) if isinstance(item["creator"], list) else item["creator"],
            "keywords": ", ".join(item["keywords"]) if isinstance(item["keywords"], list) else
item["keywords"],
            "subject": ", ".join(item["subject"]) if isinstance(item["subject"], list) else item["subject"],
            "collection": ", ".join(item["collection"]) if isinstance(item["collection"], list) else
item["collection"],
            "publisher": item["publisher"],
            "species": ", ".join(item["species"]) if isinstance(item["species"], list) else item["species"],
            "sameAs": item["sameAs"],
            "url": item["url"],
            "contributor": item["contributor"] if isinstance(item["contributor"], list) else [item["collection"]]
        })

        # Store as new version
        existing_dataset.store()
        updated_dataset_ids.append(existing_dataset.id)

        print(f"{upgrade_type.title()} update for {item['title']}: new version {existing_dataset.version_number}")

    return updated_dataset_ids

In [25]:
# Handle updates first (versioning existing datasets)
updated_ids = update_existing_datasets(datasets_to_update, **context)
updated_ids

[[34m2025-10-08T23:31:45.977+0000[0m] {[34mbase.py:[0m84} INFO[0m - Retrieving connection 'SYNAPSE_ORCA_SERVICE_ACCOUNT_CONN'[0m


[]

In [None]:
def create_new_datasets(
    datasets_to_create: List[Dict[str, Any]],
    ignored_datasets: List[str],
    **context,
) -> str:
    """Create brand new datasets for ALS numbers that don't exist yet."""

    syn_hook = SynapseHook(context["params"]["synapse_conn_id"])
    dataset_collection = DatasetCollection(id=context["params"]["collection_id"]).get()

    created_dataset_ids = []

    for item in datasets_to_create:
        # Skip if in ignored list
        if item["sameAs"] in ignored_datasets:
            continue

        dataset_description = (
            item["description"][:1000]
            if len(item["description"]) > 1000
            else item["description"]
        )
        # Set initial annotations
        dataset_annotations = {
            "source": ["Critical Path Institute"],
            "creator": item["creator"] if isinstance(item["creator"], list) else [item["creator"]],
            "keywords": item["keywords"] if isinstance(item["keywords"], list) else [item["keywords"]],
            "subject": item["subject"] if isinstance(item["subject"], list) else [item["subject"]],
            "collection": item["collection"] if isinstance(item["collection"], list) else [item["collection"]],
            "publisher": [item["publisher"]],
            "species": item["species"] if isinstance(item["species"], list) else [item["species"]],
            "sameAs": [item["sameAs"]],
            "url": [item["url"]],
            "title": item["title"],
            "contributor": item["contributor"] if isinstance(item["contributor"], list) else [item["collection"]]
        }
        # Create new dataset
        dataset = Dataset(
            parent_id=context["params"]["project_id"],
            name=item["title"],
            description=dataset_description).store()
        dataset_collection.add_item(dataset)
        created_dataset_ids.append(dataset.id)
        #Storing annotations using synapseclient rather than models method cause it would not store with models method for some reason. 
        dataset_id=dataset.id
        dataset = syn.get(dataset_id, downloadFile=False)
        dataset.annotations=dataset_annotations
        
        # After store():
        syn.store(dataset, forceVersion=False)
        #print(dataset.annotations)



        print(f"Created new dataset: {item['title']} (ID: {dataset.id})")

    dataset_collection.store()
    return dataset_collection.id

In [27]:
# Create completely new datasets
collection_id = create_new_datasets(datasets_to_create, ignored_datasets, **context)

[[34m2025-10-08T23:31:56.781+0000[0m] {[34mbase.py:[0m84} INFO[0m - Retrieving connection 'SYNAPSE_ORCA_SERVICE_ACCOUNT_CONN'[0m
[[34m2025-10-08T23:31:57.244+0000[0m] {[34m_client.py:[0m1773} INFO[0m - HTTP Request: POST https://repo-prod.prod.sagebase.org/repo/v1/entity/syn69962707/bundle2 "HTTP/1.1 200 "[0m
[[34m2025-10-08T23:31:57.349+0000[0m] {[34m_client.py:[0m1773} INFO[0m - HTTP Request: GET https://repo-prod.prod.sagebase.org/repo/v1/entity/syn69962707/column "HTTP/1.1 200 "[0m
[[34m2025-10-08T23:33:11.459+0000[0m] {[34m_client.py:[0m1773} INFO[0m - HTTP Request: GET https://repo-prod.prod.sagebase.org/repo/v1/column/tableview/defaults?viewEntityType=dataset&viewTypeMask=128 "HTTP/1.1 200 "[0m
[[34m2025-10-08T23:33:11.673+0000[0m] {[34m_client.py:[0m1773} INFO[0m - HTTP Request: POST https://repo-prod.prod.sagebase.org/repo/v1/entity/bundle2/create "HTTP/1.1 201 "[0m
[[34m2025-10-08T23:33:11.759+0000[0m] {[34m_client.py:[0m1773} INFO[0m - HTTP

  0%|          | 0.00/1.00 [00:00<?, ?it/s]

[[34m2025-10-08T23:33:11.977+0000[0m] {[34m_client.py:[0m1773} INFO[0m - HTTP Request: GET https://repo-prod.prod.sagebase.org/repo/v1/entity/syn70084694/table/transaction/async/get/59389654 "HTTP/1.1 202 "[0m
[[34m2025-10-08T23:33:13.103+0000[0m] {[34m_client.py:[0m1773} INFO[0m - HTTP Request: GET https://repo-prod.prod.sagebase.org/repo/v1/entity/syn70084694/table/transaction/async/get/59389654 "HTTP/1.1 201 "[0m


/entity/syn70084694/table/transaction/async: 100%|██████████| 1.00/1.00 [00:01<00:00, 1.21s/it]

[[34m2025-10-08T23:33:13.236+0000[0m] {[34m_client.py:[0m1773} INFO[0m - HTTP Request: POST https://repo-prod.prod.sagebase.org/repo/v1/entity/syn70084694/version/1/bundle2 "HTTP/1.1 200 "[0m





Created new dataset: A Study to Evaluate Efficacy, Safety and Tolerability of CK-2127107 in Patients With ALS (FORTITUDE-ALS) (ID: syn70084694)
[[34m2025-10-08T23:33:14.137+0000[0m] {[34m_client.py:[0m1773} INFO[0m - HTTP Request: PUT https://repo-prod.prod.sagebase.org/repo/v1/entity/syn69962707/bundle2 "HTTP/1.1 200 "[0m


In [None]:
def refresh_collection_annotations(
    collection_id: str,
    updated_dataset_ids: List[str],
    **context,
) -> None:
    """Refresh collection table annotations for updated datasets.

    This ensures the collection table reflects the latest annotations
    from newly versioned datasets.
    """

    if not updated_dataset_ids:
        print("No updated datasets to refresh annotations for")
        return

    syn_hook = SynapseHook(context["params"]["synapse_conn_id"])
    dataset_collection = DatasetCollection(id=collection_id).get()

    # Get current collection data
    current_data = dataset_collection.query(
        query=f"SELECT * from {collection_id} where source='Critical Path Institute'"
    )

    # Filter to only the datasets that were updated
    rows_to_update = current_data[current_data['id'].isin(updated_dataset_ids)]

    if rows_to_update.empty:
        print("No matching rows found in collection for updated datasets")
        return

    # For each updated dataset, get its latest annotations
    updated_rows = []

    for _, row in rows_to_update.iterrows():
        dataset_id = row['id']

        # Get the latest version of the dataset
        dataset = Dataset(id=dataset_id).get()

        # Prepare updated row data
        updated_row = {
            "id": dataset_id,
            "title": dataset.name,
            "creator": dataset.annotations.get("creator", ""),
            "keywords": dataset.annotations.get("keywords", ""),
            "subject": dataset.annotations.get("subject", ""),
            "collection": dataset.annotations.get("collection", ""),
            "publisher": dataset.annotations.get("publisher", ""),
            "species": dataset.annotations.get("species", ""),
            "sameAs": dataset.annotations.get("sameAs", ""),
            "source": dataset.annotations.get("source", "Critical Path Institute"),
            "url": dataset.annotations.get("url", ""),
            "contributor": dataset.annotations.get("contributor")
        }

        updated_rows.append(updated_row)
        print(f"Refreshed annotations for {dataset.name} (version {dataset.version_number})")

    if updated_rows:
        import pandas as pd
        update_df = pd.DataFrame(updated_rows)

        # Update the collection table
        dataset_collection.update_rows(
            values=update_df,
            primary_keys=["id"],
            dry_run=False,
            wait_for_eventually_consistent_view=True,
        )

        print(f"Updated collection annotations for {len(updated_rows)} datasets")

In [None]:
# Refresh collection table with latest annotations
refresh_collection_annotations(collection_id, updated_ids, **context)


No updated datasets to refresh annotations for


In [None]:
# Define task dependencies
data = fetch_cpath_data()
transformed_items = transform_data(data)
selected_items, duplicates = find_duplicated_datasets(transformed_items)
datasets_to_create, datasets_to_update = identify_dataset_actions(selected_items)
ignored_datasets = find_ignored_datasets()

# Handle updates first (versioning existing datasets)
updated_ids = update_existing_datasets(datasets_to_update)

# Create completely new datasets
collection_id = create_new_datasets(datasets_to_create, ignored_datasets)

# Refresh collection table with latest annotations
refresh_collection_annotations(collection_id, updated_ids)

# Slack notifications
message = generate_slack_message(duplicates)
post_slack_messages(message)

# Set up dependencies
transformed_items >> selected_items >> datasets_to_create
datasets_to_create >> updated_ids >> collection_id >> refresh_collection_annotations.
override(task_id="refresh_annotations")



KeyError: 'params'

## dummy transformed data

In [33]:
initial_test_data = [
    {
        "title": "Test ALS Dataset 1001 - FM1 Version",
        "creator": ["Original Research Group"],
        "keywords": ["ALS", "test", "original"],
        "subject": ["amyotrophic lateral sclerosis"],
        "description": "Original FM1 version for testing",
        "collection": ["ALS Knowledge Portal"],
        "publisher": "Critical Path Institute",
        "species": ["Homo sapiens"],
        "sameAs": "cpath:test1001",
        "source": "Critical Path Institute",
        "url": "https://fair.dap.c-path.org/#/data/datasets/fm1_als1001_2024_08_15"
    },
    {
        "title": "Test ALS Dataset 1002 - SRC Version",
        "creator": ["Basic Research Lab"],
        "keywords": ["ALS", "preliminary"],
        "subject": ["amyotrophic lateral sclerosis"],
        "description": "Basic SRC version for testing",
        "collection": ["ALS Knowledge Portal"],
        "publisher": "Critical Path Institute",
        "species": ["Homo sapiens"],
        "sameAs": "cpath:test1002",
        "source": "Critical Path Institute",
        "url": "https://fair.dap.c-path.org/#/data/datasets/src_als1002_2024_06_10"
    }
]


In [34]:
dataset_collection = DatasetCollection(id=context["params"]["collection_id"]).get()
for test_dataset in initial_test_data: 
    dataset = Dataset(
        parent_id=context["params"]["project_id"],
        name=test_dataset["title"],
        description=test_dataset["description"],   
    ).store()
    dataset_collection.add_item(dataset)
    dataset_id=dataset.id
    dataset=syn.get(dataset_id, downloadFile=False)
    dataset.annotations=test_dataset
    syn.store(dataset, forceVersion=False)
dataset_collection.store()

[[34m2025-10-03T23:52:48.432+0000[0m] {[34m_client.py:[0m1773} INFO[0m - HTTP Request: POST https://repo-prod.prod.sagebase.org/repo/v1/entity/syn69962707/bundle2 "HTTP/1.1 200 "[0m
[[34m2025-10-03T23:52:48.542+0000[0m] {[34m_client.py:[0m1773} INFO[0m - HTTP Request: GET https://repo-prod.prod.sagebase.org/repo/v1/entity/syn69962707/column "HTTP/1.1 200 "[0m
[[34m2025-10-03T23:52:50.339+0000[0m] {[34m_client.py:[0m1773} INFO[0m - HTTP Request: GET https://repo-prod.prod.sagebase.org/repo/v1/column/tableview/defaults?viewEntityType=dataset&viewTypeMask=128 "HTTP/1.1 200 "[0m
[[34m2025-10-03T23:52:50.512+0000[0m] {[34m_client.py:[0m1773} INFO[0m - HTTP Request: POST https://repo-prod.prod.sagebase.org/repo/v1/entity/bundle2/create "HTTP/1.1 201 "[0m
[[34m2025-10-03T23:52:50.630+0000[0m] {[34m_client.py:[0m1773} INFO[0m - HTTP Request: POST https://repo-prod.prod.sagebase.org/repo/v1/column/batch "HTTP/1.1 201 "[0m
[[34m2025-10-03T23:52:50.742+0000[0m] {[




[A[A[A

[[34m2025-10-03T23:52:50.852+0000[0m] {[34m_client.py:[0m1773} INFO[0m - HTTP Request: GET https://repo-prod.prod.sagebase.org/repo/v1/entity/syn69981986/table/transaction/async/get/59100202 "HTTP/1.1 202 "[0m
[[34m2025-10-03T23:52:51.935+0000[0m] {[34m_client.py:[0m1773} INFO[0m - HTTP Request: GET https://repo-prod.prod.sagebase.org/repo/v1/entity/syn69981986/table/transaction/async/get/59100202 "HTTP/1.1 201 "[0m





[A[A[A


[A[A[A


[A[A[A

[[34m2025-10-03T23:52:52.084+0000[0m] {[34m_client.py:[0m1773} INFO[0m - HTTP Request: POST https://repo-prod.prod.sagebase.org/repo/v1/entity/syn69981986/version/1/bundle2 "HTTP/1.1 200 "[0m
[[34m2025-10-03T23:52:53.245+0000[0m] {[34m_client.py:[0m1773} INFO[0m - HTTP Request: GET https://repo-prod.prod.sagebase.org/repo/v1/column/tableview/defaults?viewEntityType=dataset&viewTypeMask=128 "HTTP/1.1 200 "[0m
[[34m2025-10-03T23:52:53.411+0000[0m] {[34m_client.py:[0m1773} INFO[0m - HTTP Request: POST https://repo-prod.prod.sagebase.org/repo/v1/entity/bundle2/create "HTTP/1.1 201 "[0m
[[34m2025-10-03T23:52:53.506+0000[0m] {[34m_client.py:[0m1773} INFO[0m - HTTP Request: POST https://repo-prod.prod.sagebase.org/repo/v1/column/batch "HTTP/1.1 201 "[0m
[[34m2025-10-03T23:52:53.628+0000[0m] {[34m_client.py:[0m1773} INFO[0m - HTTP Request: POST https://repo-prod.prod.sagebase.org/repo/v1/entity/syn69981987/table/transaction/async/start "HTTP/1.1 201 "[0m





[A[A[A

[[34m2025-10-03T23:52:53.736+0000[0m] {[34m_client.py:[0m1773} INFO[0m - HTTP Request: GET https://repo-prod.prod.sagebase.org/repo/v1/entity/syn69981987/table/transaction/async/get/59100203 "HTTP/1.1 202 "[0m
[[34m2025-10-03T23:52:54.829+0000[0m] {[34m_client.py:[0m1773} INFO[0m - HTTP Request: GET https://repo-prod.prod.sagebase.org/repo/v1/entity/syn69981987/table/transaction/async/get/59100203 "HTTP/1.1 201 "[0m





[A[A[A


[A[A[A


[A[A[A

[[34m2025-10-03T23:52:54.981+0000[0m] {[34m_client.py:[0m1773} INFO[0m - HTTP Request: POST https://repo-prod.prod.sagebase.org/repo/v1/entity/syn69981987/version/1/bundle2 "HTTP/1.1 200 "[0m
[[34m2025-10-03T23:52:55.820+0000[0m] {[34m_client.py:[0m1773} INFO[0m - HTTP Request: PUT https://repo-prod.prod.sagebase.org/repo/v1/entity/syn69962707/bundle2 "HTTP/1.1 200 "[0m


DatasetCollection(id='syn69962707', name='Cpath-Dataset-Collection-Test', parent_id='syn64892175', activity=None, version_number=1, _columns_to_delete={}, view_entity_type=<ViewEntityType.DATASET_COLLECTION: 'datasetcollection'>, view_type_mask=<ViewTypeMask.DATASET_COLLECTION: 256>, include_default_columns=True, description='', etag='ce8825fd-4f00-4081-9b58-366da5f76baa', created_on='2025-09-30T18:20:42.333Z', modified_on='2025-10-03T23:52:55.705Z', created_by='3436666', modified_by='3436666', version_label='1', version_comment=None, is_latest_version=True, is_search_enabled=False, items=[EntityRef(id='syn66496325', version=1), EntityRef(id='syn66496324', version=1), EntityRef(id='syn69966250', version=1), EntityRef(id='syn69966251', version=1), EntityRef(id='syn69981986', version=1), EntityRef(id='syn69981987', version=1)], columns=OrderedDict([('id', Column(id='81721', name='id', column_type=ENTITYID, facet_type=None, default_value=None, maximum_size=None, maximum_list_length=None, 

In [35]:
transformed_items = [
    # Scenario 1: Version upgrade (fm1 → fv1)
    {
        "title": "Test ALS Dataset 1001 - FV1 Upgrade",
        "creator": ["Upgraded Research Group", "New Collaborator"],
        "keywords": ["ALS", "test", "standardized", "validated"],
        "subject": ["amyotrophic lateral sclerosis"],
        "description": "Upgraded FV1 version with standardized data",
        "collection": ["ALS Knowledge Portal"],
        "publisher": "Critical Path Institute",
        "species": ["Homo sapiens"],
        "sameAs": "cpath:test1001_v2",
        "source": "Critical Path Institute",
        "url": "https://fair.dap.c-path.org/#/data/datasets/fv1_als1001_2025_01_20"
    },

    # Scenario 2: Version upgrade (src → fm2)
    {
        "title": "Test ALS Dataset 1002 - FM2 Upgrade",
        "creator": ["Enhanced Research Lab"],
        "keywords": ["ALS", "enhanced", "non-standardized"],
        "subject": ["amyotrophic lateral sclerosis"],
        "description": "Enhanced FM2 version with more data",
        "collection": ["ALS Knowledge Portal"],
        "publisher": "Critical Path Institute",
        "species": ["Homo sapiens"],
        "sameAs": "cpath:test1002_v2",
        "source": "Critical Path Institute",
        "url": "https://fair.dap.c-path.org/#/data/datasets/fm2_als1002_2024_12_01"
    },

    # Scenario 3: Same version, newer date (annotation update)
    {
        "title": "Test ALS Dataset 1001 - FV1 Updated Annotations",
        "creator": ["Upgraded Research Group", "New Collaborator", "Additional PI"],
        "keywords": ["ALS", "test", "standardized", "validated", "updated"],
        "subject": ["amyotrophic lateral sclerosis"],
        "description": "Same FV1 version but with updated metadata and annotations",
        "collection": ["ALS Knowledge Portal"],
        "publisher": "Critical Path Institute",
        "species": ["Homo sapiens"],
        "sameAs": "cpath:test1001_v3",
        "source": "Critical Path Institute",
        "url": "https://fair.dap.c-path.org/#/data/datasets/fv1_als1001_2025_02_15"  # Newer date
    },

    # Scenario 4: Completely new ALS number
    {
        "title": "Test ALS Dataset 1003 - New Dataset",
        "creator": ["New Research Institute"],
        "keywords": ["ALS", "novel", "experimental"],
        "subject": ["amyotrophic lateral sclerosis"],
        "description": "Brand new ALS dataset for testing",
        "collection": ["ALS Knowledge Portal"],
        "publisher": "Critical Path Institute",
        "species": ["Homo sapiens"],
        "sameAs": "cpath:test1003",
        "source": "Critical Path Institute",
        "url": "https://fair.dap.c-path.org/#/data/datasets/fv3_als1003_2025_03_01"
    },

    # Scenario 5: Lower priority version (should be ignored)
    {
        "title": "Test ALS Dataset 1001 - FM2 Lower Priority",
        "creator": ["Lower Priority Group"],
        "keywords": ["ALS", "lower"],
        "subject": ["amyotrophic lateral sclerosis"],
        "description": "Lower priority version that should be ignored",
        "collection": ["ALS Knowledge Portal"],
        "publisher": "Critical Path Institute",
        "species": ["Homo sapiens"],
        "sameAs": "cpath:test1001_low",
        "source": "Critical Path Institute",
        "url": "https://fair.dap.c-path.org/#/data/datasets/fm2_als1001_2025_03_01"  # fm2 < fv1
    }
]

In [36]:
selected_items, duplicates = find_duplicated_datasets(transformed_items,**context)
datasets_to_create, datasets_to_update = identify_dataset_actions(selected_items, **context)
ignored_datasets = find_ignored_datasets(**context)

# Handle updates first (versioning existing datasets)
updated_ids = update_existing_datasets(datasets_to_update, **context)

# Create completely new datasets
collection_id = create_new_datasets(datasets_to_create, ignored_datasets, **context)

# Refresh collection table with latest annotations
refresh_collection_annotations(collection_id, updated_ids, **context)

[[34m2025-10-03T23:53:02.458+0000[0m] {[34mbase.py:[0m84} INFO[0m - Retrieving connection 'SYNAPSE_ORCA_SERVICE_ACCOUNT_CONN'[0m





[A[A[A


[A[A[A


[A[A[A


[A[A[A


[A[A[A


[A[A[A


[A[A[A


[A[A[A


[A[A[A


[A[A[A


[A[A[A


[A[A[A


[A[A[A


[A[A[A


[A[A[A


[A[A[A

[[34m2025-10-03T23:53:08.436+0000[0m] {[34m_client.py:[0m1773} INFO[0m - HTTP Request: POST https://file-prod.prod.sagebase.org/file/v1/fileHandle/batch "HTTP/1.1 201 "[0m
Found 5 new items to process from 5 total items
Selected fv1_als1001_2025_02_15 for als1001 (priority: 4, date: 2025_02_15)
Selected fm2_als1002_2024_12_01 for als1002 (priority: 3, date: 2024_12_01)
Selected fv3_als1003_2025_03_01 for als1003 (priority: 6, date: 2025_03_01)
Selected 3 datasets from 5 new items
[[34m2025-10-03T23:53:08.696+0000[0m] {[34mbase.py:[0m84} INFO[0m - Retrieving connection 'SYNAPSE_ORCA_SERVICE_ACCOUNT_CONN'[0m





[A[A[A


[A[A[A


[A[A[A


[A[A[A


[A[A[A


[A[A[A


[A[A[A


[A[A[A

[[34m2025-10-03T23:53:10.971+0000[0m] {[34m_client.py:[0m1773} INFO[0m - HTTP Request: POST https://file-prod.prod.sagebase.org/file/v1/fileHandle/batch "HTTP/1.1 201 "[0m
Will update fm1_als1001_2024_08_15 -> fv1_als1001_2025_02_15 (version)
Will update src_als1002_2024_06_10 -> fm2_als1002_2024_12_01 (version)
Will update fv2_als1003_2025_04_28 -> fv3_als1003_2025_03_01 (version)
[[34m2025-10-03T23:53:11.251+0000[0m] {[34mbase.py:[0m84} INFO[0m - Retrieving connection 'SYNAPSE_ORCA_SERVICE_ACCOUNT_CONN'[0m
[[34m2025-10-03T23:53:12.038+0000[0m] {[34m_client.py:[0m1773} INFO[0m - HTTP Request: POST https://repo-prod.prod.sagebase.org/repo/v1/entity/syn68737367/bundle2 "HTTP/1.1 200 "[0m



!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
ifcollision=keep.both is being IGNORED because the download destination is synapse's cache. Instead, the behavior is "overwrite.local". 
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!



dataset identifiers to be ignored after human review: []
[[34m2025-10-03T23:53:12.043+0000[0m] {[34mbase.py:[0m84} INFO[0m - Retrieving connection 'SYNAPSE_ORCA_SERVICE_ACCOUNT_CONN'[0m
[[34m2025-10-03T23:53:12.183+0000[0m] {[34m_client.py:[0m1773} INFO[0m - HTTP Request: POST https://repo-prod.prod.sagebase.org/repo/v1/entity/syn69981986/bundle2 "HTTP/1.1 200 "[0m
[[34m2025-10-03T23:53:12.277+0000[0m] {[34m_client.py:[0m1773} INFO[0m - HTTP Request: GET https://repo-prod.prod.sagebase.org/repo/v1/entity/syn69981986/column "HTTP/1.1 200 "[0m
[[34m2025-10-03T23:53:12.512+0000[0m] {[34m_client.py:[0m1773} INFO[0m - HTTP Request: PUT https://repo-prod.prod.sagebase.org/repo/v1/entity/syn69981986/bundle2 "HTTP/1.1 200 "[0m
[[34m2025-10-03T23:53:12.636+0000[0m] {[34m_client.py:[0m1773} INFO[0m - HTTP Request: PUT https://repo-prod.prod.sagebase.org/repo/v1/entity/syn69981986/annotations2 "HTTP/1.1 200 "[0m
Version update for Test ALS Dataset 1001 - FV1 Updated 




[A[A[A

[[34m2025-10-03T23:53:18.430+0000[0m] {[34m_client.py:[0m1773} INFO[0m - HTTP Request: POST https://repo-prod.prod.sagebase.org/repo/v1/entity/syn69962707/table/transaction/async/start "HTTP/1.1 201 "[0m






[A[A[A[A

[[34m2025-10-03T23:53:18.528+0000[0m] {[34m_client.py:[0m1773} INFO[0m - HTTP Request: GET https://repo-prod.prod.sagebase.org/repo/v1/entity/syn69962707/table/transaction/async/get/59100229 "HTTP/1.1 202 "[0m
[[34m2025-10-03T23:53:19.622+0000[0m] {[34m_client.py:[0m1773} INFO[0m - HTTP Request: GET https://repo-prod.prod.sagebase.org/repo/v1/entity/syn69962707/table/transaction/async/get/59100229 "HTTP/1.1 201 "[0m






[A[A[A[A



[A[A[A[A



[A[A[A[A


[A[A[A


Querying & Updating rows: 100%|██████████| 3.00/3.00 [00:03<00:00, 1.03s/it]



[A[A[A


[A[A[A


Waiting for eventually-consistent changes to show up in the view: 100%|██████████| 3.00/3.00 [00:00<00:00, 3.04it/s]

Updated collection annotations for 3 datasets



