In [154]:
import os
# Definindo a variável de ambiente do SPARK_VERSION com versão do Spark que estou usando:
os.environ["SPARK_VERSION"]="3.4.1"

In [155]:
from pyspark.sql import SparkSession
from pyspark.sql.types import (
    StructField,
    StructType,
    StringType,
    TimestampType,
    DoubleType,
    IntegerType,
)

SparkSession:

In [156]:
spark = SparkSession.builder.appName('question_4').getOrCreate()

Schema:

In [157]:
schema = StructType([
    StructField('id', StringType()),
    StructField('data_emissao', TimestampType()),
    StructField('data_vencimento', TimestampType()),
    StructField('valor', DoubleType()),
    StructField('status', StringType()),
    StructField('pagamento_data', TimestampType()),
    StructField('baixa_data', TimestampType()),
    StructField('data_cancelamento', TimestampType()),
    StructField('valor_cancelado', DoubleType()),
    StructField('pagamento_valor', DoubleType()),
    StructField('id_renegociacao', StringType()),
    StructField('id_renegociacao_novo', StringType()),
])

### DataFrame:

In [158]:
path = r'../data/Tabela financeiro - teste Python pleno.csv'

In [159]:
df = spark.read.csv(
    path,
    sep=',',
    header=True,
    schema=schema
).createOrReplaceTempView("base_de_dados")

In [160]:
spark.sql('select * from base_de_dados').show()


+------+-------------------+-------------------+-----+------+-------------------+-------------------+-------------------+---------------+---------------+---------------+--------------------+
|    id|       data_emissao|    data_vencimento|valor|status|     pagamento_data|         baixa_data|  data_cancelamento|valor_cancelado|pagamento_valor|id_renegociacao|id_renegociacao_novo|
+------+-------------------+-------------------+-----+------+-------------------+-------------------+-------------------+---------------+---------------+---------------+--------------------+
| 86794|2019-05-20 00:00:00|2020-06-01 00:00:00| 99.9|     R|2020-05-25 00:00:00|2020-05-26 08:53:20|               null|            0.0|           99.9|           null|                null|
| 89892|2019-05-21 00:00:00|2020-06-01 00:00:00| 99.9|     C|               null|               null|2019-05-21 00:00:00|           99.9|           null|           null|                null|
| 91689|2019-05-22 00:00:00|2020-06-01 00:00:

### Entendimento dos Dados:

Quais são os nomes das colunas?

In [161]:
# df.schema.fieldNames()

Quais são os tipos?

In [162]:
# df.printSchema()

### Análise:

a) Qual é o valor total de títulos emitidos no mês de junho de 2020? Qual é o valor total de títulos com vencimento no mesmo período? Por que esses números são diferentes?

Valor total de títulos emitidos em junho (mês 6) de 2020:

In [163]:
query = """
select
    sum(valor)
from base_de_dados

where (
    year(data_emissao) = 2020
    and month(data_emissao) = 6
)
"""
spark.sql(query).show()

+-----------------+
|       sum(valor)|
+-----------------+
|540360.0000000052|
+-----------------+



Valor total de títulos vencimentos em junho (mês 6) de 2020:

In [164]:
query = """
select
    sum(valor)
from base_de_dados

where (
    year(data_vencimento) = 2020
    and month(data_vencimento) = 6
)
"""
spark.sql(query).show()

+-----------------+
|       sum(valor)|
+-----------------+
|3076802.819998371|
+-----------------+



Quantidade de títulos emitidos em junho (mês 6) de 2020:

In [165]:
query = """
select
    count(data_emissao)
from base_de_dados

where (
    year(data_emissao) = 2020
    and month(data_emissao) = 6
)
"""
spark.sql(query).show()

+-------------------+
|count(data_emissao)|
+-------------------+
|               5423|
+-------------------+



Quantidade de títulos vencimento em junho (mês 6) de 2020:

In [166]:
query = """
select
    count(data_vencimento)
from base_de_dados

where (
    year(data_vencimento) = 2020
    and month(data_vencimento) = 6
)
"""
spark.sql(query).show()

+----------------------+
|count(data_vencimento)|
+----------------------+
|                 32082|
+----------------------+



*O motivo é que havia mais vencimentos agendados para junho (mês 6) de 2020 do que a emissão de boletos em junho (mês 6) de 2020.*

b) Gerar um relatório com todos os clientes que tiveram títulos pagos em julho de 2020.

In [167]:
query = """
select
    id,
    data_emissao,
    data_vencimento,
    valor,
    status,
    pagamento_data,
    baixa_data,
    pagamento_valor
from base_de_dados

where (
    year(pagamento_data) = 2020
    and month(pagamento_data) = 7
)
"""
spark.sql(query).show()

+------+-------------------+-------------------+-----+------+-------------------+-------------------+---------------+
|    id|       data_emissao|    data_vencimento|valor|status|     pagamento_data|         baixa_data|pagamento_valor|
+------+-------------------+-------------------+-----+------+-------------------+-------------------+---------------+
|110562|2019-06-05 00:00:00|2020-06-01 00:00:00| 99.9|     R|2020-07-22 00:00:00|2020-07-23 09:02:14|         110.06|
|112238|2019-06-06 00:00:00|2020-06-01 00:00:00| 99.9|     R|2020-07-31 00:00:00|2020-07-31 16:10:50|           99.9|
|119472|2019-06-11 00:00:00|2020-06-01 00:00:00| 79.9|     R|2020-07-31 00:00:00|2020-07-31 11:37:50|           79.9|
|120832|2019-06-12 00:00:00|2020-06-01 00:00:00| 79.9|     R|2020-07-01 00:00:00|2020-07-02 08:19:44|           82.1|
|131361|2019-06-24 00:00:00|2020-06-01 00:00:00| 99.9|     R|2020-07-02 00:00:00|2020-07-03 08:25:46|         102.52|
|131867|2019-06-24 00:00:00|2020-06-01 00:00:00| 79.9|  

c) Encontre quantos boletos foram renegociados no mês de julho de 2020.

In [168]:
query = """
select
    count(id)
from base_de_dados

where (
    year(data_vencimento) = 2020
    and month(data_vencimento) = 7
    and (
        id_renegociacao is not null
        or id_renegociacao_novo is not null
    )
)
"""
spark.sql(query).show()

+---------+
|count(id)|
+---------+
|        1|
+---------+



d) Encontre o valor pago de juros de todos os boletos pagos em junho de 2020.

Valor total de juros pagos por boletos em junho de 2020:

In [169]:
query = """
select
    sum(valor) - sum(pagamento_valor) as juros_total
from base_de_dados

where (
    year(data_vencimento) = 2020
    and month(data_emissao) = 6
    and status = 'R'
)
"""
spark.sql(query).show()

+------------------+
|       juros_total|
+------------------+
|-7964.829999999201|
+------------------+



Juros pagos por boletos em junho de 2020:

In [170]:
query = """
select
    valor - pagamento_valor as juros_total
from base_de_dados

where (
    year(data_emissao) = 2020
    and month(data_emissao) = 6
    and status = 'R'
)
"""
spark.sql(query).show()

+--------------------+
|         juros_total|
+--------------------+
|                 0.0|
|-0.09999999999999432|
|                 0.0|
|  -4.409999999999997|
| -3.2299999999999898|
|                 0.0|
|                 0.0|
|                 0.0|
|                 0.0|
| -2.9099999999999966|
|               -4.25|
|  -7.429999999999993|
|                 0.0|
|                 0.0|
|                 0.0|
| -3.1499999999999915|
|                 0.0|
|                 0.0|
|  -4.709999999999994|
|                 0.0|
+--------------------+
only showing top 20 rows



e) Encontre o id dos boletos que foram usados para gerar uma renegociação em junho de 2020.

In [171]:
query = """
select
    id,
    id_renegociacao,
    id_renegociacao_novo
from base_de_dados

where (
    year(data_emissao) = 2020
    and month(data_emissao) = 6
    and (
        id_renegociacao is not null
        or id_renegociacao_novo is not null
    )
)
"""
spark.sql(query).show()

+------+---------------+--------------------+
|    id|id_renegociacao|id_renegociacao_novo|
+------+---------------+--------------------+
|400729|           null|               541.0|
|407525|           null|               542.0|
|407525|           null|               542.0|
+------+---------------+--------------------+

