## Spark ML - badanie oszustw z kartami kredytowymi

Źródło danych: https://www.kaggle.com/mlg-ulb/creditcardfraud

#### Wczytanie danych

In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import Normalizer
from pyspark.ml.classification import RandomForestClassifier, LogisticRegression, GBTClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql.types import DoubleType

In [3]:
from pyspark import SparkConf

spark = SparkSession.builder.master('local').appName('MiNADZD').getOrCreate()

In [4]:
df = spark.read.csv('./dataset/finalDataset.csv', header=True, inferSchema=True)

df = df.drop('ind')
df = df.drop('_C0')
df = df.drop('id')

for feature in df.columns:
    df = df.withColumn(feature, df[feature].cast(DoubleType()))

df

DataFrame[Customer Type: double, Age: double, Type of Travel: double, Class: double, Flight Distance: double, Inflight wifi service: double, Departure/Arrival time convenient: double, Ease of Online booking: double, Gate location: double, Food and drink: double, Online boarding: double, Seat comfort: double, Inflight entertainment: double, On-board service: double, Leg room service: double, Baggage handling: double, Checkin service: double, Inflight service: double, Cleanliness: double, Departure Delay in Minutes: double, Arrival Delay in Minutes: double, satisfaction: double, Male: double, Female: double]

Rozmiar danych po wczytaniu zbioru:

In [5]:
'Rozmiar danych: ' + str(df.count()) + ' rows x ' + str(len(df.columns)) + ' columns.'

'Rozmiar danych: 129880 rows x 24 columns.'

In [6]:
df.show(5)

+------------------+-------------------+------------------+-------------------+-------------------+---------------------+---------------------------------+----------------------+-------------------+-------------------+-------------------+-------------------+----------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------------------+------------------------+------------+-------------------+-------------------+
|     Customer Type|                Age|    Type of Travel|              Class|    Flight Distance|Inflight wifi service|Departure/Arrival time convenient|Ease of Online booking|      Gate location|     Food and drink|    Online boarding|       Seat comfort|Inflight entertainment|   On-board service|   Leg room service|   Baggage handling|    Checkin service|   Inflight service|        Cleanliness|Departure Delay in Minutes|Arrival Delay in Minutes|satisfaction|               Male|       

Przeniesienie wszystkich cech do jednej kolumny w wektorze

In [7]:
inputColumns = []
for feature in df.columns:
    if feature != 'satisfaction':
        inputColumns.append(feature)

assembler = VectorAssembler(inputCols=inputColumns, outputCol='features')
df = assembler.transform(df)

In [8]:
df = df.select('satisfaction', 'features')
df

DataFrame[satisfaction: double, features: vector]

Normalizacja

In [9]:
normalizer = Normalizer(inputCol="features", outputCol="normFeatures", p=1.0)
liNormData = normalizer.transform(df)
liNormData.select('normFeatures').show(5)

+--------------------+
|        normFeatures|
+--------------------+
|[0.02314763577377...|
|[0.02649627212010...|
|[-0.0918628977331...|
|[0.01829704167050...|
|[0.02757044123145...|
+--------------------+
only showing top 5 rows



Podział na dane uczące i testowe

In [10]:
(trainingNormData, testNormData) = liNormData.randomSplit([0.8,0.2])

### RandomForestClassifier - trenowanie & testowanie 

In [11]:
algo = RandomForestClassifier(featuresCol='normFeatures', labelCol='satisfaction')
model = algo.fit(trainingNormData)

In [12]:
predictions = model.transform(testNormData)

Wyświetlenie pierwszych 20 wyników klasyfikacji:

In [13]:
predictions.select(['satisfaction','prediction', 'probability']).show(5)

+------------+----------+--------------------+
|satisfaction|prediction|         probability|
+------------+----------+--------------------+
|         0.0|       0.0|[0.90039878570638...|
|         0.0|       0.0|[0.90880050756560...|
|         0.0|       0.0|[0.91740492141826...|
|         0.0|       0.0|[0.90889150611218...|
|         0.0|       0.0|[0.64115748479544...|
+------------+----------+--------------------+
only showing top 5 rows



### Ocena jakości klasyfikatora

Dokładność i błąd klasyfikacji

In [14]:
evaluator = MulticlassClassificationEvaluator(labelCol='satisfaction', predictionCol="prediction", metricName='accuracy')
evaluator.evaluate(predictions)

0.9054720616570328

F1-score

In [15]:
evaluator = MulticlassClassificationEvaluator(labelCol='satisfaction', predictionCol="prediction", metricName='f1')
evaluator.evaluate(predictions)

0.9051694359814064

AUC

In [16]:
evaluator = BinaryClassificationEvaluator(labelCol='satisfaction', rawPredictionCol='rawPrediction')
evaluator.evaluate(predictions)

0.9635726377323806

Confusion Matrix

In [17]:
rdd = predictions.select('prediction','satisfaction').rdd

In [18]:
mm = MulticlassMetrics(rdd)

In [21]:
metrics = MulticlassMetrics(predictions.select('prediction','satisfaction').rdd)
metrics.confusionMatrix()

DenseMatrix(2, 2, [13724.0, 1493.0, 960.0, 9773.0], 0)

### LogisticRegression - trenowanie & testowanie 

In [22]:
algo = LogisticRegression(featuresCol='normFeatures', labelCol='satisfaction')
model = algo.fit(trainingNormData)

In [23]:
predictions = model.transform(testNormData)

Wyświetlenie pierwszych 20 wyników klasyfikacji:

In [24]:
predictions.select(['satisfaction','prediction']).show(5)

+------------+----------+
|satisfaction|prediction|
+------------+----------+
|         0.0|       0.0|
|         0.0|       0.0|
|         0.0|       0.0|
|         0.0|       0.0|
|         0.0|       0.0|
+------------+----------+
only showing top 5 rows



### Ocena jakości klasyfikatora

Dokładność i błąd klasyfikacji

In [25]:
evaluator = MulticlassClassificationEvaluator(labelCol='satisfaction', predictionCol="prediction", metricName='accuracy')
evaluator.evaluate(predictions)

0.8678612716763006

F1-score

In [26]:
evaluator = MulticlassClassificationEvaluator(labelCol='satisfaction', predictionCol="prediction", metricName='f1')
evaluator.evaluate(predictions)

0.8674255138821951

AUC

In [27]:
evaluator = BinaryClassificationEvaluator(labelCol='satisfaction', rawPredictionCol='rawPrediction')
evaluator.evaluate(predictions)

0.9246415328533273

Confusion Matrix

In [28]:
metrics = MulticlassMetrics(predictions.select('prediction','satisfaction').rdd)
metrics.confusionMatrix()

DenseMatrix(2, 2, [13243.0, 1988.0, 1441.0, 9278.0], 0)

### Gradient-Boosted Tree Classfier - trenowanie & testowanie 

In [29]:
algo = GBTClassifier(featuresCol='normFeatures', labelCol='satisfaction')
model = algo.fit(trainingNormData)

In [30]:
predictions = model.transform(testNormData)

Wyświetlenie pierwszych 20 wyników klasyfikacji:

In [31]:
predictions.select(['satisfaction','prediction']).show()

+------------+----------+
|satisfaction|prediction|
+------------+----------+
|         0.0|       0.0|
|         0.0|       0.0|
|         0.0|       0.0|
|         0.0|       0.0|
|         0.0|       0.0|
|         0.0|       0.0|
|         0.0|       0.0|
|         0.0|       0.0|
|         0.0|       0.0|
|         0.0|       0.0|
|         0.0|       0.0|
|         0.0|       0.0|
|         0.0|       0.0|
|         0.0|       0.0|
|         0.0|       0.0|
|         0.0|       0.0|
|         0.0|       0.0|
|         0.0|       0.0|
|         0.0|       0.0|
|         0.0|       0.0|
+------------+----------+
only showing top 20 rows



### Ocena jakości klasyfikatora

Dokładność i błąd klasyfikacji

In [32]:
evaluator = MulticlassClassificationEvaluator(labelCol='satisfaction', predictionCol="prediction", metricName='accuracy')
evaluator.evaluate(predictions)

0.9310597302504817

F1-score

In [33]:
evaluator = MulticlassClassificationEvaluator(labelCol='satisfaction', predictionCol="prediction", metricName='f1')
evaluator.evaluate(predictions)

0.9309618830636932

AUC

In [34]:
evaluator = BinaryClassificationEvaluator(labelCol='satisfaction', rawPredictionCol='rawPrediction')
evaluator.evaluate(predictions)

0.9801908474320711

Confusion Matrix

In [35]:
metrics = MulticlassMetrics(predictions.select('prediction','satisfaction').rdd)
metrics.confusionMatrix()

DenseMatrix(2, 2, [13917.0, 1022.0, 767.0, 10244.0], 0)