
### Silver Layer


##### Descrição da estrutura do arquivo

In [None]:
# Caminho para o diretório da camada Bronze
bronze_directory = "dbfs:/covid/bronze/" 

# Verificando se o diretório existe e contém arquivos
try:
    files = dbutils.fs.ls(bronze_directory)
    if len(files) > 0:
        print(f"Arquivos encontrados no diretório {bronze_directory}.")
        
        # Carregando os dados da camada Bronze
        df_bronze = spark.read.format("delta").load(bronze_directory)

        # Quantidade de linhas e colunas
        num_rows = df_bronze.count()
        num_cols = len(df_bronze.columns)

        # Tamanho do arquivo em MB
        total_size_mb = sum([file.size for file in files]) / (1024 * 1024)

        # Informações do dataset
        print(f"Tipo de arquivo: Delta")
        print(f"Tamanho total do arquivo: {total_size_mb:.2f} MB")
        print(f"Quantidade de linhas: {num_rows}")
        print(f"Quantidade de colunas: {num_cols}")
    else:
        print(f"O diretório {bronze_directory} existe, mas está vazio.")

except Exception as e:
    print(f"Erro ao acessar ou processar o diretório {bronze_directory}: {e}")


Arquivos encontrados no diretório dbfs:/covid/bronze/.
Tipo de arquivo: Delta
Tamanho total do arquivo: 19.15 MB
Quantidade de linhas: 22712
Quantidade de colunas: 737



##### Lendo e carregando a camada bronze

In [None]:
# Definindo o diretório da camada Silver para armazenar em Delta
silver_directory_delta = "dbfs:/covid/silver/"

# Converter os dados para o formato Delta e salvar na camada Silver
df_bronze.write.format("delta").mode("overwrite").save(silver_directory_delta)
print("Dados convertidos para o formato Delta na camada Silver.")

Dados convertidos para o formato Delta na camada Silver.


In [None]:
# Caminho absoluto para o diretório da camada Bronze
bronze_directory = "dbfs:/covid/bronze/"

# Carregar os dados da camada Bronze
df_silver = spark.read.format("delta").load(bronze_directory)



##### Calculando colunas com valores ausentes para remoção

In [None]:
from pyspark.sql import functions as F

# Contagem total de linhas antes da transformação
total_rows = df_silver.count()

# Calcular a contagem de valores não nulos em cada coluna
non_null_counts = df_silver.select([F.count(F.when(F.col(c).isNotNull(), c)).alias(c) for c in df_silver.columns])

# Coletar os resultados para calcular a proporção de nulos
non_null_counts_dict = non_null_counts.collect()[0].asDict()

# Identificar as colunas a serem removidas com mais de 20% de valores nulos
cols_to_drop = [col for col, non_null_count in non_null_counts_dict.items() if (total_rows - non_null_count) / total_rows > 0.20]

# Remover as colunas identificadas
df_silver = df_silver.drop(*cols_to_drop)

# Contagem das colunas e registros restantes
num_cols_remaining = len(df_silver.columns)
num_rows_remaining = df_silver.count()

# Exibir informações
num_cols_removed = len(cols_to_drop)
print(f"Colunas removidas: {cols_to_drop}")
print(f"Total de colunas removidas: {num_cols_removed}")
print(f"Total de colunas restantes: {num_cols_remaining}")
print(f"Total de registros restantes: {num_rows_remaining}")

Colunas removidas: []
Total de colunas removidas: 0
Total de colunas restantes: 27
Total de registros restantes: 22712


In [None]:
df_silver.columns

Out[17]: ['location_key',
 'date',
 'new_confirmed',
 'new_deceased',
 'cumulative_confirmed',
 'cumulative_deceased',
 'average_temperature_celsius',
 'minimum_temperature_celsius',
 'maximum_temperature_celsius',
 'rainfall_mm',
 'dew_point',
 'relative_humidity',
 'latitude',
 'longitude',
 'area_sq_km',
 'place_id',
 'wikidata_id',
 'country_code',
 'country_name',
 'subregion1_code',
 'subregion1_name',
 'subregion2_code',
 'subregion2_name',
 'iso_3166_1_alpha_2',
 'iso_3166_1_alpha_3',
 'aggregation_level',
 'population']

In [None]:
from pyspark.sql.functions import col

# Selecionando apenas as colunas relevantes na camada Silver
df_silver_selected = df_silver.select(
    col('location_key'),
    col('date'),
    col('new_confirmed'),
    col('new_deceased'),
    col('cumulative_confirmed'),
    col('cumulative_deceased'),
    col('average_temperature_celsius'),
    col('minimum_temperature_celsius'),
    col('maximum_temperature_celsius'),
    col('rainfall_mm'),
    col('dew_point'),
    col('relative_humidity'),
    col('area_sq_km'),
    col('country_name'),
    col('subregion1_name'),
    col('subregion2_name'),
    col('aggregation_level'),
    col('population')
)

# Exibir o esquema 
df_silver_selected.printSchema()

root
 |-- location_key: string (nullable = true)
 |-- date: date (nullable = true)
 |-- new_confirmed: integer (nullable = true)
 |-- new_deceased: integer (nullable = true)
 |-- cumulative_confirmed: integer (nullable = true)
 |-- cumulative_deceased: integer (nullable = true)
 |-- average_temperature_celsius: double (nullable = true)
 |-- minimum_temperature_celsius: double (nullable = true)
 |-- maximum_temperature_celsius: double (nullable = true)
 |-- rainfall_mm: double (nullable = true)
 |-- dew_point: double (nullable = true)
 |-- relative_humidity: double (nullable = true)
 |-- area_sq_km: integer (nullable = true)
 |-- country_name: string (nullable = true)
 |-- subregion1_name: string (nullable = true)
 |-- subregion2_name: string (nullable = true)
 |-- aggregation_level: integer (nullable = true)
 |-- population: integer (nullable = true)




##### Transformações

In [None]:
from pyspark.sql.functions import round

# Arredondar as colunas climáticas e outras especificadas para duas casas decimais
df_silver_transformed = df_silver_selected.withColumn('average_temperature_celsius', round(col('average_temperature_celsius'), 2)) \
    .withColumn('minimum_temperature_celsius', round(col('minimum_temperature_celsius'), 2)) \
    .withColumn('maximum_temperature_celsius', round(col('maximum_temperature_celsius'), 2)) \
    .withColumn('rainfall_mm', round(col('rainfall_mm'), 2)) \
    .withColumn('dew_point', round(col('dew_point'), 2)) \
    .withColumn('relative_humidity', round(col('relative_humidity'), 2))

# Verificação dos primeiros registros após o arredondamento
df_silver_transformed.show(5)

+------------+----------+-------------+------------+--------------------+-------------------+---------------------------+---------------------------+---------------------------+-----------+---------+-----------------+----------+--------------------+---------------+---------------+-----------------+----------+
|location_key|      date|new_confirmed|new_deceased|cumulative_confirmed|cumulative_deceased|average_temperature_celsius|minimum_temperature_celsius|maximum_temperature_celsius|rainfall_mm|dew_point|relative_humidity|area_sq_km|        country_name|subregion1_name|subregion2_name|aggregation_level|population|
+------------+----------+-------------+------------+--------------------+-------------------+---------------------------+---------------------------+---------------------------+-----------+---------+-----------------+----------+--------------------+---------------+---------------+-----------------+----------+
|          AE|2022-09-13|          402|           0|             10

In [None]:
from pyspark.sql.functions import col, round, to_date

# Tratamento de valores nulos
df_silver_cleaned = df_silver_transformed.fillna({
    'new_confirmed': 0,
    'new_deceased': 0,
    'cumulative_confirmed': 0,
    'cumulative_deceased': 0,
    'average_temperature_celsius': df_silver_transformed.agg({'average_temperature_celsius': 'avg'}).first()[0],
    'minimum_temperature_celsius': df_silver_transformed.agg({'minimum_temperature_celsius': 'avg'}).first()[0],
    'maximum_temperature_celsius': df_silver_transformed.agg({'maximum_temperature_celsius': 'avg'}).first()[0],
    'rainfall_mm': 0,
    'dew_point': 0,
    'relative_humidity': 0,
    'population': 0
})

In [None]:
# Remover linhas onde colunas críticas como 'location_key' ou 'date' sejam nulas
df_silver_cleaned = df_silver_cleaned.dropna(subset=['location_key', 'date'])

# Verificação e remoção de duplicatas da coluna 'location_key'
df_silver_cleaned = df_silver_cleaned.dropDuplicates(['location_key'])

In [None]:
# Ordenando os dados pela coluna 'date'
df_silver = df_silver_cleaned.orderBy(col('date'))

# Verificação dos primeiros registros após o arredondamento e ordenação
df_silver.show(10)

+------------+----------+-------------+------------+--------------------+-------------------+---------------------------+---------------------------+---------------------------+-----------+---------+-----------------+----------+------------+---------------+--------------------+-----------------+----------+
|location_key|      date|new_confirmed|new_deceased|cumulative_confirmed|cumulative_deceased|average_temperature_celsius|minimum_temperature_celsius|maximum_temperature_celsius|rainfall_mm|dew_point|relative_humidity|area_sq_km|country_name|subregion1_name|     subregion2_name|aggregation_level|population|
+------------+----------+-------------+------------+--------------------+-------------------+---------------------------+---------------------------+---------------------------+-----------+---------+-----------------+----------+------------+---------------+--------------------+-----------------+----------+
|       PT_18|2020-07-09|           11|           0|                 562|   

Aplicando testes de valores nulos, unicidade e intervalos de valores

In [None]:
from pyspark.sql import DataFrame
from pyspark.sql.functions import col, count

# Função para verificar se uma coluna não contém valores nulos
def check_no_nulls(df: DataFrame, column_name: str) -> bool:
    null_count = df.filter(col(column_name).isNull()).count()
    assert null_count == 0, f"Coluna '{column_name}' contém {null_count} valores nulos"
    print(f"Teste passado: Coluna '{column_name}' não contém valores nulos.")
    return True

# Função para verificar se uma coluna é única
def check_uniqueness(df: DataFrame, column_name: str) -> bool:
    unique_count = df.select(column_name).distinct().count()
    total_count = df.count()
    assert unique_count == total_count, f"Coluna '{column_name}' contém valores duplicados."
    print(f"Teste passado: Coluna '{column_name}' é única.")
    return True

# Função para verificar se os valores estão dentro de um intervalo
def check_value_range(df: DataFrame, column_name: str, min_value: float, max_value: float) -> bool:
    out_of_range_count = df.filter((col(column_name) < min_value) | (col(column_name) > max_value)).count()
    assert out_of_range_count == 0, f"Coluna '{column_name}' contém {out_of_range_count} valores fora do intervalo [{min_value}, {max_value}]."
    print(f"Teste passado: Todos os valores na coluna '{column_name}' estão dentro do intervalo [{min_value}, {max_value}].")
    return True

# Executando os testes de qualidade
check_no_nulls(df_silver_transformed, 'location_key')
check_uniqueness(df_silver_transformed, 'location_key')
check_no_nulls(df_silver_transformed, 'date')
check_value_range(df_silver_transformed, 'average_temperature_celsius', -100, 60)  # Exemplo de intervalo para temperatura


Teste passado: Coluna 'location_key' não contém valores nulos.
Teste passado: Coluna 'location_key' é única.
Teste passado: Coluna 'date' não contém valores nulos.
Teste passado: Todos os valores na coluna 'average_temperature_celsius' estão dentro do intervalo [-100, 60].
Out[33]: True

In [None]:
# Caminho da camada Silver
silver_directory = "/covid/silver/" 

# Escrever os dados transformados na camada Silver em formato Delta
df_silver.write.format("delta").mode("overwrite").save(silver_directory)

print(f"Dados transformados salvos na camada Silver: {silver_directory}")


Dados transformados salvos na camada Silver: /covid/silver/
