In [0]:
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, current_timestamp, regexp_replace
from pathlib import Path

In [0]:
spark = SparkSession.builder\
    .appName("silver_layer")\
    .getOrCreate()

In [0]:
BRONZE_PATH = "/mnt/breweries/bronze"
SILVER_PATH = "/mnt/breweries/silver/breweries.parquet"

In [0]:
def silver_layer():
    bronze_files = [file.path for file in dbutils.fs.ls(BRONZE_PATH) if file.name.endswith(".json")]
    
    if not bronze_files:
        return 'No files found in Bronze layer'

    file_list = []
    for f in bronze_files:
        file_json = f.split('_')[1]
        file_datetime = file_json.split('.')[0]
        datetimecompare = datetime.strptime(file_datetime, "%Y%m%d%H%M%S")
        file_list.append((f, datetimecompare))

    latest_file = max(file_list, key=lambda x: x[1])

    latest_file_path = latest_file[0]

    print(f"Latest file: {latest_file_path}")

    new_data = spark.read.json(latest_file_path)

    # Adicionar a coluna de data de atualização
    new_data = new_data.withColumn('updated_at', current_timestamp())

    # Tratamento da coluna phone
    new_data = new_data.withColumn(
        'phone',
        when(col('phone').isNotNull(), regexp_replace(col('phone').cast("string"), " ", ""))
        .otherwise(col('phone'))
    )

    # Tratamento dados latitude e longitude
    new_data = new_data.withColumn('latitude', col('latitude').cast('float'))
    new_data = new_data.withColumn('longitude', col('longitude').cast('float'))

    try:
        files = dbutils.fs.ls(SILVER_PATH)
        if len(files) > 0: 
            existing_data = spark.read.parquet(SILVER_PATH)
        else:
            existing_data = new_data
    except Exception as e:
        existing_data = new_data
        print(f"Erro ao verificar o arquivo: {e}")

    # atualizar registros existentes e adicionar novos
    merged_data = new_data.unionByName(existing_data).dropDuplicates(["id"])

    # Salvando o resultado na camada Silver
    merged_data.write.mode("overwrite").partitionBy("state", "updated_at").parquet(SILVER_PATH)

    return f'Silver layer updated with Parquet data, file: {latest_file[0]}, output_path: {SILVER_PATH}'

In [0]:
print("RUNNING SILVER LAYER...")
print(silver_layer())
print("SILVER LAYER FINISH WITH SUCCESS!")