# Part 2: 時空處理與儲存優化 (30 min)

這個 notebook 會介紹：

1. **時間 Resampling**：hourly → daily，計算統計量
2. **多檔案讀取**：使用 intake catalog 串接多年資料
3. **空間統計**：區域平均、gradient 計算
4. **Rechunking 策略**：根據分析模式調整 chunk
5. **儲存為 Zarr**：優化 metadata 和壓縮設定

---

## 學習目標

- 掌握 `.resample()` 和 `.groupby()` 的時間聚合
- 理解多檔案讀取的記憶體管理
- 學會選擇合適的 chunking 策略
- 正確儲存中間結果，避免重複計算

## 0. 環境準備

In [None]:
import dask
from dask.distributed import Client
import xarray as xr
import intake
import numpy as np
import matplotlib.pyplot as plt
from pathlib import Path

# 啟動 Dask Client
client = Client(processes=False, n_workers=2, threads_per_worker=2, memory_limit='2GB')
print(f"Dask Dashboard: {client.dashboard_link}")

# 載入 catalog
catalog = intake.open_catalog('catalog.yaml')

## 1. 時間 Resampling：Hourly → Daily

### 為什麼需要 Resampling？

原始 ERA5 資料是 hourly，但許多分析需要：
- **Daily mean**：日平均溫度、濕度
- **Daily max**：日最大 CAPE（對流潛勢）
- **Daily accumulation**：日累積降雨

Resampling 的優勢：
1. **減少資料量**：8760 hours → 365 days (減少 24 倍)
2. **降低雜訊**：平滑短期波動
3. **符合分析需求**：許多氣候指標是 daily 尺度

### Xarray 的 `.resample()` 方法

`.resample()` 類似 pandas，但是 **lazy** 的：
```python
ds.resample(time='1D').mean()  # 不會立即執行
```

In [None]:
# 載入 2019 年資料
ds = catalog.era5_2019.to_dask()

print("原始資料：")
print(f"  Time range: {ds.time.values[0]} to {ds.time.values[-1]}")
print(f"  Time steps: {len(ds.time)}")
print(f"  Frequency: hourly")
print(f"  Size: {ds.nbytes / 1e9:.2f} GB")

### 1.1 計算 Daily Mean

In [None]:
# Daily mean: 每天 24 小時的平均
# 'D' = calendar day, '1D' = 1 day
ds_daily_mean = ds.resample(time='1D').mean()

print("Daily mean:")
print(f"  Time steps: {len(ds_daily_mean.time)}")
print(f"  Size: {ds_daily_mean.nbytes / 1e9:.2f} GB")
print(f"  Reduction: {ds.nbytes / ds_daily_mean.nbytes:.1f}x")
print()
print(ds_daily_mean)

### 1.2 不同變數使用不同統計量

對於對流預報，我們可能需要：
- **CAPE**: daily maximum（最大不穩定度）
- **CIN**: daily mean（平均抑制）
- **K-index**: daily mean
- **BLH**: daily maximum（最高邊界層）

Xarray 允許對不同變數做不同操作：

In [None]:
# 分別計算
cape_daily_max = ds['cape'].resample(time='1D').max()
cin_daily_mean = ds['cin'].resample(time='1D').mean()
kindex_daily_mean = ds['k_index'].resample(time='1D').mean()
blh_daily_max = ds['blh'].resample(time='1D').max()

# 合併成新的 Dataset
ds_daily = xr.Dataset({
    'cape_max': cape_daily_max,
    'cin_mean': cin_daily_mean,
    'kindex_mean': kindex_daily_mean,
    'blh_max': blh_daily_max
})

print("Custom daily statistics:")
print(ds_daily)

### 1.3 執行並視覺化結果

我們選取台灣附近區域，計算並繪圖。

In [None]:
# 選取台灣區域的 7 月資料
ds_taiwan_july = ds_daily.sel(
    time='2019-07',
    latitude=slice(22, 26),
    longitude=slice(120, 122)
)

# 計算月平均
cape_monthly = ds_taiwan_july['cape_max'].mean(dim='time').compute()

# 繪圖
plt.figure(figsize=(8, 6))
cape_monthly.plot(cmap='YlOrRd', vmin=0, vmax=2000)
plt.title('Mean of Daily Max CAPE (July 2019, Taiwan)', fontsize=12)
plt.xlabel('Longitude')
plt.ylabel('Latitude')
plt.tight_layout()
plt.show()

## 2. 多檔案讀取：跨年度分析

### 情境：計算 2019-2023 年的氣候態

氣候態（climatology）= 多年平均的季節變化。例如：
- 1 月的平均 CAPE
- 7 月的平均 BLH

這需要讀取所有年份的資料。

### 方法 1：逐年讀取並串接

In [None]:
# 讀取所有年份
years = [2019, 2020]  # Demo: 只用兩年資料，減少計算量
datasets = []

for year in years:
    ds_year = catalog[f'era5_{year}'].to_dask()
    datasets.append(ds_year)
    print(f"Loaded {year}: {len(ds_year.time)} time steps")

# 沿著時間軸串接
ds_all = xr.concat(datasets, dim='time')

print(f"\nCombined dataset:")
print(f"  Time range: {ds_all.time.values[0]} to {ds_all.time.values[-1]}")
print(f"  Total time steps: {len(ds_all.time)}")
print(f"  Total size: {ds_all.nbytes / 1e9:.2f} GB")

### 重要：這裡沒有載入資料！

雖然我們「串接」了 5 年資料（~135 GB），但實際上：
- **沒有讀取任何 Zarr 檔案**
- **沒有佔用 135 GB 記憶體**
- 只是建立了一個**虛擬的 Dataset**，記錄了資料的位置

這就是 lazy evaluation 的威力。

## 3. 計算氣候態：Groupby 操作

### 什麼是 Groupby？

Groupby 是「split-apply-combine」模式：
1. **Split**：按照某個規則分組（例如按月份）
2. **Apply**：對每組做相同操作（例如計算平均）
3. **Combine**：合併結果

在氣候分析中常用的 groupby：
- `.groupby('time.month')`：按月份分組（1-12月）
- `.groupby('time.season')`：按季節分組（DJF, MAM, JJA, SON）
- `.groupby('time.dayofyear')`：按一年中的第幾天（1-365）

In [None]:
# 計算每個月份的氣候態（5 年平均）
cape_climatology = ds_all['cape'].groupby('time.month').mean(dim='time')

print("CAPE Climatology:")
print(cape_climatology)
print()
print(f"Dimensions: {cape_climatology.dims}")
print(f"Shape: {cape_climatology.shape}")
print(f"Coordinates: {list(cape_climatology.coords)}")

### 理解結果

原本的維度：`(time, latitude, longitude)`
- time: 43800 (5 years * 8760 hours)

Groupby 後的維度：`(month, latitude, longitude)`
- month: 12

每個月份的值 = 5 年中所有該月份資料的平均。

### 3.1 計算並繪製季節變化

In [None]:
# 選取台灣區域，計算氣候態
cape_taiwan = ds_all['cape'].sel(
    latitude=slice(22, 26),
    longitude=slice(120, 122)
)

# 計算區域平均的月氣候態
cape_monthly_clim = cape_taiwan.groupby('time.month').mean(dim=['time', 'latitude', 'longitude'])

# 執行計算
result = cape_monthly_clim.compute()

# 繪圖
plt.figure(figsize=(10, 5))
result.plot(marker='o', linewidth=2, markersize=8)
plt.xlabel('Month', fontsize=12)
plt.ylabel('CAPE (J/kg)', fontsize=12)
plt.title('Taiwan Area-Averaged CAPE Climatology (2019-2023)', fontsize=13)
plt.grid(alpha=0.3)
plt.xticks(range(1, 13))
plt.tight_layout()
plt.show()

print("Interpretation:")
print(f"  Max CAPE month: {result.values.argmax() + 1}")
print(f"  Min CAPE month: {result.values.argmin() + 1}")
print(f"  Summer mean (JJA): {result.sel(month=[6,7,8]).mean().values:.0f} J/kg")
print(f"  Winter mean (DJF): {result.sel(month=[12,1,2]).mean().values:.0f} J/kg")

## 4. 空間統計：計算 Gradient

在氣象分析中，變數的空間梯度（gradient）很重要：
- **溫度梯度**：鋒面位置
- **CAPE 梯度**：對流邊界

Xarray 提供 `.differentiate()` 來計算導數。

In [None]:
# 選取一個時間點
cape_snapshot = ds_all['cape'].isel(time=0)

# 計算經向（latitude）梯度
# 單位：CAPE per degree latitude
cape_grad_lat = cape_snapshot.differentiate('latitude')

# 計算緯向（longitude）梯度
cape_grad_lon = cape_snapshot.differentiate('longitude')

# 計算梯度強度（magnitude）
cape_grad_mag = np.sqrt(cape_grad_lat**2 + cape_grad_lon**2)

print("Gradient calculation (still lazy):")
print(f"  Type: {type(cape_grad_mag.data)}")
print(f"  Shape: {cape_grad_mag.shape}")

## 5. Rechunking：最佳化 Chunk 策略

### 為什麼需要 Rechunk？

原始 Zarr 的 chunking 是 `(744, 121, 161)`（時間約 1 個月）。

這適合：
- ✅ 計算時間平均、時間序列分析
- ✅ 空間場的操作

但**不適合**：
- ❌ 單點或小區域的長時間序列
- ❌ ML training（需要時間維度切小塊）

### Rechunking 的成本

Rechunking **不是免費的**：
- 需要讀取所有原始資料
- 重新組織並寫入新的 chunks
- I/O intensive（可能需要數小時）

因此：
1. **先確定分析模式**再 rechunk
2. Rechunk 的結果應該**儲存下來**，不要每次都重新算
3. 使用 intermediate Zarr 作為中間產物

### 5.1 情境：為 ML Training 最佳化

對於後續的 ML pipeline，我們需要：
- 小的時間 batch（例如 32 time steps）
- 完整的空間 patch（例如 16x16）

理想的 chunking：`(32, 16, 16)`

In [None]:
# 使用 daily mean 資料（已經從 hourly 降到 daily）
ds_daily_all = ds_all.resample(time='1D').mean()

print("Before rechunk:")
print(f"  Original chunks: {ds_daily_all['cape'].chunks}")
print(f"  Total size: {ds_daily_all.nbytes / 1e9:.2f} GB")
print()

# Rechunk 為適合 ML 的結構
ds_rechunked = ds_daily_all.chunk({
    'time': 32,        # 每個 batch 32 天
    'latitude': 16,    # 16x16 空間 patch
    'longitude': 16
})

print("After rechunk:")
print(f"  New chunks: {ds_rechunked['cape'].chunks}")
print(f"  Number of chunks: {ds_rechunked['cape'].data.npartitions}")
print()

# 計算單一 chunk 的大小
chunk_size = 32 * 16 * 16 * 4  # float32 = 4 bytes
print(f"Single chunk size: {chunk_size / 1024:.2f} KB")
print(f"  Status: {chunk_size / (1024**2):.2f} MB - 在理想範圍內 (10-100 MB)")

### Chunk Size 的權衡

| Chunk Size | 優點 | 缺點 | 適用情境 |
|------------|------|------|----------|
| < 1 MB | 極細粒度平行化 | Overhead 過大 | 不推薦 |
| 1-10 MB | 高度平行化 | 排程成本仍高 | 大量 workers |
| **10-100 MB** | **平衡** | **- ** | **大多數情況** |
| 100-500 MB | 低 overhead | 平行度受限 | 少量複雜計算 |
| > 500 MB | 最低 overhead | 記憶體壓力、無法平行 | 不推薦 |

我們的 32 KB 太小了！讓我們調整：

In [None]:
# 更合理的 chunking
ds_rechunked = ds_daily_all.chunk({
    'time': 32,        # 32 天
    'latitude': 60,    # 60 個緯度點
    'longitude': 80    # 80 個經度點
})

chunk_size = 32 * 60 * 80 * 4 * 4  # 4 variables, float32
print(f"Optimized chunk size: {chunk_size / (1024**2):.2f} MB")
print(f"Number of chunks per variable: {ds_rechunked['cape'].data.npartitions}")
print(f"Total chunks (4 variables): {ds_rechunked['cape'].data.npartitions * 4}")

## 6. 儲存為 Zarr：固化中間結果

### 為什麼要儲存中間結果？

假設我們的 workflow 是：
1. 讀取 5 年 hourly 資料
2. Resample 到 daily
3. Rechunk
4. 用於 ML training

如果每次 training 都重複 1-3 步驟：
- 每次都要讀取 ~135 GB 原始資料
- 重複計算 resample（耗費 CPU）
- 重複 rechunk（大量 I/O）

**更好的做法**：
1. 算一次 daily + rechunked 資料
2. 儲存為新的 Zarr（可能 ~6 GB）
3. 後續直接讀取這個優化過的 Zarr

### 6.1 準備要儲存的資料

In [None]:
# 選取 2019 年，resample 到 daily mean
ds_to_save = ds.resample(time='1D').mean()

# Rechunk 到適合 ML 的結構
ds_to_save = ds_to_save.chunk({
    'time': 32,
    'latitude': 60,
    'longitude': 80
})

print("Dataset to save:")
print(ds_to_save)
print()
print(f"Original size (hourly): {ds.nbytes / 1e9:.2f} GB")
print(f"New size (daily): {ds_to_save.nbytes / 1e9:.2f} GB")
print(f"Reduction: {ds.nbytes / ds_to_save.nbytes:.1f}x")

### 6.2 儲存為 Zarr

Zarr 的儲存參數：

1. **mode='w'**: 覆寫模式（小心！會刪除舊資料）
2. **consolidated=True**: 將 metadata 集中儲存
   - 優點：減少讀取時的 HTTP requests（雲端友善）
   - 缺點：不支援 append（對我們沒影響）
3. **encoding**: 設定壓縮、chunking
   - compressor: 使用 Blosc 壓縮（CPU-friendly）
   - chunks: 明確指定 chunk shape

In [None]:
from numcodecs import Blosc

# 輸出路徑
output_path = 'outputs/era5_2019_daily_optimized.zarr'

# 設定 encoding（壓縮與 chunking）
compressor = Blosc(cname='zstd', clevel=3, shuffle=Blosc.SHUFFLE)

encoding = {
    var: {
        'compressor': compressor,
        'chunks': (32, 60, 80)  # 明確指定 chunk shape
    }
    for var in ds_to_save.data_vars
}

print("Saving to Zarr...")
print(f"  Output: {output_path}")
print(f"  Compressor: Blosc (zstd, level 3)")
print(f"  Chunks: (32, 60, 80)")
print()

# 執行儲存
# compute=True: 立即執行（而不是 lazy）
# consolidated=True: 建立 .zmetadata
ds_to_save.to_zarr(
    output_path,
    mode='w',
    encoding=encoding,
    consolidated=True
)

print("✓ Saved successfully!")

### 儲存過程說明

這個操作會：
1. **讀取原始 Zarr**：從 `/home/sungche/NAS/dataset/era5/`
2. **執行 resample**：hourly → daily mean
3. **Rechunk**：重組為新的 chunk 結構
4. **壓縮**：使用 Blosc 壓縮（約 30-50% 壓縮率）
5. **寫入新 Zarr**：到 `outputs/` 目錄
6. **建立 consolidated metadata**：加速後續讀取

這個過程可能需要 **數分鐘到數十分鐘**，取決於磁碟速度和 worker 數量。

**觀察建議**：切換到 Dask Dashboard 的 Task Stream，看到大量平行的 read → compute → write 任務。

### 6.3 驗證儲存結果

In [None]:
# 重新讀取剛儲存的 Zarr
ds_loaded = xr.open_zarr(output_path, consolidated=True)

print("Loaded dataset:")
print(ds_loaded)
print()
print(f"Chunks: {ds_loaded['cape'].chunks}")
print()

# 驗證資料正確性：計算一個簡單統計量
original_mean = ds_to_save['cape'].mean().compute()
loaded_mean = ds_loaded['cape'].mean().compute()

print("Verification:")
print(f"  Original mean: {original_mean.values:.2f}")
print(f"  Loaded mean: {loaded_mean.values:.2f}")
print(f"  Difference: {abs(original_mean.values - loaded_mean.values):.6f}")
print(f"  Status: {'✓ Match' if abs(original_mean.values - loaded_mean.values) < 0.01 else '✗ Mismatch'}")

### 6.4 檢查磁碟空間使用

In [None]:
import subprocess

# 使用 du 指令查看目錄大小
result = subprocess.run(
    ['du', '-sh', output_path],
    capture_output=True,
    text=True
)

print(f"Disk usage: {result.stdout.strip()}")
print()
print("Comparison:")
print(f"  Original (hourly, uncompressed): ~27 GB")
print(f"  Saved (daily, compressed): see above")
print(f"  Expected: ~1.1 GB (24x smaller + compression)")

## 7. 壓縮選項比較

不同的壓縮器有不同的權衡：

| Compressor | 壓縮率 | 壓縮速度 | 解壓速度 | 適用情境 |
|------------|--------|----------|----------|----------|
| None | 1.0x | - | - | 快速原型、SSD |
| Blosc (lz4) | 2-3x | 極快 | 極快 | 預設選擇 |
| **Blosc (zstd)** | **3-5x** | **快** | **快** | **推薦** |
| gzip | 4-6x | 慢 | 中 | 長期封存、網路傳輸 |
| zlib | 3-5x | 慢 | 中 | 相容性優先 |

**我們使用 Blosc (zstd, level=3)**：
- 平衡壓縮率與速度
- CPU overhead 低（重要！因為我們還要做計算）
- 解壓快速（讀取時不會成為瓶頸）

如果需要更高壓縮率（例如資料要傳輸到其他地方），可以用：
```python
compressor = Blosc(cname='zstd', clevel=9, shuffle=Blosc.BITSHUFFLE)
```

## 8. 檢查點：你應該理解的概念

完成這個 notebook 後，你應該能夠：

- [ ] 使用 `.resample()` 進行時間聚合
- [ ] 使用 `.groupby()` 計算氣候態
- [ ] 用 `xr.concat()` 串接多個 Dataset
- [ ] 理解 rechunking 的時機和成本
- [ ] 根據分析模式選擇合適的 chunk size
- [ ] 正確設定壓縮參數並儲存 Zarr
- [ ] 使用 consolidated metadata 加速讀取

### Workflow 總結

處理大型 N-D array 的標準流程：

```
原始資料 (Zarr)
    ↓ (lazy operations)
選取 / 切片 / 聚合
    ↓
Rechunk（如果需要）
    ↓ (.to_zarr())
優化的中間 Zarr
    ↓ (用於後續分析)
ML / 視覺化 / 統計
```

關鍵：**把耗時的前處理做一次，儲存下來，重複使用**。

## 9. 小練習（可選）

1. **計算季節氣候態**：使用 `.groupby('time.season')` 計算 DJF, MAM, JJA, SON 的 CAPE 平均
2. **異常值偵測**：找出 CAPE > 5000 J/kg 的極端事件，統計發生頻率
3. **實驗 chunking**：比較不同 chunk 策略對 `.mean(dim='time')` 的影響
4. **壓縮率測試**：用不同的 compressor 儲存相同資料，比較磁碟空間和速度

## 下一步

在 **Notebook 3** 中，我們會進入 ML Pipeline：
- 定義對流分類任務（什麼時候算「對流」？）
- 使用 xbatcher 產生訓練 batches
- 整合 PyTorch DataLoader
- 訓練一個簡單的 CNN 模型
- 用 xskillscore 進行空間驗證

這是整個 workshop 的高潮，會展示完整的「大型陣列 → ML」工作流。

In [None]:
# 如果要關閉 Client（釋放資源）
# client.close()