<a href="https://colab.research.google.com/github/LucasJFaust/spark_projects/blob/main/SparkSQL.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
! pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=c3611031c0b15acc555b1b3e82137d21be834635d295f9dcfcd500a3e8ecaed6
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [4]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
      .config('spark.ui.port', '4050')
      .appName("SparkUI Introdução")
      .getOrCreate()
)

In [12]:
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType, StringType, TimestampType

schema_remetente_destinatario = StructType([
    StructField('nome', StringType()),
    StructField('banco', StringType()),
    StructField('tipo', StringType()),
])


schema_base_pix = StructType([
    StructField('id_transacao', IntegerType()),
    StructField('valor', DoubleType()),
    StructField('remetente', schema_remetente_destinatario),
    StructField('destinatario', schema_remetente_destinatario),
    StructField('transaction_date', TimestampType()),
    StructField('chave_pix', StringType()),
    StructField('fraude', IntegerType())
])


caminho_json = '/content/pix_transactions.json'

df = spark.read.json(
    caminho_json,
    schema=schema_base_pix,
    timestampFormat="yyyy-MM-dd"
)


spark.read.json(
    caminho_json,
    schema=schema_base_pix,
    timestampFormat="yyyy-MM-dd"
).createOrReplaceTempView('transacoes_pix')

- Para usarmos o SQL no Spark basta chamarmos um sessão Spark (lembrando que é a sessão Spark que se comunica com o cluster)

In [9]:
spark.sql("select * from transacoes_pix limit 10").show()

+------------+-------+--------------------+--------------------+-------------------+---------+------+
|id_transacao|  valor|           remetente|        destinatario|   transaction_date|chave_pix|fraude|
+------------+-------+--------------------+--------------------+-------------------+---------+------+
|        1000|   7.05|{Jonathan Gonsalv...|{Gabriel Cunha, I...|2022-03-19 00:00:00|      cpf|     0|
|        1001|  37.28|{Jonathan Gonsalv...|{Diego Souza, XP,...|2021-01-26 00:00:00|aleatoria|     0|
|        1002| 282.73|{Jonathan Gonsalv...|{Nicole Nunes, BT...|2022-05-31 00:00:00|aleatoria|     0|
|        1003|8447.92|{Jonathan Gonsalv...|{Maria Fernanda C...|2022-07-04 00:00:00|aleatoria|     0|
|        1004|  58.51|{Jonathan Gonsalv...|{Isabel Silva, C6...|2021-09-11 00:00:00|aleatoria|     0|
|        1005|6655.12|{Jonathan Gonsalv...|{Anthony Carvalho...|2022-02-11 00:00:00|  celular|     0|
|        1006|9912.25|{Jonathan Gonsalv...|{Eloah Monteiro, ...|2022-05-10 00:00:0

- Vamos contar o número de transações para cada chave pix

In [14]:
group_sql = spark.sql("select chave_pix, count(*) from transacoes_pix group by chave_pix")

In [15]:
group_df = df.groupBy('chave_pix').count()

In [16]:
group_sql.explain()

group_df.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[chave_pix#101], functions=[count(1)])
   +- Exchange hashpartitioning(chave_pix#101, 200), ENSURE_REQUIREMENTS, [plan_id=29]
      +- HashAggregate(keys=[chave_pix#101], functions=[partial_count(1)])
         +- FileScan json [chave_pix#101] Batched: false, DataFilters: [], Format: JSON, Location: InMemoryFileIndex(1 paths)[file:/content/pix_transactions.json], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<chave_pix:string>


== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[chave_pix#87], functions=[count(1)])
   +- Exchange hashpartitioning(chave_pix#87, 200), ENSURE_REQUIREMENTS, [plan_id=42]
      +- HashAggregate(keys=[chave_pix#87], functions=[partial_count(1)])
         +- FileScan json [chave_pix#87] Batched: false, DataFilters: [], Format: JSON, Location: InMemoryFileIndex(1 paths)[file:/content/pix_transactions.json], PartitionFilters: [], PushedFilters: [

In [17]:
group_sql.show()

group_df.show()

+---------+--------+
|chave_pix|count(1)|
+---------+--------+
|aleatoria|   25045|
|  celular|   24841|
|    email|   24935|
|      cpf|   25179|
+---------+--------+

+---------+-----+
|chave_pix|count|
+---------+-----+
|aleatoria|25045|
|  celular|24841|
|    email|24935|
|      cpf|25179|
+---------+-----+



In [22]:
spark.sql(
    """
    select
      chave_pix,
      round(avg(valor), 3)
    from transacoes_pix
    group by 1
    order by 2
    """
).show()

+---------+--------------------+
|chave_pix|round(avg(valor), 3)|
+---------+--------------------+
|    email|           11868.017|
|      cpf|           11946.072|
|  celular|            12152.68|
|aleatoria|           12217.234|
+---------+--------------------+



- Quantidades de transações feitas para cada chave pix:

In [24]:
spark.sql(
    """
    select
      chave_pix,
      count(*)
    from transacoes_pix
    group by 1
    order by 1 desc
    """
).show()

+---------+--------+
|chave_pix|count(1)|
+---------+--------+
|    email|   24935|
|      cpf|   25179|
|  celular|   24841|
|aleatoria|   25045|
+---------+--------+



- Quantidades de transações feitas para cada chave pix e quando o valor for maior qu e 10 mil:

In [26]:
spark.sql(
    """
    select
      chave_pix,
      count(*) as count_maior_10000
    from transacoes_pix
    where valor > 10000
    group by 1
    order by 1 desc
    """
).show()

+---------+-----------------+
|chave_pix|count_maior_10000|
+---------+-----------------+
|    email|             4830|
|      cpf|             4950|
|  celular|             4922|
|aleatoria|             5032|
+---------+-----------------+



# Window Function e CTE's

- Digamos que eu queira saber quais são as duas transações mais altas por banco da parte que foi creditada.

- Para isso vamos usar o row number que faz parte das windows functions. Ele vai colocar um index para cada transação dentro de uma janela de dados. Eu quero criar uma janela de dados para cada banco e dentro dessa janela eu quero ordenar pelo valor maior para o menor, onde o valor maior vai ter o index 1, assim por diante.

- Para fazer o filtro vamos usar CTE's que são tabelas temporárias onde podemos aplicar filtro com um subselect

In [27]:
spark.sql(
    """
    select
      destinatario.banco,
      valor,
    row_number() over (partition by destinatario.banco order by valor desc) as row_number
    from transacoes_pix
    limit 10
    """
).show()

+-----+--------+----------+
|banco|   valor|row_number|
+-----+--------+----------+
|  BTG|99946.78|         1|
|  BTG| 99913.9|         2|
|  BTG|99873.58|         3|
|  BTG|99865.12|         4|
|  BTG|99840.68|         5|
|  BTG|99832.08|         6|
|  BTG| 99829.9|         7|
|  BTG|99814.23|         8|
|  BTG|99813.42|         9|
|  BTG|99785.91|        10|
+-----+--------+----------+



In [29]:
spark.sql(
    """
    with cte_base_window as (
    select
      destinatario.banco,
      valor,
    row_number() over (partition by destinatario.banco order by valor desc) as row_number
    from transacoes_pix
    ) select
        banco,
        valor
      from cte_base_window
      where row_number in (1,2)
    """
).show()

+--------+--------+
|   banco|   valor|
+--------+--------+
|     BTG|99946.78|
|     BTG| 99913.9|
|Bradesco|99910.87|
|Bradesco|99887.88|
|      C6|99980.03|
|      C6|99964.99|
|   Caixa|99969.06|
|   Caixa|99933.09|
|    Itau|99999.54|
|    Itau|99951.02|
|  Nubank|99935.45|
|  Nubank|99914.35|
|      XP|99961.28|
|      XP|99934.01|
+--------+--------+



- Agora vamos usar uma query e um dataframe spark na mesma função.

In [30]:
df_row_number = spark.sql("""
  select
    destinatario.banco,
    valor,
    row_number() over (partition by destinatario.banco order by valor desc) as row_number
  from transacoes_pix
"""
)

In [31]:
df_row_number.show()

+-----+--------+----------+
|banco|   valor|row_number|
+-----+--------+----------+
|  BTG|99946.78|         1|
|  BTG| 99913.9|         2|
|  BTG|99873.58|         3|
|  BTG|99865.12|         4|
|  BTG|99840.68|         5|
|  BTG|99832.08|         6|
|  BTG| 99829.9|         7|
|  BTG|99814.23|         8|
|  BTG|99813.42|         9|
|  BTG|99785.91|        10|
|  BTG|99754.22|        11|
|  BTG|99750.69|        12|
|  BTG|99724.27|        13|
|  BTG|99711.66|        14|
|  BTG|99708.06|        15|
|  BTG|99684.07|        16|
|  BTG|99677.36|        17|
|  BTG|99648.38|        18|
|  BTG|99635.23|        19|
|  BTG|99628.33|        20|
+-----+--------+----------+
only showing top 20 rows



In [33]:
from pyspark.sql.functions import col
df_row_number.filter(col('row_number').isin([1,2])).show()

+--------+--------+----------+
|   banco|   valor|row_number|
+--------+--------+----------+
|     BTG|99946.78|         1|
|     BTG| 99913.9|         2|
|Bradesco|99910.87|         1|
|Bradesco|99887.88|         2|
|      C6|99980.03|         1|
|      C6|99964.99|         2|
|   Caixa|99969.06|         1|
|   Caixa|99933.09|         2|
|    Itau|99999.54|         1|
|    Itau|99951.02|         2|
|  Nubank|99935.45|         1|
|  Nubank|99914.35|         2|
|      XP|99961.28|         1|
|      XP|99934.01|         2|
+--------+--------+----------+

