 # Carregamento da Bronze para a Silver
 
 Este notebook realiza as seguintes etapas:
 
 1. Lê os dados já processados na camada Bronze (tabelas Delta).
 2. Aplica transformações adicionais, incluindo:
    - A criação (ou recalculação) de uma chave derivada para cada registro.
    - A inclusão de um campo `update_date` com o timestamp atual.
 3. Carrega os dados na camada Silver:
    - Se a tabela Silver ainda não existir, ela é criada.
    - Se a tabela Silver já existir, é realizado um merge (atualizando registros existentes e inserindo os novos) com base na chave derivada.

 **Observações:**
 - As funções `flatten_df` e `add_derived_key` são utilizadas para preparar os dados.
 - As variáveis `bronze_schema` e `silver_schema` definem os nomes dos schemas para Bronze e Silver, respectivamente.
 - Ajuste as chaves de merge se necessário; neste exemplo usamos a coluna `derived_key`.


In [None]:

# Exemplo de definição dos schemas Bronze e Silver
bronze_schema = "bronze_health"
silver_schema = "silver_health"

In [16]:
from pyspark.sql.functions import col, sha2, concat_ws
from delta.tables import DeltaTable

# Função para "achatar" estruturas aninhadas (ajuste conforme sua necessidade)
def flatten_df(nested_df):
    flat_cols = []
    for column in nested_df.columns:
        dtype = nested_df.schema[column].dataType
        if hasattr(dtype, "fields"):  # se for um StructType
            for field in dtype.fields:
                flat_cols.append(col(f"{column}.{field.name}").alias(f"{column}_{field.name}"))
        else:
            flat_cols.append(col(column))
    return nested_df.select(flat_cols)

# Função para adicionar uma chave derivada a partir do hash SHA-256 de todas as colunas,
# desconsiderando a coluna "update_date" se ela existir
def add_derived_key(df):
    cols_for_key = [col(c).cast("string") for c in df.columns if c != "update_date"]
    concatenated_cols = concat_ws("||", *cols_for_key)
    return df.withColumn("derived_key", sha2(concatenated_cols, 256))

# Função para ler a tabela Bronze, transformar os dados (adicionando a chave derivada, sem alterar update_date)
# e carregar na camada Silver realizando merge se a tabela já existir, ou criando-a caso contrário
def load_silver_table(bronze_table_name, silver_table_name):
    # Lê os dados da tabela Bronze
    df_bronze = spark.table(bronze_table_name)
    
    # Aplica flatten (caso os dados não estejam achatados)
    df_flat = flatten_df(df_bronze)
    
    # Adiciona a chave derivada (a coluna update_date já existente não será considerada na geração da chave)
    df_transformed = add_derived_key(df_flat)
    
    # Se a tabela Silver já existe, realiza merge com base na chave derivada; caso contrário, cria a tabela
    if spark.catalog.tableExists(silver_table_name):
        deltaTable = DeltaTable.forName(spark, silver_table_name)
        deltaTable.alias("tgt").merge(
            df_transformed.alias("src"),
            "tgt.derived_key = src.derived_key"
        ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
        print(f"Tabela Silver '{silver_table_name}' atualizada via merge!")
    else:
        df_transformed.write.format("delta").mode("overwrite").saveAsTable(silver_table_name)
        print(f"Tabela Silver '{silver_table_name}' criada com sucesso!")




StatementMeta(, 99c166e5-01f8-4c75-b93e-d504ce3b0909, 18, Finished, Available, Finished)

## Configurações de Schemas e Tabelas

Definimos os schemas para as camadas Bronze e Silver e os mapeamentos dos tipos de recursos para os nomes das tabelas.


In [17]:


# Dicionários com os tipos de recursos e os respectivos nomes das tabelas para Bronze e Silver
bronze_resources_tables = {
    "Patient": f"{bronze_schema}.Patients",
    "Encounter": f"{bronze_schema}.Encounters",
    "Condition": f"{bronze_schema}.Conditions",
    "DiagnosticReport": f"{bronze_schema}.DiagnosticReports",
    "DocumentReference": f"{bronze_schema}.DocumentReferences",
    "Claim": f"{bronze_schema}.Claims",
    "ExplanationOfBenefit": f"{bronze_schema}.Explanations",
    "MedicationRequest": f"{bronze_schema}.MedicationRequests",
    "Device": f"{bronze_schema}.Devices",
    "SupplyDelivery": f"{bronze_schema}.SupplyDeliveries",
    "CareTeam": f"{bronze_schema}.CareTeams",
    "CarePlan": f"{bronze_schema}.CarePlans",
    "Observation": f"{bronze_schema}.Observations",
    "Procedure": f"{bronze_schema}.Procedures",
    "Medication": f"{bronze_schema}.Medications",
    "MedicationAdministration": f"{bronze_schema}.MedicationAdministrations",
    "Immunization": f"{bronze_schema}.Immunizations",
    "ImagingStudy": f"{bronze_schema}.ImagingStudies",
    "Provenance": f"{bronze_schema}.Provenances"
}

silver_resources_tables = {
    "Patient": f"{silver_schema}.Patients",
    "Encounter": f"{silver_schema}.Encounters",
    "Condition": f"{silver_schema}.Conditions",
    "DiagnosticReport": f"{silver_schema}.DiagnosticReports",
    "DocumentReference": f"{silver_schema}.DocumentReferences",
    "Claim": f"{silver_schema}.Claims",
    "ExplanationOfBenefit": f"{silver_schema}.Explanations",
    "MedicationRequest": f"{silver_schema}.MedicationRequests",
    "Device": f"{silver_schema}.Devices",
    "SupplyDelivery": f"{silver_schema}.SupplyDeliveries",
    "CareTeam": f"{silver_schema}.CareTeams",
    "CarePlan": f"{silver_schema}.CarePlans",
    "Observation": f"{silver_schema}.Observations",
    "Procedure": f"{silver_schema}.Procedures",
    "Medication": f"{silver_schema}.Medications",
    "MedicationAdministration": f"{silver_schema}.MedicationAdministrations",
    "Immunization": f"{silver_schema}.Immunizations",
    "ImagingStudy": f"{silver_schema}.ImagingStudies",
    "Provenance": f"{silver_schema}.Provenances"
}



StatementMeta(, 99c166e5-01f8-4c75-b93e-d504ce3b0909, 19, Finished, Available, Finished)

## Processamento: Do Bronze para o Silver

 Para cada recurso, lê-se a tabela Bronze correspondente, transforma os dados (adicionando a chave derivada e a data de atualização) e carrega na camada Silver via merge (se a tabela já existir) ou criando a tabela.


In [18]:
# Itera sobre os tipos de recurso, lendo a tabela Bronze e realizando merge/criação na Silver
for resource_type in bronze_resources_tables.keys():
    bronze_table = bronze_resources_tables[resource_type]
    silver_table = silver_resources_tables[resource_type]
    load_silver_table(bronze_table, silver_table)



StatementMeta(, 99c166e5-01f8-4c75-b93e-d504ce3b0909, 20, Finished, Cancelled, Cancelled)

Tabela Silver 'silver_health.Patients' atualizada via merge!
Tabela Silver 'silver_health.Encounters' atualizada via merge!
Tabela Silver 'silver_health.Conditions' atualizada via merge!


Py4JJavaError: An error occurred while calling o28547.execute.
: org.apache.spark.SparkException: Job 672 cancelled part of cancelled job group 20
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2935)
	at org.apache.spark.scheduler.DAGScheduler.handleJobCancellation(DAGScheduler.scala:2810)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleJobGroupCancelled$4(DAGScheduler.scala:1209)
	at scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.java:23)
	at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
	at org.apache.spark.scheduler.DAGScheduler.handleJobGroupCancelled(DAGScheduler.scala:1208)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3095)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3073)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3062)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1000)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2563)
	at org.apache.spark.sql.delta.files.DeltaFileFormatWriter$.$anonfun$executeWrite$1(DeltaFileFormatWriter.scala:273)
	at org.apache.spark.sql.delta.files.DeltaFileFormatWriter$.writeAndCommit(DeltaFileFormatWriter.scala:305)
	at org.apache.spark.sql.delta.files.DeltaFileFormatWriter$.executeWrite(DeltaFileFormatWriter.scala:244)
	at org.apache.spark.sql.delta.files.DeltaFileFormatWriter$.write(DeltaFileFormatWriter.scala:224)
	at org.apache.spark.sql.delta.files.TransactionalWrite.$anonfun$writeFiles$3(TransactionalWrite.scala:542)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:132)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:220)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:101)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
	at org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles(TransactionalWrite.scala:497)
	at org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles$(TransactionalWrite.scala:444)
	at org.apache.spark.sql.delta.OptimisticTransaction.writeFiles(OptimisticTransaction.scala:147)
	at org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles(TransactionalWrite.scala:274)
	at org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles$(TransactionalWrite.scala:273)
	at org.apache.spark.sql.delta.OptimisticTransaction.writeFiles(OptimisticTransaction.scala:147)
	at org.apache.spark.sql.delta.commands.MergeIntoCommandBase.writeFiles(MergeIntoCommandBase.scala:308)
	at org.apache.spark.sql.delta.commands.MergeIntoCommandBase.writeFiles$(MergeIntoCommandBase.scala:293)
	at org.apache.spark.sql.delta.commands.MergeIntoCommand.writeFiles(MergeIntoCommand.scala:61)
	at org.apache.spark.sql.delta.commands.merge.LowShuffleMergeExecutor.writeFilesInternal$1(LowShuffleMergeExecutor.scala:333)
	at org.apache.spark.sql.delta.commands.merge.LowShuffleMergeExecutor.$anonfun$writeUnmodifiedRows$6(LowShuffleMergeExecutor.scala:360)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.delta.DeltaTableUtils$.withActiveSession(DeltaTable.scala:499)
	at org.apache.spark.sql.delta.commands.merge.LowShuffleMergeExecutor.$anonfun$writeUnmodifiedRows$1(LowShuffleMergeExecutor.scala:360)
	at org.apache.spark.sql.delta.commands.MergeIntoCommandBase.executeThunk$1(MergeIntoCommandBase.scala:426)
	at org.apache.spark.sql.delta.commands.MergeIntoCommandBase.$anonfun$recordMergeOperation$7(MergeIntoCommandBase.scala:443)
	at org.apache.spark.sql.delta.util.DeltaProgressReporter.withJobDescription(DeltaProgressReporter.scala:53)
	at org.apache.spark.sql.delta.util.DeltaProgressReporter.withStatusCode(DeltaProgressReporter.scala:32)
	at org.apache.spark.sql.delta.util.DeltaProgressReporter.withStatusCode$(DeltaProgressReporter.scala:27)
	at org.apache.spark.sql.delta.commands.MergeIntoCommand.withStatusCode(MergeIntoCommand.scala:61)
	at org.apache.spark.sql.delta.commands.MergeIntoCommandBase.$anonfun$recordMergeOperation$6(MergeIntoCommandBase.scala:443)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:169)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:167)
	at org.apache.spark.sql.delta.commands.MergeIntoCommand.recordFrameProfile(MergeIntoCommand.scala:61)
	at org.apache.spark.sql.delta.metering.DeltaLogging.$anonfun$recordDeltaOperationInternal$1(DeltaLogging.scala:137)
	at com.microsoft.spark.telemetry.delta.SynapseLoggingShim.recordOperation(SynapseLoggingShim.scala:111)
	at com.microsoft.spark.telemetry.delta.SynapseLoggingShim.recordOperation$(SynapseLoggingShim.scala:93)
	at org.apache.spark.sql.delta.commands.MergeIntoCommand.recordOperation(MergeIntoCommand.scala:61)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperationInternal(DeltaLogging.scala:136)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:126)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:116)
	at org.apache.spark.sql.delta.commands.MergeIntoCommand.recordDeltaOperation(MergeIntoCommand.scala:61)
	at org.apache.spark.sql.delta.commands.MergeIntoCommandBase.recordMergeOperation(MergeIntoCommandBase.scala:440)
	at org.apache.spark.sql.delta.commands.MergeIntoCommandBase.recordMergeOperation$(MergeIntoCommandBase.scala:404)
	at org.apache.spark.sql.delta.commands.MergeIntoCommand.recordMergeOperation(MergeIntoCommand.scala:61)
	at org.apache.spark.sql.delta.commands.merge.LowShuffleMergeExecutor.writeUnmodifiedRows(LowShuffleMergeExecutor.scala:313)
	at org.apache.spark.sql.delta.commands.merge.LowShuffleMergeExecutor.$anonfun$runLowShuffleMerge$8(LowShuffleMergeExecutor.scala:164)
	at org.apache.spark.sql.delta.util.DeltaProgressReporter.withJobDescription(DeltaProgressReporter.scala:53)
	at org.apache.spark.sql.delta.util.DeltaProgressReporter.withStatusCode(DeltaProgressReporter.scala:32)
	at org.apache.spark.sql.delta.util.DeltaProgressReporter.withStatusCode$(DeltaProgressReporter.scala:27)
	at org.apache.spark.sql.delta.commands.MergeIntoCommand.withStatusCode(MergeIntoCommand.scala:61)
	at org.apache.spark.sql.delta.commands.merge.LowShuffleMergeExecutor.runLowShuffleMerge(LowShuffleMergeExecutor.scala:164)
	at org.apache.spark.sql.delta.commands.merge.LowShuffleMergeExecutor.runLowShuffleMerge$(LowShuffleMergeExecutor.scala:114)
	at org.apache.spark.sql.delta.commands.MergeIntoCommand.runLowShuffleMerge(MergeIntoCommand.scala:61)
	at org.apache.spark.sql.delta.commands.MergeIntoCommand.$anonfun$runMerge$2(MergeIntoCommand.scala:128)
	at org.apache.spark.sql.delta.commands.MergeIntoCommand.$anonfun$runMerge$2$adapted(MergeIntoCommand.scala:86)
	at org.apache.spark.sql.delta.DeltaLog.withNewTransaction(DeltaLog.scala:227)
	at org.apache.spark.sql.delta.commands.MergeIntoCommand.$anonfun$runMerge$1(MergeIntoCommand.scala:86)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:169)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:167)
	at org.apache.spark.sql.delta.commands.MergeIntoCommand.recordFrameProfile(MergeIntoCommand.scala:61)
	at org.apache.spark.sql.delta.metering.DeltaLogging.$anonfun$recordDeltaOperationInternal$1(DeltaLogging.scala:137)
	at com.microsoft.spark.telemetry.delta.SynapseLoggingShim.recordOperation(SynapseLoggingShim.scala:111)
	at com.microsoft.spark.telemetry.delta.SynapseLoggingShim.recordOperation$(SynapseLoggingShim.scala:93)
	at org.apache.spark.sql.delta.commands.MergeIntoCommand.recordOperation(MergeIntoCommand.scala:61)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperationInternal(DeltaLogging.scala:136)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:126)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:116)
	at org.apache.spark.sql.delta.commands.MergeIntoCommand.recordDeltaOperation(MergeIntoCommand.scala:61)
	at org.apache.spark.sql.delta.commands.MergeIntoCommand.runMerge(MergeIntoCommand.scala:84)
	at org.apache.spark.sql.delta.commands.MergeIntoCommandBase.runOrig(MergeIntoCommandBase.scala:180)
	at org.apache.spark.sql.delta.commands.MergeIntoCommandBase.$anonfun$run$1(MergeIntoCommandBase.scala:156)
	at org.apache.spark.sql.delta.sources.SQLConfUtils$.withGlutenDisabled(SQLConfUtils.scala:39)
	at org.apache.spark.sql.delta.commands.MergeIntoCommandBase.run(MergeIntoCommandBase.scala:156)
	at org.apache.spark.sql.delta.commands.MergeIntoCommandBase.run$(MergeIntoCommandBase.scala:152)
	at org.apache.spark.sql.delta.commands.MergeIntoCommand.run(MergeIntoCommand.scala:61)
	at io.delta.tables.DeltaMergeBuilder.$anonfun$execute$2(DeltaMergeBuilder.scala:326)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.delta.DeltaTableUtils$.withActiveSession(DeltaTable.scala:499)
	at io.delta.tables.DeltaMergeBuilder.$anonfun$execute$1(DeltaMergeBuilder.scala:295)
	at org.apache.spark.sql.delta.util.AnalysisHelper.improveUnsupportedOpError(AnalysisHelper.scala:109)
	at org.apache.spark.sql.delta.util.AnalysisHelper.improveUnsupportedOpError$(AnalysisHelper.scala:95)
	at io.delta.tables.DeltaMergeBuilder.improveUnsupportedOpError(DeltaMergeBuilder.scala:152)
	at io.delta.tables.DeltaMergeBuilder.execute(DeltaMergeBuilder.scala:293)
	at jdk.internal.reflect.GeneratedMethodAccessor366.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:829)


In [20]:
%%sql
select count(*) from silver_health.Encounters limit 2--703

StatementMeta(, 99c166e5-01f8-4c75-b93e-d504ce3b0909, 22, Finished, Available, Finished)

<Spark SQL result set with 1 rows and 1 fields>