In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T

spark = SparkSession.builder \
    .appName("AgregateTerste") \
    .getOrCreate()

In [16]:
data = [
    (1, 10, 20, 10, 20),
    (2, 15, 25, 15, 25),
    (3, 20, 30, 20, 23),
    (4, 25, 35, 25, 12)
]

columns = ["ID", "valor1_hml", "valor2_hml", "valor1", "valor2"]

df = spark.createDataFrame(data, columns)

df = df.withColumn('valor1_hml_OK', F.when(df.valor1_hml != df.valor1, 0).otherwise(1))\
       .withColumn('valor1_hml_NOK', F.when(df.valor1_hml != df.valor1, 1).otherwise(0))\
       .withColumn('valor2_hml_OK', F.when(df.valor2_hml != df.valor2, 0).otherwise(1))\
       .withColumn('valor2_hml_NOK', F.when(df.valor2_hml != df.valor2, 1).otherwise(0))

df.show(truncate=False)

+---+----------+----------+------+------+-------------+--------------+-------------+--------------+
|ID |valor1_hml|valor2_hml|valor1|valor2|valor1_hml_OK|valor1_hml_NOK|valor2_hml_OK|valor2_hml_NOK|
+---+----------+----------+------+------+-------------+--------------+-------------+--------------+
|1  |10        |20        |10    |20    |1            |0             |1            |0             |
|2  |15        |25        |15    |25    |1            |0             |1            |0             |
|3  |20        |30        |20    |23    |1            |0             |0            |1             |
|4  |25        |35        |25    |12    |1            |0             |0            |1             |
+---+----------+----------+------+------+-------------+--------------+-------------+--------------+



In [18]:
# df2 = df.groupBy('ID').agg(F.collect_list(F.struct(F.sum('valor1_OK').alias('valor1_OK_qtd'),
#                                                    F.sum('valor1_NOK').alias('valor1_NOK_qtd'),
#                                                    F.sum('valor2_OK').alias('valor2_OK_qtd'),
#                                                    F.sum('valor2_NOK').alias('valor2_NOK_qtd'))).alias('ListaValor'))

qtdReg = df.count()


df2 = df.groupBy().agg(
    F.sum('valor1_hml_OK').alias('valor1_hml_OK_qtd'),
    F.concat(F.round(F.sum('valor1_hml_OK')/qtdReg * 100, 2), F.lit('%')).alias('valor1_OK_percent'),
    F.sum('valor1_hml_NOK').alias('valor1_hml_NOK_qtd'),
    F.concat(F.round(F.sum('valor1_hml_NOK')/qtdReg * 100, 2), F.lit('%')).alias('valor1_NOK_percent'),
    F.sum('valor2_hml_OK').alias('valor2_hml_OK_qtd'),
    F.concat(F.round(F.sum('valor2_hml_OK')/qtdReg * 100, 2), F.lit('%')).alias('valor2_OK_percent'),
    F.sum('valor2_hml_NOK').alias('valor2_hml_NOK_qtd'),
    F.concat(F.round(F.sum('valor2_hml_NOK')/qtdReg * 100, 2), F.lit('%')).alias('valor2_NOK_percent')
)


# df2.show()
df2.show(vertical=True,truncate=False)

-RECORD 0--------------------
 valor1_hml_OK_qtd  | 4      
 valor1_OK_percent  | 100.0% 
 valor1_hml_NOK_qtd | 0      
 valor1_NOK_percent | 0.0%   
 valor2_hml_OK_qtd  | 2      
 valor2_OK_percent  | 50.0%  
 valor2_hml_NOK_qtd | 2      
 valor2_NOK_percent | 50.0%  



In [19]:
df = df.withColumn(
    "columns_check",
    F.array(
        F.struct(
            F.lit("valor1").alias("column_name"),
            df["valor1"].alias("original_value"),
            df["valor1_hml"].alias("compared_value"),
            F.when(df["valor1_hml"] != df["valor1"], 0).otherwise(1).alias("valor_check_OK"),
            F.when(df["valor1_hml"] != df["valor1"], 1).otherwise(0).alias("valor_check_NOK"),
        ),
        F.struct(
            F.lit("valor2").alias("column_name"),
            df["valor2"].alias("original_value"),
            df["valor2_hml"].alias("compared_value"),
            F.when(df["valor2_hml"] != df["valor2"], 0).otherwise(1).alias("valor_check_OK"),
            F.when(df["valor2_hml"] != df["valor2"], 1).otherwise(0).alias("valor_check_NOK"),
        ),
    ),
)

# df4 = df.select(F.explode("columns_check").alias("columns_check"))

# df.select("columns_check").show(truncate=False)
# df4.printSchema()
# df4.show(truncate=False)

# df.groupBy(df["columns_check"]["column_name"]).count().show(truncate=False)


# df.groupBy(df["columns_check"]["column_name"]).agg(
#     F.sum(df["columns_check"]["valor_check_OK"]).alias("valor_check_OK_qtd"),
#     F.concat(F.round(F.sum(df["columns_check"]["valor_check_OK"])/qtdReg * 100, 2), F.lit('%')).alias("valor_check_OK_percent"),
#     F.sum(df["columns_check"]["valor_check_NOK"]).alias("valor_check_NOK_qtd"),
#     F.concat(F.round(F.sum(df["columns_check"]["valor_check_NOK"])/qtdReg * 100, 2), F.lit('%')).alias("valor_check_NOK_percent")
# )



# Explode o array para transformar cada struct em uma linha
df_exploded = df.select(F.explode("columns_check").alias("columns_check"))

# Agora você pode agrupar e somar normalmente
df_grouped = df_exploded.groupBy(df_exploded["columns_check"]["column_name"].alias("column")).agg(
    F.sum(df_exploded["columns_check"]["valor_check_OK"]).alias("valor_check_OK_qtd"),
    F.concat(F.round(F.sum(df_exploded["columns_check"]["valor_check_OK"])/qtdReg * 100, 2), F.lit('%')).alias("valor_check_OK_percent"),
    F.sum(df_exploded["columns_check"]["valor_check_NOK"]).alias("valor_check_NOK_qtd"),
    F.concat(F.round(F.sum(df_exploded["columns_check"]["valor_check_NOK"])/qtdReg * 100, 2), F.lit('%')).alias("valor_check_NOK_percent"),
)

df_grouped.show(truncate=False)


+------+------------------+----------------------+-------------------+-----------------------+
|column|valor_check_OK_qtd|valor_check_OK_percent|valor_check_NOK_qtd|valor_check_NOK_percent|
+------+------------------+----------------------+-------------------+-----------------------+
|valor1|4                 |100.0%                |0                  |0.0%                   |
|valor2|2                 |50.0%                 |2                  |50.0%                  |
+------+------------------+----------------------+-------------------+-----------------------+



In [25]:
# F.lit("valor2").alias("column_name"),
#             df["valor2"].alias("original_value"),
#             df["valor2_hml"].alias("compared_value"),
#             F.when(df["valor2_hml"] != df["valor2"], 0).otherwise(1).alias("valor_check_OK"),
#             F.when(df["valor2_hml"] != df["valor2"], 1).otherwise(0).alias("valor_check_NOK"),

valores_base = ['valor1_hml', 'valor2_hml']

tam = len(valores_base)

structs = []
for vb in valores_base:
    structs.append(
        F.struct(
            F.lit(vb.replace('_hml', '')).alias("column"),
            F.col(vb.replace('_hml', '')).alias("original_value"),
            F.col(vb).alias("compared_value_hml"),
            F.when(df[vb] != df[vb.replace('_hml', '')], 0).otherwise(1).alias("valor_check_OK"),
            F.when(df[vb] != df[vb.replace('_hml', '')], 1).otherwise(0).alias("valor_check_NOK")
        )
    )
    

# structs


# Aplicar o transpose
df_transposed = df.select(F.explode(F.array(*structs)).alias("valores")).select("valores.*")


df_transposed.show()

df_transposed.groupBy(df_transposed["column"]).agg(
    F.sum(df_transposed["valor_check_OK"]).alias("valor_check_OK_qtd"),
    F.concat(F.round(F.sum(df_transposed["valor_check_OK"])/qtdReg * 100, 2), F.lit('%')).alias("valor_check_OK_percent"),
    F.sum(df_transposed["valor_check_NOK"]).alias("valor_check_NOK_qtd"),
    F.concat(F.round(F.sum(df_transposed["valor_check_NOK"])/qtdReg * 100, 2), F.lit('%')).alias("valor_check_NOK_percent"),
).show(truncate=False)

+------+--------------+------------------+--------------+---------------+
|column|original_value|compared_value_hml|valor_check_OK|valor_check_NOK|
+------+--------------+------------------+--------------+---------------+
|valor1|            10|                10|             1|              0|
|valor2|            20|                20|             1|              0|
|valor1|            15|                15|             1|              0|
|valor2|            25|                25|             1|              0|
|valor1|            20|                20|             1|              0|
|valor2|            23|                30|             0|              1|
|valor1|            25|                25|             1|              0|
|valor2|            12|                35|             0|              1|
+------+--------------+------------------+--------------+---------------+

+------+------------------+----------------------+-------------------+-----------------------+
|column|valor_ch