# Pipeline Silver to Gold - One Big Table (OBT)
## Criação de tabelas desnormalizadas para análises avançadas

## Validando a SparkSession

In [0]:
spark

## Configurações iniciais - Azure ADLS Gen2

In [0]:
storageAccountName = "datalake7eadf73a479de9f7"
sasToken = "sv=2024-11-04&ss=bfqt&srt=sco&sp=rwdlacupyx&se=2025-06-16T03:33:35Z&st=2025-06-15T19:33:35Z&spr=https&sig=fBJfCWK99G%2FGTBY81Y8eNQYEpOxgrzjVSaavlAB0np4%3D"

def mount_adls(blobContainerName):
    try:
        dbutils.fs.mount(
            source = "wasbs://{}@{}.blob.core.windows.net".format(blobContainerName, storageAccountName),
            mount_point = f"/mnt/{storageAccountName}/{blobContainerName}",
            extra_configs = {'fs.azure.sas.' + blobContainerName + '.' + storageAccountName + '.blob.core.windows.net': sasToken}
        )
        print(f"Container {blobContainerName} montado com sucesso!")
    except Exception as e:
        print(f"Falha ao montar {blobContainerName}: {e}")

## Montando containers necessários

In [0]:
mount_adls('silver')
mount_adls('gold')

Falha ao montar silver: An error occurred while calling o1915.mount.
: java.rmi.RemoteException: java.lang.IllegalArgumentException: requirement failed: Directory already mounted: /mnt/datalake7eadf73a479de9f7/silver; nested exception is: 
	java.lang.IllegalArgumentException: requirement failed: Directory already mounted: /mnt/datalake7eadf73a479de9f7/silver
	at com.databricks.backend.daemon.data.client.DbfsClient.send0(DbfsClient.scala:135)
	at com.databricks.backend.daemon.data.client.DbfsClient.sendIdempotent(DbfsClient.scala:69)
	at com.databricks.backend.daemon.dbutils.DBUtilsCore.createOrUpdateMount(DBUtilsCore.scala:1053)
	at com.databricks.backend.daemon.dbutils.DBUtilsCore.$anonfun$mount$1(DBUtilsCore.scala:1079)
	at com.databricks.logging.UsageLogging.$anonfun$recordOperation$1(UsageLogging.scala:560)
	at com.databricks.logging.UsageLogging.executeThunkAndCaptureResultTags$1(UsageLogging.scala:657)
	at com.databricks.logging.UsageLogging.$anonfun$recordOperationWithResultTags

## Importando bibliotecas necessárias

In [0]:
from pyspark.sql.functions import (
    current_timestamp, lit, col, when, isnan, isnull, 
    count, sum as spark_sum, avg, max as spark_max, min as spark_min,
    concat_ws, coalesce, date_format, year, month, dayofmonth,
    dense_rank, row_number, round as spark_round, collect_list,
    current_date, datediff, count_distinct
)
from pyspark.sql.window import Window
from pyspark.sql.types import *


## Verificando tabelas disponíveis na camada Silver

In [0]:
print("=== TABELAS SILVER DISPONÍVEIS ===")
spark.sql("SHOW TABLES IN pipeline_silver").show()

=== TABELAS SILVER DISPONÍVEIS ===
+---------------+--------------------+-----------+
|       database|           tableName|isTemporary|
+---------------+--------------------+-----------+
|pipeline_silver|achievement_unlocked|      false|
|pipeline_silver|        achievements|      false|
|pipeline_silver|          developers|      false|
|pipeline_silver|                dlcs|      false|
|pipeline_silver|        game_genders|      false|
|pipeline_silver|      game_platforms|      false|
|pipeline_silver|           game_tags|      false|
|pipeline_silver|               games|      false|
|pipeline_silver|             genders|      false|
|pipeline_silver|           platforms|      false|
|pipeline_silver|           purchases|      false|
|pipeline_silver|             reviews|      false|
|pipeline_silver|                tags|      false|
|pipeline_silver|               users|      false|
+---------------+--------------------+-----------+



## Carregando dados das tabelas Silver

In [0]:
def carregar_tabelas_silver():
    """Carrega todas as tabelas da camada Silver"""
    
    tabelas = {}
    database_name = "pipeline_silver"
    
    try:
        # Obter lista de tabelas
        tabelas_disponiveis = spark.sql(f"SHOW TABLES IN {database_name}").collect()
        
        for tabela in tabelas_disponiveis:
            nome_tabela = tabela['tableName']
            print(f"Carregando tabela: {nome_tabela}")
            
            df = spark.sql(f"SELECT * FROM {database_name}.{nome_tabela}")
            tabelas[nome_tabela] = df
            
            print(f"  - Registros: {df.count()}")
            print(f"  - Colunas: {len(df.columns)}")
            
    except Exception as e:
        print(f"Erro ao carregar tabelas: {str(e)}")
    
    return tabelas

In [0]:
silver_tables = carregar_tabelas_silver()

Carregando tabela: achievement_unlocked
  - Registros: 2
  - Colunas: 5
Carregando tabela: achievements
  - Registros: 2
  - Colunas: 6
Carregando tabela: developers
  - Registros: 2
  - Colunas: 4
Carregando tabela: dlcs
  - Registros: 2
  - Colunas: 5
Carregando tabela: game_genders
  - Registros: 2
  - Colunas: 5
Carregando tabela: game_platforms
  - Registros: 2
  - Colunas: 5
Carregando tabela: game_tags
  - Registros: 2
  - Colunas: 5
Carregando tabela: games
  - Registros: 2
  - Colunas: 5
Carregando tabela: genders
  - Registros: 2
  - Colunas: 4
Carregando tabela: platforms
  - Registros: 2
  - Colunas: 4
Carregando tabela: purchases
  - Registros: 2
  - Colunas: 6
Carregando tabela: reviews
  - Registros: 2
  - Colunas: 7
Carregando tabela: tags
  - Registros: 2
  - Colunas: 4
Carregando tabela: users
  - Registros: 2
  - Colunas: 5


## Função para salvar OBT na camada Gold

In [0]:
def salvar_obt_gold(df_obt, nome_tabela):
    """Salva a OBT na camada Gold"""
    
    if df_obt is None:
        print("DataFrame OBT é None - não é possível salvar")
        return
    
    try:
        gold_path = f"/mnt/{storageAccountName}/gold"
        
        # Salvar como Delta na camada Gold
        df_obt.write.format('delta').mode('overwrite').save(f"{gold_path}/{nome_tabela}")
        print(f"OBT {nome_tabela} salva na camada Gold")
        
        # Criar tabela gerenciada
        database_name = "pipeline_gold"
        spark.sql(f"CREATE DATABASE IF NOT EXISTS {database_name}")
        
        df_obt.write.format('delta').mode('overwrite').saveAsTable(f"{database_name}.{nome_tabela}")
        print(f"Tabela gerenciada criada: {database_name}.{nome_tabela}")
        
    except Exception as e:
        print(f"Erro ao salvar OBT: {str(e)}")

## Criando OBT de Games

In [0]:
def criar_games_obt():
    """Cria OBT focada em games e suas métricas integradas"""
    
    try:
        games_df = silver_tables.get('games')
        if games_df is None:
            print("Tabela games não encontrada")
            return None
            
        print("=== Criando Games OBT ===")
        
        # Join com Developers
        developers_df = silver_tables.get('developers')
        if developers_df is not None:
            games_df = games_df.alias('g').join(
                developers_df.alias('d'),
                col('g.DEVELOPER_ID') == col('d.ID'),
                'left'
            ).select(
                col('g.*'),
                col('d.NAME').alias('DEVELOPER_NAME')
            )
        
        # Agregações de Reviews
        reviews_df = silver_tables.get('reviews')
        if reviews_df is not None:
            reviews_agg = reviews_df.groupBy('GAME_ID').agg(
                count('*').alias('TOTAL_REVIEWS'),
                avg('RATING').alias('AVG_RATING'),
                spark_max('RATING').alias('MAX_RATING'),
                spark_min('RATING').alias('MIN_RATING')
            )
            
            games_df = games_df.alias('g').join(
                reviews_agg.alias('r'),
                col('g.ID') == col('r.GAME_ID'),
                'left'
            ).select(
                col('g.*'),
                coalesce(col('r.TOTAL_REVIEWS'), lit(0)).alias('TOTAL_REVIEWS'),
                spark_round(coalesce(col('r.AVG_RATING'), lit(0)), 2).alias('AVG_RATING'),
                coalesce(col('r.MAX_RATING'), lit(0)).alias('MAX_RATING'),
                coalesce(col('r.MIN_RATING'), lit(0)).alias('MIN_RATING')
            )
        
        # Agregações de Purchases
        purchases_df = silver_tables.get('purchases')
        if purchases_df is not None:
            purchases_agg = purchases_df.groupBy('GAME_ID').agg(
                count('*').alias('TOTAL_PURCHASES')
            )
            
            games_df = games_df.alias('g').join(
                purchases_agg.alias('p'),
                col('g.ID') == col('p.GAME_ID'),
                'left'
            ).select(
                col('g.*'),
                coalesce(col('p.TOTAL_PURCHASES'), lit(0)).alias('TOTAL_PURCHASES')
            )
        
        # Agregações de Achievements
        achievements_df = silver_tables.get('achievements')
        if achievements_df is not None:
            achievements_agg = achievements_df.groupBy('GAME_ID').agg(
                count('*').alias('TOTAL_ACHIEVEMENTS')
            )
            
            games_df = games_df.alias('g').join(
                achievements_agg.alias('a'),
                col('g.ID') == col('a.GAME_ID'),
                'left'
            ).select(
                col('g.*'),
                coalesce(col('a.TOTAL_ACHIEVEMENTS'), lit(0)).alias('TOTAL_ACHIEVEMENTS')
            )
        
        # Agregações de DLCs - Apenas contagem (PRICE não disponível na Silver)
        dlcs_df = silver_tables.get('dlcs')
        if dlcs_df is not None:
            dlcs_agg = dlcs_df.groupBy('GAME_ID').agg(
                count('*').alias('TOTAL_DLCS')
            )
            
            games_df = games_df.alias('g').join(
                dlcs_agg.alias('d'),
                col('g.ID') == col('d.GAME_ID'),
                'left'
            ).select(
                col('g.*'),
                coalesce(col('d.TOTAL_DLCS'), lit(0)).alias('TOTAL_DLCS')
            )
        
        # Adicionando campos calculados (removidas colunas não disponíveis)
        games_obt = games_df.withColumn(
            'RATING_CATEGORY',
            when(col('AVG_RATING') >= 9, 'Excelente')
            .when(col('AVG_RATING') >= 7, 'Muito Bom')
            .when(col('AVG_RATING') >= 5, 'Bom')
            .when(col('AVG_RATING') >= 3, 'Regular')
            .when(col('AVG_RATING') > 0, 'Ruim')
            .otherwise('Sem Avaliação')
        ).withColumn(
            'POPULARITY_SCORE',
            spark_round(
                (col('TOTAL_PURCHASES') * 0.5) + 
                (col('TOTAL_REVIEWS') * 0.3) + 
                (col('AVG_RATING') * 0.2), 2
            )
        ).withColumn(
            'SUCCESS_TIER',
            when(col('TOTAL_PURCHASES') > 1000, 'Blockbuster')
            .when(col('TOTAL_PURCHASES') > 500, 'Hit')
            .when(col('TOTAL_PURCHASES') > 100, 'Popular')
            .when(col('TOTAL_PURCHASES') > 10, 'Moderado')
            .when(col('TOTAL_PURCHASES') > 0, 'Nicho')
            .otherwise('Sem Vendas')
        ).withColumn(
            'DATA_HORA_GOLD', current_timestamp()
        ).withColumn(
            'CAMADA_ORIGEM', lit('SILVER')
        )
        
        print(f"OBT Games criada com {games_obt.count()} registros")
        print(f"Total de colunas: {len(games_obt.columns)}")
        
        return games_obt
        
    except Exception as e:
        print(f"Erro ao criar Games OBT: {str(e)}")
        return None

In [0]:
# Executando criação da Games OBT
games_obt = criar_games_obt()

# Salvando Games OBT
salvar_obt_gold(games_obt, 'games_obt')

=== Criando Games OBT ===
OBT Games criada com 2 registros
Total de colunas: 18
OBT games_obt salva na camada Gold
Tabela gerenciada criada: pipeline_gold.games_obt


## Criando OBT de Usuários

In [0]:
def criar_users_obt():
    """Cria OBT focada em usuários e suas atividades"""
    
    try:
        users_df = silver_tables.get('users')
        if users_df is None:
            print("Tabela users não encontrada")
            return None
            
        print("=== Criando Users OBT ===")
        
        # Agregações de Purchases por usuário (apenas contagem - PAIDPRICE não disponível)
        purchases_df = silver_tables.get('purchases')
        if purchases_df is not None:
            purchases_agg = purchases_df.groupBy('USER_ID').agg(
                count('*').alias('TOTAL_GAMES_PURCHASED')
            )
            
            users_df = users_df.alias('u').join(
                purchases_agg.alias('p'),
                col('u.ID') == col('p.USER_ID'),
                'left'
            ).select(
                col('u.*'),
                coalesce(col('p.TOTAL_GAMES_PURCHASED'), lit(0)).alias('TOTAL_GAMES_PURCHASED')
            )
        
        # Agregações de Reviews por usuário
        reviews_df = silver_tables.get('reviews')
        if reviews_df is not None:
            reviews_agg = reviews_df.groupBy('USER_ID').agg(
                count('*').alias('TOTAL_REVIEWS_WRITTEN'),
                avg('RATING').alias('AVG_RATING_GIVEN'),
                spark_max('RATING').alias('MAX_RATING_GIVEN'),
                spark_min('RATING').alias('MIN_RATING_GIVEN')
            )
            
            users_df = users_df.alias('u').join(
                reviews_agg.alias('r'),
                col('u.ID') == col('r.USER_ID'),
                'left'
            ).select(
                col('u.*'),
                coalesce(col('r.TOTAL_REVIEWS_WRITTEN'), lit(0)).alias('TOTAL_REVIEWS_WRITTEN'),
                spark_round(coalesce(col('r.AVG_RATING_GIVEN'), lit(0)), 2).alias('AVG_RATING_GIVEN'),
                coalesce(col('r.MAX_RATING_GIVEN'), lit(0)).alias('MAX_RATING_GIVEN'),
                coalesce(col('r.MIN_RATING_GIVEN'), lit(0)).alias('MIN_RATING_GIVEN')
            )
        
        # Agregações de Achievements por usuário
        achievement_unlocked_df = silver_tables.get('achievement_unlocked')
        if achievement_unlocked_df is not None:
            achievements_df = silver_tables.get('achievements')
            if achievements_df is not None:
                # Primeiro verificar se as colunas existem antes de fazer o join
                achievements_user_agg = achievement_unlocked_df.alias('au').join(
                    achievements_df.alias('a'),
                    col('au.ACHIEVEMENT_ID') == col('a.ID'),  # Ajustado para ACHIEVEMENT_ID
                    'inner'
                ).groupBy('au.USER_ID').agg(  # Ajustado para USER_ID
                    count('*').alias('TOTAL_ACHIEVEMENTS_UNLOCKED')
                    # Removido POINTS pois pode não existir na Silver
                )
                
                users_df = users_df.alias('u').join(
                    achievements_user_agg.alias('au'),
                    col('u.ID') == col('au.USER_ID'),
                    'left'
                ).select(
                    col('u.*'),
                    coalesce(col('au.TOTAL_ACHIEVEMENTS_UNLOCKED'), lit(0)).alias('TOTAL_ACHIEVEMENTS_UNLOCKED')
                )
        
        # Adicionando campos calculados (removidas colunas não disponíveis)
        users_obt = users_df.withColumn(
            'ENGAGEMENT_SCORE',
            spark_round(
                (col('TOTAL_GAMES_PURCHASED') * 2) + 
                (col('TOTAL_REVIEWS_WRITTEN') * 3) + 
                (col('TOTAL_ACHIEVEMENTS_UNLOCKED') * 1), 2
            )
        ).withColumn(
            'USER_TYPE',
            when((col('TOTAL_REVIEWS_WRITTEN') > 10) & (col('TOTAL_GAMES_PURCHASED') > 5), 'Reviewer Ativo')
            .when(col('TOTAL_ACHIEVEMENTS_UNLOCKED') > 50, 'Achievement Hunter')
            .when(col('TOTAL_GAMES_PURCHASED') > 20, 'Colecionador')
            .otherwise('Casual')
        ).withColumn(
            'ACTIVITY_LEVEL',
            when(col('ENGAGEMENT_SCORE') > 100, 'Muito Ativo')
            .when(col('ENGAGEMENT_SCORE') > 50, 'Ativo')
            .when(col('ENGAGEMENT_SCORE') > 20, 'Moderado')
            .when(col('ENGAGEMENT_SCORE') > 0, 'Baixo')
            .otherwise('Inativo')
        ).withColumn(
            'DATA_HORA_GOLD', current_timestamp()
        ).withColumn(
            'CAMADA_ORIGEM', lit('SILVER')
        )
        
        print(f"OBT Users criada com {users_obt.count()} registros")
        print(f"Total de colunas: {len(users_obt.columns)}")
        
        return users_obt
        
    except Exception as e:
        print(f"Erro ao criar Users OBT: {str(e)}")
        return None

In [0]:
# Executando criação da Users OBT
users_obt = criar_users_obt()

# Salvando Users OBT
salvar_obt_gold(users_obt, 'users_obt')

=== Criando Users OBT ===
OBT Users criada com 2 registros
Total de colunas: 16
OBT users_obt salva na camada Gold
Tabela gerenciada criada: pipeline_gold.users_obt


## Criando OBT de Desenvolvedores

In [0]:
def criar_developers_obt():
    """Cria OBT focada em desenvolvedores e performance de seus jogos"""
    
    try:
        developers_df = silver_tables.get('developers')
        if developers_df is None:
            print("Tabela developers não encontrada")
            return None
            
        print("=== Criando Developers OBT ===")
        
        # Agregações de Games por desenvolvedor
        games_df = silver_tables.get('games')
        if games_df is not None:
            # CORREÇÃO: DEVELOPERID -> DEVELOPER_ID
            games_agg = games_df.groupBy('DEVELOPER_ID').agg(
                count('*').alias('TOTAL_GAMES_DEVELOPED'),
                # Removidas as colunas de PRICE que não existem na Silver
                # avg('PRICE').alias('AVG_GAME_PRICE'),
                # spark_max('PRICE').alias('MAX_GAME_PRICE'),
                # spark_min('PRICE').alias('MIN_GAME_PRICE'),
                # spark_sum('PRICE').alias('TOTAL_GAMES_VALUE')
            )
            
            developers_df = developers_df.alias('d').join(
                games_agg.alias('g'),
                col('d.ID') == col('g.DEVELOPER_ID'),
                'left'
            ).select(
                col('d.*'),
                coalesce(col('g.TOTAL_GAMES_DEVELOPED'), lit(0)).alias('TOTAL_GAMES_DEVELOPED')
                # Removidas as colunas de preço
            )
        
        # Agregações de Purchases por desenvolvedor (via games)
        purchases_df = silver_tables.get('purchases')
        if purchases_df is not None and games_df is not None:
            purchases_dev_agg = purchases_df.alias('p').join(
                games_df.alias('g'),
                col('p.GAME_ID') == col('g.ID'),  # CORREÇÃO: GAMEID -> GAME_ID
                'inner'
            ).groupBy('g.DEVELOPER_ID').agg(
                count('*').alias('TOTAL_SALES')
                # Removidas as colunas de PAIDPRICE que não existem na Silver
                # spark_sum('p.PAIDPRICE').alias('TOTAL_REVENUE'),
                # avg('p.PAIDPRICE').alias('AVG_SALE_PRICE')
            )
            
            developers_df = developers_df.alias('d').join(
                purchases_dev_agg.alias('p'),
                col('d.ID') == col('p.DEVELOPER_ID'),
                'left'
            ).select(
                col('d.*'),
                coalesce(col('p.TOTAL_SALES'), lit(0)).alias('TOTAL_SALES')
                # Removidas as colunas de revenue
            )
        
        # Agregações de Reviews por desenvolvedor (via games)
        reviews_df = silver_tables.get('reviews')
        if reviews_df is not None and games_df is not None:
            reviews_dev_agg = reviews_df.alias('r').join(
                games_df.alias('g'),
                col('r.GAME_ID') == col('g.ID'),  # CORREÇÃO: GAMEID -> GAME_ID
                'inner'
            ).groupBy('g.DEVELOPER_ID').agg(
                count('*').alias('TOTAL_REVIEWS_RECEIVED'),
                avg('r.RATING').alias('AVG_RATING_RECEIVED'),
                spark_max('r.RATING').alias('MAX_RATING_RECEIVED'),
                spark_min('r.RATING').alias('MIN_RATING_RECEIVED')
            )
            
            developers_df = developers_df.alias('d').join(
                reviews_dev_agg.alias('r'),
                col('d.ID') == col('r.DEVELOPER_ID'),
                'left'
            ).select(
                col('d.*'),
                coalesce(col('r.TOTAL_REVIEWS_RECEIVED'), lit(0)).alias('TOTAL_REVIEWS_RECEIVED'),
                spark_round(coalesce(col('r.AVG_RATING_RECEIVED'), lit(0)), 2).alias('AVG_RATING_RECEIVED'),
                coalesce(col('r.MAX_RATING_RECEIVED'), lit(0)).alias('MAX_RATING_RECEIVED'),
                coalesce(col('r.MIN_RATING_RECEIVED'), lit(0)).alias('MIN_RATING_RECEIVED')
            )
        
        # Adicionando campos calculados (ajustados para colunas disponíveis)
        developers_obt = developers_df.withColumn(
            'SALES_PER_GAME',
            when(col('TOTAL_GAMES_DEVELOPED') > 0,
                 spark_round(col('TOTAL_SALES') / col('TOTAL_GAMES_DEVELOPED'), 2)
            ).otherwise(0)
        ).withColumn(
            'REVIEWS_PER_GAME',
            when(col('TOTAL_GAMES_DEVELOPED') > 0,
                 spark_round(col('TOTAL_REVIEWS_RECEIVED') / col('TOTAL_GAMES_DEVELOPED'), 2)
            ).otherwise(0)
        ).withColumn(
            'DEVELOPER_TIER',
            when(col('TOTAL_SALES') > 1000, 'AAA')
            .when(col('TOTAL_SALES') > 500, 'AA')
            .when(col('TOTAL_SALES') > 100, 'Indie Premium')
            .when(col('TOTAL_SALES') > 0, 'Indie')
            .otherwise('Sem Vendas')
        ).withColumn(
            'SUCCESS_SCORE',
            spark_round(
                (col('TOTAL_SALES') * 0.5) + 
                (col('AVG_RATING_RECEIVED') * 100 * 0.3) + 
                (col('TOTAL_GAMES_DEVELOPED') * 10 * 0.2), 2
            )
        ).withColumn(
            'QUALITY_SCORE',
            when(col('TOTAL_REVIEWS_RECEIVED') > 0,
                 spark_round(col('AVG_RATING_RECEIVED') * 20, 2)
            ).otherwise(0)
        ).withColumn(
            'DATA_HORA_GOLD', current_timestamp()
        ).withColumn(
            'CAMADA_ORIGEM', lit('SILVER')
        )
        
        print(f"OBT Developers criada com {developers_obt.count()} registros")
        print(f"Total de colunas: {len(developers_obt.columns)}")
        
        return developers_obt
        
    except Exception as e:
        print(f"Erro ao criar Developers OBT: {str(e)}")
        return None

In [0]:
# Executando criação da Developers OBT
developers_obt = criar_developers_obt()

# Salvando Developers OBT
salvar_obt_gold(developers_obt, 'developers_obt')

=== Criando Developers OBT ===
OBT Developers criada com 2 registros
Total de colunas: 17
OBT developers_obt salva na camada Gold
Tabela gerenciada criada: pipeline_gold.developers_obt


## Criando OBT de Vendas e Performance

In [0]:
# Imports necessários (adicione no início do seu arquivo)
from pyspark.sql.functions import (
    col, lit, when, year, month, dayofmonth, date_format, 
    current_date, current_timestamp, datediff, count, 
    count_distinct, min, max, avg, coalesce, round as spark_round
)

def criar_sales_performance_obt():
    """Cria OBT focada em análise de vendas e performance"""
    
    try:
        purchases_df = silver_tables.get('purchases')
        if purchases_df is None:
            print("Tabela purchases não encontrada")
            return None
            
        print("=== Criando Sales Performance OBT ===")
        
        # Join com Games
        games_df = silver_tables.get('games')
        if games_df is not None:
            # CORREÇÃO: GAMEID -> GAME_ID
            sales_df = purchases_df.alias('p').join(
                games_df.alias('g'),
                col('p.GAME_ID') == col('g.ID'),
                'inner'
            ).select(
                col('p.*'),
                col('g.TITLE').alias('GAME_NAME'),  # CORREÇÃO: NAME -> TITLE (conforme schema)
                col('g.DEVELOPER_ID')
                # Removidas colunas que não existem: PRICE, RELEASEDATE
            )
        else:
            sales_df = purchases_df
        
        # Join com Users
        users_df = silver_tables.get('users')
        if users_df is not None:
            # CORREÇÃO: USERID -> USER_ID
            sales_df = sales_df.alias('s').join(
                users_df.alias('u'),
                col('s.USER_ID') == col('u.ID'),
                'left'
            ).select(
                col('s.*'),
                # Removidas colunas que podem não existir: COUNTRY, REGISTRATIONDATE
                # Mantendo apenas o que sabemos que existe
            )
        
        # Join com Developers
        developers_df = silver_tables.get('developers')
        if developers_df is not None:
            # CORREÇÃO: Verificação correta das colunas
            column_names = sales_df.columns
            if 'DEVELOPER_ID' in column_names:
                sales_df = sales_df.alias('s').join(
                    developers_df.alias('d'),
                    col('s.DEVELOPER_ID') == col('d.ID'),
                    'left'
                ).select(
                    col('s.*'),
                    col('d.NAME').alias('DEVELOPER_NAME')
                    # Removida coluna que pode não existir: COUNTRY
                )
        
        # Adicionando campos calculados (apenas com colunas que existem)
        sales_obt = sales_df.withColumn(
            # CORREÇÃO: PURCHASEDATE -> PURCHASE_DATE
            'PURCHASE_YEAR', year(col('PURCHASE_DATE'))
        ).withColumn(
            'PURCHASE_MONTH', month(col('PURCHASE_DATE'))
        ).withColumn(
            'PURCHASE_DAY', dayofmonth(col('PURCHASE_DATE'))
        ).withColumn(
            'PURCHASE_QUARTER', 
            when(month(col('PURCHASE_DATE')).between(1, 3), 'Q1')
            .when(month(col('PURCHASE_DATE')).between(4, 6), 'Q2')
            .when(month(col('PURCHASE_DATE')).between(7, 9), 'Q3')
            .otherwise('Q4')
        ).withColumn(
            'PURCHASE_WEEKDAY',
            date_format(col('PURCHASE_DATE'), 'EEEE')
        ).withColumn(
            'PURCHASE_MONTH_NAME',
            date_format(col('PURCHASE_DATE'), 'MMMM')
        ).withColumn(
            'DAYS_SINCE_PURCHASE',
            datediff(current_date(), col('PURCHASE_DATE'))
        ).withColumn(
            'IS_RECENT_PURCHASE',
            when(datediff(current_date(), col('PURCHASE_DATE')) <= 30, 'Sim').otherwise('Não')
        ).withColumn(
            'PURCHASE_SEASON',
            when(month(col('PURCHASE_DATE')).isin([12, 1, 2]), 'Verão')
            .when(month(col('PURCHASE_DATE')).isin([3, 4, 5]), 'Outono')
            .when(month(col('PURCHASE_DATE')).isin([6, 7, 8]), 'Inverno')
            .otherwise('Primavera')
        ).withColumn(
            'DATA_HORA_GOLD', current_timestamp()
        ).withColumn(
            'CAMADA_ORIGEM', lit('SILVER')
        )
        
        print(f"OBT Sales Performance criada com {sales_obt.count()} registros")
        print(f"Total de colunas: {len(sales_obt.columns)}")
        
        return sales_obt
        
    except Exception as e:
        print(f"Erro ao criar Sales Performance OBT: {str(e)}")
        return None


def criar_user_behavior_obt():
    """Cria OBT focada em comportamento e análise de usuários"""
    
    try:
        users_df = silver_tables.get('users')
        if users_df is None:
            print("Tabela users não encontrada")
            return None
            
        print("=== Criando User Behavior OBT ===")
        
        # Inicializar DataFrame base
        users_base_df = users_df
        
        # Agregações de Purchases por usuário
        purchases_df = silver_tables.get('purchases')
        if purchases_df is not None:
            # Agregação temporal de compras
            purchases_behavior = purchases_df.withColumn(
                'PURCHASE_YEAR', year(col('PURCHASE_DATE'))
            ).withColumn(
                'PURCHASE_MONTH', month(col('PURCHASE_DATE'))
            ).groupBy('USER_ID', 'PURCHASE_YEAR', 'PURCHASE_MONTH').agg(
                count('*').alias('MONTHLY_PURCHASES')
            )
            
            # Agregação geral por usuário
            purchases_agg = purchases_df.groupBy('USER_ID').agg(
                count('*').alias('TOTAL_PURCHASES'),
                count_distinct('GAME_ID').alias('UNIQUE_GAMES_PURCHASED'),
                min('PURCHASE_DATE').alias('FIRST_PURCHASE_DATE'),
                max('PURCHASE_DATE').alias('LAST_PURCHASE_DATE')
            )
            
            # Join com purchases agregado
            users_base_df = users_base_df.join(
                purchases_agg,
                users_base_df.ID == purchases_agg.USER_ID,
                'left'
            ).select(
                users_base_df['*'],
                coalesce(purchases_agg.TOTAL_PURCHASES, lit(0)).alias('TOTAL_PURCHASES'),
                coalesce(purchases_agg.UNIQUE_GAMES_PURCHASED, lit(0)).alias('UNIQUE_GAMES_PURCHASED'),
                purchases_agg.FIRST_PURCHASE_DATE,
                purchases_agg.LAST_PURCHASE_DATE
            )
        else:
            # Se não há purchases, adicionar colunas com valores default
            users_base_df = users_base_df.withColumn('TOTAL_PURCHASES', lit(0)) \
                                       .withColumn('UNIQUE_GAMES_PURCHASED', lit(0)) \
                                       .withColumn('FIRST_PURCHASE_DATE', lit(None)) \
                                       .withColumn('LAST_PURCHASE_DATE', lit(None))
        
        # Agregações de Reviews por usuário
        reviews_df = silver_tables.get('reviews')
        if reviews_df is not None:
            reviews_agg = reviews_df.groupBy('USER_ID').agg(
                count('*').alias('TOTAL_REVIEWS'),
                avg('RATING').alias('AVG_RATING_GIVEN'),
                count_distinct('GAME_ID').alias('UNIQUE_GAMES_REVIEWED'),
                min('RATING').alias('MIN_RATING_GIVEN'),
                max('RATING').alias('MAX_RATING_GIVEN')
            )
            
            # Join com reviews agregado
            users_base_df = users_base_df.join(
                reviews_agg,
                users_base_df.ID == reviews_agg.USER_ID,
                'left'
            ).select(
                users_base_df['*'],
                coalesce(reviews_agg.TOTAL_REVIEWS, lit(0)).alias('TOTAL_REVIEWS'),
                spark_round(coalesce(reviews_agg.AVG_RATING_GIVEN, lit(0)), 2).alias('AVG_RATING_GIVEN'),
                coalesce(reviews_agg.UNIQUE_GAMES_REVIEWED, lit(0)).alias('UNIQUE_GAMES_REVIEWED'),
                coalesce(reviews_agg.MIN_RATING_GIVEN, lit(0)).alias('MIN_RATING_GIVEN'),
                coalesce(reviews_agg.MAX_RATING_GIVEN, lit(0)).alias('MAX_RATING_GIVEN')
            )
        else:
            # Se não há reviews, adicionar colunas com valores default
            users_base_df = users_base_df.withColumn('TOTAL_REVIEWS', lit(0)) \
                                       .withColumn('AVG_RATING_GIVEN', lit(0)) \
                                       .withColumn('UNIQUE_GAMES_REVIEWED', lit(0)) \
                                       .withColumn('MIN_RATING_GIVEN', lit(0)) \
                                       .withColumn('MAX_RATING_GIVEN', lit(0))
        
        # Agregações de Achievements
        achievement_unlocked_df = silver_tables.get('achievement_unlocked')
        if achievement_unlocked_df is not None:
            achievements_agg = achievement_unlocked_df.groupBy('USER_ID').agg(
                count('*').alias('TOTAL_ACHIEVEMENTS'),
                count_distinct('ACHIEVEMENT_ID').alias('UNIQUE_ACHIEVEMENTS')
            )
            
            # Join com achievements agregado
            users_base_df = users_base_df.join(
                achievements_agg,
                users_base_df.ID == achievements_agg.USER_ID,
                'left'
            ).select(
                users_base_df['*'],
                coalesce(achievements_agg.TOTAL_ACHIEVEMENTS, lit(0)).alias('TOTAL_ACHIEVEMENTS'),
                coalesce(achievements_agg.UNIQUE_ACHIEVEMENTS, lit(0)).alias('UNIQUE_ACHIEVEMENTS')
            )
        else:
            # Se não há achievements, adicionar colunas com valores default
            users_base_df = users_base_df.withColumn('TOTAL_ACHIEVEMENTS', lit(0)) \
                                       .withColumn('UNIQUE_ACHIEVEMENTS', lit(0))
        
        # Adicionando campos calculados de comportamento
        user_behavior_obt = users_base_df.withColumn(
            'PURCHASE_FREQUENCY',
            when(col('TOTAL_PURCHASES') > 50, 'Muito Alta')
            .when(col('TOTAL_PURCHASES') > 20, 'Alta')
            .when(col('TOTAL_PURCHASES') > 10, 'Média')
            .when(col('TOTAL_PURCHASES') > 0, 'Baixa')
            .otherwise('Nenhuma')
        ).withColumn(
            'REVIEW_ENGAGEMENT',
            when(col('TOTAL_REVIEWS') > 20, 'Muito Engajado')
            .when(col('TOTAL_REVIEWS') > 10, 'Engajado')
            .when(col('TOTAL_REVIEWS') > 5, 'Moderado')
            .when(col('TOTAL_REVIEWS') > 0, 'Baixo')
            .otherwise('Não Avalia')
        ).withColumn(
            'ACHIEVEMENT_HUNTER_LEVEL',
            when(col('TOTAL_ACHIEVEMENTS') > 100, 'Expert')
            .when(col('TOTAL_ACHIEVEMENTS') > 50, 'Avançado')
            .when(col('TOTAL_ACHIEVEMENTS') > 20, 'Intermediário')
            .when(col('TOTAL_ACHIEVEMENTS') > 0, 'Iniciante')
            .otherwise('Não Coleciona')
        ).withColumn(
            'USER_PROFILE',
            when((col('TOTAL_PURCHASES') > 20) & (col('TOTAL_REVIEWS') > 10), 'Comprador e Revisor')
            .when((col('TOTAL_PURCHASES') > 20) & (col('TOTAL_ACHIEVEMENTS') > 50), 'Comprador e Colecionador')
            .when((col('TOTAL_REVIEWS') > 15) & (col('TOTAL_ACHIEVEMENTS') > 30), 'Revisor e Colecionador')
            .when(col('TOTAL_PURCHASES') > 30, 'Grande Comprador')
            .when(col('TOTAL_REVIEWS') > 20, 'Grande Revisor')
            .when(col('TOTAL_ACHIEVEMENTS') > 80, 'Grande Colecionador')
            .otherwise('Casual')
        ).withColumn(
            'RATING_TENDENCY',
            when(col('AVG_RATING_GIVEN') >= 8, 'Positivo')
            .when(col('AVG_RATING_GIVEN') >= 6, 'Neutro')
            .when(col('AVG_RATING_GIVEN') > 0, 'Crítico')
            .otherwise('Não Classificado')
        ).withColumn(
            'DAYS_SINCE_LAST_PURCHASE',
            when(col('LAST_PURCHASE_DATE').isNotNull(),
                 datediff(current_date(), col('LAST_PURCHASE_DATE'))
            ).otherwise(None)
        ).withColumn(
            'CUSTOMER_LIFETIME_DAYS',
            when((col('FIRST_PURCHASE_DATE').isNotNull()) & (col('LAST_PURCHASE_DATE').isNotNull()),
                 datediff(col('LAST_PURCHASE_DATE'), col('FIRST_PURCHASE_DATE'))
            ).otherwise(0)
        ).withColumn(
            'CUSTOMER_STATUS',
            when(col('DAYS_SINCE_LAST_PURCHASE') <= 30, 'Ativo')
            .when(col('DAYS_SINCE_LAST_PURCHASE') <= 90, 'Em Risco')
            .when(col('DAYS_SINCE_LAST_PURCHASE') <= 180, 'Inativo')
            .when(col('DAYS_SINCE_LAST_PURCHASE').isNotNull(), 'Perdido')
            .otherwise('Sem Compras')
        ).withColumn(
            'DATA_HORA_GOLD', current_timestamp()
        ).withColumn(
            'CAMADA_ORIGEM', lit('SILVER')
        )
        
        print(f"OBT User Behavior criada com {user_behavior_obt.count()} registros")
        print(f"Total de colunas: {len(user_behavior_obt.columns)}")
        
        return user_behavior_obt
        
    except Exception as e:
        print(f"Erro ao criar User Behavior OBT: {str(e)}")
        return None

In [0]:
# Sales Performance OBT
sales_obt = criar_sales_performance_obt()
salvar_obt_gold(sales_obt, 'sales_performance_obt')

# User Behavior OBT (nova)
user_behavior_obt = criar_user_behavior_obt()
salvar_obt_gold(user_behavior_obt, 'user_behavior_obt')

=== Criando Sales Performance OBT ===
OBT Sales Performance criada com 2 registros
Total de colunas: 20
OBT sales_performance_obt salva na camada Gold
Tabela gerenciada criada: pipeline_gold.sales_performance_obt
=== Criando User Behavior OBT ===
OBT User Behavior criada com 2 registros
Total de colunas: 26
OBT user_behavior_obt salva na camada Gold
Tabela gerenciada criada: pipeline_gold.user_behavior_obt


## Resumo das OBTs Criadas

In [0]:
print("\n=== RESUMO DAS OBTs CRIADAS ===")

if games_obt is not None:
    print(f"✓ Games OBT: {games_obt.count()} registros, {len(games_obt.columns)} colunas")

if users_obt is not None:
    print(f"✓ Users OBT: {users_obt.count()} registros, {len(users_obt.columns)} colunas")

if developers_obt is not None:
    print(f"✓ Developers OBT: {developers_obt.count()} registros, {len(developers_obt.columns)} colunas")

if sales_performance_obt is not None:
    print(f"✓ Sales Performance OBT: {sales_performance_obt.count()} registros, {len(sales_performance_obt.columns)} colunas")


=== RESUMO DAS OBTs CRIADAS ===
✓ Games OBT: 2 registros, 18 colunas
✓ Users OBT: 2 registros, 16 colunas
✓ Developers OBT: 2 registros, 17 colunas


## Verificando Tabelas Gold Criadas

In [0]:
print("\n=== TABELAS GOLD DISPONÍVEIS ===")
try:
    spark.sql("SHOW TABLES IN pipeline_gold").show()
except:
    print("Database pipeline_gold ainda não foi criado ou não contém tabelas")


=== TABELAS GOLD DISPONÍVEIS ===
+-------------+--------------------+-----------+
|     database|           tableName|isTemporary|
+-------------+--------------------+-----------+
|pipeline_gold|      developers_obt|      false|
|pipeline_gold|           games_obt|      false|
|pipeline_gold|sales_performance...|      false|
|pipeline_gold|   user_behavior_obt|      false|
|pipeline_gold|           users_obt|      false|
+-------------+--------------------+-----------+



## Exemplos de Análises com as OBTs

### Análise 1: Top 10 Jogos Mais Vendidos

In [0]:
print("\n=== TOP 10 JOGOS MAIS VENDIDOS ===")
if games_obt is not None:
    games_obt.select('TITLE', 'TOTAL_PURCHASES', 'AVG_RATING', 'DEVELOPER_NAME') \
        .orderBy(col('TOTAL_PURCHASES').desc()) \
        .limit(10) \
        .show()


=== TOP 10 JOGOS MAIS VENDIDOS ===
+--------------------+---------------+----------+----------------+
|               TITLE|TOTAL_PURCHASES|AVG_RATING|  DEVELOPER_NAME|
+--------------------+---------------+----------+----------------+
|Elder Scrolls V: ...|              1|       9.0|        Bethesda|
|               Hades|              1|       8.0|Supergiant Games|
+--------------------+---------------+----------+----------------+



### Análise 2: Desenvolvedores com Melhor Performance

In [0]:
print("\n=== TOP 10 DESENVOLVEDORES POR VENDAS ===")
if developers_obt is not None:
    developers_obt.select('NAME', 'TOTAL_GAMES_DEVELOPED', 'TOTAL_SALES', 'AVG_RATING_RECEIVED', 'DEVELOPER_TIER') \
        .orderBy(col('TOTAL_SALES').desc()) \
        .limit(10) \
        .show(truncate=False)


=== TOP 10 DESENVOLVEDORES POR VENDAS ===
+----------------+---------------------+-----------+-------------------+--------------+
|NAME            |TOTAL_GAMES_DEVELOPED|TOTAL_SALES|AVG_RATING_RECEIVED|DEVELOPER_TIER|
+----------------+---------------------+-----------+-------------------+--------------+
|Bethesda        |1                    |1          |9.0                |Indie         |
|Supergiant Games|1                    |1          |8.0                |Indie         |
+----------------+---------------------+-----------+-------------------+--------------+



### Análise 3: Usuários Mais Engajados

In [0]:
print("\n=== TOP 10 USUÁRIOS MAIS ENGAJADOS ===")
if users_obt is not None:
    users_obt.select('NAME', 'EMAIL', 'TOTAL_GAMES_PURCHASED', 'TOTAL_REVIEWS_WRITTEN', 'ENGAGEMENT_SCORE', 'USER_TYPE') \
        .orderBy(col('ENGAGEMENT_SCORE').desc()) \
        .limit(10) \
        .show(truncate=False)


=== TOP 10 USUÁRIOS MAIS ENGAJADOS ===
+-----+-----------------+---------------------+---------------------+----------------+---------+
|NAME |EMAIL            |TOTAL_GAMES_PURCHASED|TOTAL_REVIEWS_WRITTEN|ENGAGEMENT_SCORE|USER_TYPE|
+-----+-----------------+---------------------+---------------------+----------------+---------+
|Alice|alice@example.com|1                    |1                    |6               |Casual   |
|Bob  |bob@example.com  |1                    |1                    |6               |Casual   |
+-----+-----------------+---------------------+---------------------+----------------+---------+

