# Instalando o Spark

In [1]:
!pip install pyspark #==3.3.1

ERROR: Invalid requirement: '#==3.3.1'

[notice] A new release of pip is available: 23.1.2 -> 24.0
[notice] To update, run: python.exe -m pip install --upgrade pip


In [2]:
!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip

'wget' is not recognized as an internal or external command,
operable program or batch file.
'unzip' is not recognized as an internal or external command,
operable program or batch file.


# Iniciar Sessão Spark

In [3]:
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf

# ConfigureSparkUI
conf = SparkConf().set('spark.ui.port', '4050')
sc = SparkContext(conf=conf)
sc.stop()

spark = (
    SparkSession.builder                  # Método da classe que constrói a sessão spark
      .appName("Meu Primeiro App Spark")  # Nome do App Spark
      .getOrCreate())                     # Verifica se há uma sessão ativa, e se não há, cria uma nova sessão


In [4]:
!curl -s http://localhost:4040/api/tunnels 

<html>
<head>
<meta http-equiv="Content-Type" content="text/html;charset=ISO-8859-1"/>
<title>Error 404 Not Found</title>
</head>
<body><h2>HTTP ERROR 404 Not Found</h2>
<table>
<tr><th>URI:</th><td>/api/tunnels</td></tr>
<tr><th>STATUS:</th><td>404</td></tr>
<tr><th>MESSAGE:</th><td>Not Found</td></tr>
<tr><th>SERVLET:</th><td>org.glassfish.jersey.servlet.ServletContainer-42a88a12</td></tr>
</table>

</body>
</html>


# SparkSQL

In [5]:
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType, StringType, TimestampType

caminho_csv = "./base_de_dados.csv"

schema_base_pix = StructType([
    StructField('id', IntegerType()),
    StructField('valor', DoubleType()),
    StructField('parte_debitada_nome', StringType()),
    StructField('parte_debitada_conta', StringType()),
    StructField('parte_debitada_banco', StringType()),
    StructField('parte_creditada_nome', StringType()),
    StructField('parte_creditada_conta', StringType()),
    StructField('parte_creditada_banco', StringType()),
    StructField('chave_pix_tipo', StringType()),
    StructField('chave_pix_valor', StringType()),
    StructField('data_transacao', TimestampType())
])

df = spark.read.csv(
    path=caminho_csv,
    header=True,
    sep=";",
    schema=schema_base_pix,
    timestampFormat="dd/MM/yyyy HH:mm"
)
spark.read.csv(
    path=caminho_csv,
    header=True,
    sep=";",
    schema=schema_base_pix,
    timestampFormat="dd/MM/yyyy HH:mm"
).createOrReplaceTempView("base_pix")

In [6]:
spark.sql("select * from base_pix limit 5").show()

+---+--------+--------------------+--------------------+--------------------+--------------------+---------------------+---------------------+--------------+---------------+-------------------+
| id|   valor| parte_debitada_nome|parte_debitada_conta|parte_debitada_banco|parte_creditada_nome|parte_creditada_conta|parte_creditada_banco|chave_pix_tipo|chave_pix_valor|     data_transacao|
+---+--------+--------------------+--------------------+--------------------+--------------------+---------------------+---------------------+--------------+---------------+-------------------+
|  1|    9.93|Dra. Ana Carolina...|            79470453|              Nubank|       Maysa da Cruz|             67162333|                 Itau|           cpf|     8439752610|2022-02-18 13:28:00|
|  2|   15.38|        Ana Caldeira|            19689668|                Itau|        Evelyn Sales|             60005091|             Bradesco|           cpf|    27145380617|2022-04-08 01:47:00|
|  3|   57.58|    Arthur Gonca

Porém, como saber se a manipulação de dados com Dataframes não é mais rápida que SQL?

Para isso vamos propor um group by das duas maneiras e verificar qual é o plano de execução que o spark cria. 


In [7]:
group_sql = spark.sql("select chave_pix_tipo, count(1) from base_pix group by chave_pix_tipo")

In [8]:
group_dataframe = df.groupBy('chave_pix_tipo').count()

In [9]:
print("SQL Group")
group_sql.explain()

print("DataFrame Group")
group_dataframe.explain()

SQL Group
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[chave_pix_tipo#30], functions=[count(1)])
   +- Exchange hashpartitioning(chave_pix_tipo#30, 200), ENSURE_REQUIREMENTS, [plan_id=26]
      +- HashAggregate(keys=[chave_pix_tipo#30], functions=[partial_count(1)])
         +- FileScan csv [chave_pix_tipo#30] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/C:/Users/dsant/Downloads/Estudos DNC/spark2/SparkSQL/base_de_dad..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<chave_pix_tipo:string>


DataFrame Group
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[chave_pix_tipo#8], functions=[count(1)])
   +- Exchange hashpartitioning(chave_pix_tipo#8, 200), ENSURE_REQUIREMENTS, [plan_id=39]
      +- HashAggregate(keys=[chave_pix_tipo#8], functions=[partial_count(1)])
         +- FileScan csv [chave_pix_tipo#8] Batched: false, DataFilters: [], Format: CSV, Location: InMemor

In [10]:
spark.sql(
  """
    select chave_pix_tipo, sum(valor) 
    from base_pix 
    group by 1
  """
).show()

+--------------+------------------+
|chave_pix_tipo|        sum(valor)|
+--------------+------------------+
|       celular|         207778.46|
|         email|499009.38000000006|
|           cpf| 659513.3499999997|
+--------------+------------------+



In [11]:
spark.sql(
  """
    select chave_pix_tipo, round(sum(valor), 2)
    from base_pix 
    group by 1
  """
).show()

+--------------+--------------------+
|chave_pix_tipo|round(sum(valor), 2)|
+--------------+--------------------+
|       celular|           207778.46|
|         email|           499009.38|
|           cpf|           659513.35|
+--------------+--------------------+



In [12]:
spark.sql(
  """
    select chave_pix_tipo, round(sum(valor), 2) as sum_valor
    from base_pix 
    group by 1
  """
).show()

+--------------+---------+
|chave_pix_tipo|sum_valor|
+--------------+---------+
|       celular|207778.46|
|         email|499009.38|
|           cpf|659513.35|
+--------------+---------+



In [13]:
spark.sql(
  """
    select chave_pix_tipo, count(*) as count
    from base_pix 
    group by 1
  """
).show()

+--------------+-----+
|chave_pix_tipo|count|
+--------------+-----+
|       celular|   22|
|         email|   29|
|           cpf|   49|
+--------------+-----+



PARA AQUI

In [14]:
spark.sql(
    """
      select
        destinatario.banco,
        valor,
        row_number() over (partition by destinatario.banco order by valor desc) as row_number
      from transacoes_pix
      limit 10
).show()

SyntaxError: EOF while scanning triple-quoted string literal (1015198927.py, line 9)

CTE stands for common table expression. A CTE allows you to define a temporary named result set that available temporarily in the execution scope of a statement such as SELECT, INSERT, UPDATE, DELETE, or MERGE

In [15]:
spark.sql(
  """
  with base_pix_row_number as(
    select
      parte_creditada_banco, 
      data_transacao,
      row_number() over (partition by parte_creditada_banco order by data_transacao desc) as row_number
    from base_pix
  ) select
      parte_creditada_banco,
      data_transacao
    from base_pix_row_number
    where row_number = 1
    order by data_transacao desc
  """
).show()

+---------------------+-------------------+
|parte_creditada_banco|     data_transacao|
+---------------------+-------------------+
|                 Itau|2022-12-15 01:29:00|
|                  BTG|2022-12-08 23:47:00|
|               Nubank|2022-11-19 19:25:00|
|             Bradesco|2022-08-07 17:01:00|
+---------------------+-------------------+



Porém, não precisa ficar limitado somente a execução de queries SQL. 

Podemos pegar o resultado de uma query e retorná-la para um DataFrame!

In [16]:
df_window = spark.sql(
  """
  with base_pix_row_number as(
    select
      parte_creditada_banco, 
      data_transacao,
      row_number() over (partition by parte_creditada_banco order by data_transacao desc) as row_number
    from base_pix
  ) select
      parte_creditada_banco,
      data_transacao
    from base_pix_row_number
    where row_number = 1
    order by data_transacao desc
  """
)

In [17]:
df_window.show()

+---------------------+-------------------+
|parte_creditada_banco|     data_transacao|
+---------------------+-------------------+
|                 Itau|2022-12-15 01:29:00|
|                  BTG|2022-12-08 23:47:00|
|               Nubank|2022-11-19 19:25:00|
|             Bradesco|2022-08-07 17:01:00|
+---------------------+-------------------+



This opens up the true power of Spark. We can treat selectExpr as a simple way to build up
complex expressions that create new DataFrames. In fact, we can add any valid non-aggregating
SQL statement, and as long as the columns resolve, it will be valid!

In [18]:
from pyspark.sql.functions import col

In [19]:
df.selectExpr(
    "date(data_transacao) as date_data_transacao",
    "va"
).groupBy('date_data_transacao').count().orderBy(col('count').desc()).show()

AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `va` cannot be resolved. Did you mean one of the following? [`id`, `valor`, `chave_pix_tipo`, `chave_pix_valor`, `data_transacao`].; line 1 pos 0;
'Project [cast(data_transacao#10 as date) AS date_data_transacao#238, 'va]
+- Relation [id#0,valor#1,parte_debitada_nome#2,parte_debitada_conta#3,parte_debitada_banco#4,parte_creditada_nome#5,parte_creditada_conta#6,parte_creditada_banco#7,chave_pix_tipo#8,chave_pix_valor#9,data_transacao#10] csv


Exercício
1. Vimos que há dois dias em que houve duas transações pix. Descubra são os ids dessas transações.

2. Vimos que há dois dias em que houve duas transações pix. Descubra quais chaves pix foram utilizadas para realizar as transações. 

In [20]:
lista_datas = spark.sql(
  """
  select
    date(data_transacao) as date_data_transacao
  from base_pix
  group by 1
  having count(*) > 1
  """
).collect()[0][0]
lista_datas

datetime.date(2022, 2, 26)