=============================================================================
# PART 1: SETUP
=============================================================================

In [7]:
import pandas as pd
from pyspark.sql.functions import col
from functools import reduce
import collections

print("--- Part 1: Initializing paths and variables ---")
data_lake_path = "abfss://beehive-container@dsaidatalakestorage.dfs.core.windows.net" 
source_path = f"{data_lake_path}/gold/processing"
sink_path = f"{data_lake_path}/gold/quality"

# The data types we expect to find and process
target_datatypes = ['flow', 'humidity', 'temperature', 'weight', 'weather']

StatementMeta(sparkpoolDev, 43, 8, Finished, Available, Finished)

--- Part 1: Initializing paths and variables ---


=============================================================================
# PART 2: LOAD LATEST SOURCE FILES FROM THE FLAT DIRECTORY
=============================================================================

In [8]:
try:
    # 1. List all files in the single source directory
    all_files_in_folder = mssparkutils.fs.ls(source_path)

    # 2. Group all found file paths by their data type
    file_groups = collections.defaultdict(list)
    for file_info in all_files_in_folder:
        if file_info.name.endswith('.parquet'):
            # Extract type from name like 'schwartau_weather__TIMESTAMP.parquet'
            try:
                dtype = file_info.name.split('_')[1]
                if dtype in target_datatypes:
                    file_groups[dtype].append(file_info.path)
            except IndexError:
                print(f"Skipping file with unexpected name format: {file_info.name}")

    # 3. Find the single latest file from each group
    latest_files_to_read = {}
    for dtype, path_list in file_groups.items():
        if path_list:
            # max() works because the timestamp makes the filenames sort chronologically
            latest_files_to_read[dtype] = max(path_list)

    # 4. Read the latest files into a dictionary of pandas DataFrames
    dataframes = {}
    for dtype, path in latest_files_to_read.items():
        print(f"Reading latest '{dtype}' file: ...{path.split('/')[-1]}")
        spark_df = spark.read.parquet(path)
        dataframes[dtype] = spark_df.toPandas()

except Exception as e:
    print(f"An error occurred during file loading: {e}")
    raise

StatementMeta(sparkpoolDev, 43, 9, Finished, Available, Finished)

Reading latest 'flow' file: ...schwartau_flow__2025-08-20T10:45:03.885724Z.parquet
Reading latest 'humidity' file: ...schwartau_humidity__2025-08-20T10:45:03.885724Z.parquet
Reading latest 'temperature' file: ...schwartau_temperature__2025-08-20T10:45:03.885724Z.parquet
Reading latest 'weather' file: ...schwartau_weather__2025-08-20T10:46:25.819878Z.parquet
Reading latest 'weight' file: ...schwartau_weight__2025-08-20T10:45:03.885724Z.parquet


=============================================================================
# PART 3: MERGE AND TRANSFORM DATA
=============================================================================

In [9]:
if len(dataframes) > 0:
    # Proactively rename columns to avoid merge conflicts
    if 'temperature' in dataframes and 'temperature' in dataframes['temperature'].columns:
        dataframes['temperature'].rename(columns={'temperature': 'temperature_hive'}, inplace=True)
    if 'humidity' in dataframes and 'humidity' in dataframes['humidity'].columns:
        dataframes['humidity'].rename(columns={'humidity': 'humidity_hive'}, inplace=True)

    df_list = list(dataframes.values())
    
    # Sequentially merge all dataframes on the 'timestamp' column
    gold_df_schwartau = reduce(lambda left, right: pd.merge(left, right, on='timestamp', how='outer'), df_list)
    gold_df_schwartau['location'] = 'schwartau'
    gold_df_schwartau = gold_df_schwartau.sort_values(by="timestamp").reset_index(drop=True)

    print("Data successfully merged.")
    display(gold_df_schwartau.head())
else:
    print("No dataframes were loaded, aborting script.")
    mssparkutils.notebook.exit("No dataframes were loaded, aborting script.")

StatementMeta(sparkpoolDev, 43, 10, Finished, Available, Finished)

Data successfully merged.


SynapseWidget(Synapse.DataFrame, 4a97689b-07a9-4f10-8581-59602a58ad5c)

=============================================================================
# PART 4: SAVE THE FINAL GOLD DATAFRAME
=============================================================================

In [10]:
# The folder will be created automatically by the .write command, so mkdirs is not needed.
timestamp_str = spark._jvm.java.time.Instant.now().toString().replace(":", "-") # Replace colons for compatibility

# Convert final pandas DataFrame back to a Spark DataFrame for saving
spark_gold_df = spark.createDataFrame(gold_df_schwartau)

# Define the final output path and filename
output_path = f'{sink_path}/schwartau_gold__{timestamp_str}.parquet'

print(f"Writing to: {output_path}")
spark_gold_df.write.parquet(output_path, mode='overwrite')

print("\nAll files saved successfully.")

StatementMeta(sparkpoolDev, 43, 11, Finished, Available, Finished)

Writing to: abfss://beehive-container@dsaidatalakestorage.dfs.core.windows.net/gold/quality/schwartau_gold__2025-08-20T11-18-06.187537Z.parquet

All files saved successfully.


In [11]:
mssparkutils.session.stop()

StatementMeta(sparkpoolDev, 43, 12, Finished, Available, Finished)