# Como criar um cluster Databricks para este notebook\n
\n
1. No menu lateral do Databricks, clique em **Compute** (ou **Clusters**).\n
2. Clique em **Create Cluster**.\n
3. Preencha as configurações:\n
   - **Cluster name:** cluster-transformacao\n
   - **Cluster mode:** Standard\n
   - **Databricks Runtime Version:** 10.4 LTS (ou superior)\n
   - **Node type:** Standard_DS3_v2 (ou equivalente)\n
   - **Number of workers:** 1 ou 2 para testes\n
   - **Auto Termination:** 30 minutos (opcional)\n
4. Clique em **Create Cluster**.\n
5. Quando o cluster estiver ativo, selecione-o no topo deste notebook para executar as células.

## Configuração da Sessão Spark\n
Inicializa a sessão Spark. A configuração exata pode precisar de ajustes dependendo do ambiente (Databricks, Synapse).

In [None]:
# Inicializa a sessão Spark (Databricks)
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("TransformacaoDados") \
    .getOrCreate()

print(f"SparkSession criada com sucesso: {spark.version}")

## Leitura dos Dados Ingeridos\n
Lê os arquivos Parquet do NOAA ISD que foram copiados pelo pipeline de ingestão para o contêiner 'output'.

In [None]:
# Leitura dos Dados Ingeridos
storage_account_name = "datavalidation456"  # Nome real da sua storage account
container_name = "output"  # Nome real do container

# Caminho dos dados Parquet
input_path = f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/"

print(f"Lendo dados de: {input_path}")

df_raw = spark.read.parquet(input_path)
print("Schema dos dados lidos:")
df_raw.printSchema()
df_raw.show(5)

## Exploração e Inspeção de Dados
Vamos examinar os dados para entender melhor sua estrutura e conteúdo antes de realizar transformações.

In [None]:
# Exploração básica
print(f"Contagem total de registros: {df_raw.count()}")

# Ver distribuição por ano
from pyspark.sql.functions import year, col, count

# Assumindo que existe uma coluna com timestamp
# Ajuste o nome da coluna conforme necessário
if 'date' in df_raw.columns:
    timestamp_col = 'date'
elif 'datetime' in df_raw.columns:
    timestamp_col = 'datetime'
elif 'timestamp' in df_raw.columns:
    timestamp_col = 'timestamp'
else:
    # Imprimir colunas disponíveis
    print("Colunas disponíveis:")
    print(df_raw.columns)
    timestamp_col = None

if timestamp_col:
    print("\nDistribuição de registros por ano:")
    df_raw.groupBy(year(col(timestamp_col)).alias("ano")) \
        .agg(count("*").alias("contagem")) \
        .orderBy("ano") \
        .show(20)

# Verificar valores nulos
print("\nContagem de valores nulos por coluna:")
from pyspark.sql.functions import col, count, isnan, when

def null_value_count(df):
    return df.select([count(when(col(c).isNull() | isnan(col(c)), c)).alias(c) for c in df.columns])

null_value_count(df_raw).show()

## Transformação de Dados
Vamos realizar as seguintes transformações nos dados meteorológicos:
1. Filtrar apenas o ano mais recente (2023)
2. Selecionar e renomear colunas relevantes
3. Eliminar registros com valores nulos em campos importantes
4. Converter unidades de medida quando necessário

In [None]:
# Identificar o esquema e colunas necessárias
# Isso ajudará na adaptação do código para o esquema real
import re
from pyspark.sql.functions import col, year, month, dayofmonth, hour, expr, to_timestamp, lit
from pyspark.sql.types import DoubleType

# Mapeamento para colunas esperadas - ajuste conforme seu esquema real
# Este é um mapeamento genérico baseado em dados meteorológicos típicos
column_mappings = {
    # Identificadores de estação (USAF/WBAN são comuns no formato NOAA ISD)
    r'^(usaf|station_id)$': 'station_id_usaf',
    r'^(wban)$': 'station_id_wban',
    
    # Coordenadas geográficas
    r'^(lat|latitude)$': 'latitude',
    r'^(lon|long|longitude)$': 'longitude',
    r'^(elev|elevation)$': 'elevation',
    
    # Data e hora
    r'^(date|datetime|time|timestamp)$': 'timestamp',
    
    # Temperaturas - geralmente em diferentes formatos
    r'^(temp|temperature|air_temperature|t_mean)$': 'temperature'
}

# Mapear as colunas existentes para as colunas padronizadas
mapped_columns = {}
for col_name in df_raw.columns:
    for pattern, mapped_name in column_mappings.items():
        if re.match(pattern, col_name.lower()):
            mapped_columns[col_name] = mapped_name
            break

print("Mapeamento de colunas detectado:")
for orig, mapped in mapped_columns.items():
    print(f"{orig} -> {mapped}")

# Filtrar ano mais recente (2023)
if 'timestamp' in mapped_columns.values():
    orig_timestamp_col = [k for k, v in mapped_columns.items() if v == 'timestamp'][0]
    
    # Criar DataFrame com colunas renomeadas
    df_renamed = df_raw.select(
        *[col(orig_col).alias(mapped_col) 
          for orig_col, mapped_col in mapped_columns.items()]
    )
    
    # Filtrar por ano (2023)
    df_filtered = df_renamed.filter(year(col("timestamp")) == 2023)
    
    print(f"\nRegistros após filtrar por ano 2023: {df_filtered.count()}")
else:
    print("\nColuna de timestamp não identificada. Pulando filtro por ano.")
    df_filtered = df_raw

# Remover registros com valores nulos em colunas críticas
critical_columns = [col_name for col_name in [
    'station_id_usaf', 'timestamp', 'temperature'
] if col_name in df_filtered.columns]

df_clean = df_filtered
for col_name in critical_columns:
    df_clean = df_clean.filter(col(col_name).isNotNull())

print(f"\nRegistros após remover nulos em colunas críticas: {df_clean.count()}")

# Conversão de unidades (se necessário)
# Exemplo: converter temperatura de Fahrenheit para Celsius se necessário
# Verificando a faixa de valores para determinar a unidade
if 'temperature' in df_clean.columns:
    temp_stats = df_clean.select(
        expr("min(temperature)").alias("min_temp"),
        expr("max(temperature)").alias("max_temp"),
        expr("avg(temperature)").alias("avg_temp")
    ).collect()[0]
    
    print(f"\nEstatísticas de temperatura:")
    print(f"Mínima: {temp_stats['min_temp']}")
    print(f"Máxima: {temp_stats['max_temp']}")
    print(f"Média: {temp_stats['avg_temp']}")
    
    # Se a temperatura máxima for muito alta, provavelmente está em Fahrenheit
    if temp_stats['max_temp'] > 50:  # assumimos que temperaturas acima de 50 são Fahrenheit
        print("\nConvertendo temperaturas de Fahrenheit para Celsius...")
        df_clean = df_clean.withColumn(
            "temperature", 
            ((col("temperature") - 32) * 5/9).cast(DoubleType())
        )
        
        # Verificar novas estatísticas após conversão
        new_temp_stats = df_clean.select(
            expr("min(temperature)").alias("min_temp"),
            expr("max(temperature)").alias("max_temp"),
            expr("avg(temperature)").alias("avg_temp")
        ).collect()[0]
        
        print(f"\nEstatísticas após conversão:")
        print(f"Mínima: {new_temp_stats['min_temp']}")
        print(f"Máxima: {new_temp_stats['max_temp']}")
        print(f"Média: {new_temp_stats['avg_temp']}")

# Visualizar resultado da transformação
print("\nEsquema dos dados transformados:")
df_clean.printSchema()
print("\nAmostra dos dados transformados:")
df_clean.show(5)

## Gravação dos Dados Transformados
Vamos salvar os dados transformados em formato Parquet no Blob Storage.

In [None]:
# Configurações para salvar os dados transformados
# Usar o mesmo storage account, mas um diretório específico para dados transformados
transformed_folder = "transformed_weather_data"
output_path = f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/{transformed_folder}/"

print(f"Salvando dados transformados em: {output_path}")

# Escrever os dados em formato Parquet
# Modo 'overwrite' substituirá dados existentes
try:
    df_clean.write \
        .mode("overwrite") \
        .parquet(output_path)
    print("\nDados transformados gravados com sucesso!")
    print(f"Total de registros gravados: {df_clean.count()}")
except Exception as e:
    print(f"\nErro ao gravar dados transformados: {str(e)}")

# Confirmar se os dados foram gravados corretamente lendo-os de volta
try:
    df_verification = spark.read.parquet(output_path)
    print("\nVerificação da gravação:")
    print(f"Contagem de registros lidos: {df_verification.count()}")
    df_verification.show(5)
except Exception as e:
    print(f"\nErro ao ler dados transformados para verificação: {str(e)}")

## Resumo e Métricas de Qualidade
Vamos calcular algumas métricas para avaliar a qualidade dos dados transformados.

In [None]:
# Calcular métricas de qualidade dos dados
print("Métricas de qualidade dos dados transformados:")

# Contagem total de registros
total_records = df_clean.count()
print(f"\nTotal de registros: {total_records}")

# Verificar completude dos dados (ausência de valores nulos)
from pyspark.sql.functions import col, count, when, isnan, round as spark_round

# Calcular porcentagem de valores não-nulos para cada coluna
completeness = {}
for column_name in df_clean.columns:
    non_null_count = df_clean.filter(~(col(column_name).isNull() | isnan(col(column_name)))).count()
    completeness[column_name] = (non_null_count / total_records) * 100

print("\nCompletude dos dados (% de valores não-nulos):")
for col_name, percentage in completeness.items():
    print(f"{col_name}: {percentage:.2f}%")

# Estatísticas para colunas numéricas
numeric_columns = [column_name for column_name in df_clean.columns
                 if df_clean.schema[column_name].dataType.typeName() in ["double", "integer", "long", "float"]]

if numeric_columns:
    print("\nEstatísticas para colunas numéricas:")
    df_clean.select([spark_round(expr(f"min({col})"), 2).alias(f"min_{col}") for col in numeric_columns] +
                   [spark_round(expr(f"max({col})"), 2).alias(f"max_{col}") for col in numeric_columns] +
                   [spark_round(expr(f"avg({col})"), 2).alias(f"avg_{col}") for col in numeric_columns]).show()

# Verificar distribuição por meses (se tiver timestamp)
if 'timestamp' in df_clean.columns:
    print("\nDistribuição de registros por mês em 2023:")
    df_clean.groupBy(month(col("timestamp")).alias("mês"))\
        .agg(count("*").alias("contagem"))\
        .orderBy("mês")\
        .show()

print("\nProcessamento de dados concluído!")