# Upstream SDK CKAN Integration Demo

This notebook demonstrates the CKAN integration capabilities of the Upstream SDK for publishing environmental monitoring data to CKAN data portals.

## Overview

The Upstream SDK provides seamless integration with CKAN (Comprehensive Knowledge Archive Network) data portals for:
- üìä **Dataset Publishing**: Automatically create CKAN datasets from campaign data
- üìÅ **Resource Management**: Upload sensor configurations and measurement data as resources
- üè¢ **Organization Support**: Publish data under specific CKAN organizations
- üîÑ **Update Management**: Update existing datasets with new data
- üè∑Ô∏è **Metadata Integration**: Rich metadata tagging and categorization

## Features Demonstrated

- CKAN client setup and configuration
- Campaign data export and preparation
- Dataset creation with comprehensive metadata
- Resource management (sensors and measurements)
- Organization and permission handling
- Error handling and validation

## Prerequisites

- Valid Upstream account credentials
- Access to a CKAN portal with API credentials
- Existing campaign data (or run UpstreamSDK_Core_Demo.ipynb first)
- Python 3.9+ environment with required packages

## Related Notebooks

- **UpstreamSDK_Core_Demo.ipynb**: Core SDK functionality and campaign creation

## Installation and Setup

In [1]:
# Install required packages
!pip install upstream-sdk==1.0.1
#!pip install -e .
# Import required libraries
import os
import json
import getpass
from pathlib import Path
from datetime import datetime
from typing import Dict, Any, Optional, List
from io import BytesIO

# Import Upstream SDK modules
from upstream.client import UpstreamClient
from upstream.ckan import CKANIntegration

Collecting upstream-sdk==1.0.1
  Using cached upstream_sdk-1.0.1-py3-none-any.whl.metadata (19 kB)
Collecting pydantic>=2.0.0 (from upstream-sdk==1.0.1)
  Using cached pydantic-2.12.5-py3-none-any.whl.metadata (90 kB)
Collecting upstream-api-client>=0.1.7 (from upstream-sdk==1.0.1)
  Downloading upstream_api_client-0.1.9-py3-none-any.whl.metadata (24 kB)
Collecting annotated-types>=0.6.0 (from pydantic>=2.0.0->upstream-sdk==1.0.1)
  Using cached annotated_types-0.7.0-py3-none-any.whl.metadata (15 kB)
Collecting pydantic-core==2.41.5 (from pydantic>=2.0.0->upstream-sdk==1.0.1)
  Using cached pydantic_core-2.41.5-cp311-cp311-macosx_10_12_x86_64.whl.metadata (7.3 kB)
Collecting typing-inspection>=0.4.2 (from pydantic>=2.0.0->upstream-sdk==1.0.1)
  Using cached typing_inspection-0.4.2-py3-none-any.whl.metadata (2.6 kB)
Using cached upstream_sdk-1.0.1-py3-none-any.whl (37 kB)
Using cached pydantic-2.12.5-py3-none-any.whl (463 kB)
Using cached pydantic_core-2.41.5-cp311-cp311-macosx_10_12_x8

## 1. Configuration and Authentication

The Upstream SDK requires a configuration file for authentication and CKAN integration:

### üìÅ Configuration File (Required)
Create a `config.yaml` file in the notebook directory with the following structure:

```yaml
upstream:
  username: "your_tacc_username"
  password: "your_password" 
  base_url: "https://upstreamapi.pods.tacc.tapis.io/dev"

ckan:
  url: "https://ckan.tacc.utexas.edu"
  organization: "setx-uifl"
  api_key: "your_ckan_api_key"  # Optional for read-only operations

upload:
  chunk_size: 10000
  max_file_size_mb: 50
  timeout_seconds: 30
  retry_attempts: 3
```

**Configuration Options:**
- **Upstream API**: Username/password authentication
- **CKAN Portal**: API key or access token authentication  
- **Organization**: CKAN organization for dataset publishing

**Alternative Configuration Methods (Not Used in This Notebook):**
For reference, the SDK also supports environment variables and direct parameter passing, but this notebook requires the config file approach for consistency.

In [2]:
# Configuration and credential handling using UpstreamClient's built-in methods
from pathlib import Path

def get_client_credentials():
    """
    Get credentials and initialize UpstreamClient using config file.

    This notebook requires a configuration file (config.yaml) in the notebook directory.
    The configuration file should contain all necessary authentication and settings.
    """
    config_path = Path("config.yaml")

    if config_path.exists():
        print("üìÑ Loading configuration from config.yaml...")
        print(f"‚úÖ Configuration file found: {config_path}")
        print("Will initialize client using config_file parameter")
        return config_path
    else:
        print("‚ùå Configuration file config.yaml is required")
        raise Exception("Configuration file config.yaml is required. Please create config.yaml with your credentials and settings.")

# Get config file path
config_file_path = get_client_credentials()

üìÑ Loading configuration from config.yaml...
‚úÖ Configuration file found: config.yaml
Will initialize client using config_file parameter


In [3]:
from upstream.client import UpstreamClient

# Initialize Upstream client with CKAN integration using config file
try:
    # Initialize client using config file only
    print("üîß Initializing client from config file...")
    client = UpstreamClient(config_file=config_file_path)
    print(f"‚úÖ Client initialized from config file: {config_file_path}")

    # Update configuration variables from client config
    UPSTREAM_BASE_URL = client.get_config().get("base_url", None)
    CKAN_URL = client.get_config().get("ckan_url", None)
    CKAN_ORGANIZATION = client.get_config().get("ckan_organization", None)

    if not UPSTREAM_BASE_URL:
        print("‚ùå Base URL not set in config file!")
        raise Exception("Base URL is required in config file")
    if not CKAN_URL:
        print("‚ö†Ô∏è CKAN URL not set in config file, CKAN integration will be disabled")
        raise Exception("CKAN URL is required in config file")
    if not CKAN_ORGANIZATION:
        print("‚ö†Ô∏è CKAN Organization not set in config file, CKAN integration will be disabled")
        raise Exception("CKAN Organization is required in config file")

    # Test Upstream authentication
    if client.authenticate():
        print("‚úÖ Upstream authentication successful!")
        print(f"üîó Connected to: {client.get_config()['base_url']}")

        # Check CKAN integration
        if client.ckan:
            print("‚úÖ CKAN integration enabled!")
            print(f"üîó CKAN Portal: {client.get_config()['ckan_url']}")
        else:
            print("‚ö†Ô∏è  CKAN integration not configured")

        # Display client configuration summary
        config_summary = client.get_config()
        print(f"\nüìã Client Configuration Summary:")
        print(f"   ‚Ä¢ Base URL: {config_summary.get('base_url', 'Not set')}")
        print(f"   ‚Ä¢ CKAN URL: {config_summary.get('ckan_url', 'Not set')}")
        print(f"   ‚Ä¢ CKAN Organization: {config_summary.get('ckan_organization', 'Not set')}")
        print(f"   ‚Ä¢ Username: {config_summary.get('username', 'Not set')}")

    else:
        print("‚ùå Upstream authentication failed!")
        raise Exception("Upstream authentication failed")

except Exception as e:
    print(f"‚ùå Setup error: {e}")
    print("\nTroubleshooting tips:")
    print("   ‚Ä¢ Check config.yaml format and credentials")
    print("   ‚Ä¢ Ensure all required fields are present in config file")
    print("   ‚Ä¢ Check network connectivity to Upstream API")
    print("   ‚Ä¢ Verify CKAN portal accessibility")
    raise

üîß Initializing client from config file...
‚úÖ Client initialized from config file: config.yaml
‚úÖ Upstream authentication successful!
üîó Connected to: https://upstreamapi.pods.tacc.tapis.io
‚úÖ CKAN integration enabled!
üîó CKAN Portal: https://ckan.tacc.utexas.edu

üìã Client Configuration Summary:
   ‚Ä¢ Base URL: https://upstreamapi.pods.tacc.tapis.io
   ‚Ä¢ CKAN URL: https://ckan.tacc.utexas.edu
   ‚Ä¢ CKAN Organization: dso-internal
   ‚Ä¢ Username: wmobley


## 2. Campaign Selection and Data Preparation

Let's select an existing campaign with data to publish to CKAN. If you don't have existing data, run the core demo notebook first.

In [4]:
# List available campaigns
print("üìã Available campaigns for CKAN publishing:")
try:
    campaigns = client.list_campaigns(limit=10)

    if campaigns.total == 0:
        print("‚ùå No campaigns found. Please run UpstreamSDK_Core_Demo.ipynb first to create sample data.")
        raise Exception("No campaigns available")

    print(f"Found {campaigns.total} campaigns:")
    for i, campaign in enumerate(campaigns.items[:5]):
        print(f"  {i+1}. ID: {campaign.id} - {campaign.name}")
        print(f"     Description: {campaign.description[:80]}...")
        print(f"     Contact: {campaign.contact_name} ({campaign.contact_email})")
        print()

    # Select campaign (use the first one or let user choose)
    selected_campaign = campaigns.items[0]
    campaign_id = selected_campaign.id

    print(f"üìä Selected campaign for CKAN publishing:")
    print(f"   ID: {campaign_id}")
    print(f"   Name: {selected_campaign.name}")

except Exception as e:
    print(f"‚ùå Error listing campaigns: {e}")
    raise

üìã Available campaigns for CKAN publishing:
Found 4 campaigns:
  1. ID: 1 - Cow Bayou near Mauriceville - Cow Bayou
     Description: Station Cow Bayou in campaign Cow Bayou near Mauriceville

...
     Contact: WILLIAM H MOBLEY (feulmob@gmail.com)

  2. ID: 2 - demo - february 06
     Description: ...
     Contact: Sebastian Hernandez-Sterling (sebastian.hernandez11@upr.edu)

  3. ID: 3 - Environmental Monitoring Core Demo 2024
     Description: Core functionality demonstration for SDK campaign and station management...
     Contact: Dr. Jane Smith (jane.smith@example.edu)

  4. ID: 4 - Environmental Monitoring Core Demo 2024
     Description: Core functionality demonstration for SDK campaign and station management...
     Contact: Dr. Jane Smith (jane.smith@example.edu)

üìä Selected campaign for CKAN publishing:
   ID: 1
   Name: Cow Bayou near Mauriceville - Cow Bayou


In [5]:
# Get stations for the selected campaign
print(f"üìç Finding stations in campaign {campaign_id}...")
try:
    stations = client.list_stations(campaign_id=campaign_id)

    if stations.total == 0:
        print("‚ùå No stations found in this campaign. Please create stations and upload data first.")
        raise Exception("No stations available")

    print(f"Found {stations.total} stations:")
    for station in stations.items:
        print(f"  ‚Ä¢ ID: {station.id} - {station.name}")
        print(f"    Description: {station.description[:80]}...")
        print()

    # Select the first station
    selected_station = stations.items[0]
    station_id = selected_station.id

    print(f"üì° Selected station for CKAN publishing:")
    print(f"   ID: {station_id}")
    print(f"   Name: {selected_station.name}")

except Exception as e:
    print(f"‚ùå Error listing stations: {e}")
    raise

üìç Finding stations in campaign 1...
Found 1 stations:
  ‚Ä¢ ID: 1 - Cow Bayou
    Description: Station Cow Bayou in campaign Cow Bayou near Mauriceville

...

üì° Selected station for CKAN publishing:
   ID: 1
   Name: Cow Bayou


In [6]:
# Check for existing data in the station
print(f"üîç Checking data availability for station {station_id}...")
try:
    # List sensors to verify data exists
    sensors = client.sensors.list(campaign_id=campaign_id, station_id=station_id)

    if not sensors.items:
        print("‚ùå No sensors found in this station. Please upload sensor data first.")
        raise Exception("No sensor data available")
    print(sensors.items)
    total_measurements = 0
    for sensor in sensors.items:
        if sensor.statistics:
            total_measurements += sensor.statistics.count
    print(total_measurements)

    print(f"‚úÖ Data validation successful:")
    print(f"   ‚Ä¢ Sensors: {len(sensors.items)}")
    print(f"   ‚Ä¢ Total measurements: {total_measurements}")
    print(f"   ‚Ä¢ Sensor types: {', '.join([s.variablename for s in sensors.items[:3]])}{'...' if len(sensors.items) > 3 else ''}")

    if total_measurements == 0:
        print("‚ö†Ô∏è  Warning: No measurement data found. CKAN publishing will include sensor configuration only.")
    else:
        print("‚úÖ Ready for CKAN publishing with full dataset!")

except Exception as e:
    print(f"‚ùå Error checking data availability: {e}")
    raise

üîç Checking data availability for station 1...
[SensorItem(id=1, alias='Rain Increment', description=None, postprocess=False, postprocessscript=None, units='inches', variablename='No BestGuess Formula', statistics=SensorStatistics(max_value=0.71, min_value=0.0, avg_value=0.00138073445697064, stddev_value=0.0186345098105466, percentile_90=0.0, percentile_95=0.0, percentile_99=0.03, count=8959, first_measurement_value=0.0, first_measurement_collectiontime=datetime.datetime(2025, 3, 1, 0, 30, tzinfo=TzInfo(0)), last_measurement_time=datetime.datetime(2025, 6, 2, 11, 0, tzinfo=TzInfo(0)), last_measurement_value=0.0, stats_last_updated=datetime.datetime(2026, 2, 5, 19, 17, 24, 643007, tzinfo=TzInfo(0)))), SensorItem(id=2, alias='Flow Volume', description=None, postprocess=False, postprocessscript=None, units='cfs', variablename='No BestGuess Formula', statistics=SensorStatistics(max_value=795.0, min_value=1.5, avg_value=92.1882937172185, stddev_value=153.645845267881, percentile_90=189.0,

## 3. CKAN Portal Exploration

Before publishing, let's explore the CKAN portal to understand its structure and existing datasets.

In [7]:
# Check CKAN organizations via the Upstream API (pods)
print("üîé Checking CKAN organizations via Upstream API...")

# Try environment first, then fall back to the token returned by API auth.
tapis_token = os.getenv("TAPIS_TOKEN") or os.getenv("TAPIS_ACCESS_TOKEN")
if not tapis_token:
    tapis_token = client.auth_manager.get_tapis_token()

if not tapis_token:
    print("‚ö†Ô∏è No Tapis token available.")
    print("   Set TAPIS_TOKEN/TAPIS_ACCESS_TOKEN or authenticate with API to receive a tapis_access_token.")
else:
    try:
        orgs = client.list_ckan_organizations(tapis_token=tapis_token)
        print(f"Found {len(orgs)} organizations:")
        for org in orgs[:10]:
            name = org.get("name") or org.get("title") or str(org)
            print(f"  ‚Ä¢ {name}")
    except Exception as e:
        print(f"‚ùå Unable to list CKAN organizations: {e}")
        print("   Ensure your Tapis token is valid and the API can reach CKAN.")


üîé Checking CKAN organizations via Upstream API...
Found 10 organizations:
  ‚Ä¢ arctic-infrastructure
  ‚Ä¢ dso-internal
  ‚Ä¢ dynamo
  ‚Ä¢ dynamic-sensemaking-framework
  ‚Ä¢ planet-texas-2050
  ‚Ä¢ setx-uifl
  ‚Ä¢ tacc
  ‚Ä¢ twdb-subside
  ‚Ä¢ upstream
  ‚Ä¢ vital_org


In [8]:
# List available CKAN organizations via the Upstream API
print("üè¢ Available CKAN organizations (via Upstream API):")

# Use env token or token returned during API auth
tapis_token = os.getenv("TAPIS_TOKEN") or os.getenv("TAPIS_ACCESS_TOKEN")
if not tapis_token:
    tapis_token = client.auth_manager.get_tapis_token()

try:
    if not tapis_token:
        raise RuntimeError("Tapis token required to list organizations")

    organizations = client.list_ckan_organizations(tapis_token=tapis_token)

    if organizations:
        print(f"Found {len(organizations)} organizations:")
        for org in organizations[:5]:  # Show first 5
            name = org.get('name') or org.get('title') or str(org)
            title = org.get('title') or org.get('name') or ''
            print(f"  ‚Ä¢ {name}: {title}")
            print(f"    Description: {(org.get('description') or 'No description')[:60]}...")
            print(f"    Packages: {org.get('package_count', 0)}")
            print()

        # Check if our target organization exists
        org_names = [org.get('name') for org in organizations if org.get('name')]
        if 'CKAN_ORGANIZATION' in globals() and CKAN_ORGANIZATION in org_names:
            print(f"‚úÖ Target organization '{CKAN_ORGANIZATION}' found!")
        elif 'CKAN_ORGANIZATION' in globals():
            print(f"‚ö†Ô∏è  Target organization '{CKAN_ORGANIZATION}' not found.")
            print("   Publishing will use test dataset mode.")
    else:
        print("No organizations found or access restricted.")

except Exception as e:
    print(f"‚ö†Ô∏è  Could not list organizations: {e}")
    print("Continuing with dataset publishing...")


üè¢ Available CKAN organizations (via Upstream API):
Found 10 organizations:
  ‚Ä¢ arctic-infrastructure: Arctic Infrastructure
    Description: No description...
    Packages: 0

  ‚Ä¢ dso-internal: DSO Internal
    Description: No description...
    Packages: 0

  ‚Ä¢ dynamo: DYNAMO
    Description: Analysis and Model Integration MINT and Cookbooks...
    Packages: 0

  ‚Ä¢ dynamic-sensemaking-framework: Dynamic Sensemaking Framework
    Description: No description...
    Packages: 0

  ‚Ä¢ planet-texas-2050: Planet Texas 2050
    Description: Planet Texas 2050's interdisciplinary research teams work on...
    Packages: 0

‚úÖ Target organization 'dso-internal' found!


In [9]:
# Search for existing Upstream datasets
print("üîç Searching for existing Upstream datasets in CKAN:")
try:
    upstream_datasets = ckan.list_datasets(
        tags=["upstream", "environmental"],
        limit=10
    )

    if upstream_datasets:
        print(f"Found {len(upstream_datasets)} Upstream-related datasets:")
        for dataset in upstream_datasets[:3]:  # Show first 3
            print(f"  ‚Ä¢ {dataset['name']}: {dataset['title']}")
            print(f"    Notes: {(dataset.get('notes') or 'No description')[:80]}...")
            print(f"    Resources: {len(dataset.get('resources', []))}")
            print(f"    Tags: {', '.join([tag['name'] for tag in dataset.get('tags', [])])}")
            print()
    else:
        print("No existing Upstream datasets found.")
        print("This will be the first Upstream dataset in this portal!")

except Exception as e:
    print(f"‚ö†Ô∏è  Could not search datasets: {e}")
    print("Proceeding with dataset creation...")

üîç Searching for existing Upstream datasets in CKAN:
‚ö†Ô∏è  Could not search datasets: name 'ckan' is not defined
Proceeding with dataset creation...


## 4. Data Export and Preparation

Before publishing to CKAN, let's export the campaign data and examine its structure.

In [10]:
# Get detailed campaign information
print(f"üìä Retrieving detailed campaign information...")
try:
    campaign_details = client.get_campaign(campaign_id)

    print(f"‚úÖ Campaign Details Retrieved:")
    print(f"   Name: {campaign_details.name}")
    print(f"   Description: {campaign_details.description}")
    print(f"   Contact: {campaign_details.contact_name} ({campaign_details.contact_email})")
    print(f"   Allocation: {campaign_details.allocation}")
    print(f"   Start Date: {campaign_details.start_date}")
    print(f"   End Date: {campaign_details.end_date}")

    # Check campaign summary if available
    if hasattr(campaign_details, 'summary') and campaign_details.summary:
        summary = campaign_details.summary
        print(f"\nüìà Campaign Summary:")
        if hasattr(summary, 'total_stations'):
            print(f"   ‚Ä¢ Total Stations: {summary.total_stations}")
        if hasattr(summary, 'total_sensors'):
            print(f"   ‚Ä¢ Total Sensors: {summary.total_sensors}")
        if hasattr(summary, 'total_measurements'):
            print(f"   ‚Ä¢ Total Measurements: {summary.total_measurements}")
        if hasattr(summary, 'sensor_types'):
            print(f"   ‚Ä¢ Sensor Types: {', '.join(summary.sensor_types)}")

except Exception as e:
    print(f"‚ùå Error retrieving campaign details: {e}")
    raise

üìä Retrieving detailed campaign information...
‚úÖ Campaign Details Retrieved:
   Name: Cow Bayou near Mauriceville - Cow Bayou
   Description: Station Cow Bayou in campaign Cow Bayou near Mauriceville


   Contact: WILLIAM H MOBLEY (feulmob@gmail.com)
   Allocation: setx-uifl
   Start Date: None
   End Date: None

üìà Campaign Summary:
   ‚Ä¢ Sensor Types: Rain Increment, River Stage, Flow Volume


In [11]:
# Export station data for CKAN publishing
print(f"üì§ Exporting station data for CKAN publishing...")
try:
    # Export sensor configuration
    print("   Exporting sensor configuration...")
    station_sensors_data = client.stations.export_station_sensors(
        station_id=station_id,
        campaign_id=campaign_id
    )

    # Export measurement data
    print("   Exporting measurement data...")
    station_measurements_data = client.stations.export_station_measurements(
        station_id=station_id,
        campaign_id=campaign_id
    )

    # Check exported data sizes
    sensors_size = len(station_sensors_data.getvalue()) if hasattr(station_sensors_data, 'getvalue') else 0
    measurements_size = len(station_measurements_data.getvalue()) if hasattr(station_measurements_data, 'getvalue') else 0

    print(f"‚úÖ Data export completed:")
    print(f"   ‚Ä¢ Sensors data: {sensors_size:,} bytes")
    print(f"   ‚Ä¢ Measurements data: {measurements_size:,} bytes")
    print(f"   ‚Ä¢ Total data size: {(sensors_size + measurements_size):,} bytes")

    if sensors_size == 0:
        print("‚ö†Ô∏è  Warning: Sensors data is empty")
    if measurements_size == 0:
        print("‚ö†Ô∏è  Warning: Measurements data is empty")

    print("‚úÖ Ready for CKAN publication!")

except Exception as e:
    print(f"‚ùå Error exporting station data: {e}")
    raise

üì§ Exporting station data for CKAN publishing...
   Exporting sensor configuration...
Exporting sensors for station 1 in campaign 1
   Exporting measurement data...
‚úÖ Data export completed:
   ‚Ä¢ Sensors data: 180 bytes
   ‚Ä¢ Measurements data: 574,658 bytes
   ‚Ä¢ Total data size: 574,838 bytes
‚úÖ Ready for CKAN publication!


## 5. CKAN Dataset Creation and Publishing

Now let's publish the campaign data to CKAN using the integrated publishing functionality.

In [12]:
# Prepare dataset metadata
dataset_name = f"upstream-campaign-{campaign_id}"
print(f"üè∑Ô∏è  Preparing dataset metadata for: {dataset_name}")

# Create comprehensive metadata
dataset_metadata = {
    "name": dataset_name,
    "title": campaign_details.name,
    "notes": f"""{campaign_details.description}

This dataset contains environmental sensor data collected through the Upstream platform.

**Campaign Information:**
- Campaign ID: {campaign_id}
- Contact: {campaign_details.contact_name} ({campaign_details.contact_email})
- Allocation: {campaign_details.allocation}
- Duration: {campaign_details.start_date} to {campaign_details.end_date}

**Data Structure:**
- Sensors Configuration: Contains sensor metadata, units, and processing information
- Measurement Data: Time-series environmental measurements with geographic coordinates

**Access and Usage:**
Data is provided in CSV format for easy analysis and integration with various tools.""",
    "tags": ["environmental", "sensors", "upstream", "monitoring", "time-series"],
    "extras": [
        {"key": "campaign_id", "value": str(campaign_id)},
        {"key": "station_id", "value": str(station_id)},
        {"key": "source", "value": "Upstream Platform"},
        {"key": "data_type", "value": "environmental_sensor_data"},
        {"key": "contact_email", "value": campaign_details.contact_email},
        {"key": "allocation", "value": campaign_details.allocation},
        {"key": "export_date", "value": datetime.now().isoformat()}
    ],
    "license_id": "cc-by",  # Creative Commons Attribution
}

print(f"üìã Dataset Metadata Prepared:")
print(f"   ‚Ä¢ Name: {dataset_metadata['name']}")
print(f"   ‚Ä¢ Title: {dataset_metadata['title']}")
print(f"   ‚Ä¢ Tags: {', '.join(dataset_metadata['tags'])}")
print(f"   ‚Ä¢ License: {dataset_metadata['license_id']}")
print(f"   ‚Ä¢ Extra fields: {len(dataset_metadata['extras'])}")
print(f"   ‚Ä¢ Notes: {dataset_metadata['notes']}")

üè∑Ô∏è  Preparing dataset metadata for: upstream-campaign-1
üìã Dataset Metadata Prepared:
   ‚Ä¢ Name: upstream-campaign-1
   ‚Ä¢ Title: Cow Bayou near Mauriceville - Cow Bayou
   ‚Ä¢ Tags: environmental, sensors, upstream, monitoring, time-series
   ‚Ä¢ License: cc-by
   ‚Ä¢ Extra fields: 7
   ‚Ä¢ Notes: Station Cow Bayou in campaign Cow Bayou near Mauriceville



This dataset contains environmental sensor data collected through the Upstream platform.

**Campaign Information:**
- Campaign ID: 1
- Contact: WILLIAM H MOBLEY (feulmob@gmail.com)
- Allocation: setx-uifl
- Duration: None to None

**Data Structure:**
- Sensors Configuration: Contains sensor metadata, units, and processing information
- Measurement Data: Time-series environmental measurements with geographic coordinates

**Access and Usage:**
Data is provided in CSV format for easy analysis and integration with various tools.


# Publish campaign data to CKAN using integrated method
print(f"üì§ Publishing campaign data to CKAN...")
station_name = client.stations.get(station_id=station_id, campaign_id=campaign_id).name

try:
    # Use the integrated CKAN publishing method
    publication_result = client.publish_to_ckan(
        campaign_id=campaign_id,
        station_id=station_id,
    )

    print(f"‚úÖ CKAN Publication Successful!")
    print(f"\nüìä Publication Summary:")
    print(f"   ‚Ä¢ Success: {publication_result['success']}")
    print(f"   ‚Ä¢ Dataset Name: {publication_result['dataset']['name']}")
    print(f"   ‚Ä¢ Dataset ID: {publication_result['dataset']['id']}")
    print(f"   ‚Ä¢ Resources Created: {len(publication_result['resources'])}")
    print(f"   ‚Ä¢ CKAN URL: {publication_result['ckan_url']}")
    print(f"   ‚Ä¢ Message: {publication_result['message']}")

    # Store results for further operations
    published_dataset = publication_result['dataset']
    published_resources = publication_result['resources']
    ckan_dataset_url = publication_result['ckan_url']

    print(f"\nüéâ Your data is now publicly available at:")
    print(f"   {ckan_dataset_url}")

except Exception as e:
    print(f"‚ùå CKAN publication failed: {e}")
    print("\nTroubleshooting tips:")
    print("   ‚Ä¢ Check CKAN API credentials")
    print("   ‚Ä¢ Verify organization permissions")
    print("   ‚Ä¢ Ensure CKAN portal is accessible")
    print("   ‚Ä¢ Check dataset name uniqueness")
    raise

In [13]:
# Demonstrate Custom Metadata Publishing
print("üé® Demonstrating Custom Metadata Publishing...")

# Example 1: Basic custom metadata
print("\nüìù Example 1: Adding custom dataset metadata")
custom_dataset_metadata = {
    "project_name": "Water Quality Monitoring Study",
    "funding_agency": "Environmental Protection Agency",
    "grant_number": "EPA-2024-WQ-001",
    "study_period": "2024-2025",
    "principal_investigator": "Dr. Jane Smith",
    "institution": "University of Environmental Sciences",
    "data_quality_level": "Level 2 - Quality Controlled"
}

print("Custom dataset metadata to be added:")
for key, value in custom_dataset_metadata.items():
    print(f"   ‚Ä¢ {key}: {value}")

# Example 2: Custom resource metadata
print("\nüìÑ Example 2: Adding custom resource metadata")
custom_resource_metadata = {
    "calibration_date": "2024-01-15",
    "calibration_method": "NIST-traceable standards",
    "processing_version": "v2.1",
    "quality_control": "Automated + Manual Review",
    "uncertainty_bounds": "¬±2% of reading",
    "data_completeness": "98.5%"
}

print("Custom resource metadata to be added to both sensors.csv and measurements.csv:")
for key, value in custom_resource_metadata.items():
    print(f"   ‚Ä¢ {key}: {value}")

# Example 3: Custom tags
print("\nüè∑Ô∏è Example 3: Adding custom tags")
custom_tags = [
    "water-quality",
    "epa-funded",
    "university-research",
    "quality-controlled",
    "long-term-monitoring"
]

print(f"Custom tags (added to base tags): {', '.join(custom_tags)}")
print(f"Final tags will be: {', '.join(['environmental', 'sensors', 'upstream'] + custom_tags)}")

# Example 4: Additional CKAN dataset parameters
print("\n‚öôÔ∏è Example 4: Additional CKAN dataset parameters")
additional_params = {
    "license_id": "cc-by-4.0",  # Creative Commons Attribution 4.0
    "version": "2.1",
    "author": "Environmental Research Team",
    "author_email": "research@university.edu",
    "maintainer": "Dr. Jane Smith",
    "maintainer_email": "jane.smith@university.edu"
}

print("Additional CKAN dataset parameters:")
for key, value in additional_params.items():
    print(f"   ‚Ä¢ {key}: {value}")

print("\nüí° These examples show how to enrich your CKAN datasets with project-specific metadata!")

üé® Demonstrating Custom Metadata Publishing...

üìù Example 1: Adding custom dataset metadata
Custom dataset metadata to be added:
   ‚Ä¢ project_name: Water Quality Monitoring Study
   ‚Ä¢ funding_agency: Environmental Protection Agency
   ‚Ä¢ grant_number: EPA-2024-WQ-001
   ‚Ä¢ study_period: 2024-2025
   ‚Ä¢ principal_investigator: Dr. Jane Smith
   ‚Ä¢ institution: University of Environmental Sciences
   ‚Ä¢ data_quality_level: Level 2 - Quality Controlled

üìÑ Example 2: Adding custom resource metadata
Custom resource metadata to be added to both sensors.csv and measurements.csv:
   ‚Ä¢ calibration_date: 2024-01-15
   ‚Ä¢ calibration_method: NIST-traceable standards
   ‚Ä¢ processing_version: v2.1
   ‚Ä¢ quality_control: Automated + Manual Review
   ‚Ä¢ uncertainty_bounds: ¬±2% of reading
   ‚Ä¢ data_completeness: 98.5%

üè∑Ô∏è Example 3: Adding custom tags
Custom tags (added to base tags): water-quality, epa-funded, university-research, quality-controlled, long-term-monitori

In [14]:
# Publish station data to CKAN via the Upstream API (pods)
print("üöÄ Publishing station data to CKAN via Upstream API")

# Use Tapis token from env or from API auth response.
tapis_token = os.getenv("TAPIS_TOKEN") or os.getenv("TAPIS_ACCESS_TOKEN")
if not tapis_token:
    tapis_token = client.auth_manager.get_tapis_token()

if not tapis_token:
    raise RuntimeError("Tapis token required. Set TAPIS_TOKEN/TAPIS_ACCESS_TOKEN or authenticate to receive tapis_access_token.")

try:
    publication_result = client.publish_to_ckan(
        campaign_id=campaign_id,
        station_id=station_id,
        organization=CKAN_ORG if 'CKAN_ORG' in globals() else None,
        cascade=True,
        force=False,
        tapis_token=tapis_token,
    )

    print("‚úÖ Publish call completed")
    print("
üìä Publish Response:")
    print(f"   ‚Ä¢ Success: {publication_result.get('success')}")
    print(f"   ‚Ä¢ Message: {publication_result.get('message')}")
    print(f"   ‚Ä¢ Published Count: {publication_result.get('published_count')}")
    print(f"   ‚Ä¢ Errors: {publication_result.get('errors')}")
    print(f"   ‚Ä¢ ID: {publication_result.get('id')}")
    print(f"   ‚Ä¢ Type: {publication_result.get('type')}")
    print(f"   ‚Ä¢ is_published: {publication_result.get('is_published')}")
    print(f"   ‚Ä¢ published_at: {publication_result.get('published_at')}")
    if publication_result.get('cascaded_items'):
        print(f"   ‚Ä¢ Cascaded Items: {publication_result.get('cascaded_items')}")

except Exception as e:
    print(f"‚ùå Publish failed: {e}")
    print("Troubleshooting tips:")
    print("   ‚Ä¢ Verify Tapis token and CKAN organization access")
    print("   ‚Ä¢ Confirm campaign/station IDs are valid")
    raise


SyntaxError: unterminated string literal (detected at line 23) (4215670045.py, line 23)

In [None]:
# Verify publish state from the API
print("üîç Verifying publish state from the API...")
try:
    station = client.stations.get(station_id=station_id, campaign_id=campaign_id)
    is_published = getattr(station, 'is_published', None)
    published_at = getattr(station, 'published_at', None)
    print(f"Station published: {is_published}")
    print(f"Station published_at: {published_at}")
except Exception as e:
    print(f"‚ö†Ô∏è Could not retrieve station publish state: {e}")


In [None]:
# Unpublish station from CKAN via the Upstream API
print("üßπ Unpublishing station via Upstream API")

# Use Tapis token from env or from API auth response.
tapis_token = os.getenv("TAPIS_TOKEN") or os.getenv("TAPIS_ACCESS_TOKEN")
if not tapis_token:
    tapis_token = client.auth_manager.get_tapis_token()

if not tapis_token:
    raise RuntimeError("Tapis token required. Set TAPIS_TOKEN/TAPIS_ACCESS_TOKEN or authenticate to receive tapis_access_token.")

try:
    result = client.unpublish_station(
        campaign_id=campaign_id,
        station_id=station_id,
        tapis_token=tapis_token,
    )

    print("‚úÖ Unpublish call completed")
    print("
üìä Unpublish Response:")
    print(f"   ‚Ä¢ Success: {result.get('success')}")
    print(f"   ‚Ä¢ Message: {result.get('message')}")
    print(f"   ‚Ä¢ Published Count: {result.get('published_count')}")
    print(f"   ‚Ä¢ Errors: {result.get('errors')}")
    print(f"   ‚Ä¢ is_published: {result.get('is_published')}")
    print(f"   ‚Ä¢ published_at: {result.get('published_at')}")

except Exception as e:
    print(f"‚ùå Unpublish failed: {e}")
    raise


## 6. Dataset Verification and Exploration

Let's verify the published dataset and explore its contents in CKAN.

In [None]:
# Verify the published dataset
print(f"üîç Verifying published dataset in CKAN...")

try:
    # Retrieve the dataset from CKAN to verify it was created correctly
    verified_dataset = ckan.get_dataset(published_dataset['name'])

    print(f"‚úÖ Dataset verification successful!")
    print(f"\nüìã Dataset Information:")
    print(f"   ‚Ä¢ Name: {verified_dataset['name']}")
    print(f"   ‚Ä¢ Title: {verified_dataset['title']}")
    print(f"   ‚Ä¢ State: {verified_dataset['state']}")
    print(f"   ‚Ä¢ Private: {verified_dataset.get('private', 'Unknown')}")
    print(f"   ‚Ä¢ License: {verified_dataset.get('license_title', 'Not specified')}")
    print(f"   ‚Ä¢ Created: {verified_dataset.get('metadata_created', 'Unknown')}")
    print(f"   ‚Ä¢ Modified: {verified_dataset.get('metadata_modified', 'Unknown')}")

    # Show organization info if available
    if verified_dataset.get('organization'):
        org = verified_dataset['organization']
        print(f"   ‚Ä¢ Organization: {org.get('title', org.get('name', 'Unknown'))}")

    # Show tags
    if verified_dataset.get('tags'):
        tags = [tag['name'] for tag in verified_dataset['tags']]
        print(f"   ‚Ä¢ Tags: {', '.join(tags)}")

    # Show extras
    if verified_dataset.get('extras'):
        print(f"   ‚Ä¢ Extra metadata fields: {len(verified_dataset['extras'])}")
        for extra in verified_dataset['extras'][:3]:  # Show first 3
            print(f"     - {extra['key']}: {extra['value']}")

except Exception as e:
    print(f"‚ùå Dataset verification failed: {e}")

In [None]:
# Examine the published resources
print(f"üìÅ Examining published resources...")

try:
    resources = verified_dataset.get('resources', [])

    if resources:
        print(f"Found {len(resources)} resources:")

        for i, resource in enumerate(resources, 1):
            print(f"\n   üìÑ Resource {i}: {resource['name']}")
            print(f"      ‚Ä¢ ID: {resource['id']}")
            print(f"      ‚Ä¢ Format: {resource.get('format', 'Unknown')}")
            print(f"      ‚Ä¢ Size: {resource.get('size', 'Unknown')} bytes")
            print(f"      ‚Ä¢ Description: {resource.get('description', 'No description')}")
            print(f"      ‚Ä¢ Created: {resource.get('created', 'Unknown')}")
            print(f"      ‚Ä¢ URL: {resource.get('url', 'Not available')}")

            # Show download information
            if resource.get('url'):
                download_url = resource['url']
                if not download_url.startswith('http'):
                    download_url = f"{CKAN_URL}{download_url}"
                print(f"      ‚Ä¢ Download: {download_url}")

        print(f"\n‚úÖ All resources published successfully!")

    else:
        print("‚ö†Ô∏è  No resources found in the dataset")

except Exception as e:
    print(f"‚ùå Error examining resources: {e}")

## 7. Dataset Management Operations

Let's demonstrate additional CKAN management operations like updating datasets and managing resources.

In [None]:
# Update dataset with additional metadata
print(f"üîÑ Demonstrating dataset update operations...")

try:
    # Add update timestamp and additional tags
    current_tags = [tag['name'] for tag in verified_dataset.get('tags', [])]
    updated_tags = current_tags + ["demo", "notebook-generated"]

    # Update the dataset
    updated_dataset = ckan.update_dataset(
        dataset_id=published_dataset['name'],
        tags=updated_tags,
        notes=f"{verified_dataset.get('notes', '')}\n\n**Last Updated:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S UTC')} (via Upstream SDK Demo)"
    )

    print(f"‚úÖ Dataset updated successfully!")
    print(f"   ‚Ä¢ New tags added: demo, notebook-generated")
    print(f"   ‚Ä¢ Description updated with timestamp")
    print(f"   ‚Ä¢ Total tags: {len(updated_dataset.get('tags', []))}")

except Exception as e:
    print(f"‚ö†Ô∏è  Dataset update failed: {e}")
    print("This may be due to insufficient permissions or CKAN configuration.")

## 11. Cleanup and Resource Management

Let's demonstrate proper cleanup and resource management.

In [None]:
# Dataset management options
print(f"üßπ Dataset Management and Cleanup Options:")

print(f"\nüìä Current Dataset Status:")
print(f"   ‚Ä¢ Dataset Name: {published_dataset['name']}")
print(f"   ‚Ä¢ Dataset ID: {published_dataset['id']}")
print(f"   ‚Ä¢ CKAN URL: {ckan_dataset_url}")
print(f"   ‚Ä¢ Resources: {len(published_resources)}")

print(f"\nüîß Management Options:")
print(f"   1. Keep dataset active (recommended for production)")
print(f"   2. Make dataset private (hide from public)")
print(f"   3. Archive dataset (mark as deprecated)")
print(f"   4. Delete dataset (only for test data)")

# For demo purposes, we'll show how to manage the dataset
print(f"\nüí° For this demo, we'll keep the dataset active.")
print(f"   Your published data will remain available at:")
print(f"   {ckan_dataset_url}")

# Uncomment the following section if you want to delete the demo dataset
"""
# CAUTION: Uncomment only for cleanup of test datasets
print(f"\n‚ö†Ô∏è  Demo dataset cleanup:")
try:
    # Delete the demo dataset (only for demo purposes)
    deletion_result = ckan.delete_dataset(published_dataset['name'])
    if deletion_result:
        print(f"   ‚úÖ Demo dataset deleted successfully")
    else:
        print(f"   ‚ùå Dataset deletion failed")
except Exception as e:
    print(f"   ‚ö†Ô∏è  Could not delete dataset: {e}")
    print(f"   This may be due to insufficient permissions or CKAN configuration.")
"""

print(f"\nüîÑ Resource Cleanup:")
try:
    # Close any open file handles
    if 'station_sensors_data' in locals():
        station_sensors_data.close()
    if 'station_measurements_data' in locals():
        station_measurements_data.close()


    print(f"   ‚úÖ File handles closed")
except Exception as e:
    print(f"   ‚ö†Ô∏è  Error closing file handles: {e}")

In [None]:
# Logout and final cleanup
print(f"üëã Session cleanup and logout...")

try:
    # Logout from Upstream
    client.logout()
    print(f"   ‚úÖ Logged out from Upstream successfully")
except Exception as e:
    print(f"   ‚ùå Logout error: {e}")

print(f"\nüéâ CKAN Integration Demo Completed Successfully!")

print(f"\nüìö Summary of What We Accomplished:")
print(f"   ‚úÖ Connected to both Upstream and CKAN platforms")
print(f"   ‚úÖ Selected and validated campaign data")
print(f"   ‚úÖ Exported sensor and measurement data")
print(f"   ‚úÖ Created comprehensive CKAN dataset with metadata")
print(f"   ‚úÖ Published resources (sensors, measurements, metadata)")
print(f"   ‚úÖ Demonstrated dataset management operations")
print(f"   ‚úÖ Explored data discovery and search capabilities")
print(f"   ‚úÖ Showed automated publishing workflows")

print(f"\nüåê Your Data is Now Publicly Available:")
print(f"   üìä Dataset: {published_dataset['name']}")
print(f"   üîó URL: {ckan_dataset_url}")
print(f"   üìÅ Resources: {len(published_resources)} files available for download")

print(f"\nüìñ Next Steps:")
print(f"   ‚Ä¢ Explore your published data in the CKAN web interface")
print(f"   ‚Ä¢ Set up automated publishing workflows for production")
print(f"   ‚Ä¢ Configure organization permissions and access controls")
print(f"   ‚Ä¢ Integrate CKAN APIs with other data analysis tools")
print(f"   ‚Ä¢ Monitor dataset usage and access patterns")

## Summary

This notebook demonstrated the comprehensive CKAN integration capabilities of the Upstream SDK:

‚úÖ **Authentication & Setup** - Configured both Upstream and CKAN credentials  
‚úÖ **Data Export** - Retrieved campaign data and prepared for publishing  
‚úÖ **Dataset Creation** - Created CKAN datasets with rich metadata  
‚úÖ **Resource Management** - Published multiple data resources (sensors, measurements, metadata)  
‚úÖ **Portal Exploration** - Discovered existing datasets and organizations  
‚úÖ **Update Operations** - Demonstrated dataset and resource updates  
‚úÖ **Search & Discovery** - Showed data findability through tags and organization  
‚úÖ **Automation Workflows** - Built reusable publishing processes  
‚úÖ **Best Practices** - Covered naming, metadata, and performance considerations  

## Key Features

- **Seamless Integration**: Direct connection between Upstream campaigns and CKAN datasets
- **Rich Metadata**: Automatic generation of comprehensive dataset descriptions and tags
- **Multi-Resource Support**: Separate resources for sensors, measurements, and metadata
- **Update Management**: Smart handling of dataset updates and versioning
- **Error Handling**: Robust error handling and validation throughout the process
- **Automation Ready**: Workflow patterns suitable for production automation

## Production Considerations

- **Authentication**: Use environment variables or configuration files for credentials
- **Monitoring**: Implement logging and monitoring for automated publishing workflows
- **Permissions**: Configure appropriate CKAN organization permissions and access controls
- **Validation**: Add comprehensive data validation before publishing
- **Backup**: Maintain backup copies of datasets before updates

## Related Documentation

- [CKAN API Documentation](https://docs.ckan.org/en/latest/api/)

---

*This notebook demonstrates CKAN integration for the Upstream SDK. For core platform functionality, see UpstreamSDK_Core_Demo.ipynb*