# Лабараторная работа №2
- Выполнил: студунт гр.САПР-1.4 Дешевов П.П. 
- Проверл: канд. физ.-мат. наук, доцент Кравченя П.Д
### Цель и задачи: 
- Познакомиться с базовыми алгоритмами машинного обучения
- Познакомиться с реализацией машинного обучения в библиотеке Spark ML
- Получить навыки разработки программного обеспечения для анализа данных с использованием pyspark


In [1]:
# Импортируем необходимые для работы библиотеки

from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import SparkSession

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer, MinMaxScaler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator, BinaryClassificationEvaluator

# Инициализируем Spark сессию 
spark = SparkSession.builder.master("local[*]").getOrCreate()
filename_data = './data/financial_fraud_detection.csv'
# Читаем датасет
csv = spark.read.csv(filename_data, inferSchema=True, header=True)
# Выбираем необходимые для работы признаки
csv = csv.select(
    'step',
    'amount',
    'type',
    'oldbalanceOrg',
    'newbalanceOrig', 
    'oldbalanceDest', 
    'newbalanceDest',
    'isFraud'
)
csv.printSchema()
csv.show(10)

root
 |-- step: integer (nullable = true)
 |-- amount: double (nullable = true)
 |-- type: string (nullable = true)
 |-- oldbalanceOrg: double (nullable = true)
 |-- newbalanceOrig: double (nullable = true)
 |-- oldbalanceDest: double (nullable = true)
 |-- newbalanceDest: double (nullable = true)
 |-- isFraud: integer (nullable = true)

+----+--------+--------+-------------+--------------+--------------+--------------+-------+
|step|  amount|    type|oldbalanceOrg|newbalanceOrig|oldbalanceDest|newbalanceDest|isFraud|
+----+--------+--------+-------------+--------------+--------------+--------------+-------+
|   1| 9839.64| PAYMENT|     170136.0|     160296.36|           0.0|           0.0|      0|
|   1| 1864.28| PAYMENT|      21249.0|      19384.72|           0.0|           0.0|      0|
|   1|   181.0|TRANSFER|        181.0|           0.0|           0.0|           0.0|      1|
|   1|   181.0|CASH_OUT|        181.0|           0.0|       21182.0|           0.0|      1|
|   1|11668.14| 

In [2]:
# Делим датасет на тренеровочный и тестовый
splits = csv.randomSplit([0.7, 0.3])
train = splits[0]
test = splits[1].withColumnRenamed("isFraud", "trueIsFraud")
train_rows = train.count()
test_rows = test.count()
print("Training Rows:", train_rows, " Testing Rows:", test_rows)

Training Rows: 4453953  Testing Rows: 1908667


In [3]:
# Производим предобработку данных
str_idx = StringIndexer(
    inputCols = ['type'], 
    outputCols = ['typeInx'],
)
cat_vect = VectorAssembler(
    inputCols = ['step','typeInx', 'oldbalanceOrg', 'newbalanceOrig', 'oldbalanceDest', 'newbalanceDest'],
    outputCol = 'cat_features'
)
cat_idx = VectorIndexer(
    inputCol = cat_vect.getOutputCol(), 
    outputCol = 'idx_cat_features',
    handleInvalid = "keep"
)
num_vect = VectorAssembler(
    inputCols = ['amount'],
    outputCol = 'num_features',
    handleInvalid = "keep"
)
min_max = MinMaxScaler(
    inputCol = num_vect.getOutputCol(),
    outputCol = 'norm_features'
)
feat_vect = VectorAssembler(
    inputCols = ['idx_cat_features', 'norm_features'],
    outputCol = 'features',
    handleInvalid = "keep"
)
# Задаем алгоритм обучения
rfr = RandomForestRegressor(
    labelCol="isFraud",
    featuresCol="features",
    numTrees = 10,
    maxDepth=2,
    maxBins = 181834
)
# Строим pipeline - стадии через которые будут проходить данные
pipeline = Pipeline(
    stages = [
    str_idx,
    cat_vect,
    cat_idx,
    num_vect,
    min_max,
    feat_vect,
    rfr
    ]
)


In [4]:
# Обучаем на RandomForestRegressor
pipeline_model = pipeline.fit(train)

In [5]:
pred_df = pipeline_model.transform(test)
pred_df.select("features", "prediction", "trueIsFraud").show(10)

+--------------------+--------------------+-----------+
|            features|          prediction|trueIsFraud|
+--------------------+--------------------+-----------+
|[1.0,1.0,18462.78...| 5.95550172086087E-4|          0|
|[1.0,1.0,59708.34...| 5.95550172086087E-4|          0|
|[1.0,1.0,1955143....| 5.84098936381198E-4|          0|
|[1.0,0.0,181.0,0....|7.289157487937555E-4|          1|
|[1.0,1.0,54214.0,...| 5.95550172086087E-4|          0|
|[1.0,1.0,101473.0...| 5.95550172086087E-4|          0|
|[1.0,1.0,200148.0...| 5.84098936381198E-4|          0|
|[1.0,1.0,65254.0,...| 5.95550172086087E-4|          0|
|[1.0,4.0,15237.0,...| 5.95550172086087E-4|          0|
|[1.0,1.0,10525.28...| 5.95550172086087E-4|          0|
+--------------------+--------------------+-----------+
only showing top 10 rows



In [6]:
from pyspark.ml.evaluation import RegressionEvaluator
regressionEvaluator = RegressionEvaluator(predictionCol="prediction", labelCol="trueIsFraud", metricName="rmse")

# Снимаем основные метрики
# RMSE
rmse = regressionEvaluator.evaluate(pred_df)
print(f"The RMSE for the random forest regression model is {rmse:0.2f}")
# MSE
mse = regressionEvaluator.setMetricName("mse").evaluate(pred_df)
print(f"The MSE for the random forest regression model is {mse:0.2f}")
# R2
r2 = regressionEvaluator.setMetricName("r2").evaluate(pred_df)
print(f"The R2 for the random forest regression model is {r2:0.2f}")
# MAE
mae = regressionEvaluator.setMetricName("mae").evaluate(pred_df)
print(f"The MAE for the random forest regression model is {mae:0.2f}")

The RMSE for the random forest regression model is 0.03
The MSE for the random forest regression model is 0.00
The R2 for the random forest regression model is 0.20
The MAE for the random forest regression model is 0.00


In [23]:
param_grid = ParamGridBuilder().\
    addGrid(rfr.numTrees, [10, 15, 20]).\
    addGrid(rfr.maxDepth, [1, 2, 4]).\
    addGrid(rfr.maxBins , [181834, 362432, 724864]).\
    build()

In [24]:
cv = CrossValidator(estimator=pipeline, \
                    estimatorParamMaps=param_grid, \
                    evaluator=RegressionEvaluator(
                                predictionCol="prediction", \
                                labelCol="isFraud", \
                                metricName="rmse"), \
                    numFolds=2)

In [25]:
cv_model = cv.fit(train)

In [26]:
newPrediction = cv_model.transform(test)

In [27]:
# RMSE
rmse = regressionEvaluator.evaluate(newPrediction)
print(f"The RMSE for the random forest regression model is {rmse:0.2f}")
# MSE
mse = regressionEvaluator.setMetricName("mse").evaluate(newPrediction)
print(f"The MSE for the random forest regression model is {mse:0.2f}")
# R2
r2 = regressionEvaluator.setMetricName("r2").evaluate(newPrediction)
print(f"The R2 for the random forest regression model is {r2:0.2f}")
# MAE
mae = regressionEvaluator.setMetricName("mae").evaluate(newPrediction)
print(f"The MAE for the random forest regression model is {mae:0.2f}")

The RMSE for the random forest regression model is 0.00
The MSE for the random forest regression model is 0.00
The R2 for the random forest regression model is -0.00
The MAE for the random forest regression model is 0.00


# Часть 2

In [7]:
csv.show()

+----+---------+--------+-------------+--------------+--------------+--------------+-------+
|step|   amount|    type|oldbalanceOrg|newbalanceOrig|oldbalanceDest|newbalanceDest|isFraud|
+----+---------+--------+-------------+--------------+--------------+--------------+-------+
|   1|  9839.64| PAYMENT|     170136.0|     160296.36|           0.0|           0.0|      0|
|   1|  1864.28| PAYMENT|      21249.0|      19384.72|           0.0|           0.0|      0|
|   1|    181.0|TRANSFER|        181.0|           0.0|           0.0|           0.0|      1|
|   1|    181.0|CASH_OUT|        181.0|           0.0|       21182.0|           0.0|      1|
|   1| 11668.14| PAYMENT|      41554.0|      29885.86|           0.0|           0.0|      0|
|   1|  7817.71| PAYMENT|      53860.0|      46042.29|           0.0|           0.0|      0|
|   1|  7107.77| PAYMENT|     183195.0|     176087.23|           0.0|           0.0|      0|
|   1|  7861.64| PAYMENT|    176087.23|     168225.59|           0.0| 

In [8]:
lr = LogisticRegression(
    labelCol="isFraud", 
    featuresCol="features", 
    maxIter=10,
    regParam=0.3
)
pipeline = Pipeline(
    stages = [
        str_idx,
        cat_vect,
        cat_idx,
        num_vect,
        min_max,
        feat_vect,
        lr
    ]
)

In [9]:
# Обучаем на логистической регрессии
pipeline_model = pipeline.fit(train)

In [11]:
pred_df = pipeline_model.transform(test)
pred_df.select("features", "prediction", "trueIsFraud").show()

+--------------------+----------+-----------+
|            features|prediction|trueIsFraud|
+--------------------+----------+-----------+
|[1.0,1.0,18462.78...|       0.0|          0|
|[1.0,1.0,59708.34...|       0.0|          0|
|[1.0,1.0,1955143....|       0.0|          0|
|[1.0,0.0,181.0,0....|       0.0|          1|
|[1.0,1.0,54214.0,...|       0.0|          0|
|[1.0,1.0,101473.0...|       0.0|          0|
|[1.0,1.0,200148.0...|       0.0|          0|
|[1.0,1.0,65254.0,...|       0.0|          0|
|[1.0,4.0,15237.0,...|       0.0|          0|
|[1.0,1.0,10525.28...|       0.0|          0|
|(7,[0,1,6],[1.0,1...|       0.0|          0|
|[1.0,1.0,10120.0,...|       0.0|          0|
|[1.0,1.0,36616.0,...|       0.0|          0|
|[1.0,1.0,15156.0,...|       0.0|          0|
|[1.0,1.0,72421.0,...|       0.0|          0|
|[1.0,1.0,7522.0,6...|       0.0|          0|
|[1.0,1.0,49524.0,...|       0.0|          0|
|[1.0,4.0,15573.0,...|       0.0|          0|
|[1.0,1.0,1954993....|       0.0| 

In [21]:
# Снимаем метрики
tp = float(pred_df.filter("prediction == 1.0 AND trueIsFraud == 1").count())
fp = float(pred_df.filter("prediction == 1.0 AND trueIsFraud == 0").count())
tn = float(pred_df.filter("prediction == 0.0 AND trueIsFraud == 0").count())
fn = float(pred_df.filter("prediction == 0.0 AND trueIsFraud == 1").count())
re = tp / (tp + fn)
metrics = spark.createDataFrame([
 ("FP", fp),
 ("TN", tn),
 ("FN", fn),
 ("Recall", re)
], ['metric','value'])
metrics.show()

+------+---------+
|metric|    value|
+------+---------+
|    FP|      0.0|
|    TN|1906205.0|
|    FN|   2462.0|
|Recall|      0.0|
+------+---------+



In [22]:
evaluator = BinaryClassificationEvaluator(labelCol="trueIsFraud", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
aur = evaluator.evaluate(pred_df)
print ("AUR = ", aur)

AUR =  0.832116044295385


### Выводы:
- Были построены две модели на основе выбранного датасета  - https://www.kaggle.com/datasets/ealaxi/paysim1
- Построена модель на основе RandomForestRegressor
- Постоена модель на основе LogisticRegression