In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType
from pyspark.sql.functions import col

# Initialize Spark Session
spark = SparkSession.builder.appName("substruct_tables").getOrCreate()

# Connect to the database
spark.sql("USE arhasi.rapid")

# Load data from the 'metrics' table
df = spark.table("metrics")

# Function to extract sub-structs and fields from a struct field
def extract_substructs(schema, parent_name=""):
    substructs = {}
    for field in schema.fields:
        full_name = f"{parent_name}.{field.name}" if parent_name else field.name
        if isinstance(field.dataType, StructType):
            substructs[full_name] = [f.name for f in field.dataType.fields]
        elif parent_name:
            substructs.setdefault(parent_name, []).append(field.name)
    return substructs

# Extract sub-structs from the 'metrics' column
metrics_schema = df.schema['metrics'].dataType
substructs = extract_substructs(metrics_schema, "metrics")

# Iteratively create and save tables for each sub-struct
for substruct, fields in substructs.items():
    select_expr = [col('id')] + [col(f'{substruct}.{field}').alias(field) for field in fields]
    substruct_df = df.select(select_expr)
    table_name = substruct.replace('.', '_')
    substruct_df.write.saveAsTable(table_name)