# Iniciando Sessão Spark

In [5]:
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf

# ConfigureSparkUI
conf = SparkConf().set('spark.ui.port', '4050')
sc = SparkContext(conf=conf)
sc.stop()

spark = (
    SparkSession.builder                  # Método da classe que constrói a sessão spark
      .appName("Exercicios Spark")  # Nome do App Spark
      .getOrCreate())                     # Verifica se há uma sessão ativa, e se não há, cria uma nova sessão

# Definindo DataFrame

In [6]:
#Importando bibliotecas
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType, StringType, TimestampType


#Definindo os tipos de dados manualmente
schema_base_pix = StructType([
    StructField('id', IntegerType()),
    StructField('valor', DoubleType()),
    StructField('parte_debitada_nome', StringType()),
    StructField('parte_debitada_conta', StringType()),
    StructField('parte_debitada_banco', StringType()),
    StructField('parte_creditada_nome', StringType()),
    StructField('parte_creditada_conta', StringType()),
    StructField('parte_creditada_banco', StringType()),
    StructField('chave_pix_tipo', StringType()),
    StructField('chave_pix_valor', StringType()),
    StructField('data_transacao', TimestampType())
])

#Importando base de dados
caminho_csv = "./base_de_dados.csv"

#Definindo dataframe com o schema definido acima
df = spark.read.csv(
      path=caminho_csv,
      header=True,
      sep=";",
      schema=schema_base_pix,
      timestampFormat="dd/MM/yyyy HH:mm"  #formato de horas em dia meses e anos, horas e min.
)

# Análises

In [7]:
#Show dataframe
df.show()

+---+--------+--------------------+--------------------+--------------------+--------------------+---------------------+---------------------+--------------+---------------+-------------------+
| id|   valor| parte_debitada_nome|parte_debitada_conta|parte_debitada_banco|parte_creditada_nome|parte_creditada_conta|parte_creditada_banco|chave_pix_tipo|chave_pix_valor|     data_transacao|
+---+--------+--------------------+--------------------+--------------------+--------------------+---------------------+---------------------+--------------+---------------+-------------------+
|  1|    9.93|Dra. Ana Carolina...|            79470453|              Nubank|       Maysa da Cruz|             67162333|                 Itau|           cpf|     8439752610|2022-02-18 13:28:00|
|  2|   15.38|        Ana Caldeira|            19689668|                Itau|        Evelyn Sales|             60005091|             Bradesco|           cpf|    27145380617|2022-04-08 01:47:00|
|  3|   57.58|    Arthur Gonca

In [8]:
#Verificando tipos dos dados
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- valor: double (nullable = true)
 |-- parte_debitada_nome: string (nullable = true)
 |-- parte_debitada_conta: string (nullable = true)
 |-- parte_debitada_banco: string (nullable = true)
 |-- parte_creditada_nome: string (nullable = true)
 |-- parte_creditada_conta: string (nullable = true)
 |-- parte_creditada_banco: string (nullable = true)
 |-- chave_pix_tipo: string (nullable = true)
 |-- chave_pix_valor: string (nullable = true)
 |-- data_transacao: timestamp (nullable = true)



In [9]:
#Quantidade total de transações
df.describe().show()

+-------+------------------+------------------+-------------------+--------------------+--------------------+--------------------+---------------------+---------------------+--------------+--------------------+
|summary|                id|             valor|parte_debitada_nome|parte_debitada_conta|parte_debitada_banco|parte_creditada_nome|parte_creditada_conta|parte_creditada_banco|chave_pix_tipo|     chave_pix_valor|
+-------+------------------+------------------+-------------------+--------------------+--------------------+--------------------+---------------------+---------------------+--------------+--------------------+
|  count|               100|               100|                100|                 100|                 100|                 100|                  100|                  100|           100|                 100|
|   mean|              50.5|13663.011899999998|               NULL|        4.78019628E7|                NULL|                NULL|        5.107910677E7|    

In [10]:
df.show()

+---+--------+--------------------+--------------------+--------------------+--------------------+---------------------+---------------------+--------------+---------------+-------------------+
| id|   valor| parte_debitada_nome|parte_debitada_conta|parte_debitada_banco|parte_creditada_nome|parte_creditada_conta|parte_creditada_banco|chave_pix_tipo|chave_pix_valor|     data_transacao|
+---+--------+--------------------+--------------------+--------------------+--------------------+---------------------+---------------------+--------------+---------------+-------------------+
|  1|    9.93|Dra. Ana Carolina...|            79470453|              Nubank|       Maysa da Cruz|             67162333|                 Itau|           cpf|     8439752610|2022-02-18 13:28:00|
|  2|   15.38|        Ana Caldeira|            19689668|                Itau|        Evelyn Sales|             60005091|             Bradesco|           cpf|    27145380617|2022-04-08 01:47:00|
|  3|   57.58|    Arthur Gonca

In [11]:
#Qntd transações totais em valor/ano

from pyspark.sql.functions import col, sum, year, month, dayofmonth, dayofyear

df.withColumn('year',year (col('data_transacao'))).groupby('year').sum('valor').show()



+----+------------------+
|year|        sum(valor)|
+----+------------------+
|2022|517980.58999999973|
|2021| 848320.6000000001|
+----+------------------+



In [12]:
#Qntd valor Nubank 
from pyspark.sql.functions import sum as _sum

df.filter(col('parte_debitada_banco') == 'Nubank').select(_sum('valor')).show()
    

+----------+
|sum(valor)|
+----------+
| 225322.49|
+----------+



In [13]:
#Qntd de transações por tipo de chave 
df.groupby('chave_pix_tipo').count().show()

+--------------+-----+
|chave_pix_tipo|count|
+--------------+-----+
|       celular|   22|
|         email|   29|
|           cpf|   49|
+--------------+-----+



In [14]:
#Média, min e max 
from pyspark.sql import functions as f 

df.select(
    f.mean(col('valor')),
    f.max(col('valor')),
    f.min(col('valor'))
).show()

+------------------+----------+----------+
|        avg(valor)|max(valor)|min(valor)|
+------------------+----------+----------+
|13663.011899999998|  95977.62|      0.03|
+------------------+----------+----------+



In [None]:
#Maiores transações 
df.orderBy('valor', ascending = False).show(5)

+---+--------+--------------------+--------------------+--------------------+--------------------+---------------------+---------------------+--------------+--------------------+-------------------+
| id|   valor| parte_debitada_nome|parte_debitada_conta|parte_debitada_banco|parte_creditada_nome|parte_creditada_conta|parte_creditada_banco|chave_pix_tipo|     chave_pix_valor|     data_transacao|
+---+--------+--------------------+--------------------+--------------------+--------------------+---------------------+---------------------+--------------+--------------------+-------------------+
| 54|95977.62|   Henrique Ferreira|             6269315|              Nubank|Sra. Maria Luiza ...|             68542779|                  BTG|           cpf|         14957860211|2021-09-22 10:17:00|
| 86|94736.79|Joao Vitor Cavalc...|            52265189|                 BTG|        Stella Gomes|             93858790|               Nubank|         email|stella.gomes@yaho...|2021-01-26 00:04:00|
| 97|

In [16]:
df.show()

+---+--------+--------------------+--------------------+--------------------+--------------------+---------------------+---------------------+--------------+---------------+-------------------+
| id|   valor| parte_debitada_nome|parte_debitada_conta|parte_debitada_banco|parte_creditada_nome|parte_creditada_conta|parte_creditada_banco|chave_pix_tipo|chave_pix_valor|     data_transacao|
+---+--------+--------------------+--------------------+--------------------+--------------------+---------------------+---------------------+--------------+---------------+-------------------+
|  1|    9.93|Dra. Ana Carolina...|            79470453|              Nubank|       Maysa da Cruz|             67162333|                 Itau|           cpf|     8439752610|2022-02-18 13:28:00|
|  2|   15.38|        Ana Caldeira|            19689668|                Itau|        Evelyn Sales|             60005091|             Bradesco|           cpf|    27145380617|2022-04-08 01:47:00|
|  3|   57.58|    Arthur Gonca

In [17]:
#Chave mais utilizada banco BTG 
df.filter(
    (col('parte_creditada_banco') == "BTG")
).groupBy('chave_pix_tipo').count().orderBy('count', ascending=False).show()


+--------------+-----+
|chave_pix_tipo|count|
+--------------+-----+
|           cpf|   14|
|         email|   10|
|       celular|    7|
+--------------+-----+



In [None]:
#Maiores transação por mês 
from pyspark.sql.functions import col, month, year, date_format, row_number #cria numerações de linha em janelas

from pyspark.sql.window import Window  #define janelas p calculos e ordenações


df.select(
    'id', 'valor', 'data_transacao'
).withColumn(
    'ano_mes', date_format(col('data_transacao'), 'y-MM')  #criando coluna no formato ano/mês
).show()


window_func  = Window.partitionBy("ano_mes").orderBy("valor")  #particionando grupos em ano/mês ordenando pelo valor

df.select(
    'id', 'valor', 'data_transacao'
).withColumn(
    'ano_mes', date_format(col('data_transacao'), 'y-MM')
).withColumn('row_number', row_number().over(window_func)).show() #numerando as linhas dentro das partções criadas

window_func  = Window.partitionBy("ano_mes").orderBy(col("valor").desc()) #ordenando do valor maior p menor

df.select(
    'id', 'valor', 'data_transacao'
).withColumn(
    'ano_mes', date_format(col('data_transacao'), 'y-MM')
).withColumn('row_number', row_number().over(window_func)).show()


df.select(
    'id', 'valor', 'data_transacao'
).withColumn(
    'ano_mes', date_format(col('data_transacao'), 'y-MM')
).withColumn('row_number', row_number().over(window_func)).filter(col('row_number') == 1).show()  #filtrando apenas as maiores transações por mês



+---+--------+-------------------+-------+
| id|   valor|     data_transacao|ano_mes|
+---+--------+-------------------+-------+
|  1|    9.93|2022-02-18 13:28:00|2022-02|
|  2|   15.38|2022-04-08 01:47:00|2022-04|
|  3|   57.58|2022-07-14 03:18:00|2022-07|
|  4|53705.13|2022-01-15 18:06:00|2022-01|
|  5|25299.69|2022-05-13 11:04:00|2022-05|
|  6| 7165.06|2022-09-11 13:38:00|2022-09|
|  7|    6.16|2021-12-10 12:37:00|2021-12|
|  8|  136.36|2021-12-30 23:18:00|2021-12|
|  9|  574.39|2021-06-21 07:20:00|2021-06|
| 10|   42.88|2022-09-21 17:19:00|2022-09|
| 11|33629.97|2022-09-12 00:29:00|2022-09|
| 12| 4374.56|2022-08-07 17:01:00|2022-08|
| 13|  507.18|2021-03-07 12:34:00|2021-03|
| 14|67758.87|2021-03-24 22:58:00|2021-03|
| 15|  815.53|2022-02-21 11:25:00|2022-02|
| 16|    2.73|2021-07-20 09:17:00|2021-07|
| 17|    0.54|2022-02-16 10:16:00|2022-02|
| 18|49836.72|2022-07-18 22:46:00|2022-07|
| 19|    9.68|2022-02-26 15:05:00|2022-02|
| 20| 9837.22|2021-06-22 05:39:00|2021-06|
+---+------

In [32]:
# Criando uma nova coluna que contenha o valor da transação em dólar.
from pyspark.sql.functions import round

df.withColumn('valor_dolar', round(col('valor') * 5.09, 2)).select('id', 'valor', 'valor_dolar').show()

+---+--------+-----------+
| id|   valor|valor_dolar|
+---+--------+-----------+
|  1|    9.93|      50.54|
|  2|   15.38|      78.28|
|  3|   57.58|     293.08|
|  4|53705.13|  273359.11|
|  5|25299.69|  128775.42|
|  6| 7165.06|   36470.16|
|  7|    6.16|      31.35|
|  8|  136.36|     694.07|
|  9|  574.39|    2923.65|
| 10|   42.88|     218.26|
| 11|33629.97|  171176.55|
| 12| 4374.56|   22266.51|
| 13|  507.18|    2581.55|
| 14|67758.87|  344892.65|
| 15|  815.53|    4151.05|
| 16|    2.73|       13.9|
| 17|    0.54|       2.75|
| 18|49836.72|   253668.9|
| 19|    9.68|      49.27|
| 20| 9837.22|   50071.45|
+---+--------+-----------+
only showing top 20 rows

