In [8]:
import numpy as np
import xarray as xr
import numba as nb
import pandas as pd
from datetime import datetime
# from Single_cloud_analysis import Cloud
import sys
sys.path.insert(0,'/cluster/work/climate/dnikolo/n2o')
from Glaciation_time_estimator.Auxiliary_func.config_reader import read_config
from Glaciation_time_estimator.Data_postprocessing.Job_result_fp_generator import generate_tracking_filenames
from multiprocessing import Manager
from Glaciation_time_estimator.Auxiliary_func.Nestable_multiprocessing import NestablePool
from functools import partial

In [9]:
import datetime as dt


class Cloud:
    # def __new__(self, *args, **kwargs):
    #     return super().__new__(self)
    def __init__(self, cloud_id):
        self.id = cloud_id
        self.crit_fraction = 0.1
        # Bools inidicating if the cloud has been liquid at any point
        self.is_liq: bool = False
        self.is_mix: bool = False
        self.is_ice: bool = False
        # Max and min cloud size in pixels
        self.max_size_km: float = 0.0
        self.max_size_px: int = 0
        self.min_size_km: float = 510.0e6
        self.min_size_px: int = 3717*3717

        # Variables giving the first and last 4 timesteps (1 hour) of the cloud ice fraction - both arrays run in the same time direction start: [1 , 2 , 3 , 4] ... end: [1 , 2 , 3 , 4]
        self.start_ice_fraction_arr = np.empty(4)
        self.end_ice_fraction_arr = np.empty(4)
        # self.ice_fraction_arr=np.empty(max_timesteps)
        self.ice_fraction_list = []
        

        self.max_water_fraction: float = 0.0
        self.max_ice_fraction: float = 0.0

        self.track_start_time: dt.datetime = None
        self.track_end_time: dt.datetime = None
        self.track_length = None

        self.glaciation_start_time: dt.datetime = None
        self.glaciation_end_time: dt.datetime = None

        self.n_timesteps = None

        self.sum_cloud_lat = 0.0
        self.sum_cloud_lon = 0.0
        self.avg_cloud_lat = None
        self.avg_cloud_lon = None
        self.lon_list=[]
        self.lat_list=[]

        self.sum_cloud_size_km = 0.0
        self.avg_cloud_size_km = None
        self.cloud_size_km_list = []

        self.sum_cloud_size_px = 0.0
        self.avg_cloud_size_px = None

        self.n_timesteps_no_cloud = 0
        self.terminate_cloud = False

    def __str__(self):
        return f"{self.is_liq},{self.is_mix},{self.is_ice},"

    def update_status(self, time: dt.datetime, cloud_values: np.array, cloud_lat, cloud_lon, lat_resolution, lon_resolution):
        cloud_size_px = cloud_values.shape[0]
        # print(cloud_values)
        if cloud_size_px:
            self.n_timesteps_no_cloud = 0
            valid_values = cloud_values[cloud_values >= 1]
            # print(len(valid_values)/len(cloud_values))
            ice_fraction = (valid_values.sum() -
                            float(len(valid_values)))/float(len(valid_values))
            # print(valid_values)
            # ice_fraction=float(np.count_nonzero(cloud_values==2))/float(cloud_size_px)
            water_fraction = 1-ice_fraction
            # assert math.isclose(water_fraction+ice_fraction,1)
            # print(water_fraction)
            # print(water_fraction)f cloud_arr[track_number-1] is None:
            if not (self.track_start_time):
                self.track_start_time = time
                self.n_timesteps = 1
            else:
                self.n_timesteps += 1
            if self.n_timesteps <= 4:
                self.start_ice_fraction_arr[self.n_timesteps-1] = ice_fraction
            # Check and set type of cloud
            if water_fraction > 1-self.crit_fraction:
                self.is_liq = True
            elif water_fraction > self.crit_fraction:
                self.is_mix = True
            else:
                self.is_ice = True

            cloud_size_km = lat_resolution*lon_resolution*cloud_size_px * \
                np.cos(np.deg2rad(cloud_lat))*111.321*111.111
            self.cloud_size_km_list.append(cloud_size_km)
            self.max_size_km = max(self.max_size_km, cloud_size_km)
            self.min_size_km = min(self.min_size_km, cloud_size_km)

            self.max_size_px = max(self.max_size_px, cloud_size_px)
            self.min_size_px = min(self.min_size_px, cloud_size_px)

            self.sum_cloud_size_px += cloud_size_px
            self.avg_cloud_size_px = self.sum_cloud_size_px/self.n_timesteps

            self.sum_cloud_size_km += cloud_size_km
            self.avg_cloud_size_km = self.sum_cloud_size_km/self.n_timesteps

            # I assume that water_frac+ice_frac=1

            self.max_water_fraction = max(
                self.max_water_fraction, water_fraction)
            self.max_ice_fraction = max(
                self.max_ice_fraction, 1-water_fraction)

            self.sum_cloud_lat += cloud_lat
            self.sum_cloud_lon += cloud_lon
            self.lon_list.append(cloud_lon)
            self.lat_list.append(cloud_lat)
            self.avg_cloud_lat = self.sum_cloud_lat/self.n_timesteps
            self.avg_cloud_lon = self.sum_cloud_lon/self.n_timesteps

            self.track_end_time = time
            self.track_length = self.track_end_time-self.track_start_time

            self.end_ice_fraction_arr[0:3] = self.end_ice_fraction_arr[1:4]
            self.end_ice_fraction_arr[3] = ice_fraction

            # self.ice_fraction_arr[n_timesteps]=ice_fraction
            self.ice_fraction_list.append(ice_fraction)

    def update_missing_cloud(self):
        if self.track_end_time and (not self.terminate_cloud):
            self.n_timesteps_no_cloud += 1
            if self.n_timesteps_no_cloud > 1:
                self.terminate_cloud = True

In [10]:
@nb.njit
def extract_cloud_coordinates(cloudtracknumber_field, cloud_id_in_field, max_size):
    # Define the dictionary with the appropriate types
    loc_hash_map_cloud_numbers = {
        j: (0, np.zeros((2, max_size), dtype=np.int16)) for j in cloud_id_in_field}
    # # Traverse the 3D array
    # for i in cloud_id_in_field:
    #     loc_hash_map_cloud_numbers[val] = (0,np.empty((2,max_size),dtype=np.int16))
    for row in range(cloudtracknumber_field.shape[1]):
        for col in range(cloudtracknumber_field.shape[2]):
            val = cloudtracknumber_field[0, row, col]
            if val != 0:
                ind, cord = loc_hash_map_cloud_numbers[val]
                if ind <= max_size:
                    cord[:, ind] = np.asarray([row, col], dtype=np.int16)
                    ind += 1
                    # print(ind)
                    loc_hash_map_cloud_numbers[val] = (ind, cord)
    return loc_hash_map_cloud_numbers
    # return loc_hash_map_cloud_numbers


In [11]:

def extract_value(val):
    if isinstance(val, xr.DataArray):
        return val.values.item() if val.size == 1 else val.values
    return val

def analize_single_temp_range(temp_ind: int, cloud_dict, tracking_fps, pole: str, config: dict) -> None:
    # loop_start_time=dt.datetime.now()
    min_temp, max_temp = config['min_temp_arr'][temp_ind], config['max_temp_arr'][temp_ind]
    # Load datasets
    temp_key = f'{abs(round(min_temp))}_{abs(round(max_temp))}'
    try:
        cloudtrack_data = xr.load_dataset(tracking_fps[pole][temp_key]["cloudtracks"][0])
        trackstats_data = xr.load_dataset(
            tracking_fps[pole][temp_key]["trackstats_final"])
        tracknumbers_data = xr.load_dataset(tracking_fps[pole][temp_key]["tracknumbers"])
    except:  # Exception as inst:
        print(f"Skipping {min_temp} to {max_temp}")
        cloud_dict[temp_key] = np.array([])
        return None
    # Load relevant data from datasets into local variables
    n_tracks = trackstats_data.variables['track_duration'].shape[0]
    basetimes = pd.to_datetime(tracknumbers_data['basetimes'])
    lat = cloudtrack_data['lat']
    lon = cloudtrack_data['lon']
    lat_resolution = (lat.max()-lat.min())/len(lat)
    lon_resolution = (lon.max()-lon.min())/len(lon)
    trackstats_data.close()
    tracknumbers_data.close()
    cloudtrack_data.close()
    # print(append_start_time-loop_start_time)
    cloud_arr = np.empty((n_tracks), dtype=Cloud)
    # Cloud(f'{temp_ind}_{i}') for i in range(n_tracks)])
    # print(append_end_time-append_start_time)
    print(f"Analyzing T: {min_temp} to {max_temp} Agg={config['agg_fact']}")
    for fp_ind in range(3):#range(len(basetimes)):
        time = basetimes[fp_ind]
        time_str = time.strftime("%Y%m%d_%H%M%S")
        print(f'{min_temp} to {max_temp} Loading {time_str}')
        cloudtrack_fp = tracking_fps[pole][temp_key]['cloudtracks'][fp_ind]
        cloudtrack_data = xr.load_dataset(cloudtrack_fp)
        cloudtracknumber_field = cloudtrack_data['tracknumber'].data
        cloudtracknumber_field[np.isnan(cloudtracknumber_field)] = 0
        cloudtracknumber_field = cloudtracknumber_field.astype(int)
        cph_field = cloudtrack_data['cph_filtered']
        cloud_id_in_field, counts = np.unique(
            cloudtracknumber_field, return_counts=True)
        counts = counts[cloud_id_in_field != 0]
        cloud_id_in_field = cloud_id_in_field[cloud_id_in_field != 0]
        max_allowed_cloud_size_px = config['fast_mode_arr_size'] if config['postprocessing_fast_mode'] else counts.max()
        hash_map_cloud_numbers = extract_cloud_coordinates(
            cloudtracknumber_field, cloud_id_in_field, max_allowed_cloud_size_px)  # counts.max())
        cloudtrack_data.close()
        if max_allowed_cloud_size_px > 1000000:
            print(np.where(counts, counts == counts.max()))
        # print(cloud_id_in_field)
        for track_number in cloud_id_in_field:
            try:
                if cloud_arr[track_number-1] is None:
                    cloud_arr[track_number-1] = Cloud(temp_key)
            except:
                print(
                    f"Error: {temp_ind,track_number,len(cloud_arr)}")
                continue

            if (not cloud_arr[track_number-1].terminate_cloud):
                # TODO:SPEED UP NEXT TWO LINES (set_cloud_values and update_status)
                ind, cord = hash_map_cloud_numbers[track_number]
                cloud_location_ind = [cord[0, :ind], cord[1, :ind]]
                if cloud_location_ind[0].size != 0:
                    avg_lat_ind = int(round(np.mean(cloud_location_ind[0])))
                    avg_lon_ind = int(round(np.mean(cloud_location_ind[1])))
                    # TODO:SPEED UP NEXT TWO LINES (set_cloud_values and update_status)
                    cloud_values = cph_field.values[0,
                                             cloud_location_ind[0].T, cloud_location_ind[1].T]
                    cloud_arr[track_number-1].update_status(
                        time, cloud_values, extract_value(lat[avg_lat_ind]), extract_value(lon[avg_lon_ind]), lat_resolution.values, lon_resolution.values)
                else:
                    cloud_arr[track_number-1].update_missing_cloud()
    cloud_dict[f"{pole}_{temp_key}"] = cloud_arr

In [12]:
import yaml
config = read_config(config_fp="/cluster/work/climate/dnikolo/n2o/Glaciation_time_estimator/config.yaml")
tracking_fps = generate_tracking_filenames(config)

In [13]:
res_dict={}
analize_single_temp_range(0,res_dict,tracking_fps, "np",config )

Analyzing T: -5 to 0 Agg=3
-5 to 0 Loading 20230108_000000
-5 to 0 Loading 20230108_001500
-5 to 0 Loading 20230108_003000


In [14]:
import pandas as pd
import os
import numpy as np

# Function to extract values from xarray.DataArray or leave them unchanged


min_temp, max_temp = config['min_temp_arr'][0], config['max_temp_arr'][0]
temp_key = f'{abs(round(min_temp))}_{abs(round(max_temp))}'
cloudtrack_data = xr.load_dataset(
    tracking_fps['np'][temp_key]["cloudtracks"][0])
lat = cloudtrack_data['lat']
lon = cloudtrack_data['lon']
lat_resolution = extract_value((lat.max()-lat.min())/len(lat))
lon_resolution = extract_value((lon.max()-lon.min())/len(lon))
cloudtrack_data.close()
columns = ["is_liq", "is_mix", "is_ice", "max_water_frac",
           "max_ice_fraction", "avg_size[km]", "max_size[km]",
           "min_size[km]", "avg_size[px]", "max_size[px]",
           "min_size[px]", "track_start_time", "track_length",
           "glaciation_start_time", "glaciation_end_time", "avg_lat",
           "avg_lon", "start_ice_fraction", "end_ice_fraction",
           "ice_frac_hist", "lat_hist", "lon_hist", 
           "size_hist_km"]
datapoints_per_cloud = len(columns)
# Iterating through the cloud data
# for temp_ind in range(len(config['max_temp_arr'])):
temp_ind = 0
# for pole in config['pole_folders']:
pole = 'np'
min_temp, max_temp = config['min_temp_arr'][temp_ind], config['max_temp_arr'][temp_ind]
temp_key = f'{abs(round(min_temp))}_{abs(round(max_temp))}'
key = f'{pole}_{temp_key}'
cloud_arr = res_dict[key]

cloudinfo_df = pd.DataFrame(index=range(len(cloud_arr)), columns=columns)
for cloud_ind in range(len(cloud_arr)):
    current_cloud = cloud_arr[cloud_ind]
    if current_cloud is not None:
        cloudinfo_df.iloc[cloud_ind] = [
            current_cloud.is_liq,
            current_cloud.is_mix,
            current_cloud.is_ice,
            current_cloud.max_water_fraction,
            current_cloud.max_ice_fraction,
            extract_value(current_cloud.avg_cloud_size_km),
            extract_value(current_cloud.max_size_km),
            extract_value(current_cloud.min_size_km),
            extract_value(current_cloud.avg_cloud_size_px),
            extract_value(current_cloud.max_size_px),
            extract_value(current_cloud.min_size_px),
            current_cloud.track_start_time,
            current_cloud.track_length,
            current_cloud.glaciation_start_time,
            current_cloud.glaciation_end_time,
            extract_value(current_cloud.avg_cloud_lat),
            extract_value(current_cloud.avg_cloud_lon),
            current_cloud.start_ice_fraction_arr,
            current_cloud.end_ice_fraction_arr,
            current_cloud.ice_fraction_list,
            current_cloud.lat_list,
            current_cloud.lon_list,
            current_cloud.cloud_size_km_list
        ]

# Ensure output directory exists
output_dir = os.path.join(
    config['postprocessing_output_dir'],
    config['time_folder_name'],
    f"T_{abs(round(min_temp)):02}_{abs(round(max_temp)):02}_agg_{config['agg_fact']:02}"
)
os.makedirs(os.path.dirname(output_dir), exist_ok=True)

# Save DataFrame to Parquet
output_dir_parq = output_dir + ".parquet"
print("Writing to ", output_dir_parq)
cloudinfo_df.to_parquet(output_dir_parq)

# Optionally save as CSV
if config['write_csv']:
    output_dir_csv = output_dir + ".csv"
    cloudinfo_df.to_csv(output_dir_csv)

Writing to  /cluster/work/climate/dnikolo/Cloud_analysis/20230108.0000_20230115.2345/T_05_00_agg_03.parquet
