In [0]:
from pyspark.sql.functions import *

In [0]:
#le todos os jsons dentro do repositorio e transforma o campo de data de string para timestamp
df = spark.read.option("multiline","true").json('dbfs:/FileStore/eventos_json/*').withColumn('dat_atui',col('dat_atui').cast('timestamp')).withColumn('dat_hor_tran',col('dat_hor_tran').cast('timestamp'))

df.persist()

DataFrame[cod_idef_clie: string, dat_atui: timestamp, dat_hor_tran: timestamp, list_envo: array<struct<cod_tipo_entg:string,cod_venr:string,dat_emis_cpvt:string,list_item_envo:array<struct<cod_prod:string,cod_prod_venr:string,qtd_prod:bigint,stat_entg:string>>,nom_stat_entg:string,objt_des_stat_rtmt:struct<dat_envo_rtmt:string>>>, list_item_pedi: array<struct<cod_ofrt_ineo_requ:string,cod_vrne_prod:string,idt_venr:string,idt_vrne_venr:string,nom_item:string,nom_prod_orig:string,nom_venr:string,qtd_item_pedi:bigint,txt_plae_otmz_url_prod:string,vlr_desc_envo:double,vlr_oril:double,vlr_prod:double,vlr_prod_ofrt_desc:double>>, stat_pedi: string, stat_pgto: string, txt_detl_idt_pedi_pgto: string]

In [0]:
#dataframe com o ultimo status do pedido
df_ultimo_status = df.groupBy('txt_detl_idt_pedi_pgto').agg(max('dat_atui').alias('last_status_date')).withColumnRenamed('txt_detl_idt_pedi_pgto','txt_detl_idt_pedi_pgto_last')

#Cruzamento entre o dataframe com todos os dados e seus ultimos status
df_pedidos_ultimo_status = df.join(df_ultimo_status, (col('txt_detl_idt_pedi_pgto') == col('txt_detl_idt_pedi_pgto_last')) & (col('dat_atui') == col('last_status_date')),'inner').drop('txt_detl_idt_pedi_pgto_last')


DataFrame[cod_idef_clie: string, dat_atui: timestamp, dat_hor_tran: timestamp, list_envo: struct<cod_tipo_entg:string,cod_venr:string,dat_emis_cpvt:string,list_item_envo:array<struct<cod_prod:string,cod_prod_venr:string,qtd_prod:bigint,stat_entg:string>>,nom_stat_entg:string,objt_des_stat_rtmt:struct<dat_envo_rtmt:string>>, list_item_pedi: struct<cod_ofrt_ineo_requ:string,cod_vrne_prod:string,idt_venr:string,idt_vrne_venr:string,nom_item:string,nom_prod_orig:string,nom_venr:string,qtd_item_pedi:bigint,txt_plae_otmz_url_prod:string,vlr_desc_envo:double,vlr_oril:double,vlr_prod:double,vlr_prod_ofrt_desc:double>, stat_pedi: string, stat_pgto: string, txt_detl_idt_pedi_pgto: string, last_status_date: timestamp]

In [0]:
# lsita de colunas que serao selecionadas no dataframe final
lista = ['txt_detl_idt_pedi_pgto','cod_prod','cod_idef_clie','dat_atui','dat_hor_tran','stat_pedi','stat_pgto','cod_tipo_entg','cod_venr','dat_emis_cpvt','cod_prod_venr_ship','qtd_prod_ship','stat_entg_ship','prod_desc','vendedor_desc','cod_oferta','vlr_original','vlr_desconto','qtd_item_pedi']

#datarfame com os itens do pedido
df_pedido_produto = df_pedidos_ultimo_status\
                 .withColumn('list_item_pedi', explode_outer(col('list_item_pedi')))\
                 .select('txt_detl_idt_pedi_pgto','list_item_pedi.*')\
                 .withColumnRenamed('cod_vrne_prod','cod_prod')\
                 .withColumnRenamed('nom_item','prod_desc')\
                 .withColumnRenamed('nom_venr','vendedor_desc')\
                 .withColumnRenamed('vlr_oril','vlr_original')\
                 .withColumnRenamed('vlr_prod','vlr_desconto')\
                 .withColumnRenamed('idt_venr','cod_venr')\
                 .withColumnRenamed('cod_ofrt_ineo_requ','cod_oferta')\
                 .select('txt_detl_idt_pedi_pgto','cod_prod','prod_desc','cod_venr','vendedor_desc','cod_oferta','vlr_original','vlr_desconto','qtd_item_pedi')

#datarfame com os dados de pedidos e de envio
df_pedido_envio = df_pedidos_ultimo_status\
            .withColumn('list_envo', explode_outer(col('list_envo')))\
            .withColumn('cod_prod_item',col('list_item_pedi.cod_vrne_prod').getItem(0))\
            .selectExpr('txt_detl_idt_pedi_pgto','cod_idef_clie','dat_atui','dat_hor_tran','stat_pedi','stat_pgto','list_envo.cod_tipo_entg','list_envo.cod_venr','list_envo.dat_emis_cpvt','list_envo.list_item_envo','cod_prod_item')\
            .withColumn('cod_prod',col('list_item_envo.cod_prod').getItem(0))\
            .withColumn('cod_prod',when(col('cod_prod').isNull(),col('cod_prod_item')).otherwise(col('cod_prod')))\
            .withColumn('cod_prod_venr_ship',col('list_item_envo.cod_prod_venr').getItem(0))\
            .withColumn('qtd_prod_ship',col('list_item_envo.qtd_prod').getItem(0))\
            .withColumn('stat_entg_ship',col('list_item_envo.stat_entg').getItem(0))\
            .drop('list_item_envo','cod_prod_item')


#join entre as bases separando em dois dataframes por conta das chaves de join
df_pedido_produto_null = df_pedido_envio.filter('cod_venr is null').drop('cod_venr').join(df_pedido_produto,["txt_detl_idt_pedi_pgto","cod_prod"],'left').select(lista)
df_pedido_produto = df_pedido_envio.filter('cod_venr is not null').join(df_pedido_produto,["txt_detl_idt_pedi_pgto","cod_prod","cod_venr"],'left').select(lista)

#criação do dataframe final
df_final = df_pedido_produto_null.union(df_pedido_produto).drop('qtd_prod_ship','stat_entg_ship').withColumn('calyear_key',year(col('dat_atui'))).withColumn('calmonth_key',month(col('dat_atui')))


txt_detl_idt_pedi_pgto,cod_prod,cod_idef_clie,dat_atui,dat_hor_tran,stat_pedi,stat_pgto,cod_tipo_entg,cod_venr,dat_emis_cpvt,cod_prod_venr_ship,prod_desc,vendedor_desc,cod_oferta,vlr_original,vlr_desconto,qtd_item_pedi,calyear_key,calmonth_key
WrtmmUd9hO0IY2C5,MKU-6G1qsyBPFERB8,customer-store,2022-11-17T20:11:55.419+0000,2022-07-14T00:20:27.107+0000,DELIVERED,PAID,,V3mGU4wV0C,,,Rock in Rio 2022 - 08/09 - Setor Gramado,Easylive,62cdf4fbe3cc89ac64deb7cb,468.75,468.75,1,2022,11
8rQeIH1kGo6bV9c5,MKU-89HZXeJq6vudQ,customer-store,2022-07-16T06:11:49.482+0000,2022-07-16T06:11:38.489+0000,CANCELED,CANCELED,,V3mGU4wV0C,,,Rock in Rio 2022 - 03/09 - Setor Gramado,Easylive,62cdf4fbe3cc89ac64deb7c9,468.75,468.75,1,2022,7
SMT1113589577463,MKU-9iSYhmyGMPHpM,customer-store,2023-05-15T18:03:26.260+0000,2023-05-11T13:58:33.181+0000,DELIVERED,PAID,PF1c599LF_Zm3,8dJAULm6us,,4646195,Combo Natura Ekos,Top Store,64555c61d3a78500119f6518,106.5,59.9,1,2023,5
SMT1113589577463,MKU-tkv6tbv5AAquh,customer-store,2023-05-15T18:03:26.260+0000,2023-05-11T13:58:33.181+0000,DELIVERED,PAID,Egom7deHcoH3J,hxL9bHz7OO,2023-05-12T15:47:53.000Z,222419500_magazineluiza,Livro - Mindset,Magazine Luiza,626c239951dd578ff254c226,69.9,44.5,1,2023,5
OYT1516529715947,MKU-s6p435iTm8ylT,customer-store,2023-09-16T06:13:32.859+0000,2023-09-15T16:52:49.375+0000,SENT,PAID,ISO5TP2Hu3hWj,8dJAULm6us,,4617245,Copo com Canudo Stanley Quencher 2.0 887ML Branco Cream,Top Store,6459e9ba7028520010374e10,285.0,228.0,1,2023,9
GHJ2619225280159,MKU-v70eo1eSiyCa8,customer-store,2023-07-27T18:39:00.798+0000,2023-07-26T19:22:05.262+0000,DELIVERED,PAID,aXwV1q50R7QxV,hxL9bHz7OO,,220978300_magazineluiza,Sabonete Dove Original,Magazine Luiza,62f69ed096bd64d36672f00e,6.15,5.59,1,2023,7


In [0]:
#escrita em parquet
df_final.write.mode('append').partitionBy("calyear_key", "calmonth_key").parquet('dbfs:/FileStore/bronze/tb_pedidos/')