In [7]:
import fsspec
import pandas as pd

root_dir = "abfss://beehivestorage@beehiveprojectlake.dfs.core.windows.net/"
source = root_dir+"gold/processing/"

# orgainize silver data by location
silver_dfs = {
    "schwartau": pd.DataFrame(),
    "wurzburg": pd.DataFrame()
}
# empty list to hold merged DataFrames
gold_dfs = []

# list files in source
fs = fsspec.filesystem("abfs")
file_list_longform = fs.ls(source)
file_list = [file.rsplit("/", maxsplit=1)[1] for file in file_list_longform]

for file in file_list:
    # skip "hidden" files
    if file.startswith("."):
        continue
    
    print(f"processing {file}")
    df = pd.read_parquet(source+file)
    location = file.split("_")[0]

    # temperature and humidity come from hive data and weather station: clarify source
    if df.shape[1] == 2:
        if "temperature" in df.columns:
            df.columns = ["timestamp", "temperature_hive"]
        elif "humidity" in df.columns:
            df.columns = ["timestamp", "humidity_hive"]

    if silver_dfs[location].empty:
        silver_dfs[location] = df.copy()
    else:
        silver_dfs[location] = silver_dfs[location].merge(df, on="timestamp", how="outer")

for loc in silver_dfs:
    # skip locations with no DataFrames
    if silver_dfs[loc].empty:
        continue
    # add location to DataFrames
    silver_dfs[loc]["location"] = loc
    silver_dfs[loc]["location"] = silver_dfs[loc]["location"].astype("category")
    # sort DataFrame by timestamps
    silver_dfs[loc] = silver_dfs[loc].sort_values(by="timestamp")
    
# Define the desired final column order
final_columns = [
    'timestamp', 'location', 'flow_out', 'flow_in',
    'temperature_hive', 'humidity_hive', 'weight', 'precipitation', 'pressure_msl',
    'sunshine', 'temperature', 'wind_direction', 'wind_speed',
    'cloud_cover', 'dew_point', 'relative_humidity', 'wind_gust_direction',
    'wind_gust_speed', 'solar', 'precipitation_source_distance',
    'pressure_msl_source_distance', 'sunshine_source_distance',
    'temperature_source_distance', 'wind_direction_source_distance',
    'wind_speed_source_distance', 'cloud_cover_source_distance',
    'dew_point_source_distance', 'relative_humidity_source_distance',
    'visibility_source_distance', 'wind_gust_direction_source_distance',
    'wind_gust_speed_source_distance', 'solar_source_distance'
]

gold = (
    # combine silver DataFrames
    pd.concat((silver_dfs["schwartau"], silver_dfs["wurzburg"]), ignore_index=True)
    # Use reindex to conform the DataFrame to the final schema
    .reindex(columns=final_columns)
)

gold

StatementMeta(CLSYNB, 34, 6, Finished, Available, Finished)

processing schwartau__2017-01-01-2019-05-31.parquet
processing schwartau__2025-08-20T06h53m32s.parquet
processing schwartau__2025-08-20T06h53m33s.parquet
processing schwartau__2025-08-20T06h53m34s.parquet


Unnamed: 0,timestamp,location,flow_out,flow_in,temperature_hive,humidity_hive,weight,precipitation,pressure_msl,sunshine,...,temperature_source_distance,wind_direction_source_distance,wind_speed_source_distance,cloud_cover_source_distance,dew_point_source_distance,relative_humidity_source_distance,visibility_source_distance,wind_gust_direction_source_distance,wind_gust_speed_source_distance,solar_source_distance
0,2017-01-01 12:00:00+00:00,schwartau,,,,98.04,50736.79,0.0,1013.8,0.0,...,13018.0,13018.0,13018.0,13018.0,13018.0,13018.0,13018.0,13018.0,13018.0,13018.0
1,2017-01-01 13:00:00+00:00,schwartau,,,,,,0.0,1012.0,0.0,...,22269.0,22269.0,22269.0,22269.0,22269.0,22269.0,22269.0,22269.0,22269.0,2122.0
2,2017-01-01 13:15:00+00:00,schwartau,0.0,0.0,,,,,,,...,,,,,,,,,,
3,2017-01-01 13:16:00+00:00,schwartau,0.0,0.0,,,,,,,...,,,,,,,,,,
4,2017-01-01 13:17:00+00:00,schwartau,0.0,0.0,,,,,,,...,,,,,,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1267138,2019-05-31 12:11:00+00:00,schwartau,-241.0,128.0,,,,,,,...,,,,,,,,,,
1267139,2019-05-31 12:12:00+00:00,schwartau,-226.0,140.0,,,,,,,...,,,,,,,,,,
1267140,2019-05-31 12:13:00+00:00,schwartau,-208.0,144.0,,,,,,,...,,,,,,,,,,
1267141,2019-05-31 12:14:00+00:00,schwartau,-217.0,146.0,,,,,,,...,,,,,,,,,,


In [8]:
gold.info()

StatementMeta(CLSYNB, 34, 7, Finished, Available, Finished)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1267143 entries, 0 to 1267142
Data columns (total 32 columns):
 #   Column                               Non-Null Count    Dtype              
---  ------                               --------------    -----              
 0   timestamp                            1267143 non-null  datetime64[ns, UTC]
 1   location                             1267143 non-null  category           
 2   flow_out                             1256918 non-null  float64            
 3   flow_in                              1256901 non-null  float64            
 4   temperature_hive                     0 non-null        float64            
 5   humidity_hive                        1749 non-null     float64            
 6   weight                               1749 non-null     float64            
 7   precipitation                        21121 non-null    float64            
 8   pressure_msl                         16385 non-null    float64            
 9   su

In [9]:
# write to file
sink = root_dir+"gold/"

write_name = sink+f"hivedata__{pd.Timestamp.now().strftime('%Y-%m-%dT%Hh%Mm%Ss')}.parquet"
gold.to_parquet(write_name, index=False)

StatementMeta(CLSYNB, 34, 8, Finished, Available, Finished)

In [10]:
mssparkutils.session.stop()

StatementMeta(CLSYNB, 34, 9, Finished, Available, Finished)