<a href="https://colab.research.google.com/github/Carlos-Pessin/SparkSQL_Training/blob/main/SparkSQL.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Inicializando

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.2.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m4.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.2-py2.py3-none-any.whl size=317812365 sha256=0c85b540fe62669dbe55e1d29eda22a5c8d83e45317d8654e75d27c6437c8e62
  Stored in directory: /root/.cache/pip/wheels/34/34/bd/03944534c44b677cd5859f248090daa9fb27b3c8f8e5f49574
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.2


In [None]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
      .config('spark.ui.port', '4050')
      .appName("SparkUI Introdução")
      .getOrCreate()
)

In [None]:
from pyspark.sql.functions import col, filter
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType, StringType, TimestampType

In [None]:

schema_remetente_destinatario = StructType([
    StructField('nome', StringType()),
    StructField('banco', StringType()),
    StructField('tipo', StringType()),
])


schema_base_pix = StructType([
    StructField('id_transacao', IntegerType()),
    StructField('valor', DoubleType()),
    StructField('remetente', schema_remetente_destinatario),
    StructField('destinatario', schema_remetente_destinatario),
    StructField('transaction_date', TimestampType()),
    StructField('chave_pix', StringType()),
    StructField('fraude', IntegerType())
])

In [None]:
df = spark.read.json(
    '/content/drive/MyDrive/Colab Notebooks/Formação em dados/Spark/Datasets/pix_transactions.json',
    schema=schema_base_pix,
    timestampFormat="yyyy-MM-dd"
)

In [None]:
spark.read.json(
    '/content/drive/MyDrive/Colab Notebooks/Formação em dados/Spark/Datasets/pix_transactions.json',
    schema=schema_base_pix,
    timestampFormat="yyyy-MM-dd"
).createOrReplaceTempView('pix_transactions')

# Basic Query's

In [None]:
spark.sql(
    """
    SELECT
      transaction_date,
      valor,
      destinatario.nome AS nome_destinatario,
      remetente.nome AS nome_remetente
    FROM pix_transactions
    LIMIT 10
    """
).show()

+-------------------+-------+--------------------+------------------+
|   transaction_date|  valor|   nome_destinatario|    nome_remetente|
+-------------------+-------+--------------------+------------------+
|2022-03-19 00:00:00|   7.05|       Gabriel Cunha|Jonathan Gonsalves|
|2021-01-26 00:00:00|  37.28|         Diego Souza|Jonathan Gonsalves|
|2022-05-31 00:00:00| 282.73|        Nicole Nunes|Jonathan Gonsalves|
|2022-07-04 00:00:00|8447.92|Maria Fernanda Ca...|Jonathan Gonsalves|
|2021-09-11 00:00:00|  58.51|        Isabel Silva|Jonathan Gonsalves|
|2022-02-11 00:00:00|6655.12|    Anthony Carvalho|Jonathan Gonsalves|
|2022-05-10 00:00:00|9912.25|      Eloah Monteiro|Jonathan Gonsalves|
|2022-08-28 00:00:00|8212.91|        Sophie Rocha|Jonathan Gonsalves|
|2022-03-23 00:00:00|  91.71|      Pietro Ribeiro|Jonathan Gonsalves|
|2021-09-18 00:00:00|  44.29|      Eloah Teixeira|Jonathan Gonsalves|
+-------------------+-------+--------------------+------------------+



In [None]:
group_sql = spark.sql(
    """
    SELECT
      chave_pix,
      count(*) AS qtd_transacoes
    FROM pix_transactions
    GROUP BY chave_pix
    ORDER BY qtd_transacoes DESC
    """
)
group_sql.show()

+---------+--------------+
|chave_pix|qtd_transacoes|
+---------+--------------+
|      cpf|         25179|
|aleatoria|         25045|
|    email|         24935|
|  celular|         24841|
+---------+--------------+



In [None]:
group_df = df.groupBy('chave_pix').count().orderBy('count', ascending=False)
group_df.show()

+---------+-----+
|chave_pix|count|
+---------+-----+
|      cpf|25179|
|aleatoria|25045|
|    email|24935|
|  celular|24841|
+---------+-----+



Evaluating processing structure

In [None]:
group_sql.explain(),
group_df.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [qtd_transacoes#216L DESC NULLS LAST], true, 0
   +- Exchange rangepartitioning(qtd_transacoes#216L DESC NULLS LAST, 200), ENSURE_REQUIREMENTS, [plan_id=280]
      +- HashAggregate(keys=[chave_pix#70], functions=[count(1)])
         +- Exchange hashpartitioning(chave_pix#70, 200), ENSURE_REQUIREMENTS, [plan_id=277]
            +- HashAggregate(keys=[chave_pix#70], functions=[partial_count(1)])
               +- FileScan json [chave_pix#70] Batched: false, DataFilters: [], Format: JSON, Location: InMemoryFileIndex(1 paths)[file:/content/drive/MyDrive/Colab Notebooks/Formação em dados/Spark/..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<chave_pix:string>


== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [count#254L DESC NULLS LAST], true, 0
   +- Exchange rangepartitioning(count#254L DESC NULLS LAST, 200), ENSURE_REQUIREMENTS, [plan_id=300]
      +- HashAggregate(keys=[chave_pix#237], func

Same processing

In [None]:
spark.sql(
    """
    SELECT
      chave_pix,
      round(avg(valor), 2) AS valor_medio
    FROM pix_transactions
    GROUP BY chave_pix
    ORDER BY valor_medio DESC
    """
).show()

+---------+-----------+
|chave_pix|valor_medio|
+---------+-----------+
|aleatoria|   12217.23|
|  celular|   12152.68|
|      cpf|   11946.07|
|    email|   11868.02|
+---------+-----------+



# CTE

In [None]:
spark.sql(
    """
    SELECT
      destinatario.banco,
      valor,
      row_number() OVER (PARTITION BY destinatario.banco ORDER BY valor DESC) AS row_number
    FROM pix_transactions
    LIMIT 10
    """
).show()

+-----+--------+----------+
|banco|   valor|row_number|
+-----+--------+----------+
|  BTG|99946.78|         1|
|  BTG| 99913.9|         2|
|  BTG|99873.58|         3|
|  BTG|99865.12|         4|
|  BTG|99840.68|         5|
|  BTG|99832.08|         6|
|  BTG| 99829.9|         7|
|  BTG|99814.23|         8|
|  BTG|99813.42|         9|
|  BTG|99785.91|        10|
+-----+--------+----------+



2 Biggest transactions per bank

In [None]:
spark.sql(
    """
    WITH cte_base_window(
    SELECT
      destinatario.banco AS dest_banco,
      valor,
      row_number() OVER (PARTITION BY destinatario.banco ORDER BY valor DESC) AS row_number
    FROM pix_transactions
    ) SELECT
        dest_banco,
        valor
      FROM cte_base_window
      WHERE row_number in (1,2)

    """
).show()

+----------+--------+
|dest_banco|   valor|
+----------+--------+
|       BTG|99946.78|
|       BTG| 99913.9|
|  Bradesco|99910.87|
|  Bradesco|99887.88|
|        C6|99980.03|
|        C6|99964.99|
|     Caixa|99969.06|
|     Caixa|99933.09|
|      Itau|99999.54|
|      Itau|99951.02|
|    Nubank|99935.45|
|    Nubank|99914.35|
|        XP|99961.28|
|        XP|99934.01|
+----------+--------+



In [None]:
df_row_number = spark.sql(
    """
    SELECT
      destinatario.banco,
      valor,
      row_number() OVER (PARTITION BY destinatario.banco ORDER BY valor DESC) AS row_number
    FROM pix_transactions
    """
)
df_row_number.show()

+-----+--------+----------+
|banco|   valor|row_number|
+-----+--------+----------+
|  BTG|99946.78|         1|
|  BTG| 99913.9|         2|
|  BTG|99873.58|         3|
|  BTG|99865.12|         4|
|  BTG|99840.68|         5|
|  BTG|99832.08|         6|
|  BTG| 99829.9|         7|
|  BTG|99814.23|         8|
|  BTG|99813.42|         9|
|  BTG|99785.91|        10|
|  BTG|99754.22|        11|
|  BTG|99750.69|        12|
|  BTG|99724.27|        13|
|  BTG|99711.66|        14|
|  BTG|99708.06|        15|
|  BTG|99684.07|        16|
|  BTG|99677.36|        17|
|  BTG|99648.38|        18|
|  BTG|99635.23|        19|
|  BTG|99628.33|        20|
+-----+--------+----------+
only showing top 20 rows



It's possible to interact with both sql and pyspark functions

In [None]:
df_row_number.filter(col('row_number').isin(1,2)).show()

+--------+--------+----------+
|   banco|   valor|row_number|
+--------+--------+----------+
|     BTG|99946.78|         1|
|     BTG| 99913.9|         2|
|Bradesco|99910.87|         1|
|Bradesco|99887.88|         2|
|      C6|99980.03|         1|
|      C6|99964.99|         2|
|   Caixa|99969.06|         1|
|   Caixa|99933.09|         2|
|    Itau|99999.54|         1|
|    Itau|99951.02|         2|
|  Nubank|99935.45|         1|
|  Nubank|99914.35|         2|
|      XP|99961.28|         1|
|      XP|99934.01|         2|
+--------+--------+----------+

