In [0]:
from pyspark.sql.functions import *
from pyspark.sql.window import Window

In [0]:
dbutils.fs.ls("dbfs:/dbfs/")

Out[30]: [FileInfo(path='dbfs:/dbfs/censo/', name='censo/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/dbfs/meiospagamento/', name='meiospagamento/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/dbfs/volumepix/', name='volumepix/', size=0, modificationTime=0)]

In [0]:
df = spark.read.format("delta").load("dbfs:/dbfs/meiospagamento")

In [0]:
# Transforming the DataFrame
df_A = df.select("AnoMes", lit("Pix").alias("Metodo"), df.valorPix.alias("Valor"))
df_B = df.select("AnoMes", lit("TED").alias("Metodo"), df.valorTED.alias("Valor"))
df_C = df.select("AnoMes", lit("TEC").alias("Metodo"), df.valorTEC.alias("Valor"))
df_D = df.select("AnoMes", lit("Cheque").alias("Metodo"), df.valorCheque.alias("Valor"))
df_E = df.select("AnoMes", lit("Boleto").alias("Metodo"), df.valorBoleto.alias("Valor"))
df_F = df.select("AnoMes", lit("DOC").alias("Metodo"), df.valorDOC.alias("Valor"))

df_QA = df.select("AnoMes", lit("Pix").alias("Metodo"), df.quantidadePix.alias("Quantidade"))
df_QB = df.select("AnoMes", lit("TED").alias("Metodo"), df.quantidadeTED.alias("Quantidade"))
df_QC = df.select("AnoMes", lit("TEC").alias("Metodo"), df.quantidadeTEC.alias("Quantidade"))
df_QD = df.select("AnoMes", lit("Cheque").alias("Metodo"), df.quantidadeCheque.alias("Quantidade"))
df_QE = df.select("AnoMes", lit("Boleto").alias("Metodo"), df.quantidadeBoleto.alias("Quantidade"))
df_QF = df.select("AnoMes", lit("DOC").alias("Metodo"), df.quantidadeDOC.alias("Quantidade"))

In [0]:
df_result = df_A.union(df_B).union(df_C).union(df_D).union(df_E).union(df_F)
df_result_quantidade = df_QA.union(df_QB).union(df_QC).union(df_QD).union(df_QE).union(df_QF)

In [0]:
df = df_result.join(df_result_quantidade, on=[(df_result.AnoMes == df_result_quantidade.AnoMes) & (df_result.Metodo == df_result_quantidade.Metodo)], how = 'outer').select(df_result['*'], df_result_quantidade['Quantidade'])

In [0]:
windowSpec = Window.partitionBy("AnoMes")

df = df.withColumn("soma_valor_mes", sum("Valor").over(windowSpec))
df = df.withColumn("PercentualValor", (col("Valor") / col("soma_valor_mes")))
df = df.withColumn("PercentualValor", round(col("PercentualValor"), 5))

df = df.withColumn("soma_quantidade_mes", sum("Quantidade").over(windowSpec))
df = df.withColumn("PercentualQuantidade", (col("Quantidade") / col("soma_quantidade_mes")))
df = df.withColumn("PercentualQuantidade", round(col("PercentualQuantidade"), 5))

df = df.drop('soma_valor_mes','soma_quantidade_mes')

In [0]:
df.repartition(20)\
              .write.format("delta")\
              .mode("overwrite")\
              .partitionBy('AnoMes')\
              .option("overwriteSchema", "true")\
              .save("/dbfs/meiospagamento_refinado")

In [0]:
df.display()

AnoMes,Metodo,Valor,Quantidade,PercentualValor,PercentualQuantidade
202401,Boleto,516702.44,363550.32,0.090402614233202,0.0754513217338385
202401,Cheque,41615.89,11919.12,0.0072811447332073,0.002473697060435
202401,DOC,616.58,600.76,0.00010787726081554352,0.00012468187634883906
202401,Pix,1818473.97,4371542.0,0.3181614563365125,0.9072708887039028
202401,TEC,815.94,387.18,0.0001427574235133066,8.03554312616411e-05
202401,TED,3337344.85,70343.25,0.5839041500127493,0.0145990551942131
202402,Boleto,463066.01,332269.47,0.089055841305312,0.0695750422218283
202402,Cheque,37076.26,10373.38,0.0071304251563497,0.0021721175631425
202402,DOC,34.21,61.89,6.579192307927601e-06,1.2959359050077516e-05
202402,Pix,1718214.16,4367128.53,0.3304431857598446,0.914447998676801
