<a href="https://colab.research.google.com/github/MarcioLuizBR/SparkML_naPratica/blob/main/SparkML_naPratica.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install -U pyspark==3.5.1 cloudpickle==2.2.1




In [1]:
# ⬇️ Instala Java, Spark e findspark
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz
!tar xf spark-3.3.2-bin-hadoop3.tgz
!pip install -q findspark

# ⬇️ Define variáveis de ambiente
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.2-bin-hadoop3"

import findspark
findspark.init()


# ⬇️ Instala ngrok v3 e adiciona o authtoken
!rm -f ngrok ngrok.zip
!wget -q -O ngrok.zip https://bin.equinox.io/c/bNyj1mQVY4c/ngrok-v3-stable-linux-amd64.zip
!unzip -o ngrok.zip
!chmod +x ngrok

from google.colab import userdata

ngrok_token = userdata.get('Ngrok')
if ngrok_token:
    !./ngrok config add-authtoken {ngrok_token}
else:
    raise ValueError("Token do ngrok não encontrado. Adicione-o em 'Secrets' com o nome 'NGROK_TOKEN'.")

# ⬇️ Inicia o túnel ngrok para a Spark UI (porta 4050)
get_ipython().system_raw('./ngrok http 4050 &')


# abertura de sessão do Spark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Estudo SparkSQL") \
    .config("spark.ui.port", "4050") \
    .getOrCreate()

# ⬇️ Tenta obter a URL pública do ngrok com várias tentativas
import time
import urllib.request
import json

print("⏳ Aguardando o ngrok iniciar...")

public_url = None
for i in range(10):  # tenta por até 10 vezes (com 2s de intervalo)
    try:
        with urllib.request.urlopen("http://localhost:4040/api/tunnels") as response:
            data = json.load(response)
            public_url = data['tunnels'][0]['public_url']
            break
    except:
        time.sleep(2)

# ⬇️ Resultado final
if public_url:
    print(f"✅ Spark UI está acessível em:\n👉 {public_url}")
else:
    print("❌ Erro: não foi possível obter a URL pública do ngrok.")



Archive:  ngrok.zip
  inflating: ngrok                   
Authtoken saved to configuration file: /root/.config/ngrok/ngrok.yml
⏳ Aguardando o ngrok iniciar...
✅ Spark UI está acessível em:
👉 https://7d80-34-138-204-127.ngrok-free.app


In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
from pyspark.sql.types import (StructType, StructField,
                               IntegerType, DoubleType,
                               StringType, TimestampType)


schema_remetente_destinatario = StructType([
    StructField("nome",  StringType(), True),
    StructField("banco", StringType(), True),
    StructField("tipo",  StringType(), True),
])

schema_pix = StructType([
    StructField("id_transacao",    IntegerType(),  True),
    StructField("valor",           DoubleType(),   True),
    StructField("remetente",       schema_remetente_destinatario, True),
    StructField("destinatario",    schema_remetente_destinatario, True),
    StructField("chave_pix",       StringType(),   True),
    StructField("categoria",       StringType(),   True),
    StructField("transaction_date",StringType(), True),
    StructField("fraude",          IntegerType(),  True)
])

caminho_json = "/content/drive/MyDrive/arquivos e bancos de dados/case_final.json"

df = spark.read.json(
    caminho_json,
    schema=schema_pix,
)

In [4]:
from pyspark.sql.functions import date_format

df = df.withColumn("transaction_date", date_format("transaction_date", "yyyy-MM-dd HH:mm:ss"))

df.show(truncate=False)

+------------+------------------+-----------------------------+------------------------------------+---------+-------------+-------------------+------+
|id_transacao|valor             |remetente                    |destinatario                        |chave_pix|categoria    |transaction_date   |fraude|
+------------+------------------+-----------------------------+------------------------------------+---------+-------------+-------------------+------+
|1000        |588.08            |{Jonathan Gonsalves, BTG, PF}|{Calebe Melo, Caixa, PF}            |aleatoria|outros       |2021-07-16 05:00:55|0     |
|1001        |80682.5           |{Jonathan Gonsalves, BTG, PF}|{Davi Lucas Pereira, Caixa, PJ}     |celular  |transferencia|2022-04-20 12:34:01|1     |
|1002        |549.9             |{Jonathan Gonsalves, BTG, PF}|{Sabrina Castro, Nubank, PF}        |cpf      |lazer        |2022-07-10 16:51:34|0     |
|1003        |90.83             |{Jonathan Gonsalves, BTG, PF}|{Francisco da Conceicao, 

In [5]:
from pyspark.sql.functions import *

df_flatten = df.withColumns({
    "destinatario_nome": col("destinatario").getField('nome'),
    "destinatario_banco": col("destinatario").getField('banco'),
    "destinatario_tipo": col("destinatario").getField('tipo'),
}).drop("destinatario", "remetente")

In [6]:
df_flatten.printSchema()
df_flatten.show(truncate=False)

root
 |-- id_transacao: integer (nullable = true)
 |-- valor: double (nullable = true)
 |-- chave_pix: string (nullable = true)
 |-- categoria: string (nullable = true)
 |-- transaction_date: string (nullable = true)
 |-- fraude: integer (nullable = true)
 |-- destinatario_nome: string (nullable = true)
 |-- destinatario_banco: string (nullable = true)
 |-- destinatario_tipo: string (nullable = true)

+------------+------------------+---------+-------------+-------------------+------+----------------------+------------------+-----------------+
|id_transacao|valor             |chave_pix|categoria    |transaction_date   |fraude|destinatario_nome     |destinatario_banco|destinatario_tipo|
+------------+------------------+---------+-------------+-------------------+------+----------------------+------------------+-----------------+
|1000        |588.08            |aleatoria|outros       |2021-07-16 05:00:55|0     |Calebe Melo           |Caixa             |PF               |
|1001        |8

In [7]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(
    inputCols=[
        "destinatario_nome",
        "destinatario_banco",
        "destinatario_tipo",
        "categoria",
        "chave_pix"
    ],
    outputCols=[
        "destinatario_nome_index",
        "destinatario_banco_index",
        "destinatario_tipo_index",
        "categoria_index",
        "chave_pix_index"
    ]
)

df_index = indexer.fit(df_flatten).transform(df_flatten)

In [8]:
df_index.printSchema()
df_index.show(truncate=False)

root
 |-- id_transacao: integer (nullable = true)
 |-- valor: double (nullable = true)
 |-- chave_pix: string (nullable = true)
 |-- categoria: string (nullable = true)
 |-- transaction_date: string (nullable = true)
 |-- fraude: integer (nullable = true)
 |-- destinatario_nome: string (nullable = true)
 |-- destinatario_banco: string (nullable = true)
 |-- destinatario_tipo: string (nullable = true)
 |-- destinatario_nome_index: double (nullable = false)
 |-- destinatario_banco_index: double (nullable = false)
 |-- destinatario_tipo_index: double (nullable = false)
 |-- categoria_index: double (nullable = false)
 |-- chave_pix_index: double (nullable = false)

+------------+------------------+---------+-------------+-------------------+------+----------------------+------------------+-----------------+-----------------------+------------------------+-----------------------+---------------+---------------+
|id_transacao|valor             |chave_pix|categoria    |transaction_date   |fra

In [9]:
is_fraud = df_index.filter(col("fraude") == 1)
no_fraud = df_index.filter(col("fraude") == 0)

In [10]:
no_fraud = no_fraud.sample(False, 0.01, seed=123)

In [11]:
df_concat = no_fraud.union(is_fraud)
df = df_concat.sort("transaction_date")
df.count()

16202

In [12]:
train, test = df.randomSplit([0.7, 0.3], seed=123)


print("train =",train.count(),"test =",test.count())

train = 11278 test = 4924


In [13]:
# Em vez de usar uma UDF, use as funções SQL do PySpark para eficiência e para evitar problemas de serialização.
# Use when e otherwise para criar a coluna 'is_fraud'.
from pyspark.sql.functions import when
from pyspark.sql.types import DoubleType

train = train.withColumn("is_fraud", when(train.fraude > 0, 1.0).otherwise(0.0).cast(DoubleType()))

# Apply the same transformation to the test DataFrame
test = test.withColumn("is_fraud", when(test.fraude > 0, 1.0).otherwise(0.0).cast(DoubleType()))

# is_fraud = udf(lambda fraud: 1.0 if fraud > 0 else 0.0, DoubleType())
# train = train.withColumn("is_fraud", is_fraud(train.fraude))

In [14]:
train.printSchema()

root
 |-- id_transacao: integer (nullable = true)
 |-- valor: double (nullable = true)
 |-- chave_pix: string (nullable = true)
 |-- categoria: string (nullable = true)
 |-- transaction_date: string (nullable = true)
 |-- fraude: integer (nullable = true)
 |-- destinatario_nome: string (nullable = true)
 |-- destinatario_banco: string (nullable = true)
 |-- destinatario_tipo: string (nullable = true)
 |-- destinatario_nome_index: double (nullable = false)
 |-- destinatario_banco_index: double (nullable = false)
 |-- destinatario_tipo_index: double (nullable = false)
 |-- categoria_index: double (nullable = false)
 |-- chave_pix_index: double (nullable = false)
 |-- is_fraud: double (nullable = false)



In [15]:
test.printSchema()

root
 |-- id_transacao: integer (nullable = true)
 |-- valor: double (nullable = true)
 |-- chave_pix: string (nullable = true)
 |-- categoria: string (nullable = true)
 |-- transaction_date: string (nullable = true)
 |-- fraude: integer (nullable = true)
 |-- destinatario_nome: string (nullable = true)
 |-- destinatario_banco: string (nullable = true)
 |-- destinatario_tipo: string (nullable = true)
 |-- destinatario_nome_index: double (nullable = false)
 |-- destinatario_banco_index: double (nullable = false)
 |-- destinatario_tipo_index: double (nullable = false)
 |-- categoria_index: double (nullable = false)
 |-- chave_pix_index: double (nullable = false)
 |-- is_fraud: double (nullable = false)



In [16]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline

assembler = VectorAssembler(
    inputCols=[x for x in train.columns if x not in [
        "transaction_date",
        "fraude",
        "chave_pix",
        "categoria",
        "destinatario_nome",
        "destinatario_banco",
        "destinatario_tipo"
        ]],
    outputCol="features"
)

In [17]:
lr = LogisticRegression().setParams(
    maxIter=100000,
    labelCol="is_fraud",
    predictionCol="prediction"
)

In [18]:
model = Pipeline(stages=[assembler, lr]).fit(train)

In [19]:
predicted = model.transform(test)

In [20]:
predicted.show()

+------------+--------+---------+-------------+-------------------+------+--------------------+------------------+-----------------+-----------------------+------------------------+-----------------------+---------------+---------------+--------+--------------------+--------------------+--------------------+----------+
|id_transacao|   valor|chave_pix|    categoria|   transaction_date|fraude|   destinatario_nome|destinatario_banco|destinatario_tipo|destinatario_nome_index|destinatario_banco_index|destinatario_tipo_index|categoria_index|chave_pix_index|is_fraud|            features|       rawPrediction|         probability|prediction|
+------------+--------+---------+-------------+-------------------+------+--------------------+------------------+-----------------+-----------------------+------------------------+-----------------------+---------------+---------------+--------+--------------------+--------------------+--------------------+----------+
|        1011|21345.91|      cpf|tran

In [21]:
predicted = predicted.withColumn("is_fraud", when(predicted.fraude > 0, 1.0).otherwise(0.0).cast(DoubleType()))
predicted.crosstab("is_fraud", "prediction").show()

+-------------------+---+----+
|is_fraud_prediction|0.0| 1.0|
+-------------------+---+----+
|                1.0|  0|4682|
|                0.0|242|   0|
+-------------------+---+----+



In [None]:
from pyspark.sql.types import StructType, StructField, DoubleType, StringType, IntegerType

# Define schema explicitamente para garantir os tipos
schema_test = StructType([
    StructField("valor", DoubleType(), True),
    StructField("transaction_date", StringType(), True),
    StructField("destinatario_nome_index", DoubleType(), True),
    StructField("destinatario_banco_index", DoubleType(), True),
    StructField("destinatario_tipo_index", DoubleType(), True),
    StructField("categoria_index", DoubleType(), True),
    StructField("chave_pix_index", DoubleType(), True),
    StructField("fraude", IntegerType(), True)
])

df_test_data = [
    (103.2, "2023-01-01 11:56:41", 328.0, 4.0, 1.0, 3.0, 5.0, 0),
    (500000.0, "2023-01-01 11:56:41", 328.0, 2.0, 3.0, 2.0, 5.0, 1),
    (19999.0, "2023-01-01 11:56:41", 328.0, 1.0, 2.0, 1.0, 5.0, 0),
]

df_test = spark.createDataFrame(df_test_data, schema=schema_test)
df_test.show()
