# Steal GEE Data

# Initial Settings

## Install Xee and Authenticate Earth Engine:

use it to authenticate Earth Engine 
<code>earthengine authenticate --quiet </code>

In [1]:
# Check and install required packages
import sys
import subprocess

def install_package(package):
    subprocess.check_call([sys.executable, "-m", "pip", "install", package])

required_packages = {
    'xee': 'For reading Google Earth Engine data to transform it into xarray',
    'xarray': 'For working with labesled multi-dimensional arrays',
    'dask': 'For parallel computing capabilities',
}

# Check and install missing packages
for package, purpose in required_packages.items():
    try:
        __import__(package)
        print(f"✓ {package} is already installed ({purpose})")
    except ImportError:
        print(f"Installing {package} ({purpose})...")
        install_package(package)
        print(f"✓ {package} has been installed")

# Verify installations
# Satellite Data Processing

import ee
import xarray as xr

print("\nAll required packages are installed and imported successfully!")

✓ xee is already installed (For reading Google Earth Engine data to transform it into xarray)
✓ xarray is already installed (For working with labesled multi-dimensional arrays)
✓ dask is already installed (For parallel computing capabilities)

All required packages are installed and imported successfully!


## Specify your EE-registered cloud project ID and initialize the EE client with the high volume API:

In [4]:
username = input("Enter your Earth Engine username: ")

In [5]:
ee.Initialize(
    project=f'ee-{username}', # Your id (usually is ee-{{username}})
    opt_url='https://earthengine-highvolume.googleapis.com')

# Finding the dataset in GEE 

In [16]:
# 列出所有 ECMWF 相關的數據集
datasets = ee.data.getList({'id': 'ECMWF/ERA5'})
for dataset in datasets:
    print(dataset['id'])


projects/earthengine-public/assets/ECMWF/ERA5/DAILY
projects/earthengine-public/assets/ECMWF/ERA5/HOURLY
projects/earthengine-public/assets/ECMWF/ERA5/MONTHLY
projects/earthengine-public/assets/ECMWF/ERA5/MONTHLY_BY_HOUR


In [11]:
ee.ImageCollection("ECMWF").getInfo()

EEException: ImageCollection.load: Expected asset 'ECMWF' to be an ImageCollection, found 'IndexedFolder'.

In [14]:
ee.IndexedFolder("ECMWF")

AttributeError: module 'ee' has no attribute 'IndexedFolder'

In [None]:
# Setting Config

def find_dataset_in_gee(dataset_name):
      '''
      Find the dataset in GEE
      '''
      return ee.ImageCollection(dataset_name).getInfo()





# Setting Config

## Set the Dask setting to help parallel reading Large Dataset

In [37]:
import psutil
import math
import dask
from dask.distributed import Client, LocalCluster

# Set OS parms
CORE = psutil.cpu_count(logical=False)
TOTAL_MEMORY = psutil.virtual_memory().total / (1024**3) 

core_per_worker = CORE - 2
momory_limit_per_core = math.floor((TOTAL_MEMORY / CORE-1) * 0.8)

# Set dask cluster
cluster = LocalCluster(
    n_workers=core_per_worker,
    threads_per_worker=2,
    memory_limit=f'{momory_limit_per_core}GB',
    dashboard_address=':8788' 
)

if dask.is_dask_collection(cluster):
    print('Dask cluster is ready')
client = Client(cluster)
print(client.dashboard_link)

Perhaps you already have a cluster running?
Hosting the HTTP server on port 37249 instead


http://127.0.0.1:37249/status


In [47]:
import psutil
import math
import dask
from dask.distributed import Client, LocalCluster, get_client

# Set OS parameters - get number of CPUs and total system memory
CORE = psutil.cpu_count(logical=False)
TOTAL_MEMORY = psutil.virtual_memory().total / (1024**3)  # Convert to GB

# Calculate worker resources - reserve 2 cores for system
core_per_worker = CORE - 2
memory_limit_per_core = math.floor((TOTAL_MEMORY / (CORE - 1)) * 0.8)  # Use 80% of available memory per core

# First try to get any existing Dask clients
try:
    # Try to get current client if it exists
    client = get_client()
    
    # Check if client is connected to expected cluster type
    if hasattr(client, 'cluster') and isinstance(client.cluster, LocalCluster):
        print("Using existing LocalCluster")
    else:
        print("Closing existing connection and creating new LocalCluster")
        client.close()
        cluster = LocalCluster(
            n_workers=core_per_worker,
            threads_per_worker=2,
            memory_limit=f'{memory_limit_per_core}GB',
            dashboard_address=':8788'
        )
        client = Client(cluster)
        
except ValueError:
    # No existing client found, create new cluster
    print("Creating new Dask Client...")
    cluster = LocalCluster(
        n_workers=core_per_worker,
        threads_per_worker=2,
        memory_limit=f'{memory_limit_per_core}GB',
        dashboard_address=':8788'
    )
    client = Client(cluster)

# Check if cluster is ready by verifying workers are connected
def cluster_ready(client, expected_workers=core_per_worker, timeout=30):
    """Check if Dask cluster has the expected number of workers."""
    import time
    start = time.time()
    while time.time() - start < timeout:
        if len(client.scheduler_info()['workers']) >= expected_workers:
            return True
        time.sleep(0.5)
    return False

# Verify cluster status
if cluster_ready(client):
    print(f"Dask cluster is ready with {len(client.scheduler_info()['workers'])} workers")
else:
    print(f"Warning: Only {len(client.scheduler_info()['workers'])} workers connected")

# Print connection information
print(f"Dashboard: {client.dashboard_link}")

Using existing LocalCluster
Dask cluster is ready with 10 workers
Dashboard: http://127.0.0.1:33889/status


In [None]:
import psutil
import math
import dask
from dask.distributed import Client, LocalCluster

# Set OS parms
CORE = psutil.cpu_count(logical=False)
TOTAL_MEMORY = psutil.virtual_memory().total / (1024**3) 

core_per_worker = CORE - 2
momory_limit_per_core = math.floor((TOTAL_MEMORY / CORE-1) * 0.8)


try:
    client = Client.current()
    
    # 檢查 client 是否連接到期望的 cluster
    if hasattr(client, 'cluster') and isinstance(client.cluster, LocalCluster):
        print("使用現有的 LocalCluster")
    else:
        print("關閉現有的連接並創建新的 LocalCluster")
        client.close()
        cluster = LocalCluster(
            n_workers=core_per_worker,
            threads_per_worker=2,
            memory_limit=f'{momory_limit_per_core}GB',
            dashboard_address=':8788'
        )
        client = Client(cluster)

except ValueError:
    print("創建新的 Dask Client...")
    cluster = LocalCluster(
        n_workers=core_per_worker,
        threads_per_worker=2,
        memory_limit=f'{momory_limit_per_core}GB',
        dashboard_address=':8788'
    )
    client = Client(cluster)

print(f"Dashboard: {client.dashboard_link}")

Closing existing Dask Client...
Existing client closed
Creating new Dask Client...


Perhaps you already have a cluster running?
Hosting the HTTP server on port 41605 instead


Dask dashboard available at: http://127.0.0.1:41605/status
Client status: <Client: 'tcp://127.0.0.1:39807' processes=10 threads=20, memory=83.82 GiB>


In [42]:
from dask.distributed import get_client
client = get_client()
client.close()
print(client)

<Client: No scheduler connected>


# Real-World Example 

In [8]:
ic = ee.ImageCollection('ECMWF/ERA5_LAND/HOURLY').filterDate(
    '1992-10-05', '1993-03-31')
ds = xr.open_dataset(ic, engine='ee', crs='EPSG:4326', scale=0.25)

In [10]:
ds

In [None]:
ds = xr.open_mfdataset(
    ['ee://ECMWF/ERA5_LAND/D', 'ee://NASA/GDDP-CMIP6'],
    engine='ee',
    chunks={'auto'},
    crs='EPSG:4326', scale=0.25,
    parallel=True
    ).filterDate('2023-10', '2024-03')

ds

KeyboardInterrupt: 