In [19]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import *
from pyspark.sql.types import StructField, LongType
from pyspark.sql import types
from functools import reduce
from operator import mul
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType
import pandas as pd
from delta import *
from delta.tables import *


In [20]:
builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [21]:
df = spark.read.load("data_entrada.csv", format="csv", sep=",", inferSchema="true", header="true")


+-----------+----------+----------+----------+----+---------+--------------+
|dataInicial| dataFinal|percentual|     fator|taxa|valorBase|valorCalculado|
+-----------+----------+----------+----------+----+---------+--------------+
| 04/07/1994|05/07/1994|     100.0|   1.00398| 0.4|   1000.0|       1003.98|
| 05/07/1994|06/07/1994|     100.0|1.00399667| 0.4|   1000.0|       1003.99|
| 06/07/1994|07/07/1994|     100.0|     1.004| 0.4|   1000.0|        1004.0|
| 07/07/1994|08/07/1994|     100.0|   1.00397| 0.4|   1000.0|       1003.97|
| 08/07/1994|09/07/1994|     100.0|   1.00389|0.39|   1000.0|       1003.89|
| 09/07/1994|10/07/1994|     100.0|       1.0| 0.0|   1000.0|        1000.0|
| 10/07/1994|11/07/1994|     100.0|       1.0| 0.0|   1000.0|        1000.0|
| 11/07/1994|12/07/1994|     100.0|1.00375667|0.38|   1000.0|       1003.75|
| 12/07/1994|13/07/1994|     100.0|    1.0038|0.38|   1000.0|        1003.8|
| 13/07/1994|14/07/1994|     100.0|1.00380333|0.38|   1000.0|        1003.8|

In [22]:
df = df.drop('dataInicial', 'percentual', 'taxa', 'valorBase', 'valorCalculado')
df = df.withColumnRenamed("dataFinal", "data")
df = df.where("fator>1.00000000")


+----------+----------+
|      data|     fator|
+----------+----------+
|05/07/1994|   1.00398|
|06/07/1994|1.00399667|
|07/07/1994|     1.004|
|08/07/1994|   1.00397|
|09/07/1994|   1.00389|
|12/07/1994|1.00375667|
|13/07/1994|    1.0038|
|14/07/1994|1.00380333|
|15/07/1994|1.00374667|
|16/07/1994|1.00342667|
|19/07/1994|1.00332333|
|20/07/1994|1.00337667|
|21/07/1994|1.00336333|
|22/07/1994|1.00276333|
|23/07/1994|   1.00238|
|26/07/1994|1.00244667|
|27/07/1994|   1.00234|
|28/07/1994|1.00245333|
|29/07/1994|1.00209667|
|30/07/1994|1.00183333|
+----------+----------+
only showing top 20 rows



In [23]:
schema = df.schema.add(StructField("id", LongType()))
rdd = df.rdd.zipWithIndex()


def flat(l):
    for k in l:
        if not isinstance(k, (list, tuple)):
            yield k
        else:
            yield from flat(k)


rdd = rdd.map(lambda x: list(flat(x)))
df = spark.createDataFrame(rdd, schema)


+----------+----------+---+
|      data|     fator| id|
+----------+----------+---+
|05/07/1994|   1.00398|  0|
|06/07/1994|1.00399667|  1|
|07/07/1994|     1.004|  2|
|08/07/1994|   1.00397|  3|
|09/07/1994|   1.00389|  4|
|12/07/1994|1.00375667|  5|
|13/07/1994|    1.0038|  6|
|14/07/1994|1.00380333|  7|
|15/07/1994|1.00374667|  8|
|16/07/1994|1.00342667|  9|
|19/07/1994|1.00332333| 10|
|20/07/1994|1.00337667| 11|
|21/07/1994|1.00336333| 12|
|22/07/1994|1.00276333| 13|
|23/07/1994|   1.00238| 14|
|26/07/1994|1.00244667| 15|
|27/07/1994|   1.00234| 16|
|28/07/1994|1.00245333| 17|
|29/07/1994|1.00209667| 18|
|30/07/1994|1.00183333| 19|
+----------+----------+---+
only showing top 20 rows



In [24]:
window = Window.orderBy('id')
mul_udf = F.udf(lambda x: reduce(mul, x), types.DoubleType())
df = df.withColumn('fator_acumulado', mul_udf(F.collect_list(F.col('fator')).over(window)))

# Converting String based numbers into float.
df = df.withColumn('fator_acumulado', df.fator_acumulado.cast("float"))


# Function defined by user, to calculate distance between two points on the globe.
def get_taxaAcumulada(fator_acumulado):
    taxa_acumulada = (fator_acumulado - 1) * 100

    return taxa_acumulada


udf_func = udf(get_taxaAcumulada, FloatType())
df = df.withColumn("taxa_acumulada", udf_func(df.fator_acumulado))
df = df.drop('id')



21/07/26 23:59:45 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
[Stage 27:>                                                         (0 + 1) / 1]

+----------+----------+---------------+--------------+
|      data|     fator|fator_acumulado|taxa_acumulada|
+----------+----------+---------------+--------------+
|05/07/1994|   1.00398|        1.00398|    0.39800406|
|06/07/1994|1.00399667|      1.0079926|     0.7992625|
|07/07/1994|     1.004|      1.0120245|     1.2024522|
|08/07/1994|   1.00397|      1.0160422|     1.6042233|
|09/07/1994|   1.00389|      1.0199947|     1.9994736|
|12/07/1994|1.00375667|      1.0238265|      2.382648|
|13/07/1994|    1.0038|       1.027717|     2.7716994|
|14/07/1994|1.00380333|      1.0316257|     3.1625748|
|15/07/1994|1.00374667|      1.0354909|      3.549087|
|16/07/1994|1.00342667|      1.0390393|     3.9039254|
|19/07/1994|1.00332333|      1.0424923|      4.249227|
|20/07/1994|1.00337667|      1.0460124|       4.60124|
|21/07/1994|1.00336333|      1.0495305|     4.9530506|
|22/07/1994|1.00276333|      1.0524307|      5.243075|
|23/07/1994|   1.00238|      1.0549355|     5.4935455|
|26/07/199

                                                                                

In [25]:
df.write.format("delta").save("saida-delta-table")

21/07/26 23:59:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
                                                                                

In [27]:
read_df = spark.read.format("delta").load("saida-delta-table")