In [8]:
from pyspark.sql.types import *

# Schema do JSON para uso com from_json
# Ajustando o schema para o campo 'objectives'
teams_schema = StructType([
    StructField("teamId", IntegerType(), True),
    StructField("win", BooleanType(), True),
    StructField("objectives", StructType([
        # Ajuste aqui - Substitua por MapType ou NullType para evitar erro
        StructField("baron", MapType(StringType(), StringType()), True),
        StructField("dragon", MapType(StringType(), StringType()), True),
        StructField("tower", MapType(StringType(), StringType()), True)
    ]), True)
])

# Exemplo de esquema ajustado para o JSON completo
schema = StructType([
    StructField("endOfGameResult", StringType(), True),
    StructField("gameCreation", DoubleType(), True),
    StructField("gameDuration", DoubleType(), True),
    StructField("gameEndTimestamp", DoubleType(), True),
    StructField("gameId", DoubleType(), True),
    StructField("gameMode", StringType(), True),
    StructField("gameName", StringType(), True),
    StructField("gameStartTimestamp", DoubleType(), True),
    StructField("gameType", StringType(), True),
    StructField("gameVersion", StringType(), True),
    StructField("mapId", IntegerType(), True),
    StructField("platformId", StringType(), True),
    StructField("queueId", IntegerType(), True),
    StructField("participants", ArrayType(
        StructType([
            StructField("championId", IntegerType(), True),
            StructField("championName", StringType(), True),
            StructField("teamId", IntegerType(), True),
            StructField("win", BooleanType(), True)
        ])
    ), True),
    StructField("teams", ArrayType(teams_schema), True)
])

In [9]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col

spark = SparkSession.builder.appName("KafkaJsonProcessing").getOrCreate()

# Exemplo de DataFrame com uma coluna contendo JSON
data = [(
    '{"endOfGameResult": "Victory", "gameId": 1234567890, "participants": [{"championId": 157, "championName": "Yasuo", "teamId": 100, "win": true}], "teams": [{"teamId": 100, "win": true, "objectives": {"baron": {"first": false, "kills": 1}, "dragon": {"first": true, "kills": 2}, "tower": {"first": false, "kills": 3}}}]}' 
,)]

df = spark.createDataFrame(data, ["json_data"])

# Analisar o JSON utilizando o schema
df_parsed = df.withColumn("parsed_data", from_json(col("json_data"), schema))

# Selecionar e mostrar os dados analisados
df_parsed.select("parsed_data.*").show(truncate=False)

+---------------+------------+------------+----------------+------------+--------+--------+------------------+--------+-----------+-----+----------+-------+-------------------------+--------------------------------------------------------------------------------------------------------+
|endOfGameResult|gameCreation|gameDuration|gameEndTimestamp|gameId      |gameMode|gameName|gameStartTimestamp|gameType|gameVersion|mapId|platformId|queueId|participants             |teams                                                                                                   |
+---------------+------------+------------+----------------+------------+--------+--------+------------------+--------+-----------+-----+----------+-------+-------------------------+--------------------------------------------------------------------------------------------------------+
|Victory        |NULL        |NULL        |NULL            |1.23456789E9|NULL    |NULL    |NULL              |NULL    |NULL       |NULL 