In [6]:
import xarray as xr
import pandas as pd
import numpy as np
import stackstac
import planetary_computer as pc
from dask.distributed import Client, LocalCluster
import matplotlib.pyplot as plt
import warnings
import sys
import os

warnings.filterwarnings("ignore")



In [2]:
# Start Dask (safe + required)
cluster = LocalCluster(
    n_workers=4,
    threads_per_worker=2,
    memory_limit="8GB"
)
client = Client(cluster)

client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 4
Total threads: 8,Total memory: 29.80 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:41133,Workers: 4
Dashboard: http://127.0.0.1:8787/status,Total threads: 8
Started: Just now,Total memory: 29.80 GiB

0,1
Comm: tcp://127.0.0.1:40695,Total threads: 2
Dashboard: http://127.0.0.1:34979/status,Memory: 7.45 GiB
Nanny: tcp://127.0.0.1:42037,
Local directory: /tmp/dask-scratch-space/worker-1rljhdst,Local directory: /tmp/dask-scratch-space/worker-1rljhdst

0,1
Comm: tcp://127.0.0.1:39807,Total threads: 2
Dashboard: http://127.0.0.1:43365/status,Memory: 7.45 GiB
Nanny: tcp://127.0.0.1:43715,
Local directory: /tmp/dask-scratch-space/worker-ecafn9hh,Local directory: /tmp/dask-scratch-space/worker-ecafn9hh

0,1
Comm: tcp://127.0.0.1:36901,Total threads: 2
Dashboard: http://127.0.0.1:39895/status,Memory: 7.45 GiB
Nanny: tcp://127.0.0.1:33043,
Local directory: /tmp/dask-scratch-space/worker-me63wrlc,Local directory: /tmp/dask-scratch-space/worker-me63wrlc

0,1
Comm: tcp://127.0.0.1:37885,Total threads: 2
Dashboard: http://127.0.0.1:46737/status,Memory: 7.45 GiB
Nanny: tcp://127.0.0.1:45047,
Local directory: /tmp/dask-scratch-space/worker-2znakfgw,Local directory: /tmp/dask-scratch-space/worker-2znakfgw


In [7]:
sys.path.append(os.path.abspath("../utils"))

# Import mudular function
from env_variables_utils import environmental_variables

In [11]:
bbox = (-82.7167, 27.5833, -82.3833, 28.0333) 
start_date = "2019-01-01"
end_date = "2024-12-31"

env_data = environmental_variables(
    bbox=bbox,
    start_date=start_date,
    end_date=end_date,
    variables=["sst", "precip"]
)

sst_lazy = env_data["sst"]          # xarray DataArray (lazy)
precip_items = env_data["precip"]   # list of STAC Items

type(sst_lazy), type(precip_items)


Note: SST conversion (Kelvin to Celsius) applied.
Precipitation search failed or timed out: The request exceeded the maximum allowed time, please try again. If the issue persists, please contact planetarycomputer@microsoft.com.




(xarray.core.dataarray.DataArray, NoneType)

In [9]:
# Refresh SAS tokens (REQUIRED)
for item in precip_items:
    pc.sign_inplace(item)

# Make start/end timezone-aware (UTC)
start = pd.Timestamp("2019-01-01", tz="UTC")
end = pd.Timestamp("2024-12-31", tz="UTC")

# Filter items strictly to 2019–2024
precip_filtered = [
    item for item in precip_items
    if start <= pd.to_datetime(item.datetime) <= end
]

len(precip_filtered)


TypeError: 'NoneType' object is not iterable

In [None]:
# Stack STAC items (NO chunks here)
precip_stack = stackstac.stack(
    precip_filtered,
    assets=["cog"],
    epsg=32617,
    fill_value=np.nan
)

# Apply chunking AFTER stacking (this is the correct pattern)
precip_stack = precip_stack.chunk({"time": 30})

# Monthly aggregation
precip_monthly = (
    precip_stack
    .mean(dim=["x", "y"])          # spatial mean
    .squeeze(drop=True)
    .resample(time="1ME")          # monthly
    .sum(min_count=1)              # precipitation sum
)


In [None]:
final_ds = xr.Dataset({
    "sst": sst_lazy,
    "precip": precip_monthly
})

final_df = final_ds.compute().to_dataframe()
final_df.head()


In [None]:
plt.figure(figsize=(12, 5))
plt.plot(final_df.index, final_df["sst"], label="Monthly SST (°C)")
plt.plot(final_df.index, final_df["precip"], label="Monthly Precip (mm)")
plt.title("Monthly SST & Precipitation (2019–2024)")
plt.xlabel("Time")
plt.ylabel("Value")
plt.legend()
plt.grid(True)
plt.show()

In [None]:
# --- 1. Separate DataFrames ---

# Select the SST column and save it to a new DataFrame
sst_df = final_df[['sst']].copy()

# Select the Precipitation column and save it to a new DataFrame
precip_df = final_df[['precip']].copy()

# --- 2. Define File Paths ---
sst_path = "kalu_sst_data.csv"
precip_path = "kalu_precip_data.csv"

# --- 3. Save to CSV ---
sst_df.to_csv(sst_path, index=True, index_label='time')
print(f"✔ Successfully saved SST data to: {sst_path}")

precip_df.to_csv(precip_path, index=True, index_label='time')
print(f"✔ Successfully saved Precipitation data to: {precip_path}")

# --- 4. Cleanup Dask Resources (Crucial) ---
try:
    client.close()
    cluster.close()
    print("\nDask client and cluster successfully shut down.")
except Exception as e:
    print(f"Could not shut down Dask resources: {e}")

# Display the paths