<a href="https://colab.research.google.com/github/DataEtnos/Apache_Spark/blob/main/SparkSQL.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

##**SITE PARA CHAVE**

**NGROK:** https://ngrok.com


# Instalando o Spark

In [1]:

!pip install pyspark
!pip install flask-ngrok
!pip install pyngrok

Collecting flask-ngrok
  Downloading flask_ngrok-0.0.25-py3-none-any.whl.metadata (1.8 kB)
Downloading flask_ngrok-0.0.25-py3-none-any.whl (3.1 kB)
Installing collected packages: flask-ngrok
Successfully installed flask-ngrok-0.0.25
Collecting pyngrok
  Downloading pyngrok-7.2.1-py3-none-any.whl.metadata (8.3 kB)
Downloading pyngrok-7.2.1-py3-none-any.whl (22 kB)
Installing collected packages: pyngrok
Successfully installed pyngrok-7.2.1


In [2]:
!ngrok config add-authtoken '2pp0qW25AGqGYK6xftvt4Q4XXLv_JsvPyKtvYupn7z65UHQ5'

Authtoken saved to configuration file: /root/.config/ngrok/ngrok.yml


In [3]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


# Iniciar Sessão Spark

In [4]:
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf

# ConfigureSparkUI
conf = SparkConf().set('spark.ui.port', '4050')
sc = SparkContext(conf=conf)
sc.stop()

spark = (
    SparkSession.builder                  # Método da classe que constrói a sessão spark
      .appName("Meu Primeiro App Spark")  # Nome do App Spark
      .getOrCreate())                     # Verifica se há uma sessão ativa, e se não há, cria uma nova sessão


In [5]:
!curl -s http://localhost:4040/api/tunnels

In [28]:
!nrgrok authtoken 2pp0qW25AGqGYK6xftvt4Q4XXLv_JsvPyKtvYupn7z65UHQ5
from flask import Flask


app = Flask(__name__)

@app.route('/')
def home():
  return"hello , this is your flask app running on google colab with ngrok"

from pyngrok import ngrok
public_url = ngrok.connect(4050)
print('link:', public_url)
app.run()

/bin/bash: line 1: nrgrok: command not found
link: NgrokTunnel: "https://93b7-34-139-207-112.ngrok-free.app" -> "http://localhost:4050"
 * Serving Flask app '__main__'
 * Debug mode: off


 * Running on http://127.0.0.1:5000
INFO:werkzeug:[33mPress CTRL+C to quit[0m


# SparkSQL

In [7]:
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType, StringType, TimestampType

caminho_csv = "./base_de_dados.csv"

schema_base_pix = StructType([
    StructField('id', IntegerType()),
    StructField('valor', DoubleType()),
    StructField('parte_debitada_nome', StringType()),
    StructField('parte_debitada_conta', StringType()),
    StructField('parte_debitada_banco', StringType()),
    StructField('parte_creditada_nome', StringType()),
    StructField('parte_creditada_conta', StringType()),
    StructField('parte_creditada_banco', StringType()),
    StructField('chave_pix_tipo', StringType()),
    StructField('chave_pix_valor', StringType()),
    StructField('data_transacao', TimestampType())
])

df = spark.read.csv(
    path=caminho_csv,
    header=True,
    sep=";",
    schema=schema_base_pix,
    timestampFormat="dd/MM/yyyy HH:mm"
)
spark.read.csv(
    path=caminho_csv,
    header=True,
    sep=";",
    schema=schema_base_pix,
    timestampFormat="dd/MM/yyyy HH:mm"
).createOrReplaceTempView("base_pix") #criando  uma tabela temporaria

In [8]:
spark.sql("select * from base_pix limit 3").show()

+---+-----+--------------------+--------------------+--------------------+--------------------+---------------------+---------------------+--------------+---------------+-------------------+
| id|valor| parte_debitada_nome|parte_debitada_conta|parte_debitada_banco|parte_creditada_nome|parte_creditada_conta|parte_creditada_banco|chave_pix_tipo|chave_pix_valor|     data_transacao|
+---+-----+--------------------+--------------------+--------------------+--------------------+---------------------+---------------------+--------------+---------------+-------------------+
|  1| 9.93|Dra. Ana Carolina...|            79470453|              Nubank|       Maysa da Cruz|             67162333|                 Itau|           cpf|     8439752610|2022-02-18 13:28:00|
|  2|15.38|        Ana Caldeira|            19689668|                Itau|        Evelyn Sales|             60005091|             Bradesco|           cpf|    27145380617|2022-04-08 01:47:00|
|  3|57.58|    Arthur Goncalves|            1

Porém, como saber se a manipulação de dados com Dataframes não é mais rápida que SQL?

Para isso vamos propor um group by das duas maneiras e verificar qual é o plano de execução que o spark cria.


#verificando velocidade de groupby com spark.sql e um dataframe do spark

In [13]:
group_sql = spark.sql("select chave_pix_tipo, count(1) from base_pix group by chave_pix_tipo")

In [14]:
group_dataframe = df.groupBy('chave_pix_tipo').count()

In [16]:
print("SQL Group")
group_sql.explain()

print("DataFrame Group")
group_dataframe.explain()

SQL Group
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[chave_pix_tipo#30], functions=[count(1)])
   +- Exchange hashpartitioning(chave_pix_tipo#30, 200), ENSURE_REQUIREMENTS, [plan_id=26]
      +- HashAggregate(keys=[chave_pix_tipo#30], functions=[partial_count(1)])
         +- FileScan csv [chave_pix_tipo#30] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/content/base_de_dados.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<chave_pix_tipo:string>


DataFrame Group
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[chave_pix_tipo#8], functions=[count(1)])
   +- Exchange hashpartitioning(chave_pix_tipo#8, 200), ENSURE_REQUIREMENTS, [plan_id=39]
      +- HashAggregate(keys=[chave_pix_tipo#8], functions=[partial_count(1)])
         +- FileScan csv [chave_pix_tipo#8] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/content/base_de

In [17]:
spark.sql(
  """
    select chave_pix_tipo, sum(valor)
    from base_pix
    group by 1
  """
).show()

+--------------+------------------+
|chave_pix_tipo|        sum(valor)|
+--------------+------------------+
|       celular|         207778.46|
|         email|499009.38000000006|
|           cpf| 659513.3499999997|
+--------------+------------------+



In [18]:
spark.sql(
  """
    select chave_pix_tipo, round(sum(valor), 2)
    from base_pix
    group by 1
  """
).show()

+--------------+--------------------+
|chave_pix_tipo|round(sum(valor), 2)|
+--------------+--------------------+
|       celular|           207778.46|
|         email|           499009.38|
|           cpf|           659513.35|
+--------------+--------------------+



In [19]:
spark.sql(
  """
    select chave_pix_tipo, round(sum(valor), 2) as sum_valor
    from base_pix
    group by 1
  """
).show()

+--------------+---------+
|chave_pix_tipo|sum_valor|
+--------------+---------+
|       celular|207778.46|
|         email|499009.38|
|           cpf|659513.35|
+--------------+---------+



In [20]:
spark.sql(
  """
    select chave_pix_tipo, count(*) as count
    from base_pix
    group by 1
  """
).show()

+--------------+-----+
|chave_pix_tipo|count|
+--------------+-----+
|       celular|   22|
|         email|   29|
|           cpf|   49|
+--------------+-----+



PARA AQUI

In [32]:
spark.sql(
    """
    select * from base_pix
    where valor > 1000
    """
).show()


+---+--------+--------------------+--------------------+--------------------+--------------------+---------------------+---------------------+--------------+---------------+-------------------+
| id|   valor| parte_debitada_nome|parte_debitada_conta|parte_debitada_banco|parte_creditada_nome|parte_creditada_conta|parte_creditada_banco|chave_pix_tipo|chave_pix_valor|     data_transacao|
+---+--------+--------------------+--------------------+--------------------+--------------------+---------------------+---------------------+--------------+---------------+-------------------+
|  4|53705.13|  Ana Julia Caldeira|            22834741|                Itau|   Ana Livia Almeida|             44695116|               Nubank|           cpf|    26590384142|2022-01-15 18:06:00|
|  5|25299.69|  Srta. Nicole Pinto|             3715882|              Nubank|Srta. Ana Laura d...|             21409465|               Nubank|           cpf|    73486105280|2022-05-13 11:04:00|
|  6| 7165.06|   Gabriela Ferr

In [36]:
spark.sql(
    """
    SELECT *
    FROM (
        SELECT
            parte_creditada_banco,
            valor,
            ROW_NUMBER() OVER (PARTITION BY parte_creditada_banco ORDER BY valor DESC) AS row_number
        FROM base_pix
    ) subquery
    WHERE row_number in (1,2)
    """
).show()


+---------------------+--------+----------+
|parte_creditada_banco|   valor|row_number|
+---------------------+--------+----------+
|                  BTG|95977.62|         1|
|                  BTG|80083.34|         2|
|             Bradesco|81977.98|         1|
|             Bradesco|58083.62|         2|
|                 Itau|94586.45|         1|
|                 Itau| 78559.4|         2|
|               Nubank|94736.79|         1|
|               Nubank|60139.23|         2|
+---------------------+--------+----------+



CTE stands for common table expression. A CTE allows you to define a temporary named result set that available temporarily in the execution scope of a statement such as SELECT, INSERT, UPDATE, DELETE, or MERGE

In [22]:
spark.sql(
  """
  with base_pix_row_number as(
    select
      parte_creditada_banco,
      data_transacao,
      row_number() over (partition by parte_creditada_banco order by data_transacao desc) as row_number
    from base_pix
  ) select
      parte_creditada_banco,
      data_transacao
    from base_pix_row_number
    where row_number = 1
    order by data_transacao desc
  """
).show()

+---------------------+-------------------+
|parte_creditada_banco|     data_transacao|
+---------------------+-------------------+
|                 Itau|2022-12-15 01:29:00|
|                  BTG|2022-12-08 23:47:00|
|               Nubank|2022-11-19 19:25:00|
|             Bradesco|2022-08-07 17:01:00|
+---------------------+-------------------+



Porém, não precisa ficar limitado somente a execução de queries SQL.

Podemos pegar o resultado de uma query e retorná-la para um DataFrame!

In [37]:
df_window = spark.sql(
  """
  with base_pix_row_number as(
    select
      parte_creditada_banco,
      data_transacao,
      row_number() over (partition by parte_creditada_banco order by data_transacao desc) as row_number
    from base_pix
  ) select
      parte_creditada_banco,
      data_transacao
    from base_pix_row_number
    where row_number = 1
    order by data_transacao desc
  """
)

In [38]:
df_window.show()

+---------------------+-------------------+
|parte_creditada_banco|     data_transacao|
+---------------------+-------------------+
|                 Itau|2022-12-15 01:29:00|
|                  BTG|2022-12-08 23:47:00|
|               Nubank|2022-11-19 19:25:00|
|             Bradesco|2022-08-07 17:01:00|
+---------------------+-------------------+



This opens up the true power of Spark. We can treat selectExpr as a simple way to build up
complex expressions that create new DataFrames. In fact, we can add any valid non-aggregating
SQL statement, and as long as the columns resolve, it will be valid!

In [25]:
from pyspark.sql.functions import col

In [35]:
df.selectExpr(
    "date(data_transacao) as date_data_transacao",
    "valor"
).groupBy('date_data_transacao').count().orderBy(col('count').desc()).show()

+-------------------+-----+
|date_data_transacao|count|
+-------------------+-----+
|         2022-02-26|    2|
|         2022-03-02|    2|
|         2021-06-22|    1|
|         2022-11-29|    1|
|         2021-07-20|    1|
|         2021-02-15|    1|
|         2021-03-22|    1|
|         2022-02-16|    1|
|         2021-04-25|    1|
|         2021-03-07|    1|
|         2022-01-15|    1|
|         2022-01-09|    1|
|         2022-05-23|    1|
|         2022-02-01|    1|
|         2021-07-11|    1|
|         2022-04-12|    1|
|         2022-06-05|    1|
|         2021-09-06|    1|
|         2021-06-20|    1|
|         2021-12-14|    1|
+-------------------+-----+
only showing top 20 rows



Exercício
1. Vimos que há dois dias em que houve duas transações pix. Descubra são os ids dessas transações.

2. Vimos que há dois dias em que houve duas transações pix. Descubra quais chaves pix foram utilizadas para realizar as transações.

In [27]:
lista_datas = spark.sql(
  """
  select
    date(data_transacao) as date_data_transacao
  from base_pix
  group by 1
  having count(*) > 1
  """
).collect()[0][0]
lista_datas

datetime.date(2022, 2, 26)