In [0]:
!pip install bs4

In [0]:
from bs4 import BeautifulSoup
from pyspark.sql.functions import col, regexp_replace, split, explode, trim, regexp_extract, expr, array_distinct, udf, when, from_json
from pyspark.sql.types import ArrayType, StructType, StructField, StringType, IntegerType

def chave_explode(df, column_key, id_regex=None):
    # Define schema do array de structs conforme esperado (id e description como string)
    schema = ArrayType(
        StructType([
            StructField("id", StringType()),
            StructField("description", StringType())
        ])
    )

    # Converte a string json para array de structs
    df = df.withColumn("array_col", from_json(col(column_key), schema))

    # Explode o array em linhas individuais
    df = df.withColumn("exploded", explode(col("array_col")))

    # Extrai id (cast para int) e description de cada struct explodeado
    df = df.select(
        "steam_appid",
        col("exploded.id").cast(IntegerType()).alias("id"),
        col("exploded.description").alias("description")
    )

    return df


def langueges_list(df,column_key):

    # Remover tags <strong> e </strong>
    df = df.withColumn(
        "column_clean",
        regexp_replace(column_key, r'<strong>|</strong>', "")
    )

    # Remover tudo após <br> (inclusive ele)
    df = df.withColumn(
        "column_clean",
        regexp_replace("column_clean", r'<br>.*$', "")
    )

    # Remover asterisco (*)
    df = df.withColumn(
        "column_clean",
        regexp_replace("column_clean", r'\*', "")
    )

    # Quebrar string em array usando vírgula
    df = df.withColumn(
        "column_array",
        split("column_clean", r'\s*,\s*')
    )

    # Tirar espaços extras de cada elemento (higher-order function)
    df = df.withColumn(
        "column_array",
        expr("transform(column_array, x -> trim(x))")
    )

    # Remover duplicatas no array (se houver)
    df = df.withColumn(
        "languages",
        array_distinct("column_array")
    )

    df = df.select('steam_appid','languages')
    
    return df


def clean_html(html_str):
    if html_str:
        soup = BeautifulSoup(html_str, "html.parser")
        return soup.get_text(separator=" ").replace("\n", " ").strip()
    return None


def get_price(df, column_key):

    schema = StructType([
        StructField("currency", StringType(), True),
        StructField("initial", IntegerType(), True),
        StructField("final", IntegerType(), True),
        StructField("discount_percent", IntegerType(), True),
        StructField("initial_formatted", StringType(), True),
        StructField("final_formatted", StringType(), True)
    ])

    df = df.withColumn("price_json", from_json(col(column_key), schema))

    df = df.withColumn(
        "Currency",
        when(col("price_json.currency").isNotNull(), col("price_json.currency")).otherwise("-")
    )

    df = df.withColumn("Price_raw", col("price_json.final_formatted"))

    df = df.withColumn(
        "Price_str",
        regexp_replace(col("Price_raw"), r"[^0-9,.\-]", "")
    )

    df = df.withColumn(
        "Price_str",
        regexp_replace(col("Price_str"), ",", ".")
    )

    # Substitui string vazia por None para evitar erro ao fazer cast
    df = df.withColumn(
        "Price_str",
        when(col("Price_str") == "", None).otherwise(col("Price_str"))
    )

    df = df.withColumn(
        "Price",
        col("Price_str").cast("double")
    )

    # Substitui nulos por 0.0
    df = df.withColumn(
        "Price",
        when(col("Price").isNull(), 0.0).otherwise(col("Price"))
    )

    df = df.select("steam_appid", "Currency", "Price")

    return df


def get_publishers(df, column_key):

    # Remove o colchete esquerdo '[' (pode remover o direito também, se quiser)
    df = df.withColumn("publishers_clean", regexp_replace(col(column_key), r"^\[|\]$", ""))

    # Separa por vírgula (caso tenha múltiplos valores, ainda que no seu exemplo não tem), resultando em um array
    df = df.withColumn("publishers_array", split(col("publishers_clean"), ","))

    # Pega o primeiro valor do array e remove espaços extras
    df = df.withColumn(column_key, trim(col("publishers_array").getItem(0)))

    return df



@dlt.table(
    name="details_s",
    comment="Dados detail transformados para a camada silver"
)
def details_silver():
    df = dlt.read("steamdatabricks_workspace.steam.details_b")

    # Itera sobre todos os campos internos da coluna 'data' (que é uma struct)
    for field in df.schema["data"].dataType.fields:
        # Cria uma nova coluna no DataFrame para cada campo da struct 'data'
        # O nome da nova coluna será o mesmo do campo interno
        # O valor vem de 'data.<nome_do_campo>'
        df = df.withColumn(field.name, col("data." + field.name))

    # Remove a coluna 'data' original (struct) e a coluna 'success', 
    # pois não são mais necessárias após a extração dos campos
    df = df.drop('data', 'success')

    # Gera o dataframe generos a partir da coluna array genres
    genre = df.select('steam_appid','genres')
    genre = chave_explode(genre, 'genres', r'"id":"(\d+)"')
    genre = genre.withColumnRenamed('id', 'id_genres') \
                .withColumnRenamed('description', 'description_genres')

    # Gera o dataframe categoria a partir da coluna array categories
    categories = df.select('steam_appid','categories')
    categories = chave_explode(categories, 'categories', r'"id":(\d+)')
    categories = categories.withColumnRenamed('id', 'id_categories') \
                        .withColumnRenamed('description', 'description_categories')

    # Gera o dataframe languages a partir da coluna de lista supported_languages
    languages = df.select('steam_appid','supported_languages')
    languages = langueges_list(languages,'supported_languages')

    clean_html_udf = udf(clean_html, StringType())

    # Aplicar UDF para criar coluna limpa
    df = df.withColumn("pc_clean_text", clean_html_udf("pc_requirements"))

    df = df.withColumn("OS", regexp_extract("pc_clean_text", r"OS\s*:\s*([^,]+)", 1))
    df = df.withColumn("Processor", regexp_extract("pc_clean_text", r"Processor\s*:\s*([^,]+)", 1))
    df = df.withColumn("Memory", regexp_extract("pc_clean_text", r"Memory\s*:\s*([^,]+)", 1))
    df = df.withColumn("Graphics", regexp_extract("pc_clean_text", r"Graphics\s*:\s*([^,]+)", 1))
    df = df.withColumn("Storage", regexp_extract("pc_clean_text", r"Storage\s*:\s*([^,]+)", 1))

    # Gerando dataframe para dados de preços
    price = df.select("steam_appid","price_overview")
    # Chamando função que normaliza as colunas referente a preço e tipo e moeda
    price = get_price(price, "price_overview")

    # Função responsável pela definição de publicadores
    df_publi = get_publishers(df, 'publishers')

    # Selecionando colunas ao dataframe
    df_selected = df_publi.select('steam_appid','name','type','header_image',
                                'is_free','publishers','required_age','website','short_description',
                                'OS', 'Processor', 'Memory', 'Graphics', 'Storage')

    # Realizando left join para agrupar os dataframes
    df_joined = df_selected.join(genre, on="steam_appid", how="left") \
                .join(categories, on="steam_appid", how="left") \
                .join(languages, on="steam_appid", how="left") \
                .join(price, on="steam_appid", how="left")

    # Formatando tipo de dados em colunas especificas do dataframe
    df_transformed = (
        df_joined
        .withColumn("steam_appid", col("steam_appid").cast("int"))
        .withColumn("required_age", col("required_age").cast("int"))
        .withColumn("is_free", col("is_free").cast("boolean"))
    )

    return df_transformed