# 💾 Transform Thies disdrometer data to Xarray 💧

This ```notebook``` aims to transform ```.csv``` data, which is measured every minute by Thies disdrometers, and transform it into an ```Xarray.Dataset``` format (```.zarr```). This facilitates the pre-processing of the data.

All you need to do is set the ```csv_files``` and ```ds.to_zarr```. This data is usually delivered in folders with a number, for example ```"77"```, which refers to the disdrometer code for spatial location.

### ⚠ <span style="color:red">Important</span> ⚠

Note that the data available for the development of the following code has the following columns:

- ```var6``` : Disdrometer code/name ```'codigo_synop'```
- ```var8``` : Total rain intensity ```'r_int'``` $[mm/h]$ 
- ```var9``` : Liquid rain intensity ```'rl_int'``` $[mm/h]$ 
- ```var10``` : Solid precipitation intensity ```'rs_int'``` $[mm/h]$ 
- ```var11``` : Precipitation amount ```'r_acc'``` $[mm]$
- ```var12``` : Precipitation visibility ```'MOR'``` $[m]$
- ```var13``` : Reflectivity ```'ref'``` $[dBZ]$
- ```var16``` : Total measured particles  ```'n_t'``` []

Columns ```var42``` to ```var482``` correspond to the aerosol count recorded by the Thies disdrometer, first for all speeds (20 speeds) of the particles in the first diameter range; then, the count of drops in each speed interval but for particles corresponding to diameter 2, and so on for a total of 22 diameters.

In [1]:
import pandas as pd
import dask.dataframe as dd
import dask.array as da
import xarray as xr
import numpy as np
import glob
import os
from dask.distributed import Client
#import matplotlib.pyplot as plt
#import matplotlib.ticker as ticker  # Import minor locator ticker
#import seaborn as sns 
import Functions as Func

Build the dataframe and concatenate all the measurements. 

### <span style="color:red">Important</span>

If you have a considerable amount of data, you may experience an error due to not having enough RAM to concatenate all the measurements. What you can do is concatenate data fragments. For example, in this case, 11 years of measurements for disdrometer 77 were stored in monthly ```.csv``` files and transformed into 4-year groups. You can then concatenate these ```xarray.datasets``` into a single one (thanks to this ```.zarr``` format, there will no longer be RAM issues as with the ```DataFrame```) and save the ```.zarr``` file as shown in the last commented [cell](#Concat).

In [None]:
# 📌 Define diameter and velocity classes
diameter_classes, velocity_classes = Func.diam_vel_classes()

num_diameters = len(diameter_classes)
num_velocities = len(velocity_classes)

# Path to CSV files
csv_files = glob.glob(r'E:\Universidad\Trabajo_de_Grado_Isabel\Datos_Solicitados_SIATA_JPC\resultados\642/*.csv')

# Particle columns
particle_columns = [f"var{i}" for i in range(42, 482)]

# Inicializar lista para almacenar chunks procesados
dfs = []

# Read each CSV file in parts
for file in csv_files:
    try:
        chunk_iter = pd.read_csv(file, parse_dates=[0], date_format="%Y-%m-%d %H:%M:%S", chunksize=1000)

        for chunk in chunk_iter:

            # Replace negative values ​​in particle columns with NaN (if present)
            existing_particle_cols = [col for col in particle_columns if col in chunk.columns]
            chunk[existing_particle_cols] = chunk[existing_particle_cols].where(
                chunk[existing_particle_cols] >= 0, np.nan
            )

            # Convert columns to numeric (only the relevant ones)
            chunk = chunk.apply(pd.to_numeric, errors='coerce')

            # Add chunk to list
            dfs.append(chunk)

    except Exception as e:
        print(f"Error al leer {file}: {e}")

# Concatenate all chunks
df = pd.concat(dfs, ignore_index=True)

# Set the time index
df.set_index(df.columns[0], inplace=True)
df.index.name = "time"
df.index = pd.to_datetime(df.index, errors="coerce")

# Show summary
# display(df)

Unnamed: 0_level_0,cliente,var1,var2,var3,var4,var5,var6,var7,var8,var9,...,var472,var473,var474,var475,var476,var477,var478,var479,var480,var481
time,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
2022-11-23 14:37:00,642,0.0,0.0,,0.0,0.0,0.0,,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
2022-11-23 14:38:00,642,0.0,0.0,,0.0,0.0,0.0,,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
2022-11-23 14:39:00,642,0.0,0.0,,0.0,0.0,0.0,,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
2022-11-23 14:40:00,642,0.0,0.0,,0.0,0.0,0.0,,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
2022-11-23 14:46:00,642,0.0,0.0,,0.0,0.0,0.0,,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2025-04-24 13:50:00,642,0.0,0.0,,0.0,0.0,0.0,,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
2025-04-24 13:51:00,642,0.0,0.0,,0.0,0.0,0.0,,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
2025-04-24 13:52:00,642,0.0,0.0,,0.0,0.0,0.0,,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
2025-04-24 13:53:00,642,0.0,0.0,,0.0,0.0,0.0,,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0


In [3]:
# 📌 Building the xarray.Dataset

# 📌 Creating a DataArray for diameter and velocity with attributes
diameter = xr.DataArray(
    diameter_classes,
    dims="diameter",
    coords={"diameter": diameter_classes},
    name="diameter"
)
diameter.attrs["units"] = "mm"
diameter.attrs["long_name"] = "Drop diameter"

velocity = xr.DataArray(
    velocity_classes,
    dims="velocity",
    coords={"velocity": velocity_classes},
    name="velocity"
)
velocity.attrs["units"] = "m/s"
velocity.attrs["long_name"] = "Fall velocity"

# 📌 Processing the DataFrame df
particle_columns = [f"var{i}" for i in range(42, 482)]
particle_counts = df[particle_columns].values

num_measurements = len(df)
expected_size = num_measurements * num_diameters * num_velocities

# Adjust size if necessary
if particle_counts.size < expected_size:
    padding = np.zeros((num_measurements, expected_size - particle_counts.size))
    particle_counts = np.hstack([particle_counts, padding])
elif particle_counts.size > expected_size:
    particle_counts = particle_counts[:, :num_diameters * num_velocities]

# Reshape a (time, diameter, velocity)
particle_counts = particle_counts.reshape((num_measurements, num_diameters, num_velocities))
time = df.index

# 📌 Create main DataArrays
Client = xr.DataArray(df.cliente.astype(str).values, coords=[time], dims=['time'])
Synop_Code = xr.DataArray(df.var6.astype(str).values, coords=[time], dims=['time'])
r_int = xr.DataArray(df.var8.values, coords=[time], dims=['time'])
rl_int = xr.DataArray(df.var9.values, coords=[time], dims=['time'])
rs_int = xr.DataArray(df.var10.values, coords=[time], dims=['time'])
r_acc = xr.DataArray(df.var11.values, coords=[time], dims=['time'])
MOR = xr.DataArray(df.var12.values, coords=[time], dims=['time'])
ref = xr.DataArray(df.var13.values, coords=[time], dims=['time'])
n_t = xr.DataArray(df.var16.values, coords=[time], dims=['time'])
raw = xr.DataArray(particle_counts, coords=[time, diameter_classes, velocity_classes], dims=['time', 'diameter', 'velocity'])
raw = raw.chunk({'time': 10000})

# 📌 Individual attributes
atributos = Func.attr()

# 📌 Apply attributes
Client.attrs.update(atributos['Client'])
Synop_Code.attrs.update(atributos['Synop_Code'])
r_int.attrs.update(atributos['r_int'])
rl_int.attrs.update(atributos['rl_int'])
rs_int.attrs.update(atributos['rs_int'])
r_acc.attrs.update(atributos['r_acc'])
MOR.attrs.update(atributos['MOR'])
ref.attrs.update(atributos['ref'])
n_t.attrs.update(atributos['n_t'])
raw.attrs.update(atributos['raw'])

# # 📌 Calculate nd
# nd = calculate_nd(raw, diameter, velocity)

# # 📌 Calculate precipitation parameters
# r_int_1, lwc_1, n_t_1, ref_1, dm_1, nw_1 = calculate_parameters_dsd(nd, diameter)

# 📌 Create final dataset
ds = xr.Dataset({
    "Client": Client,
    "Synop_Code": Synop_Code,
    "r_int": r_int,
    "rl_int": rl_int,
    "rs_int": rs_int,
    "r_acc": r_acc,
    "MOR": MOR,
    "ref": ref,
    "n_t": n_t,
    "raw": raw,
    # "nd": nd,
    "diameter": diameter,
    "velocity": velocity,
    # 'r_int_1' : r_int_1,
    # 'lwc_1' : lwc_1,
    # 'n_t_1' : n_t_1,
    # 'ref_1' : ref_1,
    # 'dm_1' : dm_1,
    # 'nw_1' : nw_1
})

# 📌 Global attributes of the Dataset
ds.attrs["title"] = "Thies Disdrometer Precipitation Dataset"
ds.attrs["institution"] = "SIATA / Universidad del Quindío"
ds.attrs["references"] = {'[1]' : "https://doi.org/10.1175/JTECH-D-13-00174.1",  '[2]' : "https://github.com/aladinor/parsivel2zarr",
                           '[3]' : "https://doi.org/10.5194/hess-23-4737-2019"}

ds = ds.chunk({'time': 10000})

In [None]:
# ds

Unnamed: 0,Array,Chunk
Bytes,9.57 MiB,78.12 kiB
Shape,"(1254182,)","(10000,)"
Dask graph,126 chunks in 1 graph layer,126 chunks in 1 graph layer
Data type,object numpy.ndarray,object numpy.ndarray
"Array Chunk Bytes 9.57 MiB 78.12 kiB Shape (1254182,) (10000,) Dask graph 126 chunks in 1 graph layer Data type object numpy.ndarray",1254182  1,

Unnamed: 0,Array,Chunk
Bytes,9.57 MiB,78.12 kiB
Shape,"(1254182,)","(10000,)"
Dask graph,126 chunks in 1 graph layer,126 chunks in 1 graph layer
Data type,object numpy.ndarray,object numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,9.57 MiB,78.12 kiB
Shape,"(1254182,)","(10000,)"
Dask graph,126 chunks in 1 graph layer,126 chunks in 1 graph layer
Data type,object numpy.ndarray,object numpy.ndarray
"Array Chunk Bytes 9.57 MiB 78.12 kiB Shape (1254182,) (10000,) Dask graph 126 chunks in 1 graph layer Data type object numpy.ndarray",1254182  1,

Unnamed: 0,Array,Chunk
Bytes,9.57 MiB,78.12 kiB
Shape,"(1254182,)","(10000,)"
Dask graph,126 chunks in 1 graph layer,126 chunks in 1 graph layer
Data type,object numpy.ndarray,object numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,9.57 MiB,78.12 kiB
Shape,"(1254182,)","(10000,)"
Dask graph,126 chunks in 1 graph layer,126 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 9.57 MiB 78.12 kiB Shape (1254182,) (10000,) Dask graph 126 chunks in 1 graph layer Data type float64 numpy.ndarray",1254182  1,

Unnamed: 0,Array,Chunk
Bytes,9.57 MiB,78.12 kiB
Shape,"(1254182,)","(10000,)"
Dask graph,126 chunks in 1 graph layer,126 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,9.57 MiB,78.12 kiB
Shape,"(1254182,)","(10000,)"
Dask graph,126 chunks in 1 graph layer,126 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 9.57 MiB 78.12 kiB Shape (1254182,) (10000,) Dask graph 126 chunks in 1 graph layer Data type float64 numpy.ndarray",1254182  1,

Unnamed: 0,Array,Chunk
Bytes,9.57 MiB,78.12 kiB
Shape,"(1254182,)","(10000,)"
Dask graph,126 chunks in 1 graph layer,126 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,9.57 MiB,78.12 kiB
Shape,"(1254182,)","(10000,)"
Dask graph,126 chunks in 1 graph layer,126 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 9.57 MiB 78.12 kiB Shape (1254182,) (10000,) Dask graph 126 chunks in 1 graph layer Data type float64 numpy.ndarray",1254182  1,

Unnamed: 0,Array,Chunk
Bytes,9.57 MiB,78.12 kiB
Shape,"(1254182,)","(10000,)"
Dask graph,126 chunks in 1 graph layer,126 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,9.57 MiB,78.12 kiB
Shape,"(1254182,)","(10000,)"
Dask graph,126 chunks in 1 graph layer,126 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 9.57 MiB 78.12 kiB Shape (1254182,) (10000,) Dask graph 126 chunks in 1 graph layer Data type float64 numpy.ndarray",1254182  1,

Unnamed: 0,Array,Chunk
Bytes,9.57 MiB,78.12 kiB
Shape,"(1254182,)","(10000,)"
Dask graph,126 chunks in 1 graph layer,126 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,9.57 MiB,78.12 kiB
Shape,"(1254182,)","(10000,)"
Dask graph,126 chunks in 1 graph layer,126 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 9.57 MiB 78.12 kiB Shape (1254182,) (10000,) Dask graph 126 chunks in 1 graph layer Data type float64 numpy.ndarray",1254182  1,

Unnamed: 0,Array,Chunk
Bytes,9.57 MiB,78.12 kiB
Shape,"(1254182,)","(10000,)"
Dask graph,126 chunks in 1 graph layer,126 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,9.57 MiB,78.12 kiB
Shape,"(1254182,)","(10000,)"
Dask graph,126 chunks in 1 graph layer,126 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 9.57 MiB 78.12 kiB Shape (1254182,) (10000,) Dask graph 126 chunks in 1 graph layer Data type float64 numpy.ndarray",1254182  1,

Unnamed: 0,Array,Chunk
Bytes,9.57 MiB,78.12 kiB
Shape,"(1254182,)","(10000,)"
Dask graph,126 chunks in 1 graph layer,126 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,9.57 MiB,78.12 kiB
Shape,"(1254182,)","(10000,)"
Dask graph,126 chunks in 1 graph layer,126 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 9.57 MiB 78.12 kiB Shape (1254182,) (10000,) Dask graph 126 chunks in 1 graph layer Data type float64 numpy.ndarray",1254182  1,

Unnamed: 0,Array,Chunk
Bytes,9.57 MiB,78.12 kiB
Shape,"(1254182,)","(10000,)"
Dask graph,126 chunks in 1 graph layer,126 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,4.11 GiB,33.57 MiB
Shape,"(1254182, 22, 20)","(10000, 22, 20)"
Dask graph,126 chunks in 1 graph layer,126 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 4.11 GiB 33.57 MiB Shape (1254182, 22, 20) (10000, 22, 20) Dask graph 126 chunks in 1 graph layer Data type float64 numpy.ndarray",20  22  1254182,

Unnamed: 0,Array,Chunk
Bytes,4.11 GiB,33.57 MiB
Shape,"(1254182, 22, 20)","(10000, 22, 20)"
Dask graph,126 chunks in 1 graph layer,126 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray


Finaly, save the ```Xarray.Dataset``` as ```.zarr``` format, and if you need to load again the data, you can use the following code:

```ds = xr.open_zarr(r"E:\Universidad\Trabajo_de_Grado_Isabel\Rain_drop_Size_Data\Siata\Solicitud_Disdros\Thies_Zarr\77_Zarr.zarr")```

In [None]:
ds.dtypes

for var in ['Client', 'Synop_Code']:
    ds[var] = ds[var].astype(str)

ds.to_zarr(r'E:\Universidad\Trabajo_de_Grado_Isabel\Datos_Solicitados_SIATA_JPC\resultados\Zarr\642_Zarr.zarr')

<a id="Concat"></a>

### ⚠ Important ⚠

If you have the resources to load all the data, ignore the code below and just save your data in ```.zarr``` format to the desired location. However, if your data exceeds your resources, you can split the disdrometer measurement data (for example, by year) and save it once you have converted all the data from a disdrometer (for example, ```disdrometer 77```, assuming it has 10 years of measurements and we split the information into two of 5 years: ```77_2015.zarr``` and ```77_2025.zarr```). We can concatenate them into a single .```zarr``` file with the code below without worrying about available resources since the ```.zarr``` format has no problem handling large volumes of data.

In [None]:
# ds_1 = xr.open_zarr(r'Your_Path\77_2015.zarr')
# ds_2 = xr.open_zarr(r'Your_Path\77_2025.zarr')

We concatenate in the "time" dimension and be careful with the dates.

In [None]:
# datasets = [ds_1, ds_2]
# ds_total = xr.concat(datasets, dim='time')
# ds_total = ds_total.sortby('time') 

# ds_total = ds_total.chunk({'time': 10000})

# ds_total.to_zarr(r'Path_save\77_Zarr.zarr')