# Preparando ambiente e Carregando o Dataset ✈

In [81]:
!pip install pyspark



In [82]:
!pip install graphframes



In [83]:
!pip install imbalanced-learn



In [84]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import col, udf
from pyspark.sql.types import ArrayType, DoubleType
from imblearn.over_sampling import SMOTE
from pyspark.ml.linalg import Vectors
import plotly.express as px


# Criando Sessão Spark
spark = SparkSession.builder.appName("Modelo").getOrCreate()

In [85]:
df = spark.read.csv("/content/train_transaction.csv", header=True, inferSchema=True)

# Análise Exploratoria Simples ⚛
- Para realizar visões com pyspark é necessário converter para pandas
- Utilizarei 10 porcento do dataset

In [86]:
# Porção de 10 porcento do dataset
sample_df = df.sample(False, 0.1, seed=42)
# Transformando para pandas
samples = sample_df.toPandas()

In [87]:
# Verificar colunas
samples.columns

Index(['TransactionID', 'isFraud', 'TransactionDT', 'TransactionAmt',
       'ProductCD', 'card1', 'card2', 'card3', 'card4', 'card5',
       ...
       'V330', 'V331', 'V332', 'V333', 'V334', 'V335', 'V336', 'V337', 'V338',
       'V339'],
      dtype='object', length=394)

In [88]:
# Verificar correlações com as colunas numéricas
# Selecionando apenas as colunas numéricas
numeric_columns = samples.select_dtypes(include=['int64', 'float64']).columns
correlation_matrix = samples[numeric_columns].corr()

In [89]:
correlation_matrix

Unnamed: 0,TransactionAmt,card2,card3,card5,addr1,addr2,dist1,dist2,C1,C2,...,V330,V331,V332,V333,V334,V335,V336,V337,V338,V339
TransactionAmt,1.000000,0.009335,-0.111996,0.013397,-0.010660,0.029313,0.036263,0.035570,-0.016776,-0.017477,...,0.003904,0.023331,0.011175,0.016327,0.148532,0.029021,0.071420,0.148588,0.084484,0.111440
card2,0.009335,1.000000,0.021692,0.026308,0.020470,-0.023761,-0.010377,0.013889,0.009339,0.010313,...,0.076462,0.057081,0.062781,0.065196,0.023732,0.058775,0.060032,0.040558,0.070684,0.064601
card3,-0.111996,0.021692,1.000000,-0.160168,-0.002836,-0.570308,-0.012773,-0.137976,0.075364,0.086931,...,0.000559,-0.000558,-0.000517,-0.000566,-0.000530,-0.001129,-0.000660,0.001138,0.000324,0.000673
card5,0.013397,0.026308,-0.160168,1.000000,0.028816,0.034800,0.011287,0.043335,-0.010276,-0.013154,...,-0.289122,-0.232697,-0.260831,-0.268642,-0.091966,-0.277870,-0.265173,-0.080093,-0.244959,-0.198724
addr1,-0.010660,0.020470,-0.002836,0.028816,1.000000,0.002652,-0.008993,0.036566,-0.003265,-0.002703,...,0.038894,0.033002,0.036458,0.037831,-0.006310,0.025703,0.017852,0.014743,0.035985,0.030835
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
V335,0.029021,0.058775,-0.001129,-0.277870,0.025703,0.004241,,-0.059183,-0.011396,-0.013249,...,0.876866,0.744883,0.846945,0.859096,0.413005,1.000000,0.936953,0.230172,0.771811,0.595810
V336,0.071420,0.060032,-0.000660,-0.265173,0.017852,0.004110,,-0.035432,-0.008547,-0.010412,...,0.806461,0.672629,0.759922,0.778354,0.613983,0.936953,1.000000,0.222526,0.703603,0.556355
V337,0.148588,0.040558,0.001138,-0.080093,0.014743,0.002506,,-0.014518,0.002889,0.002244,...,0.292720,0.302845,0.274216,0.303636,0.077149,0.230172,0.222526,1.000000,0.711917,0.886602
V338,0.084484,0.070684,0.000324,-0.244959,0.035985,0.003341,,-0.026010,-0.001572,-0.003592,...,0.837425,0.751333,0.829054,0.851338,0.171596,0.771811,0.703603,0.711917,1.000000,0.942071


In [90]:
# Convertendo para mapa de calor plotly express

fig = px.imshow(correlation_matrix, text_auto=True)
fig.show()

In [91]:
# Para ficar mais visual, retirando as colunas vetores, que começam com V
numeric_columns = numeric_columns.drop(numeric_columns[numeric_columns.str.startswith('V')])

In [92]:
correlation_matrix = samples[numeric_columns].corr()
fig = px.imshow(correlation_matrix, text_auto=True)
fig.show()

In [93]:
# Verificar variações com boxplot plotly express
fig = px.box(samples, x="isFraud", y="TransactionAmt")
fig.show()

In [94]:
fig = px.box(samples, x="isFraud", y="card1", title="Boxplots")
fig.add_trace(px.box(samples, x="isFraud", y="card2").data[0])
fig.add_trace(px.box(samples, x="isFraud", y="card3").data[0])
fig.add_trace(px.box(samples, x="isFraud", y="card5").data[0])
fig.add_trace(px.box(samples, x="isFraud", y="dist1").data[0])
fig.add_trace(px.box(samples, x="isFraud", y="dist2").data[0])

# Atualizando o layout e mostrando o gráfico
fig.update_layout(showlegend=False)  # Esconde a legenda para evitar repetição
fig.show()

In [95]:
# Ver quantidade de valores nulos por colunas
samples.isnull().sum()

Unnamed: 0,0
TransactionID,0
isFraud,0
TransactionDT,0
TransactionAmt,0
ProductCD,0
...,...
V335,50764
V336,50764
V337,50764
V338,50764


In [96]:
samples['isFraud'].value_counts()

Unnamed: 0_level_0,count
isFraud,Unnamed: 1_level_1
0,56983
1,2012


## Conclusões da análise
Fica difícil identificar padrões com apenas 10 porcento do dataset e com colunas que não são possíveis saber os significados. Entretanto, será necessário tratar os valores nulos das colunas vetores. Os outliers encontrados nos boxplots, podem ser necessários para encontrar as fraudes. Além disso, como demostrado no value_counts das fraudes, existe um desbalanceamento

# Pré-Processamento
- Tratamentos
- Balanceamento com SMOTE
- Padronização

In [97]:
df.printSchema()

root
 |-- TransactionID: integer (nullable = true)
 |-- isFraud: integer (nullable = true)
 |-- TransactionDT: integer (nullable = true)
 |-- TransactionAmt: double (nullable = true)
 |-- ProductCD: string (nullable = true)
 |-- card1: integer (nullable = true)
 |-- card2: double (nullable = true)
 |-- card3: double (nullable = true)
 |-- card4: string (nullable = true)
 |-- card5: double (nullable = true)
 |-- card6: string (nullable = true)
 |-- addr1: double (nullable = true)
 |-- addr2: double (nullable = true)
 |-- dist1: double (nullable = true)
 |-- dist2: double (nullable = true)
 |-- P_emaildomain: string (nullable = true)
 |-- R_emaildomain: string (nullable = true)
 |-- C1: double (nullable = true)
 |-- C2: double (nullable = true)
 |-- C3: double (nullable = true)
 |-- C4: double (nullable = true)
 |-- C5: double (nullable = true)
 |-- C6: double (nullable = true)
 |-- C7: double (nullable = true)
 |-- C8: double (nullable = true)
 |-- C9: double (nullable = true)
 |-- C10:

In [98]:
# Preenchendo valores nulos com 0
df = df.na.fill(0)

In [99]:
# Onehot coding
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, DoubleType
from pyspark.ml.feature import VectorAssembler, StandardScaler, StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline
import pandas as pd
from imblearn.over_sampling import SMOTE


string_cols = ['ProductCD', 'card4', 'card6', 'P_emaildomain', 'R_emaildomain', 'M1', 'M2', 'M3', 'M4', 'M5', 'M6', 'M7', 'M8', 'M9']
indexers = [StringIndexer(inputCol=col, outputCol=f"{col}_index") for col in string_cols]
encoders = [OneHotEncoder(inputCol=f"{col}_index", outputCol=f"{col}_vec") for col in string_cols]


# Definir colunas de features
encoded_cols = [f"{col}_vec" for col in string_cols]
feature_cols = [col for col in df.columns if col not in string_cols + ['isFraud']] + encoded_cols

# Criar pipeline de pré-processamento
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
preprocessing_pipeline = Pipeline(stages=indexers + encoders + [assembler])

# Aplicar pipeline de pré-processamento
df_preprocessed = preprocessing_pipeline.fit(df).transform(df)

In [101]:
# Função para aplicar SMOTE
def apply_smote(pdf):
    X = pdf.drop('isFraud', axis=1)
    y = pdf['isFraud']
    smote = SMOTE(random_state=42)
    X_resampled, y_resampled = smote.fit_resample(X, y)
    resampled_df = pd.concat([X_resampled, y_resampled], axis=1)
    return resampled_df

# Converter features para array
to_array = udf(lambda v: v.toArray().tolist(), ArrayType(DoubleType()))
df_preprocessed = df_preprocessed.withColumn("features_array", to_array("features"))

# Preparar para o balanceamento
df_for_smote = df_preprocessed.select("features_array", "isFraud")

# Aplicar SMOTE
balanced_df = df_for_smote.groupby().applyInPandas(apply_smote, schema=df_for_smote.schema)

# Converter array de volta para vetor
to_vector = udf(lambda x: Vectors.dense(x), VectorUDT())
balanced_df = balanced_df.withColumn("features_vector", to_vector("features_array"))

# Aplicar StandardScaler
scaler = StandardScaler(inputCol="features_vector", outputCol="scaled_features", withStd=True, withMean=True)
df_scaled = scaler.fit(balanced_df).transform(balanced_df)

Py4JJavaError: An error occurred while calling o6933.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 185.0 failed 1 times, most recent failure: Lost task 0.0 in stage 185.0 (TID 460) (ce70b235777c executor driver): org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user defined function (`StringIndexerModel$$Lambda$3996/0x0000000841667840`: (string) => double).
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:198)
	at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_0_36$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at org.apache.spark.ContextAwareIterator.next(ContextAwareIterator.scala:41)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1161)
	at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1176)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1213)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:322)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$PythonUDFWriterThread.writeIteratorToStream(PythonUDFRunner.scala:58)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:451)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:282)
Caused by: org.apache.spark.SparkException: StringIndexer encountered NULL value. To handle or skip NULLS, try setting StringIndexer.handleInvalid.
	at org.apache.spark.ml.feature.StringIndexerModel.$anonfun$getIndexer$1(StringIndexer.scala:396)
	at org.apache.spark.ml.feature.StringIndexerModel.$anonfun$getIndexer$1$adapted(StringIndexer.scala:391)
	... 32 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user defined function (`StringIndexerModel$$Lambda$3996/0x0000000841667840`: (string) => double).
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:198)
	at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_0_36$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at org.apache.spark.ContextAwareIterator.next(ContextAwareIterator.scala:41)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1161)
	at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1176)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1213)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:322)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$PythonUDFWriterThread.writeIteratorToStream(PythonUDFRunner.scala:58)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:451)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:282)
Caused by: org.apache.spark.SparkException: StringIndexer encountered NULL value. To handle or skip NULLS, try setting StringIndexer.handleInvalid.
	at org.apache.spark.ml.feature.StringIndexerModel.$anonfun$getIndexer$1(StringIndexer.scala:396)
	at org.apache.spark.ml.feature.StringIndexerModel.$anonfun$getIndexer$1$adapted(StringIndexer.scala:391)
	... 32 more


In [79]:
# Converter para numpy arrays
features = np.array(df_scaled.select("scaled_features").collect())
labels = np.array(df_scaled.select("isFraud").collect())

IllegalArgumentException: Data type array<double> of column features_array is not supported.

# Modelagem
- Preparação dos dados para treinamento
- Arquitetura do Modelo
- Treino
- Avaliação das Métricas

In [None]:
# Função para criar janelas de sequências
def create_sequences(df, window_size):
    sequences = []
    labels = []

    for i in range(len(df) - window_size):
        # Extraindo janela de features e o label correspondente
        sequence = df[i:i+window_size]['scaled_features']
        label = df.iloc[i+window_size]['isFraud']
        sequences.append(sequence)
        labels.append(label)

    return np.array(sequences), np.array(labels)

# Criando as sequências com janela de 10
window_size = 10
X_train, y_train = create_sequences(df_scaled.toPandas(), window_size)

# Conclusões