In [2]:
from deltalake import DeltaTable
from deltalake.writer import write_deltalake
import os
import json
import pandas as pd
from dotenv import load_dotenv

load_dotenv()

STORAGE_ACCOUNT_NAME = os.getenv("AZURE_STORAGE_ACCOUNT_NAME")
AZURE_STORAGE_ACCESS_KEY = os.getenv("AZURE_STORAGE_ACCESS_KEY")
CONTAINER_NAME = "test-container"
delta_table_path = f"abfss://{CONTAINER_NAME}@{STORAGE_ACCOUNT_NAME}.dfs.core.windows.net/data2"

storage_options = {"azure_storage_account_name": STORAGE_ACCOUNT_NAME, "azure_storage_access_key": AZURE_STORAGE_ACCESS_KEY} 

dt = DeltaTable(delta_table_path, storage_options=storage_options) 

df = dt.to_pandas()


In [3]:
df

Unnamed: 0,turbineId,timestamp,wind_speed,wind_direction,rotor_rpm,active_power,generator_temp,gearbox_temp,pitch_angle
0,T003,2025-06-03T09:34:49.030Z,22.93 ft/s,255 degrees,0.21 rps,1264000 Watts,60.33 °C,64.78 °C,13.02 degrees
1,T006,2025-06-03T09:34:49.029Z,26.96 km/h,25 degrees,12.81 RPM,1.42 MW,146.91 °F,150.16 °F,18.24 degrees
2,T001,2025-06-03T09:34:49.040Z,8.43 m/s,318 degrees,15.34 RPM,1450 kW,59.35 °C,65.01 °C,14.55 degrees
3,T002,2025-06-03T09:34:49.035Z,28.62 km/h,5.34 rad,16.49 RPM,1.48 MW,336.91 Kelvin,340.99 Kelvin,16.53 radians
4,T005,2025-06-03T09:34:49.031Z,9.7 m/s,2257.78 mil,11.59 RPM,1326 kWh,57.3 °C,61.82 °C,229.98 mil
...,...,...,...,...,...,...,...,...,...
3385,T005,2025-06-03T07:55:18.507Z,6.38 m/s,4942.22 mil,14.09 RPM,1362 kWh,59.67 °C,65.95 °C,310.75 mil
3386,T004,2025-06-03T07:55:18.548Z,14.7 knots,205 degrees,13.11 RPM,1397 kW,62.79 °C,337.38 °C,13.72 degrees
3387,T003,2025-06-03T07:55:18.493Z,20.54 ft/s,244 degrees,0.2 rps,1253000 Watts,57.7 °C,62.93 °C,15 degrees
3388,T002,2025-06-03T07:55:18.510Z,22.21 km/h,2.57 rad,11.39 RPM,1.41 MW,335.73 Kelvin,335.74 Kelvin,16.24 radians


In [5]:
print(dt.files())

['part-00001-106cb19f-7ef0-4f8e-984b-2af56daf4cc4-c000.snappy.parquet', 'part-00001-7dc9baa7-9257-4d40-95b2-c85b09525a72-c000.snappy.parquet', 'part-00001-61c5fa12-1616-4eb0-9d2b-9b41cb4a9ef4-c000.snappy.parquet', 'part-00001-18be6ad2-daff-40a6-9faa-0b1e0c1f44d0-c000.snappy.parquet', 'part-00001-2afb4acf-4efa-40ba-a17f-93638174c04c-c000.snappy.parquet', 'part-00001-8a3afe72-60ea-4c14-9ae2-1902d66b4c52-c000.snappy.parquet', 'part-00001-3dfb762d-8d30-498f-861f-7e1432cf576c-c000.snappy.parquet', 'part-00001-0c6b061c-aa18-4019-899b-135747757b2d-c000.snappy.parquet', 'part-00001-cb369fa6-18b8-4f18-9f31-acec6895b538-c000.snappy.parquet', 'part-00001-2601c2dc-b4d3-4563-be49-ce134c0d27a5-c000.snappy.parquet', 'part-00001-91e2c286-a0aa-4c2c-aebd-4818f50934c6-c000.snappy.parquet', 'part-00001-f1c7e0e2-0626-4e53-bf40-3decb568541e-c000.snappy.parquet', 'part-00001-57bd280c-1333-4f51-b8ee-09a26154e448-c000.snappy.parquet', 'part-00001-c42a2fe9-5caf-42cb-a92d-ebd8e04b8307-c000.snappy.parquet', 'part

In [6]:
import numpy as np
import pandas as pd

df['timestamp'] = pd.to_datetime(df['timestamp'])

def convert_wind_speed(val):
    if pd.isna(val): return np.nan
    val = val.lower()
    if "knots" in val:
        return float(val.replace(" knots", "")) * 0.514444
    elif "km/h" in val:
        return float(val.replace(" km/h", "")) / 3.6
    elif "m/s" in val:
        return float(val.replace(" m/s", ""))
    elif "ft/s" in val:
        return float(val.replace(" ft/s", "")) * 0.3048
    else:
        return np.nan

def convert_direction(val):
    if pd.isna(val): return np.nan
    val = val.lower()
    if "degrees" in val:
        return float(val.replace(" degrees", ""))
    elif "rad" in val:
        return float(val.replace(" rad", "")) * (180 / np.pi)
    elif "mil" in val:
        return float(val.replace(" mil", "")) * 0.05625
    else:
        return np.nan

def convert_rpm(val):
    if pd.isna(val): return np.nan
    val = val.strip().lower()
    try:
        if "rpm" in val:
            return float(val.replace("rpm", "").strip())
        elif "rps" in val:
            return float(val.replace("rps", "").strip()) * 60
        else:
            return float(val)
    except:
        return np.nan

def convert_power(val):
    if pd.isna(val): return np.nan
    val = val.lower()

    if "mw" in val:
        return float(val.replace(" mw", "")) * 1000
    elif "kwh" in val:
        return float(val.replace(" kwh", ""))
    elif "kw" in val:
        return float(val.replace(" kw", ""))
    elif "watts" in val:
        return float(val.replace(" watts", "")) / 1000
    else:
        return np.nan


def convert_temp(val):
    if pd.isna(val): return np.nan
    val = val.lower()
    if "°c" in val:
        return float(val.replace(" °c", ""))
    elif "°f" in val:
        f = float(val.replace(" °f", ""))
        return (f - 32) * 5/9
    elif "kelvin" in val:
        return float(val.replace(" kelvin", "")) - 273.15
    else:
        return np.nan

def convert_angle(val):
    if pd.isna(val): 
        return np.nan
    val = val.lower().strip()
    if "degrees" in val:
        angle = float(val.replace(" degrees", ""))
    elif "radians" in val:
        angle = float(val.replace(" radians", "")) * (180 / np.pi)
    elif "mil" in val:
        angle = float(val.replace(" mil", "")) * 0.05625
    else:
        return np.nan

    return angle % 360



df['wind_speed_mps'] = df['wind_speed'].apply(convert_wind_speed)
df['wind_direction_deg'] = df['wind_direction'].apply(convert_direction)
df['rotor_rpm_s'] = df['rotor_rpm'].apply(convert_rpm)
df['active_power_kw'] = df['active_power'].apply(convert_power)
df['generator_temp_c'] = df['generator_temp'].apply(convert_temp)
df['gearbox_temp_c'] = df['gearbox_temp'].apply(convert_temp)
df['pitch_angle_deg'] = df['pitch_angle'].apply(convert_angle)

df_converted = df[[
    'turbineId', 'timestamp',
    'wind_speed_mps', 'wind_direction_deg',
    'rotor_rpm_s', 'active_power_kw',
    'generator_temp_c', 'gearbox_temp_c',
    'pitch_angle_deg'
]]

df_converted


Unnamed: 0,turbineId,timestamp,wind_speed_mps,wind_direction_deg,rotor_rpm_s,active_power_kw,generator_temp_c,gearbox_temp_c,pitch_angle_deg
0,T003,2025-06-03 09:34:49.030000+00:00,6.989064,255.000000,12.60,1264.0,60.330000,64.780000,13.020000
1,T006,2025-06-03 09:34:49.029000+00:00,7.488889,25.000000,12.81,1420.0,63.838889,65.644444,18.240000
2,T001,2025-06-03 09:34:49.040000+00:00,8.430000,318.000000,15.34,1450.0,59.350000,65.010000,14.550000
3,T002,2025-06-03 09:34:49.035000+00:00,7.950000,305.959463,16.49,1480.0,63.760000,67.840000,227.099235
4,T005,2025-06-03 09:34:49.031000+00:00,9.700000,127.000125,11.59,1326.0,57.300000,61.820000,12.936375
...,...,...,...,...,...,...,...,...,...
3385,T005,2025-06-03 07:55:18.507000+00:00,6.380000,277.999875,14.09,1362.0,59.670000,65.950000,17.479688
3386,T004,2025-06-03 07:55:18.548000+00:00,7.562327,205.000000,13.11,1397.0,62.790000,337.380000,13.720000
3387,T003,2025-06-03 07:55:18.493000+00:00,6.260592,244.000000,12.00,1253.0,57.700000,62.930000,15.000000
3388,T002,2025-06-03 07:55:18.510000+00:00,6.169444,147.250153,11.39,1410.0,62.580000,62.590000,210.483459


In [7]:
import os
from dotenv import load_dotenv
from daft import from_pydict
from deltalake.writer import write_deltalake

load_dotenv()

SAS_TOKEN = os.getenv("AZURE_STORAGE_SAS_TOKEN")
STORAGE_ACCOUNT_NAME = os.getenv("AZURE_STORAGE_ACCOUNT_NAME")
CONTAINER_NAME = "test-container"
DELTA_OUTPUT_PATH = f"abfss://{CONTAINER_NAME}@{STORAGE_ACCOUNT_NAME}.dfs.core.windows.net/data"


In [9]:
df_one = df_converted[df_converted["turbineId"] == "T001"].copy()
df_one

Unnamed: 0,turbineId,timestamp,wind_speed_mps,wind_direction_deg,rotor_rpm_s,active_power_kw,generator_temp_c,gearbox_temp_c,pitch_angle_deg
2,T001,2025-06-03 09:34:49.040000+00:00,8.43,318.0,15.34,1450.0,59.35,65.01,14.55
8,T001,2025-06-03 09:34:59.041000+00:00,6.68,253.0,15.82,1456.0,59.21,67.34,13.98
17,T001,2025-06-03 09:35:09.041000+00:00,8.25,253.0,13.95,1403.0,59.91,64.11,15.90
21,T001,2025-06-03 09:35:19.042000+00:00,8.55,238.0,13.93,1388.0,62.56,66.54,13.18
26,T001,2025-06-03 09:34:19.040000+00:00,9.64,217.0,15.95,1451.0,61.75,67.72,14.45
...,...,...,...,...,...,...,...,...,...
3364,T001,2025-06-03 07:54:38.523000+00:00,9.66,323.0,12.40,1371.0,59.56,61.06,10.36
3369,T001,2025-06-03 07:54:48.523000+00:00,6.19,88.0,17.31,1490.0,63.42,70.28,16.03
3375,T001,2025-06-03 07:54:58.523000+00:00,9.25,277.0,16.77,1440.0,62.30,65.87,15.69
3381,T001,2025-06-03 07:55:08.523000+00:00,8.64,238.0,15.91,1426.0,63.09,67.09,14.50


In [10]:
df_one["interval_start"] = df_one["timestamp"].dt.floor("10min")
df_one

Unnamed: 0,turbineId,timestamp,wind_speed_mps,wind_direction_deg,rotor_rpm_s,active_power_kw,generator_temp_c,gearbox_temp_c,pitch_angle_deg,interval_start
2,T001,2025-06-03 09:34:49.040000+00:00,8.43,318.0,15.34,1450.0,59.35,65.01,14.55,2025-06-03 09:30:00+00:00
8,T001,2025-06-03 09:34:59.041000+00:00,6.68,253.0,15.82,1456.0,59.21,67.34,13.98,2025-06-03 09:30:00+00:00
17,T001,2025-06-03 09:35:09.041000+00:00,8.25,253.0,13.95,1403.0,59.91,64.11,15.90,2025-06-03 09:30:00+00:00
21,T001,2025-06-03 09:35:19.042000+00:00,8.55,238.0,13.93,1388.0,62.56,66.54,13.18,2025-06-03 09:30:00+00:00
26,T001,2025-06-03 09:34:19.040000+00:00,9.64,217.0,15.95,1451.0,61.75,67.72,14.45,2025-06-03 09:30:00+00:00
...,...,...,...,...,...,...,...,...,...,...
3364,T001,2025-06-03 07:54:38.523000+00:00,9.66,323.0,12.40,1371.0,59.56,61.06,10.36,2025-06-03 07:50:00+00:00
3369,T001,2025-06-03 07:54:48.523000+00:00,6.19,88.0,17.31,1490.0,63.42,70.28,16.03,2025-06-03 07:50:00+00:00
3375,T001,2025-06-03 07:54:58.523000+00:00,9.25,277.0,16.77,1440.0,62.30,65.87,15.69,2025-06-03 07:50:00+00:00
3381,T001,2025-06-03 07:55:08.523000+00:00,8.64,238.0,15.91,1426.0,63.09,67.09,14.50,2025-06-03 07:50:00+00:00


In [11]:
melted = df_one.melt(
    id_vars=["turbineId", "interval_start"],
    value_vars=[
        "wind_speed_mps", "wind_direction_deg", "rotor_rpm_s",
        "active_power_kw", "generator_temp_c", "gearbox_temp_c", "pitch_angle_deg"
    ],
    var_name="signal_name",
    value_name="value"
)

In [12]:
aggregated = melted.groupby(["turbineId", "interval_start", "signal_name"]).agg(
    avg_value=("value", "mean"),
    min_value=("value", "min"),
    max_value=("value", "max"),
    std_value=("value", "std")
).reset_index()

In [18]:
daft_table = from_pydict(aggregated.to_dict(orient="list"))
daft_table
df_pandas = daft_table.to_pandas()

In [19]:
DELTA_OUTPUT_PATH = f"abfss://{CONTAINER_NAME}@{STORAGE_ACCOUNT_NAME}.dfs.core.windows.net/delta/scada/10_mins/"
write_deltalake(
                        table_or_uri=DELTA_OUTPUT_PATH,
                        data=df_pandas,
                        mode="append",
                        storage_options={
                            "account_name": STORAGE_ACCOUNT_NAME,
                            "sas_token": SAS_TOKEN
                        }
                    )
print("✅ Агреговані 10-хв. дані записано у delta/scada/10_mins/")

✅ Агреговані 10-хв. дані записано у delta/scada/10_mins/


In [20]:
df_pandas

Unnamed: 0,turbineId,interval_start,signal_name,avg_value,min_value,max_value,std_value
0,T001,2025-06-03 07:50:00+00:00,active_power_kw,1420.166667,1357.00,1490.00,42.101069
1,T001,2025-06-03 07:50:00+00:00,gearbox_temp_c,65.118889,60.99,70.28,2.645976
2,T001,2025-06-03 07:50:00+00:00,generator_temp_c,61.305556,58.64,63.64,1.665237
3,T001,2025-06-03 07:50:00+00:00,pitch_angle_deg,13.148333,8.97,16.35,2.480769
4,T001,2025-06-03 07:50:00+00:00,rotor_rpm_s,14.853889,12.40,17.93,1.992185
...,...,...,...,...,...,...,...
72,T001,2025-06-03 09:30:00+00:00,generator_temp_c,60.924062,58.92,63.43,1.291126
73,T001,2025-06-03 09:30:00+00:00,pitch_angle_deg,13.510625,10.17,17.96,2.352875
74,T001,2025-06-03 09:30:00+00:00,rotor_rpm_s,14.701250,12.07,17.87,1.808298
75,T001,2025-06-03 09:30:00+00:00,wind_direction_deg,152.718750,0.00,334.00,100.667205
