In [9]:
import os
import time
import numpy as np
import dask.array as da
import metpy.constants as mpconsts
import metpy.units as units
from func import cal_buoyancy  # import the function from your script
from pathlib import Path
import pandas as pd
from dask import delayed
from dask.distributed import Client

In [None]:
backup_dir = Path.home() / "backup_data"

backup_dir.mkdir(parents=True, exist_ok=True)

# path to interpolated humidity, pressure, and temperature 
file_path_main_arrays = backup_dir / "backup_arrays_for_buoyancy.npz"
# path to times, latitudes and longitudes
file_path_metadata = backup_dir / "backup_metadata_for_buoyancy.npz"

# Load the saved arrays
data = np.load(file_path_main_arrays, mmap_mode='r')
print("file containing arrays loaded")

interpolated_pressure = da.from_array(data["array1"], chunks="auto")
interpolated_q = da.from_array(data["array2"], chunks="auto")
interpolated_t = da.from_array(data["array3"], chunks="auto")
interpolated_qfit = da.from_array(data["array_4"], chunks="auto")
print("Arrays loaded successfully!")

metadata = np.load(file_path_metadata)
times = metadata['times']
lats = metadata['lats']
lons = metadata['lons']
interpolated_altitudes = np.arange(0, 4000 + 20, 20)  # Assuming altitudes are fixed
print("Metadata loaded successfully!")

In [10]:
# Define array shapes

all_blt = np.full((times.shape[0], lats.size, lons.size), np.nan)

all_blc = np.full((times.shape[0], lats.size, lons.size), np.nan)

alt = interpolated_altitudes

g = 9.80665  # Gravity (m/s^2)

# Dask Client for parallel processing

client = Client()

@delayed

def compute_buoyancy_at_point(latidx, lonidx, tidx, interpolated_pressure, 
                              interpolated_t, interpolated_qfit, interpolated_q, alt):
    
    start_time = time.time()
    
    p = interpolated_pressure[tidx, :, latidx, lonidx].compute()
    
    t = interpolated_t[tidx, :, latidx, lonidx].compute()
    
    qlc = interpolated_qfit[tidx, :, latidx, lonidx].compute()
    
    qr = interpolated_q[tidx, :, latidx, lonidx].compute()

    # Calculate buoyancy for ql and qr profiles
    
    b_qr = cal_buoyancy(t_profile=t, h_profile=qr, p_profile=p, alt_profile=alt)
    
    b_qlc = cal_buoyancy(t_profile=t, h_profile=qlc, p_profile=p, alt_profile=alt)

    # Convert to Kelvin and calculate the buoyancy difference
    
    pprof_qr = (b_qr.tp) * units.degK
    
    pprof_qlc = (b_qlc.tp) * units.degK
    
    t = t * units.degC
    
    t = t.to(units.degK)

    y_qr = pprof_qr - t
    
    y_qlc = pprof_qlc - t

    pres_100 = p[0] - 100
    
    pres_300 = p[0] - 300
    
    pres_mask = (p >= pres_300) & (p <= pres_100)

    altmask = (alt >= 2000) & (alt <= 4000)

    x_clipped = p[altmask]
    
    y_clipped_qr = y_qr[altmask]
    
    y_clipped_qlc = y_qlc[altmask]

    br = (mpconsts.Rd * np.trapz(y_clipped_qr[::-1], np.log(x_clipped[::-1]))).to(units('J/kg'))
    
    blc = (mpconsts.Rd * np.trapz(y_clipped_qlc[::-1], np.log(x_clipped[::-1]))).to(units('J/kg'))

    blt = br - blc
    
    end_time = time.time()
    
    duration = end_time - start_time
    
    print(f"Time for latidx={latidx}, lonidx={lonidx}, tidx={tidx}: {duration:.4f} seconds")


    return tidx, latidx, lonidx, blt.magnitude, blc.magnitude

# Create a list of delayed tasks for each (latidx, lonidx, tidx)
tasks = []

for latidx in range(lats.size):
    
    for lonidx in range(lons.size):
        
        for tidx in range(times.shape[0]):
            
            task = compute_buoyancy_at_point(latidx, lonidx, tidx, 
                                             interpolated_pressure, interpolated_t, 
                                             interpolated_qfit, interpolated_q, alt)
            
            tasks.append(task)

# Compute all tasks in parallel
results = client.compute(tasks, sync=True)

# Store the results in the all_blt and all_blc arrays
for result in results:
    
    tidx, latidx, lonidx, blt_value, blc_value = result
    
    all_blt[tidx, latidx, lonidx] = blt_value
    
    all_blc[tidx, latidx, lonidx] = blc_value

# # **Save the results**
# output_dir = Path.home() / "buoyancy_arrays"
# output_dir.mkdir(parents=True, exist_ok=True)
# output_path = output_dir / "computed_buoyancy.npz"
# np.savez(output_path, all_blt=all_blt, all_blc=all_blc)
# print(f"\nBuoyancy arrays saved at: {output_path}")

# Close the Dask client
client.close()


2025-02-07 09:38:37,919 - distributed.nanny - ERROR - Failed to start process
Traceback (most recent call last):
  File "/home/annierosen16/anaconda3/envs/annieenv/lib/python3.9/site-packages/distributed/nanny.py", line 448, in instantiate
    result = await self.process.start()
  File "/home/annierosen16/anaconda3/envs/annieenv/lib/python3.9/site-packages/distributed/nanny.py", line 712, in start
    self.init_result_q = mp_ctx.Queue()
  File "/home/annierosen16/anaconda3/envs/annieenv/lib/python3.9/multiprocessing/context.py", line 103, in Queue
    return Queue(maxsize, ctx=self.get_context())
  File "/home/annierosen16/anaconda3/envs/annieenv/lib/python3.9/multiprocessing/queues.py", line 43, in __init__
    self._rlock = ctx.Lock()
  File "/home/annierosen16/anaconda3/envs/annieenv/lib/python3.9/multiprocessing/context.py", line 68, in Lock
    return Lock(ctx=self.get_context())
  File "/home/annierosen16/anaconda3/envs/annieenv/lib/python3.9/multiprocessing/synchronize.py", line

2025-02-07 09:38:37,971 - distributed.nanny - ERROR - Failed to start process
Traceback (most recent call last):
  File "/home/annierosen16/anaconda3/envs/annieenv/lib/python3.9/site-packages/distributed/nanny.py", line 448, in instantiate
    result = await self.process.start()
  File "/home/annierosen16/anaconda3/envs/annieenv/lib/python3.9/site-packages/distributed/nanny.py", line 712, in start
    self.init_result_q = mp_ctx.Queue()
  File "/home/annierosen16/anaconda3/envs/annieenv/lib/python3.9/multiprocessing/context.py", line 103, in Queue
    return Queue(maxsize, ctx=self.get_context())
  File "/home/annierosen16/anaconda3/envs/annieenv/lib/python3.9/multiprocessing/queues.py", line 43, in __init__
    self._rlock = ctx.Lock()
  File "/home/annierosen16/anaconda3/envs/annieenv/lib/python3.9/multiprocessing/context.py", line 68, in Lock
    return Lock(ctx=self.get_context())
  File "/home/annierosen16/anaconda3/envs/annieenv/lib/python3.9/multiprocessing/synchronize.py", line

KeyboardInterrupt: 