# Session 1 Live Demo: EO Processing Workflows & Python

**Course:** BD4EO - Big Data Foundations for Earth Observation  
**Session:** Tuesday 09:10-10:30  
**Instructor:** Anne Fouilloux (PANGEO)

This notebook contains the live demonstrations from Session 1, showing modern EO processing workflows using Python and cloud-native tools.

## Setup: Import Required Libraries

First, let's import the essential Python libraries for EO processing:

In [None]:
# Core data manipulation and analysis
import xarray as xr
import numpy as np
import pandas as pd

# Geospatial libraries
import rasterio
import rioxarray as rxr
import geopandas as gpd

# Visualization
import matplotlib.pyplot as plt
import cartopy.crs as ccrs
import cartopy.feature as cfeature

# Cloud and parallel computing
import dask
import dask.array as da

# STAC for data discovery
import pystac_client
import planetary_computer

# Suppress warnings for cleaner output
import warnings
warnings.filterwarnings('ignore')

print("✅ All libraries imported successfully!")
print(f"Xarray version: {xr.__version__}")
print(f"Dask version: {dask.__version__}")

## Demo 1: Introduction to Xarray and Data Cubes

Let's start with the fundamental concept of data cubes using Xarray:

In [None]:
# Load a sample atmospheric dataset (built into xarray)
ds = xr.tutorial.open_dataset("air_temperature")

print("Dataset structure:")
print(ds)
print("\nDimensions and coordinates:")
print(f"Time: {ds.time.size} timesteps from {ds.time.values[0]} to {ds.time.values[-1]}")
print(f"Latitude: {ds.lat.size} points from {ds.lat.min().values}° to {ds.lat.max().values}°")
print(f"Longitude: {ds.lon.size} points from {ds.lon.min().values}° to {ds.lon.max().values}°")

In [None]:
# Demonstrate data cube slicing and selection
# Select data for a specific time
single_time = ds.air.isel(time=0)

# Select data for a specific location over time
time_series = ds.air.sel(lat=50, lon=260, method='nearest')

# Select data for a geographic region
region = ds.air.sel(lat=slice(40, 60), lon=slice(240, 280))

print(f"Single time slice shape: {single_time.shape}")
print(f"Time series shape: {time_series.shape}")
print(f"Regional subset shape: {region.shape}")

# Demonstrate powerful operations
seasonal_mean = ds.air.groupby('time.season').mean()
print(f"Seasonal mean shape: {seasonal_mean.shape}")
print(f"Seasons: {list(seasonal_mean.season.values)}")

In [None]:
# Visualization: Create a multi-panel plot
fig, axes = plt.subplots(2, 2, figsize=(15, 10), 
                        subplot_kw={'projection': ccrs.PlateCarree()})

# Plot seasonal means
seasons = ['DJF', 'MAM', 'JJA', 'SON']
titles = ['Winter (DJF)', 'Spring (MAM)', 'Summer (JJA)', 'Fall (SON)']

for i, (season, title) in enumerate(zip(seasons, titles)):
    ax = axes[i//2, i%2]
    
    # Plot the data
    im = seasonal_mean.sel(season=season).plot(
        ax=ax, transform=ccrs.PlateCarree(),
        cmap='coolwarm', add_colorbar=False,
        vmin=240, vmax=300
    )
    
    # Add geographic features
    ax.add_feature(cfeature.COASTLINE, linewidth=0.5)
    ax.add_feature(cfeature.BORDERS, linewidth=0.3)
    ax.set_title(title, fontsize=12, fontweight='bold')
    ax.set_global()

# Add a colorbar
plt.tight_layout()
cbar = plt.colorbar(im, ax=axes, orientation='horizontal', 
                   fraction=0.05, pad=0.1, aspect=50)
cbar.set_label('Temperature (K)', fontsize=12)

plt.suptitle('Seasonal Temperature Patterns - Demonstrating Data Cube Concepts', 
             fontsize=14, fontweight='bold', y=1.02)
plt.show()

## Demo 2: STAC Data Discovery

Now let's demonstrate how to discover real satellite data using STAC (SpatioTemporal Asset Catalog):

In [None]:
# Connect to Microsoft Planetary Computer STAC catalog
catalog = pystac_client.Client.open(
    "https://planetarycomputer.microsoft.com/api/stac/v1",
    modifier=planetary_computer.sign_inplace,
)

print("Available collections:")
collections = list(catalog.get_collections())
eo_collections = [c for c in collections if 'sentinel' in c.id.lower() or 'landsat' in c.id.lower()]

for collection in eo_collections[:10]:  # Show first 10 EO collections
    print(f"- {collection.id}: {collection.title}")

In [None]:
# Search for Sentinel-2 data over a specific area and time
# Let's use the Baltic region as an example (relevant to Riga location)
bbox = [20.0, 54.0, 28.0, 60.0]  # Baltic region bounding box
datetime_range = "2023-06-01/2023-06-30"  # Summer 2023

search = catalog.search(
    collections=["sentinel-2-l2a"],
    bbox=bbox,
    datetime=datetime_range,
    query={"eo:cloud_cover": {"lt": 20}}  # Less than 20% cloud cover
)

items = search.item_collection()
print(f"Found {len(items)} Sentinel-2 scenes with <20% cloud cover")

# Display information about the first few items
for i, item in enumerate(items[:3]):
    print(f"\nScene {i+1}:")
    print(f"  Date: {item.datetime.strftime('%Y-%m-%d')}")
    print(f"  Cloud cover: {item.properties.get('eo:cloud_cover', 'N/A')}%")
    print(f"  Assets: {list(item.assets.keys())[:5]}...")  # Show first 5 assets

In [None]:
# Let's examine one item in detail
if len(items) > 0:
    item = items[0]
    print(f"Detailed information for scene from {item.datetime.strftime('%Y-%m-%d')}:")
    print(f"\nSpatial coverage:")
    print(f"  Bounding box: {item.bbox}")
    
    print(f"\nAvailable bands/assets:")
    for asset_key, asset in item.assets.items():
        if asset_key in ['B02', 'B03', 'B04', 'B08']:  # Main visible/NIR bands
            print(f"  {asset_key}: {asset.title} - {asset.href}")
    
    print(f"\nProperties:")
    interesting_props = ['eo:cloud_cover', 'proj:epsg', 'datetime']
    for prop in interesting_props:
        if prop in item.properties:
            print(f"  {prop}: {item.properties[prop]}")
else:
    print("No items found for the specified criteria.")

## Demo 3: Cloud-Optimized Data Access

Now let's demonstrate how to access and process satellite data efficiently using cloud-optimized formats:

In [None]:
# Load Sentinel-2 data using xarray and rioxarray
if len(items) > 0:
    item = items[0]
    
    # Load specific bands (Red, Green, Blue, NIR)
    bands_to_load = ['B04', 'B03', 'B02', 'B08']  # Red, Green, Blue, NIR
    band_data = {}
    
    for band in bands_to_load:
        if band in item.assets:
            print(f"Loading {band}...")
            # Note: In a real environment, you would load the actual data
            # Here we'll create a placeholder for demonstration
            band_data[band] = f"Would load: {item.assets[band].href}"
    
    print("\nData loading complete!")
    print("In a real environment, each band would be loaded as an xarray DataArray")
    
    # Demonstrate what the loaded data structure would look like
    print("\nExpected data structure:")
    print("- Shape: (height, width) e.g., (10980, 10980) for 10m bands")
    print("- Coordinates: x (easting), y (northing)")
    print("- CRS: UTM zone (automatically detected)")
    print("- Chunks: Automatically chunked for parallel processing")
else:
    print("Creating synthetic data for demonstration...")
    
    # Create synthetic satellite-like data
    x = np.linspace(20, 28, 800)
    y = np.linspace(54, 60, 600)
    
    # Simulate NDVI-like patterns
    xx, yy = np.meshgrid(x, y)
    ndvi_pattern = 0.3 + 0.4 * np.sin(xx * 3) * np.cos(yy * 2) + 0.1 * np.random.random((600, 800))
    ndvi_pattern = np.clip(ndvi_pattern, -1, 1)
    
    # Create xarray dataset
    synthetic_data = xr.Dataset({
        'ndvi': (['y', 'x'], ndvi_pattern)
    }, coords={
        'x': x,
        'y': y
    })
    
    print(f"Synthetic dataset created: {synthetic_data.ndvi.shape}")

In [None]:
# Demonstrate cloud-native processing concepts
print("Cloud-native processing benefits:")
print("\n1. Lazy Loading:")
print("   - Data is only loaded when needed (when .compute() is called)")
print("   - Allows working with datasets larger than memory")

print("\n2. Chunked Processing:")
print("   - Large arrays are split into smaller chunks")
print("   - Each chunk can be processed in parallel")
print("   - Scales from local machine to cloud clusters")

print("\n3. Efficient I/O:")
print("   - Only read the data you need (spatial/temporal subsetting)")
print("   - Compressed formats reduce transfer time")
print("   - Co-location of data and compute reduces latency")

# Demonstrate lazy operations
if 'synthetic_data' in locals():
    print("\nDemonstrating lazy operations:")
    
    # Create a lazy operation
    subset = synthetic_data.ndvi.sel(x=slice(22, 26), y=slice(56, 58))
    mean_value = subset.mean()
    
    print(f"Created lazy operation - no computation yet")
    print(f"Result type: {type(mean_value.data)}")
    
    # Now compute the result
    computed_mean = mean_value.compute()
    print(f"Computed result: {computed_mean.values:.4f}")

## Demo 4: Parallel Processing with Dask

Let's demonstrate how Dask enables parallel processing for large-scale EO applications:

In [None]:
# Create a larger synthetic dataset to demonstrate parallel processing
print("Creating large synthetic dataset...")

# Simulate a time series of satellite images
n_times = 50
height, width = 2000, 2000
dates = pd.date_range('2023-01-01', periods=n_times, freq='W')

# Create chunked dask array (this is key for parallel processing)
chunks = (10, 500, 500)  # (time, y, x)
data_array = da.random.random((n_times, height, width), chunks=chunks)

# Add some realistic patterns
# Seasonal trend
seasonal_trend = np.sin(np.arange(n_times) * 2 * np.pi / 52) * 0.3 + 0.5
data_array = data_array + seasonal_trend[:, None, None]

# Create xarray dataset with proper coordinates
x_coords = np.linspace(20, 30, width)
y_coords = np.linspace(50, 60, height)

large_dataset = xr.Dataset({
    'vegetation_index': (['time', 'y', 'x'], data_array)
}, coords={
    'time': dates,
    'y': y_coords,
    'x': x_coords
})

print(f"Dataset shape: {large_dataset.vegetation_index.shape}")
print(f"Chunk sizes: {large_dataset.vegetation_index.chunks}")
print(f"Total size: {large_dataset.vegetation_index.nbytes / 1e9:.2f} GB")

In [None]:
# Demonstrate parallel operations
print("Performing parallel computations...")

# 1. Temporal statistics (computed in parallel across time)
annual_mean = large_dataset.vegetation_index.mean(dim='time')
annual_std = large_dataset.vegetation_index.std(dim='time')

# 2. Spatial statistics (computed in parallel across space)
spatial_mean = large_dataset.vegetation_index.mean(dim=['x', 'y'])

# 3. Moving window operations
monthly_mean = large_dataset.vegetation_index.rolling(time=4, center=True).mean()

print("All operations defined (lazy evaluation)")
print("\nComputing results...")

# Show task graph information
print(f"Annual mean task graph: {annual_mean.data.npartitions} partitions")

# Compute one result to demonstrate
result = spatial_mean.compute()
print(f"Spatial mean computed: {len(result)} time points")

In [None]:
# Visualize the parallel processing results
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 5))

# Plot time series
spatial_mean.plot(ax=ax1, x='time')
ax1.set_title('Regional Mean Vegetation Index Over Time')
ax1.set_ylabel('Vegetation Index')
ax1.grid(True, alpha=0.3)

# Plot a spatial slice (compute a small subset)
subset = large_dataset.vegetation_index.isel(time=25, x=slice(0, 500), y=slice(0, 500))
computed_subset = subset.compute()

im = computed_subset.plot(ax=ax2, cmap='viridis', add_colorbar=False)
ax2.set_title('Spatial Pattern (Week 25)')
ax2.set_aspect('equal')

plt.colorbar(im, ax=ax2, label='Vegetation Index')
plt.tight_layout()
plt.show()

print("\nKey benefits of parallel processing:")
print("✅ Process datasets larger than memory")
print("✅ Automatic parallelization across CPU cores")
print("✅ Scales to distributed computing clusters")
print("✅ Familiar NumPy/Pandas-like interface")

## Demo 5: Integration with ESA Services

Let's demonstrate how these concepts integrate with ESA's operational services:

In [None]:
# Simulate connection to ESA services
print("ESA Service Integration Examples:")
print("\n1. Copernicus Data Space Ecosystem:")
print("   - Direct access to Sentinel missions data")
print("   - Integrated processing environment")
print("   - No data download required")

print("\n2. EOPF (Earth Observation Processing Framework):")
print("   - Harmonized access across missions")
print("   - Zarr format for cloud optimization")
print("   - Consistent APIs and data models")

print("\n3. Cubes & Clouds Educational Platform:")
print("   - Hands-on learning environment")
print("   - Community collaboration projects")
print("   - Best practices and workflows")

# Example of how you would access these services
service_examples = {
    "Copernicus Data Space": "https://dataspace.copernicus.eu/",
    "EOPF Sample Service": "https://zarr.eopf.copernicus.eu/",
    "Cubes & Clouds": "https://eo-college.org/courses/cubes-and-clouds",
    "STAC Browser": "https://stac.browser.user.eopf.eodc.eu"
}

print("\nService Access Points:")
for service, url in service_examples.items():
    print(f"  {service}: {url}")

In [None]:
# Demonstrate workflow integration concepts
print("Complete Workflow Integration:")
print("\n📡 Data Discovery (STAC)")
print("   ↓")
print("☁️  Cloud Access (COG/Zarr)")
print("   ↓")
print("🐍 Python Processing (Xarray/Dask)")
print("   ↓")
print("📊 Analysis & Visualization")
print("   ↓")
print("🔄 Sharing & Collaboration (FAIR)")

# Example workflow code structure
workflow_example = '''
# 1. Discovery
items = catalog.search(collections=["sentinel-2"], bbox=bbox, datetime=date_range)

# 2. Access
dataset = xr.open_dataset(items[0].assets['data'].href, engine='zarr')

# 3. Process
ndvi = (dataset.nir - dataset.red) / (dataset.nir + dataset.red)
result = ndvi.compute()

# 4. Share
result.to_netcdf('ndvi_analysis.nc')
'''

print("\nExample workflow structure:")
print(workflow_example)

## Summary and Key Takeaways

This demonstration notebook has shown:

In [None]:
# Summary of what we've demonstrated
takeaways = {
    "Data Cube Concepts": [
        "N-dimensional labeled arrays with Xarray",
        "Efficient slicing and selection operations",
        "Built-in operations (groupby, resample, etc.)"
    ],
    "Cloud-Native Access": [
        "STAC for standardized data discovery",
        "Lazy loading and chunked processing",
        "Efficient I/O with cloud-optimized formats"
    ],
    "Parallel Processing": [
        "Dask for automatic parallelization",
        "Scaling from local to distributed computing",
        "Memory-efficient operations on large datasets"
    ],
    "ESA Integration": [
        "Real-world operational services",
        "Community-driven development",
        "End-to-end workflow examples"
    ]
}

print("🎯 Key Takeaways from Session 1:")
print("=" * 50)

for category, points in takeaways.items():
    print(f"\n📌 {category}:")
    for point in points:
        print(f"   • {point}")

print("\n" + "=" * 50)
print("🔜 Next: Session 2 - FAIR Data & Open Science")
print("   We'll build on these technical foundations to create")
print("   sustainable, collaborative EO workflows.")

## Additional Resources

For continued learning and exploration:

### Documentation
- **[Xarray User Guide](https://docs.xarray.dev/en/stable/user-guide/index.html)**
- **[Dask Documentation](https://docs.dask.org/en/stable/)**
- **[STAC Specification](https://stacspec.org/)**
- **[Pangeo Tutorials](https://pangeo.io/tutorials.html)**

### Hands-on Practice
- **[EOPF Sample Notebooks](https://eopf-sample-service.github.io/eopf-sample-notebooks/)**
- **[Cubes & Clouds Course](https://eo-college.org/courses/cubes-and-clouds)**
- **[Pangeo Gallery](https://gallery.pangeo.io/)**

### Community
- **[Pangeo Discourse Forum](https://discourse.pangeo.io/)**
- **[EO College Community](https://eo-college.org/)**
- **[STAC Community](https://github.com/radiantearth/stac-spec)**