### Premissas:
- Nome: dim_customers
- Descrição da tabela: Tabela composta por variáveis qualitativas de clientes.
- Tipo: SCD-2

In [0]:
# Importa bibliotecas
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType, ArrayType
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from datetime import datetime

In [0]:
# Cria dataframe com dados para a criação da dimensão
df_dimensao = spark.sql("""
SELECT DISTINCT
  customer_id,
  customer_zip_code_prefix,
  customer_city,
  customer_state
FROM poc_datum.olist.customers_silver 
-- WHERE customer_unique_id = '7e4bebe20140a71b34263a659ba1ce11'
""")

display(df_dimensao)

customer_id,customer_zip_code_prefix,customer_city,customer_state
2455a94ebab82b39829283f823a69bba,39900,almenara,MG
95e3388b5dcbb48857ce7d634e15ead3,11520,cubatao,SP
3bc36b88b6987a920fc6aa2e63f6c76a,97015,santa maria,RS
af7f6c91d4cd411589ac2e001c331dc6,68540,conceicao do araguaia,PA
4734aa9452fd42498c835eb1a71caf69,74684,goiania,GO
1a39d706d531bfbb53490b3b30396aa6,84020,ponta grossa,PR
0359e57f5d0093284997d0cf36a75e7c,2722,sao paulo,SP
3bfd6087dc6d9b4982748af54b599117,13172,sumare,SP
27d2e4ebaa061c90646c53dce7079645,28360,bom jesus do itabapoana,RJ
176f7fae9b66b96ab07b7b3385f834aa,14401,franca,SP


In [0]:
# Primeira carga

# Cria DataFrame com os dados de "NÃO INFORMADO"
data = [(-1, -1, "Não informado", "Não informado")]
column = df_dimensao.schema.fieldNames()
df = spark.createDataFrame( data, schema=column )
# df.show()

# Faz o join com o dataframe com dados para a criação da dimensão
dim_customers = df.union(df_dimensao)
# dim_customers.show()

# Inseri as colunas default de dimensão
dim_customers = dim_customers.withColumn( "row_ingestion_timestamp", current_timestamp() ) \
    .withColumn( "row_version", lit(1) ) \
    .withColumn( "row_current_indicator", lit(True) ) \
    .withColumn( "row_effective_date", to_timestamp( lit('1900-01-01 00:00:00'), "yyyy-MM-dd HH:mm:ss") ) \
    .withColumn( "row_expiration_date", to_timestamp( lit('2200-01-01 00:00:00') , "yyyy-MM-dd HH:mm:ss" ) )

# Inseri coluna SK
# dim_customers = dim_customers.withColumn( 'sk_dim_customers', monotonically_increasing_id() ) 
dim_customers = dim_customers.withColumn( 
    'sk_dim_customers', 
    sha2(concat_ws("|", 
        dim_customers.row_ingestion_timestamp,
        dim_customers.customer_id, 
        dim_customers.customer_zip_code_prefix), 256))

# Ordena as colunas
dim_customers_select = dim_customers.select( 
    'sk_dim_customers',
    'row_ingestion_timestamp',
    'row_version',
    'row_current_indicator',
    'row_effective_date',
    'row_expiration_date',
    'customer_id',
    'customer_zip_code_prefix',
    'customer_city',
    'customer_state'
    )

display(dim_customers_select)

sk_dim_customers,row_ingestion_timestamp,row_version,row_current_indicator,row_effective_date,row_expiration_date,customer_id,customer_zip_code_prefix,customer_city,customer_state
1ca36dc9a8bd5061b4493c0f6fc8423877efd8ecb5a5ff48072d59c256e86700,2024-04-29T13:03:15.509Z,1,True,1900-01-01T00:00:00Z,2200-01-01T00:00:00Z,-1,-1,Não informado,Não informado
4fdc1829587a3ab9e2cb3df5488cf03ba2db1e51fae6f477d7867ee21fcddd9a,2024-04-29T13:03:15.509Z,1,True,1900-01-01T00:00:00Z,2200-01-01T00:00:00Z,2455a94ebab82b39829283f823a69bba,39900,almenara,MG
08534311f9b826697900da651751a599207dcd41302b546a2efc9b9a4ef63d66,2024-04-29T13:03:15.509Z,1,True,1900-01-01T00:00:00Z,2200-01-01T00:00:00Z,95e3388b5dcbb48857ce7d634e15ead3,11520,cubatao,SP
08e03370fdeed157cc4790995530c43f617afac38bf88989afee5b7c7fe4de46,2024-04-29T13:03:15.509Z,1,True,1900-01-01T00:00:00Z,2200-01-01T00:00:00Z,3bc36b88b6987a920fc6aa2e63f6c76a,97015,santa maria,RS
4f0997caee3b8a949fe8c76dab6c0e2d8228beecb0d6c87ada6d682cc7436501,2024-04-29T13:03:15.509Z,1,True,1900-01-01T00:00:00Z,2200-01-01T00:00:00Z,af7f6c91d4cd411589ac2e001c331dc6,68540,conceicao do araguaia,PA
6edb16c4b8a9a3ca44c3e3bbeb7a229fa7865f2ca0364f5913dc4ee9224f0ee7,2024-04-29T13:03:15.509Z,1,True,1900-01-01T00:00:00Z,2200-01-01T00:00:00Z,4734aa9452fd42498c835eb1a71caf69,74684,goiania,GO
bd972e823ee9c5233bbc20837fd5ccf876bef12807dcea6e36f2012baf05714f,2024-04-29T13:03:15.509Z,1,True,1900-01-01T00:00:00Z,2200-01-01T00:00:00Z,1a39d706d531bfbb53490b3b30396aa6,84020,ponta grossa,PR
0be58fd6271b47f235b04c950af843f2fab334749fcda4422b586bdbb2139eea,2024-04-29T13:03:15.509Z,1,True,1900-01-01T00:00:00Z,2200-01-01T00:00:00Z,0359e57f5d0093284997d0cf36a75e7c,2722,sao paulo,SP
10c9035a6be9ba3c924d4ff9331629fd83a50dfb048a23fc7b5a61f8a9e51189,2024-04-29T13:03:15.509Z,1,True,1900-01-01T00:00:00Z,2200-01-01T00:00:00Z,3bfd6087dc6d9b4982748af54b599117,13172,sumare,SP
1f9e58c70326a81ad7eb4ca560e8078beb51bc8ecfd058a03dc927eaf1f88bce,2024-04-29T13:03:15.509Z,1,True,1900-01-01T00:00:00Z,2200-01-01T00:00:00Z,27d2e4ebaa061c90646c53dce7079645,28360,bom jesus do itabapoana,RJ


In [0]:
table_name = 'dim_customers'
spark.sql('USE olist')
spark.sql(f'DROP TABLE IF EXISTS dim_customers')
dim_customers_select.write.format("delta").mode('overwrite').saveAsTable(table_name)

### Carga Diferencial (Upsert)

In [0]:
#teste
data = [('2455a94ebab82b39829283f823a69bba',39900,'almenara1','MG')]
column = ['customer_id', 'customer_zip_code_prefix', 'customer_city', 'customer_state']

df_dimensao = spark.createDataFrame(data, schema=column)
df_dimensao.show()

+--------------------+------------------------+-------------+--------------+
|         customer_id|customer_zip_code_prefix|customer_city|customer_state|
+--------------------+------------------------+-------------+--------------+
|2455a94ebab82b398...|                   39900|    almenara1|            MG|
+--------------------+------------------------+-------------+--------------+



In [0]:
# Dados Novos
df_origem = df_dimensao

# Dados da dimensão
df_destino = spark.sql("""
    SELECT   
        customer_id,
        customer_zip_code_prefix,
        customer_city,
        customer_state 
    FROM 
        poc_datum.olist.dim_customers 
    ORDER BY customer_id """)

# Realize o EXCEPT (retornar apenas registros novos)
df_dados_novos = df_origem.exceptAll(df_destino)

display(df_dados_novos)

# Cria uma tabela temporária
df_dados_novos.createOrReplaceTempView("temp_dados_novos")

customer_id,customer_zip_code_prefix,customer_city,customer_state
2455a94ebab82b39829283f823a69bba,39900,almenara1,MG


In [0]:
# Paramêtros
table_merge = 'dim_customers'
change_date = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')
print(change_date)

2024-04-29 13:03:26.160115


In [0]:
# spark.sql('USE olist')

In [0]:
spark.sql(f"""
CREATE OR REPLACE TEMPORARY VIEW dados_novos AS 
SELECT
     sha2(concat_ws("|",  '{change_date}',  a.customer_id, a.customer_zip_code_prefix), 256) AS sk_dim_customers
    ,a.customer_id
    ,a.customer_zip_code_prefix
    ,a.customer_city
    ,a.customer_state
    ,to_timestamp('{change_date}')  AS change_date
    ,(
        SELECT
            MAX(b.sk_dim_customers)
        FROM
            dim_customers as b
        WHERE
            a.customer_id = b.customer_id
    ) AS max_sk_dim_customers
    ,COALESCE(
        (
            SELECT
                MAX(c.row_version) + 1
            FROM
                dim_customers as c
            WHERE
                a.customer_id = c.customer_id
        ), 1
    ) AS max_row_version
FROM
    temp_dados_novos AS a
""")

DataFrame[]

In [0]:
%sql
SELECT
    *
FROM
    dados_novos AS a

sk_dim_customers,customer_id,customer_zip_code_prefix,customer_city,customer_state,change_date,max_sk_dim_customers,max_row_version
068417c10640b76ae9dbb01bb6c7d6611716173df011f037ebccd2ac2ba12593,2455a94ebab82b39829283f823a69bba,39900,almenara1,MG,2024-04-29T13:03:26.160115Z,923370c742d5aa373a0277ab32481f34109403f73c8608e18319cc0bd412b826,2


In [0]:
spark.sql(f""" 
MERGE INTO {table_merge} as destino
USING dados_novos 
ON destino.sk_dim_customers = dados_novos.max_sk_dim_customers

WHEN MATCHED THEN 
  UPDATE SET
   destino.row_expiration_date = to_timestamp('{change_date}') --dados_novos.change_date
  ,destino.row_current_indicator = False
  """)

DataFrame[num_affected_rows: bigint, num_updated_rows: bigint, num_deleted_rows: bigint, num_inserted_rows: bigint]

In [0]:
spark.sql(f"""
MERGE INTO {table_merge} as destino
USING dados_novos 
ON destino.customer_id = dados_novos.customer_id
AND destino.customer_city = dados_novos.customer_city

WHEN NOT MATCHED 
  THEN INSERT (
    sk_dim_customers
    , row_ingestion_timestamp
    ,row_version
    ,row_current_indicator
    ,row_effective_date
    ,row_expiration_date
    ,customer_id
    ,customer_zip_code_prefix
    ,customer_city
    ,customer_state
  )
  VALUES (
    dados_novos.sk_dim_customers
    ,to_timestamp('{change_date}') --dados_novos.change_date
    ,dados_novos.max_row_version
    ,1
    ,to_timestamp('{change_date}') --dados_novos.change_date
    ,to_timestamp( '2200-01-01 00:00:00')
    ,dados_novos.customer_id
    ,dados_novos.customer_zip_code_prefix
    ,dados_novos.customer_city
    ,dados_novos.customer_state
  )
""")

DataFrame[num_affected_rows: bigint, num_updated_rows: bigint, num_deleted_rows: bigint, num_inserted_rows: bigint]

In [0]:
%sql
SELECT * FROM poc_datum.olist.dim_customers
WHERE customer_id = '2455a94ebab82b39829283f823a69bba'

sk_dim_customers,row_ingestion_timestamp,row_version,row_current_indicator,row_effective_date,row_expiration_date,customer_id,customer_zip_code_prefix,customer_city,customer_state
068417c10640b76ae9dbb01bb6c7d6611716173df011f037ebccd2ac2ba12593,2024-04-29T13:03:26.160115Z,2,True,2024-04-29T13:03:26.160115Z,2200-01-01T00:00:00Z,2455a94ebab82b39829283f823a69bba,39900,almenara1,MG
923370c742d5aa373a0277ab32481f34109403f73c8608e18319cc0bd412b826,2024-04-29T13:03:18.82Z,1,False,1900-01-01T00:00:00Z,2024-04-29T13:03:26.160115Z,2455a94ebab82b39829283f823a69bba,39900,almenara,MG
