<a href="https://colab.research.google.com/github/BiancadeFrancisco/BigData_PySpark_EscrevendoDados/blob/main/C%C3%B3pia_de_PYSPARK_analisando_dados.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m4.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=900d9aebc5be490a2ddc02cd928e2d317ff253f6d2220d9c0db7c3739a3bdb21
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [3]:
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf

In [4]:
# ConfigureSparkUI
conf = SparkConf().set('spark.ui.port', '4050')
sc = SparkContext(conf=conf)
sc.stop()

spark = (
    SparkSession.builder                  # Método da classe que constrói a sessão spark
      .appName("Exercicios Spark")  # Nome do App Spark
      .getOrCreate())                     # Verifica se há uma sessão ativa, e se não há, cria uma nova sessão

In [6]:
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType, StringType, TimestampType


schema_base_pix = StructType([
    StructField('id', IntegerType()),
    StructField('valor', DoubleType()),
    StructField('parte_debitada_nome', StringType()),
    StructField('parte_debitada_conta', StringType()),
    StructField('parte_debitada_banco', StringType()),
    StructField('parte_creditada_nome', StringType()),
    StructField('parte_creditada_conta', StringType()),
    StructField('parte_creditada_banco', StringType()),
    StructField('chave_pix_tipo', StringType()),
    StructField('chave_pix_valor', StringType()),
    StructField('data_transacao', TimestampType())
])

caminho_csv = "./base_de_dados.csv"

df = spark.read.csv(
      path=caminho_csv,
      header=True,
      sep=";",
      schema=schema_base_pix,
      timestampFormat="dd/MM/yyyy HH:mm"
)

In [7]:
# LER O ARQUIVO:

df.show()

+---+--------+--------------------+--------------------+--------------------+--------------------+---------------------+---------------------+--------------+---------------+-------------------+
| id|   valor| parte_debitada_nome|parte_debitada_conta|parte_debitada_banco|parte_creditada_nome|parte_creditada_conta|parte_creditada_banco|chave_pix_tipo|chave_pix_valor|     data_transacao|
+---+--------+--------------------+--------------------+--------------------+--------------------+---------------------+---------------------+--------------+---------------+-------------------+
|  1|    9.93|Dra. Ana Carolina...|            79470453|              Nubank|       Maysa da Cruz|             67162333|                 Itau|           cpf|     8439752610|2022-02-18 13:28:00|
|  2|   15.38|        Ana Caldeira|            19689668|                Itau|        Evelyn Sales|             60005091|             Bradesco|           cpf|    27145380617|2022-04-08 01:47:00|
|  3|   57.58|    Arthur Gonca

In [8]:
# Descubra a quantidade total de transações na base de dados

df.count()

100

In [9]:
# Qual foi a quantia total de dinheiro transacional em cada ano?
from pyspark.sql.functions import year, col

df.withColumn(
    'year', year(col('data_transacao'))
).groupBy('year').sum('valor').show()

+----+------------------+
|year|        sum(valor)|
+----+------------------+
|2022|517980.58999999973|
|2021| 848320.6000000001|
+----+------------------+



In [10]:
# Descubra a quantidade de dinheiro que entrou na Nubank em todo o período.

from pyspark.sql.functions import sum as _sum
df.filter(col('parte_creditada_banco') == "Nubank").select(_sum('valor')).show()

from pyspark.sql import functions as F
df.filter(col('parte_creditada_banco') == "Nubank").select(F.sum('valor')).show()

+----------+
|sum(valor)|
+----------+
| 302224.64|
+----------+

+----------+
|sum(valor)|
+----------+
| 302224.64|
+----------+



In [11]:
# Retorne a quantidade de transações agrupadas por tipo de chave.

from pyspark.sql.functions import year, col

df.groupBy('chave_pix_tipo').count().show()

+--------------+-----+
|chave_pix_tipo|count|
+--------------+-----+
|       celular|   22|
|         email|   29|
|           cpf|   49|
+--------------+-----+



In [12]:
# Descubra qual é a média, mínima e máxima do valor de todas as transações.

df.select('valor').describe().show()

from pyspark.sql import functions as F


df.select(
    F.max(col('valor')),
    F.min(col('valor')),
    F.mean(col('valor')),
).show()

+-------+------------------+
|summary|             valor|
+-------+------------------+
|  count|               100|
|   mean|13663.011899999998|
| stddev|25715.376055332952|
|    min|              0.03|
|    max|          95977.62|
+-------+------------------+

+----------+----------+------------------+
|max(valor)|min(valor)|        avg(valor)|
+----------+----------+------------------+
|  95977.62|      0.03|13663.011899999998|
+----------+----------+------------------+



In [13]:
# Quais foram as 5 maiores transações durante todo o período?

df.orderBy('valor', ascending=False).show(5)

+---+--------+--------------------+--------------------+--------------------+--------------------+---------------------+---------------------+--------------+--------------------+-------------------+
| id|   valor| parte_debitada_nome|parte_debitada_conta|parte_debitada_banco|parte_creditada_nome|parte_creditada_conta|parte_creditada_banco|chave_pix_tipo|     chave_pix_valor|     data_transacao|
+---+--------+--------------------+--------------------+--------------------+--------------------+---------------------+---------------------+--------------+--------------------+-------------------+
| 54|95977.62|   Henrique Ferreira|             6269315|              Nubank|Sra. Maria Luiza ...|             68542779|                  BTG|           cpf|         14957860211|2021-09-22 10:17:00|
| 86|94736.79|Joao Vitor Cavalc...|            52265189|                 BTG|        Stella Gomes|             93858790|               Nubank|         email|stella.gomes@yaho...|2021-01-26 00:04:00|
| 97|

In [14]:
# Para o banco BTG, mostre qual é a chave pix mais utilizada para enviar ou receber transações.
df.filter(
    (col('parte_debitada_banco') == "BTG") |
    (col('parte_creditada_banco') == "BTG")
).groupBy('chave_pix_tipo').count().orderBy('count', ascending=False).show()

+--------------+-----+
|chave_pix_tipo|count|
+--------------+-----+
|           cpf|   22|
|         email|   15|
|       celular|   13|
+--------------+-----+



In [19]:
# Descubra qual foi a maior transação de cada mês na base de dados.
from pyspark.sql.functions import date_format

df.select(
    'id', 'valor', 'data_transacao'
).withColumn(
    'ano_mes', date_format(col('data_transacao'), 'y-MM')
).show()


from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

window_func  = Window.partitionBy("ano_mes").orderBy("valor")

df.select(
    'id', 'valor', 'data_transacao'
).withColumn(
    'ano_mes', date_format(col('data_transacao'), 'y-MM')
).withColumn('row_number', row_number().over(window_func)).show()

window_func  = Window.partitionBy("ano_mes").orderBy(col("valor").desc())

df.select(
    'id', 'valor', 'data_transacao'
).withColumn(
    'ano_mes', date_format(col('data_transacao'), 'y-MM')
).withColumn('row_number', row_number().over(window_func)).show()


df.select(
    'id', 'valor', 'data_transacao'
).withColumn(
    'ano_mes', date_format(col('data_transacao'), 'y-MM')
).withColumn('row_number', row_number().over(window_func)).filter(col('row_number') == 1).show()

+---+--------+-------------------+-------+
| id|   valor|     data_transacao|ano_mes|
+---+--------+-------------------+-------+
|  1|    9.93|2022-02-18 13:28:00|2022-02|
|  2|   15.38|2022-04-08 01:47:00|2022-04|
|  3|   57.58|2022-07-14 03:18:00|2022-07|
|  4|53705.13|2022-01-15 18:06:00|2022-01|
|  5|25299.69|2022-05-13 11:04:00|2022-05|
|  6| 7165.06|2022-09-11 13:38:00|2022-09|
|  7|    6.16|2021-12-10 12:37:00|2021-12|
|  8|  136.36|2021-12-30 23:18:00|2021-12|
|  9|  574.39|2021-06-21 07:20:00|2021-06|
| 10|   42.88|2022-09-21 17:19:00|2022-09|
| 11|33629.97|2022-09-12 00:29:00|2022-09|
| 12| 4374.56|2022-08-07 17:01:00|2022-08|
| 13|  507.18|2021-03-07 12:34:00|2021-03|
| 14|67758.87|2021-03-24 22:58:00|2021-03|
| 15|  815.53|2022-02-21 11:25:00|2022-02|
| 16|    2.73|2021-07-20 09:17:00|2021-07|
| 17|    0.54|2022-02-16 10:16:00|2022-02|
| 18|49836.72|2022-07-18 22:46:00|2022-07|
| 19|    9.68|2022-02-26 15:05:00|2022-02|
| 20| 9837.22|2021-06-22 05:39:00|2021-06|
+---+------