In [49]:
from pyspark.sql import types as T
from pyspark.sql import functions as F

def get_spark_type(data_type: str):
    dict_type = {
        'int': T.IntegerType(),
        'string': T.StringType(),
        'struct': T.StringType(),
        'boolean': T.BooleanType(),
        'double': T.DoubleType(),
        'timestamp': T.TimestampType(),
        'date': T.DateType(),
        'array': T.ArrayType(T.StringType()),
    }
    return dict_type[data_type]

def create_schema(fields: list):
    schema_dict = {}

    for field, data_type in fields:
        parts = field.split(".")
        current_level = schema_dict

        for part in parts[:-1]:
            if part not in current_level:
                current_level[part] = {}
            current_level = current_level[part]

        current_level[parts[-1]] = get_spark_type(data_type)

    def build_struct_type(d):
        fields = []
        for key, value in d.items():
            if isinstance(value, dict):
                fields.append(
                    T.StructField(key, build_struct_type(value), True)
                )
            else:
                fields.append(T.StructField(key, value, True))
        return T.StructType(fields)

    return build_struct_type(schema_dict)

def apply_mapping(df, column_json, mapping):
    transformed_columns = [
        F.col(f"{column_json}.{source_col}")
        .cast(get_spark_type(target_type))
        .alias(target_col)
        for source_col, target_col, target_type in mapping
    ]

    df = df.select("*", *transformed_columns)

    df = df.drop(column_json)
    return df

In [35]:
mapping_schema = [
    ('payload.id', 'int'),
    ('payload.nome', 'string'),
    ('payload.data_nascimento', 'timestamp'),
    ('payload.telefones', 'array'),
    ('payload.endereco.rua', 'string'),
    ('payload.endereco.numero', 'string'),
    ('payload.endereco.complemento', 'string'),
    ('payload.endereco.bairro', 'string'),
    ('payload.endereco.cep', 'string'),
    ('payload.endereco.cidade', 'string'),
    ('payload.endereco.estado', 'string'),
    ('payload.status', 'boolean'),
]

In [63]:
schema = create_schema(mapping_schema)
print(schema)
print("="*150)
print(schema.json())


StructType([StructField('payload', StructType([StructField('id', IntegerType(), True), StructField('nome', StringType(), True), StructField('data_nascimento', TimestampType(), True), StructField('telefones', ArrayType(StringType(), True), True), StructField('endereco', StructType([StructField('rua', StringType(), True), StructField('numero', StringType(), True), StructField('complemento', StringType(), True), StructField('bairro', StringType(), True), StructField('cep', StringType(), True), StructField('cidade', StringType(), True), StructField('estado', StringType(), True)]), True), StructField('status', BooleanType(), True)]), True)])
{"fields":[{"metadata":{},"name":"payload","nullable":true,"type":{"fields":[{"metadata":{},"name":"id","nullable":true,"type":"integer"},{"metadata":{},"name":"nome","nullable":true,"type":"string"},{"metadata":{},"name":"data_nascimento","nullable":true,"type":"timestamp"},{"metadata":{},"name":"telefones","nullable":true,"type":{"containsNull":true,"

In [51]:
from pyspark.sql import SparkSession
from datetime import datetime, timedelta
import random
import json

In [52]:
spark = SparkSession.builder.appName("DataFrameComJson").getOrCreate()

schema = T.StructType([
    T.StructField("id", T.IntegerType(), nullable=False),
    T.StructField("data_processamento", T.TimestampType(), nullable=False),
    T.StructField("obj_json", T.StringType(), nullable=False)
])

# Função para gerar um JSON com estrutura específica
def gerar_payload_json(id):
    payload = {
        "payload": {
            "id": id,
            "nome": random.choice(["David", "Ana", "Carlos", "Beatriz"]),
            "data_nascimento": (datetime.now() - timedelta(days=random.randint(5000, 10000))).strftime("%Y-%m-%d %H:%M:%S"),
            "telefones": [{"numero": f"{random.randint(1000000000, 9999999999)}"}],
            "status": random.choice(["ativo", "inativo"]),
            "endereco": {
                "rua": "Rua Exemplo",
                "numero": random.randint(1, 500),
                "complemento": "casa",
                "bairro": "Jardim Exemplo",
                "cep": f"{random.randint(1000000, 9999999)}",
                "cidade": "São Paulo",
                "estado": "São Paulo"
            }
        }
    }
    return json.dumps(payload)

# Gerar dados para o DataFrame
data = [
    (
        i,
        datetime.now() - timedelta(days=random.randint(0, 30)),
        gerar_payload_json(i)
    )
    for i in range(1, 51)
]

# Criar o DataFrame
df = spark.createDataFrame(data, schema=schema)

In [55]:
df.printSchema()

root
 |-- id: integer (nullable = false)
 |-- data_processamento: timestamp (nullable = false)
 |-- obj_json: string (nullable = false)



In [56]:
df.count()

50

In [54]:
df.show(5, truncate=False)

+---+--------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|id |data_processamento        |obj_json                                                                                                                                                                                                                                                                                                                         |
+---+--------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [66]:
df_unboxing = df.withColumn("obj_json", F.from_json("obj_json", schema))

In [67]:
df_unboxing.printSchema()

root
 |-- id: integer (nullable = false)
 |-- data_processamento: timestamp (nullable = false)
 |-- obj_json: struct (nullable = true)
 |    |-- payload: struct (nullable = true)
 |    |    |-- id: integer (nullable = true)
 |    |    |-- nome: string (nullable = true)
 |    |    |-- data_nascimento: timestamp (nullable = true)
 |    |    |-- telefones: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- endereco: struct (nullable = true)
 |    |    |    |-- rua: string (nullable = true)
 |    |    |    |-- numero: string (nullable = true)
 |    |    |    |-- complemento: string (nullable = true)
 |    |    |    |-- bairro: string (nullable = true)
 |    |    |    |-- cep: string (nullable = true)
 |    |    |    |-- cidade: string (nullable = true)
 |    |    |    |-- estado: string (nullable = true)
 |    |    |-- status: boolean (nullable = true)



In [68]:
df_unboxing.show(5, truncate=False)

+---+--------------------------+---------------------------------------------------------------------------------------------------------------------------------------------+
|id |data_processamento        |obj_json                                                                                                                                     |
+---+--------------------------+---------------------------------------------------------------------------------------------------------------------------------------------+
|1  |2024-10-16 01:39:03.387747|{{1, Carlos, 1997-08-13 01:39:03, [{"numero":"7048933861"}], {Rua Exemplo, 205, casa, Jardim Exemplo, 3314393, São Paulo, São Paulo}, NULL}} |
|2  |2024-10-22 01:39:03.387842|{{2, Ana, 2009-08-02 01:39:03, [{"numero":"9658483627"}], {Rua Exemplo, 409, casa, Jardim Exemplo, 4333378, São Paulo, São Paulo}, NULL}}    |
|3  |2024-10-07 01:39:03.387872|{{3, Beatriz, 2003-04-06 01:39:03, [{"numero":"6581642517"}], {Rua Exemplo, 325, casa, Jardim

In [77]:
mapping_cast = [
    ('payload.id', 'id_cliente', 'string'),
    ('payload.nome', 'nome_cliente', 'string'),
    ('payload.data_nascimento', 'data_nascimento_cliente', 'timestamp'),
    ('payload.telefones', 'lista_contatos', 'array'),
    ('payload.endereco.rua', 'rua_cliente', 'string'),
    ('payload.endereco.numero', 'numero_residencia_cliente', 'string'),
    ('payload.endereco.complemento', 'complemento_residencia_cliente', 'string'),
    ('payload.endereco.bairro', 'bairro_residencia_cliente', 'string'),
    ('payload.endereco.cep', 'cep_residencia_cliente', 'string'),
    ('payload.endereco.cidade', 'cidade_cliente', 'string'),
    ('payload.endereco.estado', 'estado_cliente', 'string'),
    ('payload.status', 'status', 'boolean'),
]

In [78]:
df_apply_mapping = (
    df_unboxing.transform(
        apply_mapping, column_json="obj_json", mapping=mapping_cast
    )
)

In [80]:
df_apply_mapping.printSchema()

root
 |-- id: integer (nullable = false)
 |-- data_processamento: timestamp (nullable = false)
 |-- id_cliente: string (nullable = true)
 |-- nome_cliente: string (nullable = true)
 |-- data_nascimento_cliente: timestamp (nullable = true)
 |-- lista_contatos: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- rua_cliente: string (nullable = true)
 |-- numero_residencia_cliente: string (nullable = true)
 |-- complemento_residencia_cliente: string (nullable = true)
 |-- bairro_residencia_cliente: string (nullable = true)
 |-- cep_residencia_cliente: string (nullable = true)
 |-- cidade_cliente: string (nullable = true)
 |-- estado_cliente: string (nullable = true)
 |-- status: boolean (nullable = true)



In [81]:
df_apply_mapping.show(5, truncate=False)

+---+--------------------------+----------+------------+-----------------------+-------------------------+-----------+-------------------------+------------------------------+-------------------------+----------------------+--------------+--------------+------+
|id |data_processamento        |id_cliente|nome_cliente|data_nascimento_cliente|lista_contatos           |rua_cliente|numero_residencia_cliente|complemento_residencia_cliente|bairro_residencia_cliente|cep_residencia_cliente|cidade_cliente|estado_cliente|status|
+---+--------------------------+----------+------------+-----------------------+-------------------------+-----------+-------------------------+------------------------------+-------------------------+----------------------+--------------+--------------+------+
|1  |2024-10-16 01:39:03.387747|1         |Carlos      |1997-08-13 01:39:03    |[{"numero":"7048933861"}]|Rua Exemplo|205                      |casa                          |Jardim Exemplo           |3314393      