In [None]:
# importando as bibliotecas necessárias
from pyspark.sql import SparkSession

import requests
import datetime

# recuperando a Spark Session
spark = SparkSession.builder.appName("mr-data-test").getOrCreate()

# recuperando o Spark Context
sc = spark.sparkContext

In [None]:
# Função para consulta do índice do CDI
def get_fator(dates):
    url = "https://calculadorarendafixa.com.br/calculadora/di/calculo"

    response = requests.get(
        url,
        params=[
            ('dataInicio', dates[0].strftime("%Y-%m-%d")),
            ('dataFim', dates[1].strftime("%Y-%m-%d")),
            ('percentual', '100'),
            ('valor', '1000.00'),
        ]
    ).json()

    return {
        "Data": dates[1],
        "Fator": response["fator"]
    }

In [None]:
# testando
response = get_fator((datetime.datetime(2021, 1, 5), datetime.datetime(2021, 1, 6)))
print(response)

In [None]:
# definição do período a ser consultado
start = datetime.datetime(2021, 7, 20)
end = datetime.datetime(2021, 7, 26)

date_list = [start + datetime.timedelta(days=x) for x in range(0, (end-start).days)]

In [None]:
# gerando um rdd a partir do período
rdd = sc.parallelize(list(zip(date_list, date_list[1:])))

In [None]:
# exibindo informações do rdd
print("Partitions: "+str(rdd.getNumPartitions()))
print("1st element: "+str(rdd.first()))
print(rdd)
print("Number of elements in RDD -> {counts} ".format(counts=rdd.count()))

In [None]:
# recupera o fator do CDI para cada data no rdd
rdd = rdd.map(get_fator)

In [None]:
# exibindo o primeiro registro
rdd.first()

In [None]:
# cria um dataframe a partir do rdd
df = spark.createDataFrame(rdd)

In [None]:
# verificando por informações faltantes
from pyspark.sql import functions as F
df.where(F.isnull(F.col("Fator"))).count()

In [None]:
# exibindo o schema do df
df.printSchema()

In [None]:
# transformando coluna fator para double
df = df.withColumn("Fator",df["Fator"].cast('double'))

# transformando data fator para date
df = df.withColumn("Data",df["Data"].cast('date'))

df.printSchema()

In [None]:
# calculando Fator Acumulado
from pyspark.sql import functions as F, Window, types
from functools import reduce
from operator import mul

window = Window.orderBy('Data')

mul_udf = F.udf(lambda x: reduce(mul, x), types.DoubleType())

df = df.withColumn('FatorAcumulado', mul_udf(F.collect_list(F.col('Fator')).over(window)))

In [None]:
# calculando Taxa Acumulada
df = df.withColumn('TaxaAcumulada', (100 * (df["FatorAcumulado"] - 1)))

In [None]:
# exibindo os 2 primeiros registros
df.show(n=2)

In [None]:
# gravando dados em uma tabela delta
# %fs rm -r /tmp/delta-table/mr-data-test
df.write.format("delta").mode("overwrite").save("/tmp/delta-table/mr-data-test")

In [None]:
# lendo os dados da tabela delta
df = spark.read.format("delta").load("/tmp/delta-table/mr-data-test")
df.show()