**INICIO DO PROCESSO E CONFIGURAÇÃO DAS PASTAS**

In [0]:
# Importação de bibliotecas
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import datetime
import pandas as pd

# Configuração de diretórios
gold_path = "/Volumes/workspace/default/gold"
delta_table_path = "/Volumes/workspace/default/delta/final_climate_data"
delta_catalog = "/Volumes/workspace/default"
delta_table_name = "climate_data_final"

print("Configurando ambiente para criação da tabela Delta...")
print(f"Gold path: {gold_path}")
print(f"Delta path: {delta_table_path}")
print(f"Catalog: {delta_catalog}")
print(f"Table: {delta_table_name}")




**LENDO OS DADOS DA CAMADA GOLD E VISUALIZANDO**

In [0]:
def read_gold_data():
    """Lê todos os arquivos das camadas gold"""
    try:
        print("Lendo dados da camada Gold...")
        
        # Listar arquivos gold
        gold_files = dbutils.fs.ls(gold_path)
        weather_files = [f.path for f in gold_files if 'weather_gold' in f.name]
        air_quality_files = [f.path for f in gold_files if 'air_quality_gold' in f.name]
        
        print(f" Arquivos de weather encontrados: {len(weather_files)}")
        print(f" Arquivos de air quality encontrados: {len(air_quality_files)}")
        
        # Ler dados meteorológicos
        weather_dfs = []
        for file in weather_files:
            df = spark.read.parquet(file)
            weather_dfs.append(df)
        
        if weather_dfs:
            weather_df = weather_dfs[0]
            for df in weather_dfs[1:]:
                weather_df = weather_df.union(df)
            print(f"Total de registros meteorológicos: {weather_df.count()}")
        else:
            print(" Nenhum dado meteorológico encontrado")
            weather_df = None
        
        # Ler dados de qualidade do ar
        air_quality_dfs = []
        for file in air_quality_files:
            df = spark.read.parquet(file)
            air_quality_dfs.append(df)
        
        if air_quality_dfs:
            air_quality_df = air_quality_dfs[0]
            for df in air_quality_dfs[1:]:
                air_quality_df = air_quality_df.union(df)
            print(f"🌫️  Total de registros de qualidade do ar: {air_quality_df.count()}")
        else:
            print(" Nenhum dado de qualidade do ar encontrado")
            air_quality_df = None
        
        return weather_df, air_quality_df
        
    except Exception as e:
        print(f"Erro ao ler dados gold: {str(e)}")
        return None, None

# Ler os dados
weather_df, air_quality_df = read_gold_data()

# Visualizar amostra dos dados
if weather_df:
    print(" Amostra dos dados meteorológicos:")
    display(weather_df.limit(5))

if air_quality_df:
    print(" Amostra dos dados de qualidade do ar:")
    display(air_quality_df.limit(5))


**UNIFICANDO OS DADOS E SETANDO AS COLUNAS NECECSSARIAS**

In [0]:
def create_final_table(weather_df, air_quality_df):
    """Cria a tabela final unificada"""
    try:
        print("Unificando dados...")
        
        if weather_df is None or air_quality_df is None:
            print("Dados insuficientes para unificação")
            return None
        
        # Juntar os dados por timestamp e localização
        final_df = weather_df.alias("w").join(
            air_quality_df.alias("a"),
            (col("w.timestamp") == col("a.timestamp")) & 
            (col("w.location") == col("a.location")),
            "inner"
        ).select(
            col("w.timestamp"),
            col("w.date"),
            col("w.temperature_2m").alias("temperature_c"),
            col("w.relative_humidity_2m").alias("humidity_percent"),
            col("w.pressure_msl").alias("pressure_hpa"),
            col("w.precipitation").alias("precipitation_mm"),
            col("a.pm10"),
            col("a.pm2_5"),
            col("a.carbon_monoxide").alias("co_ugm3"),
            col("a.nitrogen_dioxide").alias("no2_ugm3"),
            col("w.location"),
            col("w.latitude"),
            col("w.longitude")
        )
        
        # Adicionar colunas calculadas
        final_df = final_df.withColumn(
            "temperature_f", 
            (col("temperature_c") * 9/5) + 32
        ).withColumn(
            "air_quality_index",
            when(col("pm2_5") <= 12, "Bom")
            .when(col("pm2_5") <= 35, "Moderado")
            .when(col("pm2_5") <= 55, "Ruim")
            .when(col("pm2_5") <= 150, "Muito Ruim")
            .otherwise("Perigoso")
        ).withColumn(
            "load_timestamp", 
            current_timestamp()
        )
        
        print(f"Dados unificados com sucesso!")
        print(f"Total de registros na tabela final: {final_df.count()}")
        print(f"Período dos dados: {final_df.agg(min('timestamp'), max('timestamp')).collect()[0]}")
        
        # Mostrar schema final
        print("  Schema da tabela final:")
        final_df.printSchema()
        
        return final_df
        
    except Exception as e:
        print(f"Erro na unificação dos dados: {str(e)}")
        return None

# Criar tabela final
final_table_df = create_final_table(weather_df, air_quality_df)

if final_table_df:
    print(" Amostra da tabela final:")
    display(final_table_df.limit(10))


**V2 AJUSTADA LENDO AS BIBLIOTECAS E DIRETORIOS**

In [0]:
# Importação de bibliotecas
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import datetime
import pandas as pd

# Configuração de diretórios
gold_path = "/Volumes/workspace/default/gold"
delta_table_path = "/Volumes/workspace/default/delta/final_climate_data"
delta_catalog = "/Volumes/workspace/default"
delta_table_name = "climate_data_final"

print("Configurando ambiente para criação da tabela Delta...")
print(f"Gold path: {gold_path}")
print(f"Delta path: {delta_table_path}")
print(f"Catalog: {delta_catalog}")
print(f"Table: {delta_table_name}")

**LENDO OS DADOS  DA CAMADA GOLD E VISUALIZANDO OS DADOS**

In [0]:
def read_gold_data():
    """Lê todos os arquivos das camadas gold"""
    try:
        print(" Lendo dados da camada Gold...")
        
        # Listar arquivos gold usando a sintaxe correta
        gold_files = dbutils.fs.ls(gold_path.replace("dbfs:", ""))
        weather_files = [f.path for f in gold_files if 'weather_gold' in f.name]
        air_quality_files = [f.path for f in gold_files if 'air_quality_gold' in f.name]
        
        print(f"  Arquivos de weather encontrados: {len(weather_files)}")
        print(f" Arquivos de air quality encontrados: {len(air_quality_files)}")
        
        # Ler dados meteorológicos
        weather_dfs = []
        for file in weather_files:
            df = spark.read.parquet(file)
            weather_dfs.append(df)
        
        if weather_dfs:
            weather_df = weather_dfs[0]
            for df in weather_dfs[1:]:
                weather_df = weather_df.union(df)
            print(f"  Total de registros meteorológicos: {weather_df.count()}")
        else:
            print("  Nenhum dado meteorológico encontrado")
            weather_df = None
        
        # Ler dados de qualidade do ar
        air_quality_dfs = []
        for file in air_quality_files:
            df = spark.read.parquet(file)
            air_quality_dfs.append(df)
        
        if air_quality_dfs:
            air_quality_df = air_quality_dfs[0]
            for df in air_quality_dfs[1:]:
                air_quality_df = air_quality_df.union(df)
            print(f"🌫️  Total de registros de qualidade do ar: {air_quality_df.count()}")
        else:
            print("  Nenhum dado de qualidade do ar encontrado")
            air_quality_df = None
        
        return weather_df, air_quality_df
        
    except Exception as e:
        print(f" Erro ao ler dados gold: {str(e)}")
        return None, None

# Ler os dados
weather_df, air_quality_df = read_gold_data()

# Visualizar amostra dos dados
if weather_df:
    print(" Amostra dos dados meteorológicos:")
    display(weather_df.limit(5))

if air_quality_df:
    print(" Amostra dos dados de qualidade do ar:")
    display(air_quality_df.limit(5))


****CRIAÇÃO DA TABELA  APLICANDO O JOIN** **

In [0]:

def create_final_table(weather_df, air_quality_df):
    """Cria a tabela final unificada"""
    try:
        print("Unificando dados...")
        
        if weather_df is None or air_quality_df is None:
            print(" Dados insuficientes para unificação")
            return None
        
        # Juntar os dados por timestamp e localização 
        final_df = weather_df.alias("w").join(
            air_quality_df.alias("a"),
            (col("w.timestamp") == col("a.timestamp")) & 
            (col("w.location") == col("a.location")),
            "inner"
        ).select(
            col("w.timestamp"),
            col("w.date"),
            col("w.temperature_2m").alias("temperature_c"),
            col("w.relative_humidity_2m").alias("humidity_percent"),
            col("w.pressure_msl").alias("pressure_hpa"),
            col("w.precipitation").alias("precipitation_mm"),
            col("a.pm10"),
            col("a.pm2_5"),
            col("a.carbon_monoxide").alias("co_ugm3"),
            col("a.nitrogen_dioxide").alias("no2_ugm3"),
            col("w.location"),
            col("w.latitude"),
            col("w.longitude")
           
        )
        
        # Adicionar colunas calculadas 
        final_df = final_df.withColumn(  
            "temperature_f", 
            (col("temperature_c") * 9/5) + 32
        ).withColumn(
            "air_quality_index",
            when(col("pm2_5") <= 12, "Bom")
            .when(col("pm2_5") <= 35, "Moderado")
            .when(col("pm2_5") <= 55, "Ruim")
            .when(col("pm2_5") <= 150, "Muito Ruim")
            .otherwise("Perigoso")
        ).withColumn(
            "load_timestamp", 
            current_timestamp()
        )
        
        print(f" Dados unificados com sucesso!")
        print(f"Total de registros na tabela final: {final_df.count()}")
        
        # Mostrar schema final
        print(" Schema da tabela final:")
        final_df.printSchema()
        
        return final_df
        
    except Exception as e:
        print(f" Erro na unificação dos dados: {str(e)}")
        return None

# Criar tabela final
final_table_df = create_final_table(weather_df, air_quality_df)

if final_table_df:
    print("Amostra da tabela final:")
    display(final_table_df.limit(10))


**CRIANDO  TABELA FINAL USANDO SQL**

In [0]:
def create_delta_table(final_df):
    """Cria a tabela Delta final """
    try:
        print(" Criando tabela Delta...")
        
        if final_df is None:
            print(" Nenhum dado para criar tabela")
            return None
        
        #  Unity Catalog
        
        catalog_name = "workspace"  #  catalog
        schema_name = "default"     #  schema
        table_name = "climate_data_final"
        full_table_name = f"{catalog_name}.{schema_name}.{table_name}"
        
        
        
        
        # Criar DataFrame temporário
        final_df.createOrReplaceTempView("temp_final_table")
        
        # Criar tabela usando SQL 
        spark.sql(f"""
            CREATE OR REPLACE TABLE {full_table_name}
            USING DELTA
            AS SELECT * FROM temp_final_table
        """)
        
        print(f" Tabela Delta criada com sucesso!")
        print(f"Tabela: {full_table_name}")
        
        # Verificar a tabela criada
        delta_df = spark.read.table(full_table_name)
        print(f"Total de registros na tabela Delta: {delta_df.count()}")
        
        return delta_df
        
    except Exception as e:
        print(f"Erro ao criar tabela Delta: {str(e)}")
        print(" Tentando método alternativo...")
        
        # Método alternativo usando DataFrameWriter
        try:
            # Escrever usando DataFrameWriter
            (final_df.write
                .format("delta")
                .mode("overwrite")
                .option("overwriteSchema", "true")
                .saveAsTable(full_table_name))
            
            print(f" Tabela Delta criada com método alternativo!")
            delta_df = spark.read.table(full_table_name)
            return delta_df
            
        except Exception as e2:
            print(f" Erro no método alternativo: {str(e2)}")
            return None


**CRIANDO A TABELA DELTA**

In [0]:
# Configuração 
catalog_name = "workspace"  # Seu catalog
schema_name = "default"     # Seu schema
table_name = "climate_data_final"
full_table_name = f"{catalog_name}.{schema_name}.{table_name}"

print("Configuração para Unity Catalog:")
print(f"Catalog: {catalog_name}")
print(f" Schema: {schema_name}")
print(f"Table: {table_name}")
print(f" Full name: {full_table_name}")

# COMMAND ----------

# MAGIC %md
# MAGIC ## 4.2 Executar Criação da Tabela

# COMMAND ----------

# Criar tabela Delta
delta_final_df = create_delta_table(final_table_df)

if delta_final_df:
    print(" Amostra da tabela Delta:")
    display(delta_final_df.limit(10))
else:
    print("Falha ao criar tabela Delta")


**CHECKLIST PARA VERIFICAÇÃO DA CRIAÇÃO DA TABELA**

In [0]:
# Verificar se a tabela foi criada corretamente
try:
    # Listar tabelas no schema
    tables = spark.sql(f"SHOW TABLES IN {catalog_name}.{schema_name} LIKE '{table_name}'")
    if tables.count() > 0:
        print("Tabela encontrada no Unity Catalog!")
        display(tables)
        
        # Detalhes da tabela
        table_details = spark.sql(f"DESCRIBE DETAIL {full_table_name}")
        print("Detalhes da tabela:")
        display(table_details)
        
        # Contagem de registros
        count = spark.sql(f"SELECT COUNT(*) FROM {full_table_name}").collect()[0][0]
        print(f" Total de registros: {count:,}")
        
    else:
        print(" Tabela não encontrada no Unity Catalog")
        
except Exception as e:
    print(f" Erro ao verificar tabela: {str(e)}")


**QUERYS SQL PARA CONSULTA DOS DADOS**

In [0]:
# consulta na tabela
try:
    print("Testando consulta na tabela...")
    test_query = spark.sql(f"""
        SELECT 
            timestamp,
            temperature_c,
            humidity_percent,
            pm2_5,
            air_quality_index
        FROM {full_table_name}
        ORDER BY timestamp DESC
        LIMIT 5
    """)
    display(test_query)
    print("Consulta executada com sucesso!")
    
except Exception as e:
    print(f"Erro na consulta: {str(e)}")
    print(" Tentando ver tabelas disponíveis...")
    
    # Listar todas as tabelas disponíveis
    try:
        all_tables = spark.sql("SHOW TABLES")
        display(all_tables)
    except:
        print("Não foi possível listar tabelas")


In [0]:
# SQL : Dados mais recentes
display(
    spark.sql(
        """
        SELECT
            timestamp,
            temperature_c,
            humidity_percent,
            pm2_5,
            air_quality_index
        FROM climate_data_final
        ORDER BY timestamp DESC
        LIMIT 10
        """
    )
)

In [0]:
# SQL: Médias por hora
print("Médias por hora:")
display(spark.sql(f"""
    SELECT
        DATE_TRUNC('hour', timestamp) as hour,
        ROUND(AVG(temperature_c), 1) as avg_temperature,
        ROUND(AVG(pm2_5), 1) as avg_pm2_5,
        COUNT(*) as readings
    FROM climate_data_final
    GROUP BY DATE_TRUNC('hour', timestamp)
    ORDER BY hour DESC
    LIMIT 12
"""))


In [0]:

# SQL: Qualidade do ar por faixa
print("Qualidade do ar por faixa:")
display(spark.sql(f"""
    SELECT
        air_quality_index,
        MIN(pm2_5) as min_pm2_5,
        MAX(pm2_5) as max_pm2_5,
        AVG(pm2_5) as avg_pm2_5,
        COUNT(*) as readings
    FROM climate_data_final
    GROUP BY air_quality_index
    ORDER BY avg_pm2_5
"""))

In [0]:
def create_delta_table_optimized(final_df):
    """Cria tabela Delta com particionamento e otimizações"""
    try:
        print("Criando tabela Delta otimizada...")
        
        if final_df is None:
            print(" Nenhum dado para criar tabela")
            return None
        
        # Configuração Unity Catalog
        catalog_name = "workspace"
        schema_name = "default"
        table_name = "climate_data_final"
        full_table_name = f"{catalog_name}.{schema_name}.{table_name}"
        
        # Adicionar colunas de particionamento
        final_df_partitioned = final_df.withColumn(
            "year", year(col("timestamp"))
        ).withColumn(
            "month", month(col("timestamp"))
        ).withColumn(
            "day", dayofmonth(col("timestamp"))
        )
        
        # Criar tabela Delta com particionamento e compressão
        (final_df_partitioned.write
            .format("delta")
            .mode("overwrite")
            .option("overwriteSchema", "true")
            .option("compression", "snappy")  # Compressão Snappy
            .partitionBy("year", "month")     # Particionamento por ano/mês
            .saveAsTable(full_table_name))
        
        print("Tabela Delta criada com otimizações!")
        print("Particionamento: year, month")
        print(" Compressão: Snappy")
        print(f" Tabela: {full_table_name}")
        
        # Otimizar a tabela
        optimize_delta_table(full_table_name)
        
        return spark.read.table(full_table_name)
        
    except Exception as e:
        print(f" Erro ao criar tabela Delta: {str(e)}")
        return None

def optimize_delta_table(table_name):
    """Otimiza a tabela Delta para melhor performance"""
    try:
        print(" Otimizando tabela Delta...")
        
        # Compactar arquivos (OPTIMIZE)
        spark.sql(f"OPTIMIZE {table_name} ZORDER BY (timestamp, location)")
        
        # Coletar estatísticas
        spark.sql(f"ANALYZE TABLE {table_name} COMPUTE STATISTICS")
        
        # Vacuum para limpeza
        spark.sql(f"VACUUM {table_name} RETAIN 168 HOURS")  # 7 dias
        
        print(" Tabela otimizada com sucesso!")
        
    except Exception as e:
        print(f" Aviso na otimização: {str(e)}")

**VISUALIZAÇÃO DE ALGUNS INDICADORES DO TEMPO EM FORMATO DE GRÁFICOS.**

In [0]:
def create_visualizations(final_table_name):
    """Cria visualizações básicas dos dados"""
    try:
        print("Criando visualizações...")
        
        # Converter para Pandas para visualização
        sample_data = spark.sql(f"""
            SELECT * FROM {final_table_name} 
            WHERE timestamp >= current_date() - INTERVAL 7 DAYS
            LIMIT 1000
        """).toPandas()
        
        # Configurar matplotlib
        import matplotlib.pyplot as plt
        import seaborn as sns
        plt.style.use('default')
        sns.set_palette("husl")
        
        # Figura com múltiplos gráficos
        fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 10))
        
        # 1. Temperatura ao longo do tempo
        sample_data['timestamp'] = pd.to_datetime(sample_data['timestamp'])
        sample_data.set_index('timestamp', inplace=True)
        
        sample_data['temperature_c'].resample('H').mean().plot(
            ax=ax1, title='Temperatura Média por Hora', color='red'
        )
        ax1.set_ylabel('°C')
        ax1.grid(True, alpha=0.3)
        
        # 2. Qualidade do ar
        quality_counts = sample_data['air_quality_index'].value_counts()
        ax2.pie(quality_counts.values, labels=quality_counts.index, autopct='%1.1f%%')
        ax2.set_title('Distribuição da Qualidade do Ar')
        
        # 3. PM2.5 vs Temperatura
        hourly_avg = sample_data.resample('H').mean()
        ax3.scatter(hourly_avg['temperature_c'], hourly_avg['pm2_5'], alpha=0.6)
        ax3.set_xlabel('Temperatura (°C)')
        ax3.set_ylabel('PM2.5 (µg/m³)')
        ax3.set_title('Relação: Temperatura vs PM2.5')
        ax3.grid(True, alpha=0.3)
        
        # 4. Série temporal PM2.5
        sample_data['pm2_5'].resample('H').mean().plot(
            ax=ax4, title='PM2.5 Médio por Hora', color='orange'
        )
        ax4.set_ylabel('µg/m³')
        ax4.grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.show()
        
        print("Visualizações criadas com sucesso!")
        
    except Exception as e:
        print(f"Erro nas visualizações: {str(e)}")

# Executar as visualizações
create_visualizations("workspace.default.climate_data_final")

In [0]:
# Configuração
catalog_name = "workspace"
schema_name = "default"
table_name = "climate_data_final"
full_table_name = f"{catalog_name}.{schema_name}.{table_name}"

print("Iniciando processo de persistência otimizada...")

# 1. Ler dados da Gold
weather_df, air_quality_df = read_gold_data()

# 2. Criar tabela final
final_table_df = create_final_table(weather_df, air_quality_df)

# 3. Criar tabela Delta otimizada
if final_table_df:
    delta_final_df = create_delta_table_optimized(final_table_df)
    
    # 4. Verificações
    if delta_final_df:
        print("Persistência concluída com sucesso!")
        print(" Resumo da implementação:")
        print(f" Formato: Delta/Parquet com Snappy")
        print(f" Particionamento: year, month")
        print(f" Compressão: Habilitada")
        print(f" Otimizações: Z-Ordering aplicado")
        
        # 5. Visualizações
        create_visualizations(full_table_name)
        
        # 6. Exemplo abaixo de consulta otimizada filtrando dados por ano e mês e agrupando por ano e mês
        print("Exemplo de consulta particionada:")
        display(spark.sql(f"""
            SELECT 
                year,
                month,
                AVG(temperature_c) as avg_temp,
                AVG(pm2_5) as avg_pm25,
                COUNT(*) as readings
            FROM {full_table_name}
            WHERE year = 2024 AND month = 12
            GROUP BY year, month
            ORDER BY year, month
        """))

In [0]:
def verify_implementation():
    """Verifica se todas as otimizações foram aplicadas"""
    print("Verificando implementação...")
    
    try:
        # Verificar particionamento
        partitions = spark.sql(f"""
            SHOW PARTITIONS {full_table_name}
        """)
        print("Particionamento implementado:")
        display(partitions.limit(5))
        
        # Verificar formato e compressão
        details = spark.sql(f"""
            DESCRIBE DETAIL {full_table_name}
        """).collect()[0]
        
        print(f"Formato: {details.format}")
        print(f"Localização: {details.location}")
        print(f"Tamanho: {round(details.sizeInBytes/1024/1024, 2)} MB")
        
        # Verificar estatísticas
        print("Estatísticas da tabela:")
        display(spark.sql(f"""
            SELECT * FROM {full_table_name}
            LIMIT 1
        """))
        
    except Exception as e:
        print(f"Erro na verificação: {str(e)}")

# Executar verificação
verify_implementation()

**## EXPLICAÇÃO DOS CONCEITOS ABORDADOS NESSE PROJETO

### Formato Escolhido: Delta Lake/Parquet com Compressão Snappy

**Por quê Delta/Parquet?**
- **Schema Enforcement**: Validação de schema nativa
- **ACID Compliance**: Transações atômicas
- **Time Travel**: Versionamento de dados
- **Eficiência**: Formato colunar para consultas analíticas
- **Compressão Snappy**: Balanceamento entre velocidade e taxa de compressão

### Estratégia de Particionamento: Ano/Mês

**Por quê particionamento temporal?**
- **Performance**: Consultas filtradas por tempo são 10-100x mais rápidas
- **Gerenciamento**: Facilita operações de manutenção (VACUUM, OPTIMIZE)
- **Custo**: Redução de dados escaneados em consultas
- **Organização**: Estrutura lógica para data lakes

### Contribuição para Desempenho e Escalabilidade

**Benefícios de performance:**
- **Z-Ordering**: Melhora a localidade dos dados para consultas comuns
- **Estatísticas**: Otimizador de consultas toma decisões melhores
- **Compactação**: Redução de 70-80% no armazenamento
- **Leitura seletiva**: Somente colunas necessárias são lidas

**Escalabilidade:**
- Suporte a petabytes de dados
- Processamento distribuído nativo
- Integração com ecossistema Spark

### Cenários de Consumo Futuro

1. **Dashboards Analíticos**
   - Power BI/Tableau para monitoramento em tempo real
   - Métricas de qualidade do ar e clima

2. **Machine Learning**
   - Previsão de qualidade do ar
   - Modelos de séries temporais climáticas

3. ** Análises Ad-hoc**
   - Consultas SQL exploratórias
   - Correlação entre variáveis climáticas

4. ** Sistemas Downstream**
   - APIs para consumo de dados
   - Alertas automáticos baseados em thresholds

5. ** Relatórios Regulatórios**
   - Conformidade ambiental
   - Tendências históricas**