In [1]:
!pip install pyspark



In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, expr, explode, arrays_zip, lit
from pyspark.sql.types import StringType, ArrayType

# Iniciar a sessão
spark = SparkSession.builder.appName("DynamicColumnExtraction").getOrCreate()

data = [
    ('["nome", "idade", "ativo"]', '["João", "30", "true"]'),
    ('["nome", "idade", "ativo"]', '["Maria", "25", "false"]')
]

df = spark.createDataFrame(data, ["metadata", "valores"])

df_parsed = (
    df.withColumn("metadata_array", from_json(col("metadata"), ArrayType(StringType())))
      .withColumn("valores_array", from_json(col("valores"), ArrayType(StringType())))
)



df_zipped = df_parsed.withColumn("zipped", arrays_zip("metadata_array", "valores_array"))



df_exploded = df_zipped.withColumn("kv", explode(col("zipped")))

df_mapped = (
    df_exploded.withColumn("key", col("kv")["metadata_array"])
                       .withColumn("value", col("kv")["valores_array"])
                       .groupBy("metadata", "valores")
                       .pivot("key").agg(expr("first(value)"))
)

df_mapped.show()

+--------------------+--------------------+-----+-----+-----+
|            metadata|             valores|ativo|idade| nome|
+--------------------+--------------------+-----+-----+-----+
|["nome", "idade",...|["João", "30", "t...| true|   30| João|
|["nome", "idade",...|["Maria", "25", "...|false|   25|Maria|
+--------------------+--------------------+-----+-----+-----+



In [3]:
from pyspark.sql.types import IntegerType, BooleanType

def infer_and_cast(df, columns):
    for col_name in columns:
        sample_value = df.select(col_name).filter(col(col_name).isNotNull()).first()
        if sample_value:
            value = sample_value[0]
            if value.lower() in ("true", "false"):
                df = df.withColumn(col_name, col(col_name).cast(BooleanType()))
            elif value.isdigit():
                df = df.withColumn(col_name, col(col_name).cast(IntegerType()))
    return df

# Aplicar inferência
columns_to_infer = [col for col in df_mapped.columns if col not in ("metadata", "valores")]
df_final = infer_and_cast(df_mapped, columns_to_infer)

df_final.show()
df_final.printSchema()


+--------------------+--------------------+-----+-----+-----+
|            metadata|             valores|ativo|idade| nome|
+--------------------+--------------------+-----+-----+-----+
|["nome", "idade",...|["João", "30", "t...| true|   30| João|
|["nome", "idade",...|["Maria", "25", "...|false|   25|Maria|
+--------------------+--------------------+-----+-----+-----+

root
 |-- metadata: string (nullable = true)
 |-- valores: string (nullable = true)
 |-- ativo: boolean (nullable = true)
 |-- idade: integer (nullable = true)
 |-- nome: string (nullable = true)



In [16]:
raw_objects = [
    {
        "metadata": ["pessoa.nome", "pessoa.idade", "pessoa.ativo", "pessoa.salario", "empresa.nome"],
        "valores": ["João", "30", "true", "1500.75", "ACME Ltda"]
    },
    {
        "metadata": ["pessoa.nome", "pessoa.idade", "pessoa.ativo", "pessoa.salario", "empresa.nome"],
        "valores": ["Maria", "25", "false", "2000.00", "Beta Corp"]
    }
]

raw_objects

[{'metadata': ['pessoa.nome',
   'pessoa.idade',
   'pessoa.ativo',
   'pessoa.salario',
   'empresa.nome'],
  'valores': ['João', '30', 'true', '1500.75', 'ACME Ltda']},
 {'metadata': ['pessoa.nome',
   'pessoa.idade',
   'pessoa.ativo',
   'pessoa.salario',
   'empresa.nome'],
  'valores': ['Maria', '25', 'false', '2000.00', 'Beta Corp']}]

# Versão Um

In [15]:
# ✅ 1. SETUP
!pip install -q pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

import json
import re
from collections import defaultdict

spark = SparkSession.builder.master("local[*]").appName("NestedStruct").getOrCreate()

# ✅ 2. DADOS DE ENTRADA (JSON SERIALIZADO EM STRING)
raw_objects = [
    {
        "metadata": ["pessoa.nome", "pessoa.idade", "pessoa.ativo", "pessoa.salario", "empresa.nome"],
        "valores": ["João", "30", "true", "1500.75", "ACME Ltda"]
    },
    {
        "metadata": ["pessoa.nome", "pessoa.idade", "pessoa.ativo", "pessoa.salario", "empresa.nome"],
        "valores": ["Maria", "25", "false", "2000.00", "Beta Corp"]
    }
]

# Serializar como strings JSON
data = [(json.dumps(obj),) for obj in raw_objects]

# Criar DataFrame com coluna única 'linha'
schema = StructType([StructField("linha", StringType(), True)])
df_raw = spark.createDataFrame(data, schema)

# ✅ 3. PARSEAR JSON
json_schema = StructType([
    StructField("metadata", ArrayType(StringType())),
    StructField("valores", ArrayType(StringType()))
])

df = df_raw.withColumn("parsed", from_json("linha", json_schema)) \
    .selectExpr("parsed.metadata", "parsed.valores") \
    .withColumn("zipped", arrays_zip("metadata", "valores")) \
    .withColumn("pair", explode("zipped")) \
    .withColumn("chave", col("pair.metadata")) \
    .withColumn("valor", col("pair.valores"))

# ✅ 4. ADICIONAR ID POR LINHA PARA RECONSTRUIR DEPOIS
df = df.withColumn("row_id", monotonically_increasing_id())

# ✅ 5. PIVOTAR: CHAVE COMO COLUNA (COM SUBSTITUIÇÃO DE . POR _)
df_pivoted = df.groupBy("row_id").pivot("chave").agg(first("valor"))

# Renomear colunas substituindo . por _
for old_name in df_pivoted.columns:
    if old_name != "row_id":
        new_name = old_name.replace('.', '_')
        df_pivoted = df_pivoted.withColumnRenamed(old_name, new_name)

# ✅ 6. INFERÊNCIA DE TIPO DINÂMICA
def infer_and_cast(df):
    for c in df.columns:
        if c == "row_id":
            continue

        samples = df.select(c).filter(col(c).isNotNull()).limit(10).collect()
        if not samples:
            continue

        types = set()
        for row in samples:
            value = row[0]
            if isinstance(value, str):
                if value.lower() in ("true", "false"):
                    types.add("boolean")
                elif re.match(r"^-?\d+$", value):
                    types.add("int")
                elif re.match(r"^-?\d+\.\d+$", value):
                    types.add("float")
                else:
                    types.add("string")
            else:
                types.add(type(value).__name__)

        if len(types) == 1:
            inferred = types.pop()
            if inferred == "boolean":
                df = df.withColumn(c, col(c).cast(BooleanType()))
            elif inferred == "int":
                df = df.withColumn(c, col(c).cast(IntegerType()))
            elif inferred == "float":
                df = df.withColumn(c, col(c).cast(FloatType()))
            else:
                df = df.withColumn(c, col(c).cast(StringType()))
        else:
            df = df.withColumn(c, col(c).cast(StringType()))
    return df

df_casted = infer_and_cast(df_pivoted)

# ✅ 7. RECONSTRUIR STRUCTS COM BASE EM PREFIXOS (AGORA USANDO _ EM VEZ DE .)
def group_by_prefix(columns):
    grouped = defaultdict(list)
    for c in columns:
        if '_' in c and c != "row_id":
            prefix, field = c.split('_', 1)
            grouped[prefix].append((c, field))
    return grouped

grouped = group_by_prefix(df_casted.columns)

for prefix, fields in grouped.items():
    struct_cols = [col(full).alias(sub) for full, sub in fields]
    df_casted = df_casted.withColumn(prefix, struct(*struct_cols))

# ✅ 8. REMOVER COLUNAS FLAT COM UNDERSCORES
flat_cols = [c for c in df_casted.columns if '_' not in c and c not in grouped]
final_cols = flat_cols + list(grouped.keys())
df_final = df_casted.select(final_cols)

# ✅ 9. RESULTADO
df_final.printSchema()
df_final.show(truncate=False)

root
 |-- empresa: struct (nullable = false)
 |    |-- nome: string (nullable = true)
 |-- pessoa: struct (nullable = false)
 |    |-- ativo: boolean (nullable = true)
 |    |-- idade: integer (nullable = true)
 |    |-- nome: string (nullable = true)
 |    |-- salario: float (nullable = true)

+-----------+---------------------------+
|empresa    |pessoa                     |
+-----------+---------------------------+
|{NULL}     |{NULL, NULL, João, NULL}   |
|{NULL}     |{NULL, 30, NULL, NULL}     |
|{NULL}     |{true, NULL, NULL, NULL}   |
|{NULL}     |{NULL, NULL, NULL, 1500.75}|
|{ACME Ltda}|{NULL, NULL, NULL, NULL}   |
|{NULL}     |{NULL, NULL, Maria, NULL}  |
|{NULL}     |{NULL, 25, NULL, NULL}     |
|{NULL}     |{false, NULL, NULL, NULL}  |
|{NULL}     |{NULL, NULL, NULL, 2000.0} |
|{Beta Corp}|{NULL, NULL, NULL, NULL}   |
+-----------+---------------------------+



# Versão Dois

In [18]:
from pyspark.sql import functions as F
from pyspark.sql.types import *
import json
from collections import defaultdict

# Dados de entrada
raw_objects = [
    {
        "metadata": ["pessoa.nome", "pessoa.idade", "pessoa.ativo", "pessoa.salario", "empresa.nome", "VL_LIST_DATA"],
        "valores": ["João", "30", "true", "1500.75", "ACME Ltda", "detalhes.cargo=Analista;detalhes.setor=TI;contatos.0.tipo=email;contatos.0.valor=joao@email.com"]
    },
    {
        "metadata": ["pessoa.nome", "pessoa.idade", "pessoa.ativo", "pessoa.salario", "empresa.nome", "VL_LIST_DATA"],
        "valores": ["Maria", "25", "false", "2000.00", "Beta Corp", "detalhes.cargo=Gerente;detalhes.setor=RH;contatos.0.tipo=telefone;contatos.0.valor=1199999999"]
    }
]

# 1. Criar DataFrame inicial
data = [(json.dumps(obj),) for obj in raw_objects]
df_raw = spark.createDataFrame(data, ["linha"])

# 2. Parsear JSON
json_schema = StructType([
    StructField("metadata", ArrayType(StringType())),
    StructField("valores", ArrayType(StringType()))
])

df = df_raw.withColumn("parsed", F.from_json("linha", json_schema)) \
           .select("parsed.*") \
           .withColumn("row_id", F.monotonically_increasing_id())

# 3. Explodir para pares chave-valor
df_exploded = df.withColumn("zipped", F.arrays_zip("metadata", "valores")) \
                .withColumn("pair", F.explode("zipped")) \
                .select(
                    "row_id",
                    F.col("pair.metadata").alias("chave"),
                    F.col("pair.valores").alias("valor")
                )

# 4. Separar dados normais e VL_LIST_DATA
df_main = df_exploded.filter(F.col("chave") != "VL_LIST_DATA")
df_list_data = df_exploded.filter(F.col("chave") == "VL_LIST_DATA")

# 5. Pivotar dados principais (substituindo . por _)
df_pivoted = df_main.withColumn("chave", F.regexp_replace("chave", "\\.", "_")) \
                   .groupBy("row_id").pivot("chave").agg(F.first("valor"))

# 6. Processar VL_LIST_DATA
def parse_list_data(value):
    result = {"detalhes": {}, "contatos": []}
    items = value.split(';')
    for item in items:
        if '=' in item:
            key, val = item.split('=', 1)
            parts = key.split('.')
            if parts[0] == "detalhes":
                result["detalhes"][parts[1]] = val
            elif parts[0] == "contatos":
                idx = int(parts[1])
                while len(result["contatos"]) <= idx:
                    result["contatos"].append({})
                result["contatos"][idx][parts[2]] = val
    return json.dumps(result)

parse_list_data_udf = F.udf(parse_list_data, StringType())

df_list_data_processed = df_list_data.withColumn(
    "parsed_list_data",
    parse_list_data_udf("valor")
).select("row_id", "parsed_list_data")

# 7. Juntar tudo
df_combined = df_pivoted.join(df_list_data_processed, "row_id")

# 8. Inferir tipos
def infer_type(value):
    if value.lower() in ("true", "false"):
        return BooleanType()
    elif re.match(r"^-?\d+$", value):
        return IntegerType()
    elif re.match(r"^-?\d+\.\d+$", value):
        return FloatType()
    return StringType()

for col_name in df_combined.columns:
    if col_name not in ["row_id", "parsed_list_data"]:
        sample = df_combined.select(col_name).filter(F.col(col_name).isNotNull()).first()
        if sample:
            dtype = infer_type(sample[0])
            df_combined = df_combined.withColumn(col_name, F.col(col_name).cast(dtype))

# 9. Criar estruturas aninhadas
def create_struct(df, prefix):
    cols = [c for c in df.columns if c.startswith(prefix + "_")]
    if not cols:
        return df

    fields = [c[len(prefix)+1:] for c in cols]
    struct_cols = [F.col(c).alias(f) for c, f in zip(cols, fields)]

    return df.withColumn(prefix, F.struct(*struct_cols)).drop(*cols)

df_final = create_struct(df_combined, "pessoa")
df_final = create_struct(df_final, "empresa")

# 10. Processar VL_LIST_DATA
list_data_schema = StructType([
    StructField("detalhes", StructType([
        StructField("cargo", StringType()),
        StructField("setor", StringType())
    ])),
    StructField("contatos", ArrayType(StructType([
        StructField("tipo", StringType()),
        StructField("valor", StringType())
    ])))
])

df_final = df_final.withColumn(
    "VL_LIST_DATA",
    F.from_json("parsed_list_data", list_data_schema)
).drop("parsed_list_data")

# Resultado final
df_final.printSchema()
df_final.show(truncate=False)

root
 |-- row_id: long (nullable = false)
 |-- pessoa: struct (nullable = false)
 |    |-- ativo: boolean (nullable = true)
 |    |-- idade: integer (nullable = true)
 |    |-- nome: string (nullable = true)
 |    |-- salario: float (nullable = true)
 |-- empresa: struct (nullable = false)
 |    |-- nome: string (nullable = true)
 |-- VL_LIST_DATA: struct (nullable = true)
 |    |-- detalhes: struct (nullable = true)
 |    |    |-- cargo: string (nullable = true)
 |    |    |-- setor: string (nullable = true)
 |    |-- contatos: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- tipo: string (nullable = true)
 |    |    |    |-- valor: string (nullable = true)

+----------+--------------------------+-----------+-------------------------------------------+
|row_id    |pessoa                    |empresa    |VL_LIST_DATA                               |
+----------+--------------------------+-----------+-----------------------------------------

In [21]:
from pyspark.sql import functions as F
from pyspark.sql.types import *
import json

# Função para extrair schema dinâmico de VL_LIST_API
def get_dynamic_schema(df):
    # Coletar amostras para identificar todos os campos possíveis
    samples = df.select("VL_LIST_DATA").limit(100).collect()

    # Identificar todos os caminhos de campos
    field_paths = set()

    for row in samples:
        data = row.VL_LIST_DATA
        if data:
            # Extrair campos dos detalhes
            if data.detalhes:
                for field in data.detalhes.__fields__:
                    field_paths.add(f"detalhes_{field}")

            # Extrair campos dos contatos
            if data.contatos:
                for contact in data.contatos:
                    for field in contact.__fields__:
                        field_paths.add(f"contatos_{field}")

    return sorted(field_paths)

# Função para extrair valores dinamicamente
def extract_dynamic_fields(df):
    # Obter schema dinâmico
    dynamic_fields = get_dynamic_schema(df)

    # Adicionar colunas para cada campo dinâmico
    for field in dynamic_fields:
        parts = field.split('_')
        if parts[0] == "detalhes":
            df = df.withColumn(field, F.col("VL_LIST_DATA.detalhes").getItem(parts[1]))
        elif parts[0] == "contatos":
            # Para arrays, pegamos o primeiro elemento (poderia ser expandido)
            df = df.withColumn(field, F.col("VL_LIST_DATA.contatos")[0].getItem(parts[1]))

    return df

# Aplicar a transformação
df_expanded = extract_dynamic_fields(df_final)

# Mostrar schema e dados expandidos
print("Schema expandido:")
df_expanded.printSchema()

print("Dados expandidos:")
df_expanded.show(truncate=False)

# Opcional: Criar versão normalizada (uma linha por contato)
df_contatos = df_final.select(
    "*",
    F.explode("VL_LIST_DATA.contatos").alias("contato")
).select(
    "*",
    F.col("contato.tipo").alias("contato_tipo"),
    F.col("contato.valor").alias("contato_valor")
).drop("VL_LIST_DATA", "contato")

print("Dados normalizados (um registro por contato):")
df_contatos.show(truncate=False)

Schema expandido:
root
 |-- row_id: long (nullable = false)
 |-- pessoa: struct (nullable = false)
 |    |-- ativo: boolean (nullable = true)
 |    |-- idade: integer (nullable = true)
 |    |-- nome: string (nullable = true)
 |    |-- salario: float (nullable = true)
 |-- empresa: struct (nullable = false)
 |    |-- nome: string (nullable = true)
 |-- VL_LIST_DATA: struct (nullable = true)
 |    |-- detalhes: struct (nullable = true)
 |    |    |-- cargo: string (nullable = true)
 |    |    |-- setor: string (nullable = true)
 |    |-- contatos: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- tipo: string (nullable = true)
 |    |    |    |-- valor: string (nullable = true)
 |-- contatos_tipo: string (nullable = true)
 |-- contatos_valor: string (nullable = true)
 |-- detalhes_cargo: string (nullable = true)
 |-- detalhes_setor: string (nullable = true)

Dados expandidos:
+----------+--------------------------+-----------+--------------

# Versão Três

In [22]:
from pyspark.sql import functions as F
from pyspark.sql.types import *
import json
import re
from collections import defaultdict

def create_initial_dataframe(raw_objects, spark):
    """Cria o DataFrame inicial a partir de objetos raw"""
    data = [(json.dumps(obj),) for obj in raw_objects]
    return spark.createDataFrame(data, ["linha"])

def parse_json_data(df):
    """Parseia o JSON contido na coluna 'linha'"""
    json_schema = StructType([
        StructField("metadata", ArrayType(StringType())),
        StructField("valores", ArrayType(StringType()))
    ])
    return df.withColumn("parsed", F.from_json("linha", json_schema)) \
             .select("parsed.*") \
             .withColumn("row_id", F.monotonically_increasing_id())

def explode_key_value_pairs(df):
    """Transforma os pares metadata-valores em linhas separadas"""
    return df.withColumn("zipped", F.arrays_zip("metadata", "valores")) \
             .withColumn("pair", F.explode("zipped")) \
             .select(
                 "row_id",
                 F.col("pair.metadata").alias("chave"),
                 F.col("pair.valores").alias("valor")
             )

def separate_special_columns(df, special_columns):
    """Separa colunas especiais do restante dos dados"""
    condition = F.col("chave").isin(special_columns)
    df_special = df.filter(condition)
    df_main = df.filter(~condition)
    return df_main, df_special

def pivot_main_data(df, replace_char='.', replacement='_'):
    """Pivota os dados principais substituindo caracteres nos nomes das colunas"""
    return df.withColumn("chave", F.regexp_replace("chave", re.escape(replace_char), replacement)) \
             .groupBy("row_id").pivot("chave").agg(F.first("valor"))

def parse_special_data(value, structure):
    """Função genérica para parsear dados especiais"""
    result = defaultdict(dict)
    items = value.split(';')
    for item in items:
        if '=' in item:
            key, val = item.split('=', 1)
            parts = key.split('.')
            current = result
            for part in parts[:-1]:
                current = current.setdefault(part, {})
            current[parts[-1]] = val
    return json.dumps(result)

def process_special_columns(df, column_name, structure):
    """Processa colunas especiais de acordo com a estrutura definida"""
    parse_udf = F.udf(lambda x: parse_special_data(x, structure), StringType())
    return df.withColumn("parsed_data", parse_udf("valor")) \
             .select("row_id", "parsed_data")

def infer_and_cast_types(df, exclude_columns=[]):
    """Infere e converte tipos de colunas automaticamente"""
    def infer_type(value):
        if isinstance(value, str):
            if value.lower() in ("true", "false"):
                return BooleanType()
            elif re.match(r"^-?\d+$", value):
                return IntegerType()
            elif re.match(r"^-?\d+\.\d+$", value):
                return FloatType()
        return StringType()

    for col_name in [c for c in df.columns if c not in exclude_columns]:
        sample = df.select(col_name).filter(F.col(col_name).isNotNull()).first()
        if sample:
            dtype = infer_type(sample[0])
            df = df.withColumn(col_name, F.col(col_name).cast(dtype))
    return df

def create_nested_structures(df, prefixes):
    """Cria estruturas aninhadas para os prefixos especificados"""
    for prefix in prefixes:
        cols = [c for c in df.columns if c.startswith(f"{prefix}_")]
        if cols:
            fields = [c[len(prefix)+1:] for c in cols]
            struct_cols = [F.col(c).alias(f) for c, f in zip(cols, fields)]
            df = df.withColumn(prefix, F.struct(*struct_cols)).drop(*cols)
    return df

def process_list_data(df, column_name, schema):
    """Processa colunas com dados em lista de acordo com o schema fornecido"""
    return df.withColumn(column_name, F.from_json(column_name, schema))

def transform_data(raw_objects, spark, config):
    """
    Função principal que orquestra todo o processamento

    Args:
        raw_objects: Lista de objetos com os dados brutos
        spark: Sessão Spark
        config: Dicionário com configurações:
            {
                "special_columns": ["VL_LIST_DATA"],  # Colunas especiais para tratamento diferenciado
                "structure_definitions": {            # Definições de estrutura para colunas especiais
                    "VL_LIST_DATA": {
                        "detalhes": ["cargo", "setor"],
                        "contatos": ["tipo", "valor"]
                    }
                },
                "nested_prefixes": ["pessoa", "empresa"]  # Prefixos para criar estruturas aninhadas
            }
    """
    # 1. Criação do DataFrame inicial
    df = create_initial_dataframe(raw_objects, spark)

    # 2. Parse do JSON
    df = parse_json_data(df)

    # 3. Explodir pares chave-valor
    df = explode_key_value_pairs(df)

    # 4. Separar colunas especiais
    df_main, df_special = separate_special_columns(df, config.get("special_columns", []))

    # 5. Pivotar dados principais
    df_pivoted = pivot_main_data(df_main)

    # 6. Processar colunas especiais
    processed_special = []
    for col_name in config.get("special_columns", []):
        if col_name in [row.chave for row in df_special.select("chave").distinct().collect()]:
            structure = config["structure_definitions"].get(col_name, {})
            df_col = df_special.filter(F.col("chave") == col_name)
            processed = process_special_columns(df_col, col_name, structure)
            processed_special.append(processed)

    # 7. Juntar todos os dados
    df_combined = df_pivoted
    for special_df in processed_special:
        df_combined = df_combined.join(special_df, "row_id")

    # 8. Inferir tipos de dados
    exclude = ["row_id"] + [f"parsed_{col}" for col in config.get("special_columns", [])]
    df_combined = infer_and_cast_types(df_combined, exclude)

    # 9. Criar estruturas aninhadas
    df_final = create_nested_structures(df_combined, config.get("nested_prefixes", []))

    # 10. Processar dados especiais com schema
    for col_name in config.get("special_columns", []):
        if f"parsed_{col_name}" in df_final.columns:
            schema_def = config["structure_definitions"].get(col_name, {})
            schema = build_schema_from_definition(schema_def)
            df_final = process_list_data(df_final, col_name, schema)
            df_final = df_final.drop(f"parsed_{col_name}")

    return df_final

def build_schema_from_definition(definition):
    """Constrói um schema Spark a partir da definição de estrutura"""
    fields = []

    if "detalhes" in definition:
        detail_fields = [StructField(field, StringType()) for field in definition["detalhes"]]
        fields.append(StructField("detalhes", StructType(detail_fields)))

    if "contatos" in definition:
        contact_fields = [StructField(field, StringType()) for field in definition["contatos"]]
        fields.append(StructField("contatos", ArrayType(StructType(contact_fields))))

    return StructType(fields)

# Exemplo de uso:
config = {
    "special_columns": ["VL_LIST_DATA"],
    "structure_definitions": {
        "VL_LIST_DATA": {
            "detalhes": ["cargo", "setor"],
            "contatos": ["tipo", "valor"]
        }
    },
    "nested_prefixes": ["pessoa", "empresa"]
}

df_result = transform_data(raw_objects, spark, config)
df_result.printSchema()
df_result.show(truncate=False)

root
 |-- row_id: long (nullable = false)
 |-- parsed_data: string (nullable = true)
 |-- pessoa: struct (nullable = false)
 |    |-- ativo: boolean (nullable = true)
 |    |-- idade: integer (nullable = true)
 |    |-- nome: string (nullable = true)
 |    |-- salario: float (nullable = true)
 |-- empresa: struct (nullable = false)
 |    |-- nome: string (nullable = true)

+----------+-------------------------------------------------------------------------------------------------------------------+--------------------------+-----------+
|row_id    |parsed_data                                                                                                        |pessoa                    |empresa    |
+----------+-------------------------------------------------------------------------------------------------------------------+--------------------------+-----------+
|0         |{"detalhes": {"cargo": "Analista", "setor": "TI"}, "contatos": {"0": {"tipo": "email", "valor": "joao@email.

In [24]:
from pyspark.sql import functions as F
from pyspark.sql.types import *
import json

def safe_extract_nested_data(df, column_name):
    """
    Extrai dados aninhados de forma segura, verificando se a coluna existe
    e se tem a estrutura esperada.
    """
    # Verifica se a coluna existe no DataFrame
    if column_name not in df.columns:
        raise ValueError(f"A coluna '{column_name}' não existe no DataFrame. Colunas disponíveis: {df.columns}")

    # Obtém o schema da coluna
    col_schema = [f for f in df.schema.fields if f.name == column_name][0].dataType

    # Verifica se é uma struct
    if not isinstance(col_schema, StructType):
        raise ValueError(f"A coluna '{column_name}' não é uma estrutura (StructType). Tipo encontrado: {type(col_schema)}")

    # Extrai campos dinamicamente
    fields_to_extract = []
    for field in col_schema.fields:
        if isinstance(field.dataType, StructType):
            for subfield in field.dataType.fields:
                fields_to_extract.append((f"{field.name}_{subfield.name}", f"{column_name}.{field.name}.{subfield.name}"))
        else:
            fields_to_extract.append((f"{column_name}_{field.name}", f"{column_name}.{field.name}"))

    # Adiciona as colunas extraídas
    for new_col, col_expr in fields_to_extract:
        df = df.withColumn(new_col, F.col(col_expr))

    return df

def process_dynamic_data(df):
    """
    Processa o DataFrame dinamicamente, verificando e tratando colunas aninhadas.
    """
    # Verifica colunas que podem conter dados aninhados
    nested_columns = [f.name for f in df.schema.fields
                     if isinstance(f.dataType, StructType) or
                        (isinstance(f.dataType, ArrayType) and isinstance(f.dataType.elementType, StructType))]

    if not nested_columns:
        print("Nenhuma coluna aninhada encontrada.")
        return df

    print(f"Colunas aninhadas encontradas: {nested_columns}")

    # Processa cada coluna aninhada
    for col_name in nested_columns:
        try:
            df = safe_extract_nested_data(df, col_name)
            print(f"Coluna '{col_name}' processada com sucesso.")
        except Exception as e:
            print(f"Erro ao processar coluna '{col_name}': {str(e)}")
            continue

    return df

# Exemplo de uso:
try:
    # Processa o DataFrame dinamicamente
    df_processed = process_dynamic_data(df_final)

    # Mostra os resultados
    print("\nSchema após processamento:")
    df_processed.printSchema()

    print("\nDados após processamento:")
    df_processed.show(truncate=False)

except Exception as e:
    print(f"Erro durante o processamento: {str(e)}")
    # Mostra o schema atual para diagnóstico
    print("\nSchema atual do DataFrame:")
    df_final.printSchema()

Colunas aninhadas encontradas: ['pessoa', 'empresa', 'VL_LIST_DATA']
Coluna 'pessoa' processada com sucesso.
Coluna 'empresa' processada com sucesso.
Coluna 'VL_LIST_DATA' processada com sucesso.

Schema após processamento:
root
 |-- row_id: long (nullable = false)
 |-- pessoa: struct (nullable = false)
 |    |-- ativo: boolean (nullable = true)
 |    |-- idade: integer (nullable = true)
 |    |-- nome: string (nullable = true)
 |    |-- salario: float (nullable = true)
 |-- empresa: struct (nullable = false)
 |    |-- nome: string (nullable = true)
 |-- VL_LIST_DATA: struct (nullable = true)
 |    |-- detalhes: struct (nullable = true)
 |    |    |-- cargo: string (nullable = true)
 |    |    |-- setor: string (nullable = true)
 |    |-- contatos: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- tipo: string (nullable = true)
 |    |    |    |-- valor: string (nullable = true)
 |-- pessoa_ativo: boolean (nullable = true)
 |-- pessoa_idad

## Generalização

In [41]:
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql import SparkSession
import json

# Iniciar sessão Spark
spark = SparkSession.builder.appName("NestedData").getOrCreate()

# 1. Dados de exemplo CORRETOS
data = [
    (1, '{"nome": "João", "idade": 30}', [{"produto": "A", "preco": 10.5}], {"chave": "valor"}),
    (2, '{"nome": "Maria", "idade": 25}', [{"produto": "B", "preco": 20.0}], {"chave": "valor2"}),
    (3, '{"nome": "Carlos"}', None, None)
]

# 2. Schema CORRETO
schema = StructType([
    StructField("id", IntegerType()),
    StructField("json_col", StringType()),
    StructField("array_col", ArrayType(StructType([
        StructField("produto", StringType()),
        StructField("preco", DoubleType())
    ]))),
    StructField("map_col", MapType(StringType(), StringType()))
])

# 3. Criar DataFrame
df = spark.createDataFrame(data, schema)

# 4. Função CORRETA para processar JSON
def parse_json_safely(df, json_column, output_column):
    """Parseia uma coluna JSON de forma segura"""
    json_schema = StructType([
        StructField("nome", StringType()),
        StructField("idade", IntegerType())
    ])

    return df.withColumn(output_column, F.from_json(F.col(json_column), json_schema))

# 5. Processamento PASSO-A-PASSO
df = parse_json_safely(df, "json_col", "json_parsed")

# Extrair campos do JSON
df = df.withColumn("nome", F.col("json_parsed.nome"))
df = df.withColumn("idade", F.col("json_parsed.idade"))

# Processar array de structs
df = df.withColumn("primeiro_produto", F.col("array_col").getItem(0).getField("produto"))
df = df.withColumn("primeiro_preco", F.col("array_col").getItem(0).getField("preco"))

# Processar map
df = df.withColumn("valor_chave", F.col("map_col").getItem("chave"))

# 6. Mostrar resultados CORRETOS
print("Schema final:")
df.printSchema()

print("\nDados processados:")
df.select(
    "id",
    "array_col",
    "map_col",
    "nome",
    "idade",
    "primeiro_produto",
    "primeiro_preco",
    "valor_chave"
).show(truncate=False)

Schema final:
root
 |-- id: integer (nullable = true)
 |-- json_col: string (nullable = true)
 |-- array_col: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- produto: string (nullable = true)
 |    |    |-- preco: double (nullable = true)
 |-- map_col: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- json_parsed: struct (nullable = true)
 |    |-- nome: string (nullable = true)
 |    |-- idade: integer (nullable = true)
 |-- nome: string (nullable = true)
 |-- idade: integer (nullable = true)
 |-- primeiro_produto: string (nullable = true)
 |-- primeiro_preco: double (nullable = true)
 |-- valor_chave: string (nullable = true)


Dados processados:
+---+-----------+-----------------+------+-----+----------------+--------------+-----------+
|id |array_col  |map_col          |nome  |idade|primeiro_produto|primeiro_preco|valor_chave|
+---+-----------+-----------------+------+-----+----------------+---

In [None]:
def parse_json_safely(df, json_column, output_column):
    """Versão com tratamento de erros completo"""
    json_schema = StructType([
        StructField("nome", StringType()),
        StructField("idade", IntegerType())
    ])

    try:
        # Tentar parsear o JSON
        df = df.withColumn(output_column, F.from_json(F.col(json_column), json_schema))

        # Extrair campos com fallback para nulo
        df = df.withColumn("nome", F.coalesce(F.col(f"{output_column}.nome"), F.lit("")))
        df = df.withColumn("idade", F.coalesce(F.col(f"{output_column}.idade"), F.lit(None).cast(IntegerType())))

    except Exception as e:
        print(f"Erro ao processar JSON: {str(e)}")
        df = df.withColumn("nome", F.lit(None).cast(StringType()))
        df = df.withColumn("idade", F.lit(None).cast(IntegerType()))

    return df