# **HDF5 to Delta Lake Data Conversion**
The raw data files provided by Bosch are in HDF5 (.h5) format, a hierarchical data format commonly used for storing large numerical datasets, especially in scientific and industrial applications. While powerful for structured, multi-dimensional data, HDF5 is increasingly considered outdated for use in modern distributed data systems — particularly due to its limited compatibility with big data tools and lack of built-in support for scalable query engines like Spark.

The procedures used here can be easily adapted for other semi-structured formats such as CSV or JSON. The main logic of reading, transforming, and storing structured data remains the same.

Our goal in this notebook is to:
- Convert each .h5 file into a row-oriented format suitable for distributed processing — in this case, Parquet files.
- Store these files as a Delta Lake table, which provides fast, reliable access to large-scale data using Apache Spark.
- Implement a partitioning strategy that enables efficient filtering and fast retrieval, based on the structure and metadata of the original files.

This notebook represents Phase 1 of the pipeline: ingestion and conversion from raw industrial data into a format optimized for analytics and machine learning.

⚠️ Note: Before running this notebook, make sure you have completed **Step 0.2** of the **Phase 0**. This involves downloading the raw data from the cnc_machines repository and placing it in the data/raw/ directory. The code in this notebook relies on the presence of these .h5 files in that location.

In [41]:
import os
import time
import h5py
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, DoubleType, StringType
from tqdm import tqdm

In [None]:
# Directory where raw data is stored
RAW_DIR = os.path.join("..", "data", "raw")
# Directory where Delta data is stored
DELTA_DIR = os.path.join("..", "data", "delta")

In [30]:
# File for logging any failed conversions from hdf5 to Delta
FAILED_LOG = os.path.join("..", "data", "failed_files.txt")

# Clean failed log if it exists
open(FAILED_LOG, "w").close()

In [None]:
# Number of workers for writing files to Delta
MAX_WORKERS = 4  # You can increase this based on system resources

In [31]:
# Check for hd5 files - fail if none found
h5_files = [
    os.path.join(root, f)
    for root, _, files in os.walk(RAW_DIR)
    for f in files if f.endswith(".h5")
]

assert len(h5_files) > 0, "⚠️ No .h5 files found in the raw data directory. Did you complete step 0.2?"
print(f"✅ Found {len(h5_files)} .h5 files in the raw data directory.")

✅ Found 1702 .h5 files in the raw data directory.


## 📊 **Exploring Raw File Structure to Guide Delta Lake Partitioning**
In this section, we collect metadata from each HDF5 file — including dataset keys, shapes, and types — to understand the structure and volume of data across files. This exploration helps us make an informed decision about how to partition the Delta Lake table. Partitioning is a way of organizing data by one or more key columns (e.g., machine ID, operation, or date) to improve query performance by enabling Spark to read only the relevant subsets of data. However, deep or overly granular partitioning (such as by exact timestamp or unique file ID) can backfire, creating many tiny files and directories that slow down query planning and metadata operations. Our goal is to find the right balance: enough partitioning to enable fast filtering, but not so much that it fragments the dataset.

In [32]:
# Function to extract metadata from HDF5 files and save to DataFrame
def collect_hdf5_metadata(base_dir):
    data = []

    for root, _, files in os.walk(base_dir):
        for file in files:
            if file.endswith(".h5"):
                file_path = os.path.join(root, file)
                record = {
                    "file_path": file_path,
                    "num_rows": None,
                    "num_columns": None,
                    "column_names": []
                }

                try:
                    with h5py.File(file_path, 'r') as f:
                        for key in f.keys():
                            dataset = f[key]
                            if len(dataset.shape) == 2:  # Ensure it's a 2D dataset
                                record["num_rows"] = dataset.shape[0]
                                record["num_columns"] = dataset.shape[1]
                                record["column_names"] = list(dataset.attrs.get("column_names", []))
                                break  # Process only the first 2D dataset
                except Exception as e:
                    record["error"] = str(e)

                data.append(record)

    return pd.DataFrame(data)

In [33]:
# Run and save result
df_files = collect_hdf5_metadata(RAW_DIR)

In [34]:
# View summary
df_files.head()

Unnamed: 0,file_path,num_rows,num_columns,column_names
0,../data/raw/M01/OP03/bad/M01_Aug_2019_OP03_000.h5,139653,3,[]
1,../data/raw/M01/OP03/good/M01_Feb_2020_OP03_00...,179200,3,[]
2,../data/raw/M01/OP03/good/M01_Aug_2019_OP03_00...,178176,3,[]
3,../data/raw/M01/OP03/good/M01_Feb_2019_OP03_00...,156000,3,[]
4,../data/raw/M01/OP03/good/M01_Feb_2020_OP03_00...,178176,3,[]


In [35]:
print("Mean number of rows and columns:")
print(df_files[['num_rows', 'num_columns']].mean())
print("Number of files:")
print(df_files['file_path'].nunique())


Mean number of rows and columns:
num_rows       104936.098707
num_columns         3.000000
dtype: float64
Number of files:
1702


## 📌 **Data Structure and Partitioning Strategy**
Each HDF5 file contains time-series sensor data with three columns: "x", "y", and "z", representing vibration signals along three axes. This structure is described in the original `CNC_machining` GitHub repository.

To efficiently store and query this data using Delta Lake, we will apply partitioning — a strategy that organizes the dataset into subfolders based on selected column values. Partitioning allows Spark to read only the relevant slices of data when filtering, significantly improving performance on large datasets.

Based on the file structure and metadata, we will partition the Delta table by:

- `machine_id` (e.g., M01, M02)
- `operation` (e.g., OP00, OP01)
- `label` (e.g., good, bad)

This strikes a balance between fast filtering and avoiding excessive small files or deeply nested directories.

In [36]:
# Init Spark
spark = SparkSession.builder \
    .appName("Delta Lake") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

In [37]:
# Define a schema for the spark dataframe to ensure consistency
# across all files
schema = StructType([
    StructField("x", DoubleType(), True),
    StructField("y", DoubleType(), True),
    StructField("z", DoubleType(), True),
    StructField("machine_id", StringType(), True),
    StructField("month", StringType(), True),
    StructField("year", StringType(), True),
    StructField("operation", StringType(), True),
    StructField("example_no", StringType(), True),
    StructField("label", StringType(), True),
])


In [38]:
# Function to parse information from the file names
def parse_filename(filename):
    base = filename.replace(".h5", "")
    parts = base.split("_")
    return {
        "machine_id": parts[0],
        "month": parts[1],
        "year": parts[2],
        "operation": parts[3],
        "example_no": parts[4]
    }

In [39]:
# Stream each file
for root, _, files in os.walk(RAW_DIR):
    for f in files:
        if f.endswith(".h5"):
            file_path = os.path.join(root, f)
            start_time = time.time()
            try:
                with h5py.File(file_path, 'r') as h5f:
                    if "vibration_data" not in h5f:
                        continue
                    data = h5f["vibration_data"][()]
                    df_pd = pd.DataFrame(data, columns=["x", "y", "z"])
                    df_pd[["x", "y", "z"]] = df_pd[["x", "y", "z"]].astype("float64")

                    meta = parse_filename(f)
                    label = os.path.basename(os.path.dirname(file_path))
                    for k, v in meta.items():
                        df_pd[k] = v
                    df_pd["label"] = label

                    # Convert to Spark and write
                    df_spark = spark.createDataFrame(df_pd, schema=schema)

                    df_spark.write.format("delta") \
                        .partitionBy("machine_id", "operation", "label") \
                        .mode("append") \
                        .save(DELTA_DIR)

                    duration = round(time.time() - start_time, 2)
                    print(f"✅ Saved: {file_path} ({duration}s)")

            except Exception as e:
                duration = round(time.time() - start_time, 2)
                print(f"⚠️ Error reading {file_path} after {duration}s: {e}")
                with open(FAILED_LOG, "a") as log:
                    log.write(f"{file_path} | {str(e)}\n")

✅ Saved: ../data/raw/M01/OP03/bad/M01_Aug_2019_OP03_000.h5 (11.12s)
✅ Saved: ../data/raw/M01/OP03/good/M01_Feb_2020_OP03_000.h5 (7.58s)
✅ Saved: ../data/raw/M01/OP03/good/M01_Aug_2019_OP03_008.h5 (8.49s)
✅ Saved: ../data/raw/M01/OP03/good/M01_Feb_2019_OP03_002.h5 (7.89s)
✅ Saved: ../data/raw/M01/OP03/good/M01_Feb_2020_OP03_001.h5 (8.26s)
✅ Saved: ../data/raw/M01/OP03/good/M01_Aug_2019_OP03_009.h5 (8.1s)
✅ Saved: ../data/raw/M01/OP03/good/M01_Feb_2021_OP03_003.h5 (12.14s)
✅ Saved: ../data/raw/M01/OP03/good/M01_Aug_2019_OP03_006.h5 (9.9s)
✅ Saved: ../data/raw/M01/OP03/good/M01_Aug_2019_OP03_012.h5 (7.51s)
✅ Saved: ../data/raw/M01/OP03/good/M01_Feb_2021_OP03_007.h5 (7.67s)
✅ Saved: ../data/raw/M01/OP03/good/M01_Aug_2019_OP03_002.h5 (8.12s)
✅ Saved: ../data/raw/M01/OP03/good/M01_Aug_2019_OP03_013.h5 (7.41s)
✅ Saved: ../data/raw/M01/OP03/good/M01_Feb_2021_OP03_006.h5 (8.44s)
✅ Saved: ../data/raw/M01/OP03/good/M01_Aug_2019_OP03_003.h5 (9.51s)
✅ Saved: ../data/raw/M01/OP03/good/M01_Feb_2021_O

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/opt/conda/lib/python3.11/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

In [40]:
# Stop the Spark session
spark.stop()