# Instalando o Spark

In [35]:
!pip install pyspark #==3.3.1



In [36]:
!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip

--2024-12-29 18:01:07--  https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
Resolving bin.equinox.io (bin.equinox.io)... 99.83.220.108, 13.248.244.96, 75.2.60.68, ...
Connecting to bin.equinox.io (bin.equinox.io)|99.83.220.108|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 13921656 (13M) [application/octet-stream]
Saving to: ‘ngrok-stable-linux-amd64.zip.3’


2024-12-29 18:01:08 (21.2 MB/s) - ‘ngrok-stable-linux-amd64.zip.3’ saved [13921656/13921656]

Archive:  ngrok-stable-linux-amd64.zip
replace ngrok? [y]es, [n]o, [A]ll, [N]one, [r]ename: 

# SparkSQL

In [37]:
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType, StringType, TimestampType

caminho_csv = "./base_de_dados.csv"

schema_base_pix = StructType([
    StructField('id', IntegerType()),
    StructField('valor', DoubleType()),
    StructField('parte_debitada_nome', StringType()),
    StructField('parte_debitada_conta', StringType()),
    StructField('parte_debitada_banco', StringType()),
    StructField('parte_creditada_nome', StringType()),
    StructField('parte_creditada_conta', StringType()),
    StructField('parte_creditada_banco', StringType()),
    StructField('chave_pix_tipo', StringType()),
    StructField('chave_pix_valor', StringType()),
    StructField('data_transacao', TimestampType())
])

df = spark.read.csv(
    path=caminho_csv,
    header=True,
    sep=";",
    schema=schema_base_pix,
    timestampFormat="dd/MM/yyyy HH:mm"
)
spark.read.csv(
    path=caminho_csv,
    header=True,
    sep=";",
    schema=schema_base_pix,
    timestampFormat="dd/MM/yyyy HH:mm"
).createOrReplaceTempView("base_pix")

In [38]:
spark.sql("select * from base_pix limit 3").show()

+---+-----+--------------------+--------------------+--------------------+--------------------+---------------------+---------------------+--------------+---------------+-------------------+
| id|valor| parte_debitada_nome|parte_debitada_conta|parte_debitada_banco|parte_creditada_nome|parte_creditada_conta|parte_creditada_banco|chave_pix_tipo|chave_pix_valor|     data_transacao|
+---+-----+--------------------+--------------------+--------------------+--------------------+---------------------+---------------------+--------------+---------------+-------------------+
|  1| 9.93|Dra. Ana Carolina...|            79470453|              Nubank|       Maysa da Cruz|             67162333|                 Itau|           cpf|     8439752610|2022-02-18 13:28:00|
|  2|15.38|        Ana Caldeira|            19689668|                Itau|        Evelyn Sales|             60005091|             Bradesco|           cpf|    27145380617|2022-04-08 01:47:00|
|  3|57.58|    Arthur Goncalves|            1

In [39]:
group_sql = spark.sql("select chave_pix_tipo, count(1) from base_pix group by chave_pix_tipo")

In [40]:
group_dataframe = df.groupBy('chave_pix_tipo').count()

In [41]:
print("SQL Group")
group_sql.explain()

print("DataFrame Group")
group_dataframe.explain()

SQL Group
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[chave_pix_tipo#277], functions=[count(1)])
   +- Exchange hashpartitioning(chave_pix_tipo#277, 200), ENSURE_REQUIREMENTS, [plan_id=445]
      +- HashAggregate(keys=[chave_pix_tipo#277], functions=[partial_count(1)])
         +- FileScan csv [chave_pix_tipo#277] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/content/base_de_dados.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<chave_pix_tipo:string>


DataFrame Group
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[chave_pix_tipo#255], functions=[count(1)])
   +- Exchange hashpartitioning(chave_pix_tipo#255, 200), ENSURE_REQUIREMENTS, [plan_id=458]
      +- HashAggregate(keys=[chave_pix_tipo#255], functions=[partial_count(1)])
         +- FileScan csv [chave_pix_tipo#255] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/c

In [42]:
spark.sql(
  """
    select chave_pix_tipo, sum(valor)
    from base_pix
    group by 1
  """
).show()

+--------------+------------------+
|chave_pix_tipo|        sum(valor)|
+--------------+------------------+
|       celular|         207778.46|
|         email|499009.38000000006|
|           cpf| 659513.3499999997|
+--------------+------------------+



In [43]:
spark.sql(
  """
    select chave_pix_tipo, round(sum(valor), 2)
    from base_pix
    group by 1
  """
).show()

+--------------+--------------------+
|chave_pix_tipo|round(sum(valor), 2)|
+--------------+--------------------+
|       celular|           207778.46|
|         email|           499009.38|
|           cpf|           659513.35|
+--------------+--------------------+



In [44]:
spark.sql(
  """
    select chave_pix_tipo, round(sum(valor), 2) as sum_valor
    from base_pix
    group by 1
  """
).show()

+--------------+---------+
|chave_pix_tipo|sum_valor|
+--------------+---------+
|       celular|207778.46|
|         email|499009.38|
|           cpf|659513.35|
+--------------+---------+



In [45]:
spark.sql(
  """
    select chave_pix_tipo, count(*) as count
    from base_pix
    group by 1
  """
).show()

+--------------+-----+
|chave_pix_tipo|count|
+--------------+-----+
|       celular|   22|
|         email|   29|
|           cpf|   49|
+--------------+-----+



In [46]:
spark.sql(
  """
  with base_pix_row_number as(
    select
      parte_creditada_banco,
      data_transacao,
      row_number() over (partition by parte_creditada_banco order by data_transacao desc) as row_number
    from base_pix
  ) select
      parte_creditada_banco,
      data_transacao
    from base_pix_row_number
    where row_number = 1
    order by data_transacao desc
  """
).show()

+---------------------+-------------------+
|parte_creditada_banco|     data_transacao|
+---------------------+-------------------+
|                 Itau|2022-12-15 01:29:00|
|                  BTG|2022-12-08 23:47:00|
|               Nubank|2022-11-19 19:25:00|
|             Bradesco|2022-08-07 17:01:00|
+---------------------+-------------------+



In [47]:
df_window = spark.sql(
  """
  with base_pix_row_number as(
    select
      parte_creditada_banco,
      data_transacao,
      row_number() over (partition by parte_creditada_banco order by data_transacao desc) as row_number
    from base_pix
  ) select
      parte_creditada_banco,
      data_transacao
    from base_pix_row_number
    where row_number = 1
    order by data_transacao desc
  """
)

In [48]:
df_window.show()

+---------------------+-------------------+
|parte_creditada_banco|     data_transacao|
+---------------------+-------------------+
|                 Itau|2022-12-15 01:29:00|
|                  BTG|2022-12-08 23:47:00|
|               Nubank|2022-11-19 19:25:00|
|             Bradesco|2022-08-07 17:01:00|
+---------------------+-------------------+



In [49]:
from pyspark.sql.functions import col

In [51]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- valor: double (nullable = true)
 |-- parte_debitada_nome: string (nullable = true)
 |-- parte_debitada_conta: string (nullable = true)
 |-- parte_debitada_banco: string (nullable = true)
 |-- parte_creditada_nome: string (nullable = true)
 |-- parte_creditada_conta: string (nullable = true)
 |-- parte_creditada_banco: string (nullable = true)
 |-- chave_pix_tipo: string (nullable = true)
 |-- chave_pix_valor: string (nullable = true)
 |-- data_transacao: timestamp (nullable = true)



In [54]:
df.selectExpr(
    "date(data_transacao) as date_data_transacao",
).groupBy('date_data_transacao').count().orderBy(col('count').desc()).show()

+-------------------+-----+
|date_data_transacao|count|
+-------------------+-----+
|         2022-02-26|    2|
|         2022-03-02|    2|
|         2021-06-22|    1|
|         2022-11-29|    1|
|         2021-07-20|    1|
|         2021-02-15|    1|
|         2021-03-22|    1|
|         2022-02-16|    1|
|         2021-04-25|    1|
|         2021-03-07|    1|
|         2022-01-15|    1|
|         2022-01-09|    1|
|         2022-05-23|    1|
|         2022-02-01|    1|
|         2021-07-11|    1|
|         2022-04-12|    1|
|         2022-06-05|    1|
|         2021-09-06|    1|
|         2021-06-20|    1|
|         2021-12-14|    1|
+-------------------+-----+
only showing top 20 rows

