In [8]:
from dataclasses import dataclass
import datetime as dt
from pathlib import Path
import pytz
import re

import polars as pl

In [9]:
# Function and global vars definitions
def save_csv(df: pl.DataFrame, output_data_path: Path, output_name: str, v: bool = False):
    output_path: Path = output_data_path / f"{output_name}.csv"
    df.write_csv(output_path)
    v and print(f"Saved: {output_path}")


def collocate_on_location(df: pl.DataFrame, collocation_location: str, v: bool = False):
    collocated_timestamps: pl.DataFrame = (
        df.filter(
            pl.col(LOCATION_ID.label) == collocation_location
        )
        .select(
            pl.col(UTC_MINUTE.label)
        )
    )
    result: pl.DataFrame = (
        df.join(
            collocated_timestamps,
            on=UTC_MINUTE.label,
            how="inner"
        )
        .sort(UTC_MINUTE.label)
    )
    
    v and print(result)
    return result


@dataclass
class ColumnDefinition:
    label: str
    dtype: any

DATETIME_DTYPE = pl.Datetime("ms")
LOCATION_ID = ColumnDefinition(
    label="location_id",
    dtype="string" # not enforces anywhere
)
UTC_MINUTE = ColumnDefinition(
    label="utc_minute",
    dtype=DATETIME_DTYPE
)

RAW_FMI_BUNDLES_PATH: Path = Path("raw/FMI_bundles")
PROCESSED_DATA_PATH: Path = Path("processed")

In [10]:
# Preprocess data
save_all_locations_1m = True
# save_all_locations_1m = False
v = True

if save_all_locations_1m:
    all_dfs = []
    # Iterate over each subdirectory (location folder)
    for location_dir in RAW_FMI_BUNDLES_PATH.iterdir():
        if location_dir.is_dir():
            location = location_dir.name
            print(f"Processing: {location}")

            csv_files = list(location_dir.glob("*.csv"))
            dfs = []

            for file in csv_files:
                # Extract sensor ID (e.g., SN122) from filename
                match = re.search(r"SN\d{3}", file.name)
                sensor_id = match.group(0) if match else "UNKNOWN"

                print(f"- Found sensor with id {sensor_id}.")

                df = pl.read_csv(file, try_parse_dates=True)

                # Add sensor_id as a column
                df = df.with_columns(
                    pl.lit(sensor_id).alias("sensor_id")
                )

                dfs.append(df)

            if not dfs:
                print(f"> Skipping {location}, no CSVs found.")
                continue

            # Combine all CSVs for the current location
            combined = pl.concat(dfs)

            # Parse utc to datetime and truncate to 1-minute
            combined = combined.with_columns(
                pl.col("utc").dt.truncate("1m").cast(UTC_MINUTE.dtype).alias(UTC_MINUTE.label),
            )

            # Group by minute and sensor_id, then aggregate
            aggregated = (
                combined
                .group_by([UTC_MINUTE.label, "sensor_id", "location_id"])
                .agg([
                    pl.col(pl.Float64).mean(),
                    pl.len().alias("n_measurements")  # count per group
                ])
                .sort("n_measurements", UTC_MINUTE.label)
            )

            aggregated = aggregated.with_columns(
                (pl.col(UTC_MINUTE.label).cast(pl.Int64)/1_000).alias("unix_epoch")
            )

            # Save result
            output_path = PROCESSED_DATA_PATH / f"{location}_1min.csv"
            aggregated.write_csv(output_path)
            v and print(f"Saved: {output_path}")
            del output_path

            # create single dataset
            all_dfs.append(aggregated)

    full_df = pl.concat(all_dfs).sort(UTC_MINUTE.label)
    output_path = PROCESSED_DATA_PATH / f"ALL_LOCATIONS_1min.csv"
    full_df.write_csv(output_path)
    v and print(f"Saved: {output_path}")
    del output_path

    assert dt.datetime.fromtimestamp(1585831680.0, tz=pytz.utc) == pytz.utc.localize(dt.datetime.strptime("2020-04-02T12:48", "%Y-%m-%dT%H:%M"))
    assert dt.datetime.fromtimestamp(1713534120.0, tz=pytz.utc) == pytz.utc.localize(dt.datetime.strptime("2024-04-19T13:42", "%Y-%m-%dT%H:%M"))

Processing: SOD_B
- Found sensor with id SN081.
Saved: processed/SOD_B_1min.csv
Processing: HALSSIAAPA_A
- Found sensor with id SN122.
Saved: processed/HALSSIAAPA_A_1min.csv
Processing: SOD_A
- Found sensor with id SN122.
- Found sensor with id SN081.
Saved: processed/SOD_A_1min.csv
Saved: processed/ALL_LOCATIONS_1min.csv


In [11]:
# Collocate full data
collocate_full_data = True
# collocate_full_data = False
v = True

if collocate_full_data:
    data_file = PROCESSED_DATA_PATH/"ALL_LOCATIONS_1min.csv"
    df = pl.read_csv(data_file, try_parse_dates=True)
    df = df.with_columns([
        pl.col(UTC_MINUTE.label).cast(UTC_MINUTE.dtype)
    ])

    collocated_timestamps = (
        df.group_by(UTC_MINUTE.label)
        .agg([pl.col("sensor_id").n_unique().alias("unique_sensor_count")])
        .filter(pl.col("unique_sensor_count") >= 2)
    )

    print("collocated_timestamps.shape:", collocated_timestamps.shape)

    collocated_data = (
        df.join(
            collocated_timestamps.select(
                pl.col(UTC_MINUTE.label)
            ),
            on=UTC_MINUTE.label,
            how="inner"
        )
        .sort(UTC_MINUTE.label)
    )
    # .join(df, on="utc_minute", how="inner").sort("utc_minute")

    print("collocated_data.shape:", collocated_data.shape)
    assert collocated_data.shape[0] == 2 * collocated_timestamps.shape[0], "Collocated data is not of size 2 * collocated timestamps"
    print("Collocated dataset has the correct shape.")
    # display(collocated_data)
    output_path = PROCESSED_DATA_PATH / f"ALL_LOCATIONS_collocated_1min.csv"
    collocated_data.write_csv(output_path)
    v and print(f"Saved: {output_path}")
    del output_path
    # print(df.dtypes)

collocated_timestamps.shape: (12156, 2)
collocated_data.shape: (24312, 24)
Collocated dataset has the correct shape.
Saved: processed/ALL_LOCATIONS_collocated_1min.csv


In [12]:
# Collocate by location
save_collocate_by_location = True
# save_collocate_by_location = False
v = True

if save_collocate_by_location:
    data_file = PROCESSED_DATA_PATH/"ALL_LOCATIONS_collocated_1min.csv"
    df = pl.read_csv(data_file, try_parse_dates=True)
    df = df.with_columns([
        pl.col(UTC_MINUTE.label).cast(UTC_MINUTE.dtype)
    ])

    v = False
    collocated_SOD_A_SOD_B = collocate_on_location(df, "SOD_B", v=v)
    collocated_SOD_A_HALSSIAAPA_A = collocate_on_location(df, "HALSSIAAPA_A", v=v)
    assert collocated_SOD_A_SOD_B.shape[0] + collocated_SOD_A_HALSSIAAPA_A.shape[0] == 24312, "Collocated set 1 + collocated set 2 does not equal total number of collocated data points"
    print("Data was split between SOD_B and HALSSIAAPA_ correctly.")
    save_csv(collocated_SOD_A_SOD_B, PROCESSED_DATA_PATH, "SOD_A_SOD_B_collocated_1min")
    save_csv(collocated_SOD_A_HALSSIAAPA_A, PROCESSED_DATA_PATH, "SOD_A_HALSSIAAPA_A_collocated_1min")

# check for null values
assert collocated_SOD_A_SOD_B.null_count().sum_horizontal().item() == 0, "Nulls found in collocated_SOD_A_SOD_B"
assert collocated_SOD_A_HALSSIAAPA_A.null_count().sum_horizontal().item() == 0, "Nulls found in collocated_SOD_A_HALSSIAAPA_A"
print("No nulls found.")


Data was split between SOD_B and HALSSIAAPA_ correctly.
No nulls found.
