### Lendo as tabelas delta do container silver e salvando no container gold

In [None]:
from pyspark.sql.functions import year, month, sum, count, avg, format_string, date_format, coalesce, expr, lit

storageAccountName = "satcseguroimoveis"

apolice_df = spark.read.format('delta').load(f"/mnt/{storageAccountName}/silver/apolice")
sinistro_df = spark.read.format('delta').load(f"/mnt/{storageAccountName}/silver/sinistro")
imovel_df = spark.read.format('delta').load(f"/mnt/{storageAccountName}/silver/imovel")
apolice_cobertura_df = spark.read.format('delta').load(f"/mnt/{storageAccountName}/silver/apolice_cobertura")
cobertura_df = spark.read.format('delta').load(f"/mnt/{storageAccountName}/silver/cobertura")
avaliacao_df = spark.read.format('delta').load(f"/mnt/{storageAccountName}/silver/avaliacao")

apolice_df.write.format('delta').mode('overwrite').saveAsTable("APOLICE")
sinistro_df.write.format('delta').mode('overwrite').saveAsTable("SINISTRO")
imovel_df.write.format('delta').mode('overwrite').saveAsTable("IMOVEL")
apolice_cobertura_df.write.format('delta').mode('overwrite').saveAsTable("APOLICE_COBERTURA")
cobertura_df.write.format('delta').mode('overwrite').saveAsTable("COBERTURA")
avaliacao_df.write.format('delta').mode('overwrite').saveAsTable("AVALIACAO")

### Criando a tabela OBT

In [None]:
%sql
CREATE OR REPLACE TABLE satcseguroimoveis_obt (
    ANO INT,
    MES STRING,
    TOTAL_DE_VALOR_DE_SINISTRO DECIMAL(38,2) NOT NULL,
    VALOR_TOTAL_DAS_APOLICES DECIMAL(38,2) NOT NULL,
    TOTAL_DE_ATIVOS DECIMAL(38,2) NOT NULL,
    TOTAL_DE_PASSIVOS DECIMAL(38,2) NOT NULL,
    NUMERO_TOTAL_DE_IMOVEIS INT NOT NULL,
    APOLICES_FINALIZADAS INT NOT NULL,
    NUMERO_DE_SINISTROS INT NOT NULL,
    NUMERO_DE_APOLICES_VENDIDAS INT NOT NULL,
    VALOR_MEDIO_DE_PREMIO DECIMAL(38,6) NOT NULL
)
USING DELTA;

### Inserindo os dados na tabela OBT

In [None]:
%sql
INSERT INTO satcseguroimoveis_obt
SELECT 
    YEAR(A.DATA_INICIO) AS ANO,
    DATE_FORMAT(A.DATA_INICIO, 'MMMM') AS MES,
    COALESCE(SUM(S.VALOR_SINISTRO), 0) AS TOTAL_DE_VALOR_DE_SINISTRO,
    COALESCE(SUM(A.VALOR_APOLICE), 0) AS VALOR_TOTAL_DAS_APOLICES,
    COALESCE(SUM(C.VALOR), 0) AS TOTAL_DE_ATIVOS, 
    COALESCE(SUM(I.VALOR_IMOVEL), 0) AS TOTAL_DE_PASSIVOS, 
    COALESCE(COUNT(AV.CODIGO_IMOVEL), 0) AS NUMERO_TOTAL_DE_IMOVEIS,
    COALESCE(SUM(CASE WHEN A.DATA_TERMINO <= CURRENT_DATE THEN 1 ELSE 0 END), 0) AS APOLICES_FINALIZADAS, 
    COALESCE(COUNT(DISTINCT S.CODIGO_SINISTRO), 0) AS NUMERO_DE_SINISTROS,
    COALESCE(COUNT(DISTINCT A.CODIGO_APOLICE), 0) AS NUMERO_DE_APOLICES_VENDIDAS,
    COALESCE(AVG(A.VALOR_APOLICE), 0) AS VALOR_MEDIO_DE_PREMIO
FROM 
    APOLICE A
LEFT JOIN 
    SINISTRO S ON A.CODIGO_APOLICE = S.CODIGO_APOLICE
LEFT JOIN 
    IMOVEL I ON A.CODIGO_IMOVEL = I.CODIGO_IMOVEL
LEFT JOIN 
    APOLICE_COBERTURA AC ON A.CODIGO_APOLICE = AC.CODIGO_APOLICE
LEFT JOIN 
    COBERTURA C ON AC.CODIGO_COBERTURA = C.CODIGO_COBERTURA
LEFT JOIN 
    AVALIACAO AV ON I.CODIGO_IMOVEL = AV.CODIGO_IMOVEL
GROUP BY 
    YEAR(A.DATA_INICIO), DATE_FORMAT(A.DATA_INICIO, 'MMMM');

num_affected_rows,num_inserted_rows
41,41


### Consultando os dados gravados

In [None]:
%sql
SELECT * FROM satcseguroimoveis_obt LIMIT 50

ANO,MES,TOTAL_DE_VALOR_DE_SINISTRO,VALOR_TOTAL_DAS_APOLICES,TOTAL_DE_ATIVOS,TOTAL_DE_PASSIVOS,NUMERO_TOTAL_DE_IMOVEIS,APOLICES_FINALIZADAS,NUMERO_DE_SINISTROS,NUMERO_DE_APOLICES_VENDIDAS,VALOR_MEDIO_DE_PREMIO
2021,July,2770.0,166364.07,3540.0,2821848.65,16,0,3,3,10397.754375
2023,December,7070.0,1460232.95,7750.0,31003657.25,43,0,9,6,33958.905814
2021,April,1230.0,227347.14,1820.0,4996640.4,6,0,2,1,37891.19
2021,October,800.0,104140.5,1410.0,3302721.98,8,0,3,2,13017.5625
2024,February,12060.0,3083748.18,16760.0,43807972.77,75,0,18,11,41116.6424
2023,October,3010.0,410005.28,4210.0,7647328.2,17,1,4,6,24117.957647
2023,February,9350.0,1437843.28,9610.0,28134327.81,42,0,9,6,34234.36381
2022,April,2000.0,672901.78,5040.0,14640756.48,25,0,6,3,26916.0712
2023,September,6600.0,2054569.14,8930.0,24823733.76,37,0,4,6,55528.895676
2022,September,17250.0,2213998.22,13390.0,32830262.12,49,0,9,4,45183.637143


In [None]:
df_obt = spark.table("satcseguroimoveis_obt")
df_obt.write.format('delta').save(f"/mnt/satcseguroimoveis/gold/satcseguroimoveis_obt")

In [None]:
df_obt = spark.read.format('delta').load(f"/mnt/satcseguroimoveis/gold/satcseguroimoveis_obt")
df_obt.show(10)

+----+---------+--------------------------+------------------------+---------------+-----------------+-----------------------+--------------------+-------------------+---------------------------+---------------------+
| ANO|      MES|TOTAL_DE_VALOR_DE_SINISTRO|VALOR_TOTAL_DAS_APOLICES|TOTAL_DE_ATIVOS|TOTAL_DE_PASSIVOS|NUMERO_TOTAL_DE_IMOVEIS|APOLICES_FINALIZADAS|NUMERO_DE_SINISTROS|NUMERO_DE_APOLICES_VENDIDAS|VALOR_MEDIO_DE_PREMIO|
+----+---------+--------------------------+------------------------+---------------+-----------------+-----------------------+--------------------+-------------------+---------------------------+---------------------+
|2021|     July|                   2770.00|               166364.07|        3540.00|       2821848.65|                     16|                   0|                  3|                          3|         10397.754375|
|2023| December|                   7070.00|              1460232.95|        7750.00|      31003657.25|                     43|  