In [1]:
import cdsapi

In [6]:
cdsapi_version

'0.7.6'

In [2]:
# Initialize cdsapi client to get session
client = cdsapi.Client(debug=True)

dataset = 'cams-global-radiative-forcings'
request = {
    'variable': ['radiative_forcing_of_carbon_dioxide'],
    'forcing_type': 'instantaneous',
    'band': ['long_wave'],
    'sky_type': ['all_sky'],
    'level': ['surface'],
    'version': ['2'],
    'year': ['2018'],
    'month': ['06']
}
target = 'download.grib'

2025-08-15 11:05:28,621 DEBUG GET https://ads.atmosphere.copernicus.eu/api/catalogue/v1/messages
2025-08-15 11:05:29,480 DEBUG REPLY {"messages":[{"id":"sites/ads/2024/forum.md","date":"2024-09-26T00:00:00","summary":null,"url":null,"severity":"info","content":"Watch our [Forum]( https://forum.ecmwf.int/) for Announcements, news and other discussed topics.","live":true}]}
2025-08-15 11:05:29,481 INFO [2024-09-26T00:00:00] Watch our [Forum]( https://forum.ecmwf.int/) for Announcements, news and other discussed topics.
2025-08-15 11:05:29,482 DEBUG CDSAPI {'url': 'https://ads.atmosphere.copernicus.eu/api', 'key': '2d85cf79-2c34-43a7-8c2c-7f64b984fe13', 'quiet': False, 'verify': True, 'timeout': 60, 'progress': True, 'sleep_max': 120, 'retry_max': 500, 'delete': False, 'datastores_version': '0.4.0'}


In [3]:
client.retrieve(dataset, request, target)

2025-08-15 11:05:48,575 DEBUG GET https://ads.atmosphere.copernicus.eu/api/retrieve/v1/processes/cams-global-radiative-forcings
2025-08-15 11:05:48,693 DEBUG REPLY {"title":"CAMS global radiative forcings","description":"This dataset provides geographical distributions of the radiative forcing (RF) by key atmospheric constituents. The radiative forcing estimates are based on the CAMS reanalysis and additional model simulations and are provided separately for...\n\ncarbon dioxide \nmethane \ntropospheric ozone \nstratospheric ozone\ninteractions between anthropogenic aerosols and radiation\ninteractions between anthropogenic aerosols and clouds\n\nRadiative forcing measures the imbalance in the Earth’s energy budget caused by a perturbation of the climate system, such as changes in atmospheric composition caused by human activities. RF is a useful predictor of globally-averaged temperature change, especially when rapid adjustments of atmospheric temperature and moisture profiles are tak

'download.grib'

In [None]:
client.session.heade

In [None]:
import requests
import json
import base64
import os
from pathlib import Path

class ADSDirectClient:
    """
    Direct client for Copernicus ADS API without using cdsapi library
    """
    
    def __init__(self, personal_access_token=None):
        self.base_url = "https://ads.atmosphere.copernicus.eu/api/retrieve/v1"
        self.pat = personal_access_token or self._get_pat_from_config()
        self.session = requests.Session()
        self._setup_authentication()
    
    def _get_pat_from_config(self):
        """Get Personal Access Token from .cdsapirc file"""
        config_paths = [
            Path.home() / ".cdsapirc",
            Path.home() / ".adsapirc"  # Sometimes ADS uses different config
        ]
        
        for config_path in config_paths:
            if config_path.exists():
                try:
                    with open(config_path, 'r') as f:
                        content = f.read()
                        for line in content.split('\n'):
                            if line.strip().startswith('key:'):
                                return line.split('key:')[1].strip()
                except Exception as e:
                    print(f"Error reading {config_path}: {e}")
        
        raise ValueError("Personal Access Token not found. Set it manually or in ~/.cdsapirc")
    
    def _setup_authentication(self):
        """Setup authentication headers - trying multiple approaches"""
        
        # Method 1: Basic Auth with empty username (most common for API keys)
        auth_string = f":{self.pat}"
        encoded_auth = base64.b64encode(auth_string.encode()).decode()
        
        self.session.headers.update({
            "Authorization": f"Basic {encoded_auth}",
            "Content-Type": "application/json",
            "Accept": "application/json",
            "User-Agent": "python-ads-client/1.0.0"
        })
    
    def _try_alternative_auth(self, response_401):
        """Try alternative authentication methods if basic auth fails"""
        
        auth_methods = [
            # Method 2: Basic auth with 'user' as username
            ("Basic", base64.b64encode(f"user:{self.pat}".encode()).decode()),
            # Method 3: Bearer token
            ("Bearer", self.pat),
            # Method 4: Try with different basic auth format
            ("Basic", base64.b64encode(f"{self.pat}:".encode()).decode()),
        ]
        
        for auth_type, token in auth_methods:
            headers = self.session.headers.copy()
            headers["Authorization"] = f"{auth_type} {token}"
            
            print(f"Trying {auth_type} authentication...")
            test_response = requests.get(
                f"{self.base_url}/processes",
                headers=headers,
                timeout=10
            )
            
            if test_response.status_code != 401:
                print(f"✅ {auth_type} authentication works!")
                self.session.headers["Authorization"] = f"{auth_type} {token}"
                return True
        
        return False
    
    def get_processes(self):
        """Get available processes/datasets"""
        response = self.session.get(f"{self.base_url}/processes")
        
        if response.status_code == 401:
            print("Authentication failed with default method, trying alternatives...")
            if self._try_alternative_auth(response):
                response = self.session.get(f"{self.base_url}/processes")
            else:
                raise Exception("All authentication methods failed")
        
        response.raise_for_status()
        return response.json()
    
    def submit_job(self, process_id, inputs):
        """Submit a data retrieval job"""
        
        payload = {
            "inputs": inputs,
            "response": "document"  # or "raw" depending on what you want
        }
        
        url = f"{self.base_url}/processes/{process_id}/execution"
        
        print(f"Submitting job to: {url}")
        print(f"Payload: {json.dumps(payload, indent=2)}")
        print(f"Headers: {dict(self.session.headers)}")
        
        response = self.session.post(url, json=payload)
        
        if response.status_code == 401:
            print("Authentication failed, trying alternatives...")
            if self._try_alternative_auth(response):
                response = self.session.post(url, json=payload)
            else:
                raise Exception("Authentication failed with all methods")
        
        print(f"Response Status: {response.status_code}")
        print(f"Response Headers: {dict(response.headers)}")
        
        if response.status_code not in [200, 201, 202]:
            print(f"Error Response: {response.text}")
            response.raise_for_status()
        
        return response.json()
    
    def get_job_status(self, job_id):
        """Get status of a submitted job"""
        response = self.session.get(f"{self.base_url}/jobs/{job_id}")
        response.raise_for_status()
        return response.json()
    
    def download_result(self, job_id, output_file):
        """Download the result of a completed job"""
        response = self.session.get(f"{self.base_url}/jobs/{job_id}/results")
        response.raise_for_status()
        
        with open(output_file, 'wb') as f:
            for chunk in response.iter_content(chunk_size=8192):
                f.write(chunk)
        
        return output_file

def test_ads_direct_access():
    """Test the direct ADS API access"""
    
    try:
        # Initialize client
        client = ADSDirectClient()
        
        # Test 1: Get available processes
        print("Testing: Get available processes...")
        try:
            processes = client.get_processes()
            print("✅ Successfully retrieved processes list")
            print(f"Found {len(processes.get('processes', []))} processes")
        except Exception as e:
            print(f"❌ Failed to get processes: {e}")
            return
        
        # Test 2: Submit a simple job
        print("\nTesting: Submit a job...")
        
        inputs = {
            "variable": ["temperature"],
            "pressure_level": ["1000"],
            "date": ["2023-01-01"],
            "time": ["00:00"],
            "format": "netcdf",
            "area": [90, -180, -90, 180]  # Global
        }
        
        try:
            job_response = client.submit_job("cams-global-reanalysis-eac4", inputs)
            print("✅ Successfully submitted job")
            print(f"Job response: {json.dumps(job_response, indent=2)}")
            
            # If job has an ID, check its status
            if 'jobID' in job_response:
                job_id = job_response['jobID']
                status = client.get_job_status(job_id)
                print(f"Job status: {status}")
                
        except Exception as e:
            print(f"❌ Failed to submit job: {e}")
    
    except Exception as e:
        print(f"❌ Failed to initialize client: {e}")
        print("Make sure your Personal Access Token is in ~/.cdsapirc")

# Additional debugging function
def debug_authentication():
    """Debug authentication by showing exactly what headers to use"""
    
    # Get PAT from config
    config_path = Path.home() / ".cdsapirc"
    pat = None
    
    if config_path.exists():
        with open(config_path, 'r') as f:
            content = f.read()
            for line in content.split('\n'):
                if line.strip().startswith('key:'):
                    pat = line.split('key:')[1].strip()
                    break
    
    if not pat:
        print("❌ No Personal Access Token found in ~/.cdsapirc")
        return
    
    print(f"PAT found: {pat[:10]}...")
    
    # Show different authentication header options
    print("\n=== AUTHENTICATION HEADERS TO TRY ===")
    
    # Option 1
    auth1 = base64.b64encode(f":{pat}".encode()).decode()
    print(f"\nOption 1 - Basic Auth (empty username):")
    print(f"Authorization: Basic {auth1}")
    
    # Option 2  
    auth2 = base64.b64encode(f"user:{pat}".encode()).decode()
    print(f"\nOption 2 - Basic Auth (user:token):")
    print(f"Authorization: Basic {auth2}")
    
    # Option 3
    print(f"\nOption 3 - Bearer Token:")
    print(f"Authorization: Bearer {pat}")
    
    # Option 4
    print(f"\nOption 4 - API Key Header:")
    print(f"X-API-Key: {pat}")
    
    print(f"\n=== COMPLETE HEADERS TO USE ===")
    print(json.dumps({
        "Authorization": f"Basic {auth1}",
        "Content-Type": "application/json",
        "Accept": "application/json",
        "User-Agent": "python-ads-client/1.0.0"
    }, indent=2))

if __name__ == "__main__":
    print("=== ADS DIRECT API CLIENT ===\n")
    
    print("1. Debugging authentication...")
    debug_authentication()
    
    print("\n" + "="*50 + "\n")
    
    print("2. Testing direct API access...")
    test_ads_direct_access()