# Instalando o Spark

In [19]:
!pip install pyspark #==3.3.1



In [20]:
!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip

--2024-02-27 21:50:45--  https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
Resolving bin.equinox.io (bin.equinox.io)... 52.202.168.65, 18.205.222.128, 54.237.133.81, ...
Connecting to bin.equinox.io (bin.equinox.io)|52.202.168.65|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 13921656 (13M) [application/octet-stream]
Saving to: ‘ngrok-stable-linux-amd64.zip.3’


2024-02-27 21:50:46 (51.4 MB/s) - ‘ngrok-stable-linux-amd64.zip.3’ saved [13921656/13921656]

Archive:  ngrok-stable-linux-amd64.zip
replace ngrok? [y]es, [n]o, [A]ll, [N]one, [r]ename: 

# Iniciar Sessão Spark

In [22]:
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf

# ConfigureSparkUI
conf = SparkConf().set('spark.ui.port', '4050')
sc = SparkContext(conf=conf)
sc.stop()

spark = (
    SparkSession.builder                  # Método da classe que constrói a sessão spark
      .appName("Meu Primeiro App Spark")  # Nome do App Spark
      .getOrCreate())                     # Verifica se há uma sessão ativa, e se não há, cria uma nova sessão


ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=Meu Primeiro App Spark, master=local[*]) created by getOrCreate at <ipython-input-3-0e8e82901c43>:12 

In [23]:
!curl -s http://localhost:4040/api/tunnels

# SparkSQL

In [27]:
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType, StringType, TimestampType

caminho_json = "/content/pix_transactions.json"

schema_remetente_destinatario = StructType([
    StructField('nome', StringType()),
    StructField('banco', StringType()),
    StructField('tipo', StringType()),
])


eschema_pix = StructType([
    StructField('chave_pix', StringType()),
    StructField('fraude', IntegerType()),
    StructField('id_transacao', IntegerType()),
    StructField('remetente', schema_remetente_destinatario),
    StructField('destinatario', schema_remetente_destinatario),
    StructField('transaction_date', TimestampType()),
    StructField('valor', DoubleType())
])


spark.read.json(
    path=caminho_json,
    schema=eschema_pix,
    timestampFormat="yyyy-MM-dd"
).createOrReplaceTempView('transacoes_pix')


df = spark.read.json(
    path=caminho_json,
    schema=eschema_pix,
    timestampFormat="yyyy-MM-dd"
)


In [25]:
# Aqui podemos escrever qualquer query
spark.sql('select * from transacoes_pix limit 10').show()

+---------+------+------------+--------------------+--------------------+-------------------+-------+
|chave_pix|fraude|id_transacao|           remetente|        destinatario|   transaction_date|  valor|
+---------+------+------------+--------------------+--------------------+-------------------+-------+
|      cpf|     0|        1000|{Jonathan Gonsalv...|{Gabriel Cunha, I...|2022-03-19 00:00:00|   7.05|
|aleatoria|     0|        1001|{Jonathan Gonsalv...|{Diego Souza, XP,...|2021-01-26 00:00:00|  37.28|
|aleatoria|     0|        1002|{Jonathan Gonsalv...|{Nicole Nunes, BT...|2022-05-31 00:00:00| 282.73|
|aleatoria|     0|        1003|{Jonathan Gonsalv...|{Maria Fernanda C...|2022-07-04 00:00:00|8447.92|
|aleatoria|     0|        1004|{Jonathan Gonsalv...|{Isabel Silva, C6...|2021-09-11 00:00:00|  58.51|
|  celular|     0|        1005|{Jonathan Gonsalv...|{Anthony Carvalho...|2022-02-11 00:00:00|6655.12|
|      cpf|     0|        1006|{Jonathan Gonsalv...|{Eloah Monteiro, ...|2022-05-1

In [38]:
# testando para ver o que é mais facil
group_sql = spark.sql('select chave_pix, count(*) from transacoes_pix group by chave_pix ')

In [39]:
group_df = df.groupBy('chave_pix').count()

In [40]:
group_sql.explain()

group_df.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[chave_pix#338], functions=[count(1)])
   +- Exchange hashpartitioning(chave_pix#338, 200), ENSURE_REQUIREMENTS, [plan_id=123]
      +- HashAggregate(keys=[chave_pix#338], functions=[partial_count(1)])
         +- FileScan json [chave_pix#338] Batched: false, DataFilters: [], Format: JSON, Location: InMemoryFileIndex(1 paths)[file:/content/pix_transactions.json], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<chave_pix:string>


== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[chave_pix#352], functions=[count(1)])
   +- Exchange hashpartitioning(chave_pix#352, 200), ENSURE_REQUIREMENTS, [plan_id=136]
      +- HashAggregate(keys=[chave_pix#352], functions=[partial_count(1)])
         +- FileScan json [chave_pix#352] Batched: false, DataFilters: [], Format: JSON, Location: InMemoryFileIndex(1 paths)[file:/content/pix_transactions.json], PartitionFilters: [], PushedFilt

In [41]:
group_sql.show()

group_df.show()

+---------+--------+
|chave_pix|count(1)|
+---------+--------+
|aleatoria|   25045|
|  celular|   24841|
|    email|   24935|
|      cpf|   25179|
+---------+--------+

+---------+-----+
|chave_pix|count|
+---------+-----+
|aleatoria|25045|
|  celular|24841|
|    email|24935|
|      cpf|25179|
+---------+-----+



In [46]:
spark.sql(
    """
      select
      chave_pix,
      sum(valor) as valor_total
      from transacoes_pix
      group by chave_pix
      order by valor_total desc
    """
).show()

+---------+--------------------+
|chave_pix|         valor_total|
+---------+--------------------+
|aleatoria| 3.059806293100002E8|
|  celular| 3.018847117200015E8|
|      cpf| 3.007901403699987E8|
|    email|2.9592901058000124E8|
+---------+--------------------+



In [49]:
spark.sql(
    """
    select
    chave_pix,
    count(*) as numero_total
    from transacoes_pix
    where valor>10000
    group by chave_pix
    order by numero_total desc
    """
).show()

+---------+------------+
|chave_pix|numero_total|
+---------+------------+
|aleatoria|        5032|
|      cpf|        4950|
|  celular|        4922|
|    email|        4830|
+---------+------------+

