In [1]:
import logging

# Cria um logger com o nome 'jupyter_logger'
logger = logging.getLogger('jupyter_logger')

# Define o nível de log para DEBUG, então todas as mensagens de log serão mostradas
logger.setLevel(logging.DEBUG)

# Cria um manipulador de log que escreve as mensagens de log na saída padrão
handler = logging.StreamHandler()

# Define o nível de log do manipulador para DEBUG
handler.setLevel(logging.DEBUG)

logger.addHandler(handler)

In [2]:
from pyspark.sql import SparkSession
from delta.tables import DeltaTable
from pyspark.sql.functions import col
from pyspark import SparkContext


spark = SparkSession.builder \
    .appName("Merge with Delta Lake") \
    .config("spark.task.maxFailures", "1") \
    .config("fs.s3a.endpoint", "http://minio:9000") \
    .config("fs.s3a.access.key", "348GTMvf6HFATtNh") \
    .config("fs.s3a.secret.key", "nqyaOwy8bOSF8OuIeY8urJiHYVFQWqpx") \
    .config("fs.s3a.path.style.access", "true") \
    .config("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.ssl.enabled", "false") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

def executing_merge(raw_bucket,trust_bucket, spark=spark):
    
    
    # raw_bucket = "s3a://raw/user"

    newData = spark.read.json(f"{raw_bucket}/*")
    path = trust_bucket #"s3a://trusted/user/"
    
    
    df_raw = spark.read.format("binaryFile").load(f"{raw_bucket}/*")
    count_raw = df_raw.count()
    
    
    df_trusted_before_merge = spark.read.format("binaryFile").load(f"{path}/*")
    count_trusted_before_merge = df_trusted_before_merge.count()
    
    logger.debug(f"Before merge in raw bucket has {count_raw} and trust bucket has {count_trusted_before_merge}")
    
    
    if DeltaTable.isDeltaTable(spark, path):
        deltaTable = DeltaTable.forPath(spark, path)
    else:
        
        selectedData = newData.select(col('data.id').alias('id'),col('data.first_name').alias('first_name'),col('data.last_name').alias('last_name'),col('data.email').alias('email'),col('data.date_of_birth').alias('date_of_birth'))
        selectedData.write.format("delta").save(path)
        deltaTable = DeltaTable.forPath(spark, path)
    
    logger.debug("Executing merge")
    deltaTable.alias("oldData") \
        .merge(
            newData.alias("newData"),
            "oldData.id = newData.data.id") \
        .whenMatchedUpdate(set = { 
            "first_name" : "newData.data.first_name",
            "last_name" : "newData.data.last_name",
            "email" : "newData.data.email",
            "date_of_birth" : "newData.data.date_of_birth" }) \
        .whenNotMatchedInsert(values = { 
            "id": "newData.data.id",
            "first_name" : "newData.data.first_name",
            "last_name" : "newData.data.last_name",
            "email" : "newData.data.email",
            "date_of_birth" : "newData.data.date_of_birth" }) \
        .execute()
    
    df_trusted_after_merge = spark.read.format("binaryFile").load(f"{path}/*")
    count_trusted_after_merge = df_trusted_after_merge.count()
    logger.debug(f"After merge in raw bucket has {count_raw} and trust bucket has {count_trusted_after_merge}")
    
#     

In [3]:
raw_bucket="s3a://raw/user"
trust_bucket="s3a://trusted/user"

In [4]:
executing_merge(raw_bucket=raw_bucket,trust_bucket=trust_bucket)

Before merge in raw bucket has 126 and trust bucket has 36
Executing merge
After merge in raw bucket has 126 and trust bucket has 38
