## Обзор и препроцессинг сгенерированных данных на PySpark

In [158]:
import findspark
findspark.init()
findspark.find()
import pyspark
from pyspark.sql import SparkSession
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import hour
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import TrainValidationSplit
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.feature import MinMaxScaler

In [47]:
!hdfs dfs -ls /user/testdata/

Found 52 items
drwxr-xr-x   - ubuntu hadoop          0 2022-12-22 05:48 /user/testdata/02_06_2019-08_06_2019
drwxr-xr-x   - ubuntu hadoop          0 2022-12-21 07:56 /user/testdata/02_09_2018-08_09_2018
drwxr-xr-x   - ubuntu hadoop          0 2022-12-21 08:46 /user/testdata/02_12_2018-08_12_2018
drwxr-xr-x   - ubuntu hadoop          0 2022-12-21 08:55 /user/testdata/03_02_2019-09_02_2019
drwxr-xr-x   - ubuntu hadoop          0 2022-12-21 08:59 /user/testdata/03_03_2019-09_03_2019
drwxr-xr-x   - ubuntu hadoop          0 2022-12-22 05:57 /user/testdata/04_08_2019-10_08_2019
drwxr-xr-x   - ubuntu hadoop          0 2022-12-21 08:42 /user/testdata/04_11_2018-10_11_2018
drwxr-xr-x   - ubuntu hadoop          0 2022-12-22 05:44 /user/testdata/05_05_2019-11_05_2019
drwxr-xr-x   - ubuntu hadoop          0 2022-12-21 07:44 /user/testdata/05_08_2018-11_08_2018
drwxr-xr-x   - ubuntu hadoop          0 2022-12-21 08:51 /user/testdata/06_01_2019-12_01_2019
drwxr-xr-x   - ubuntu hadoop      

In [48]:
!hdfs dfs -ls /user/testdata/05_05_2019-11_05_2019

Found 3 items
-rw-r--r--   1 ubuntu hadoop      51401 2022-12-22 05:44 /user/testdata/05_05_2019-11_05_2019/customers.csv
-rw-r--r--   1 ubuntu hadoop       3976 2022-12-22 05:44 /user/testdata/05_05_2019-11_05_2019/terminals.csv
-rw-r--r--   1 ubuntu hadoop     194244 2022-12-22 05:44 /user/testdata/05_05_2019-11_05_2019/transactions.csv


In [49]:
spark = SparkSession\
        .builder\
        .appName("trans_feature_engineering")\
        .getOrCreate()

spark.conf.set('spark.sql.repl.eagerEval.enabled', True)  # to pretty print pyspark.DataFrame in jupyter

In [73]:
customers = spark.read.csv('/user/testdata/05_05_2019-11_05_2019/customers.csv', inferSchema=True, header=True)
terminals = spark.read.csv('/user/testdata/05_05_2019-11_05_2019/terminals.csv', inferSchema=True, header=True)
transactions = spark.read.csv('/user/testdata/05_05_2019-11_05_2019/transactions.csv', inferSchema=True, header=True)

### Обзор customers

In [74]:
customers.printSchema()

root
 |-- CUSTOMER_ID: integer (nullable = true)
 |-- x_customer_id: double (nullable = true)
 |-- y_customer_id: double (nullable = true)
 |-- mean_amount: double (nullable = true)
 |-- std_amount: double (nullable = true)
 |-- mean_nb_tx_per_day: double (nullable = true)
 |-- available_terminals: string (nullable = true)
 |-- nb_terminals: integer (nullable = true)



In [75]:
customers.limit(10)

CUSTOMER_ID,x_customer_id,y_customer_id,mean_amount,std_amount,mean_nb_tx_per_day,available_terminals,nb_terminals
0,54.88135039273247,71.51893663724195,62.262520726806166,31.131260363403083,2.1795327319875875,"[29, 87]",2
1,42.36547993389047,64.58941130666561,46.57078506995579,23.28539253497789,3.567092003128319,[5],1
2,96.36627605010293,38.34415188257777,80.21387861785314,40.10693930892657,2.115579679011618,[],0
3,56.80445610939323,92.5596638292661,11.74842552879926,5.87421276439963,0.3485171988061628,"[65, 94]",2
4,2.021839744032572,83.2619845547938,78.9248913402358,39.4624456701179,3.4800485929872766,[],0
5,97.8618342232764,79.91585642167236,48.84053941402853,24.420269707014263,3.122116705145822,[79],1
6,11.827442586893325,63.99210213275238,18.61856230385941,9.309281151929705,3.778675668198336,[],0
7,52.184832175007166,41.46619399905236,30.132783149939563,15.06639157496978,3.0969347577368667,[],0
8,45.615033221654855,56.84339488686485,6.785031041453738,3.392515520726869,2.470541988303508,"[8, 46]",2
9,61.20957227224214,61.69339968747569,94.6560674588893,47.32803372944465,2.7272811964139336,[84],1


### Обзор terminals

In [76]:
terminals.printSchema()

root
 |-- TERMINAL_ID: integer (nullable = true)
 |-- x_terminal_id: double (nullable = true)
 |-- y_terminal_id: double (nullable = true)



In [77]:
terminals.limit(10)

TERMINAL_ID,x_terminal_id,y_terminal_id
0,41.7022004702574,72.0324493442158
1,0.0114374817344886,30.233257263183976
2,14.675589081711305,9.23385947687978
3,18.62602113776709,34.556072704304775
4,39.67674742306699,53.88167340033569
5,41.91945144032948,68.52195003967594
6,20.445224973151745,87.81174363909454
7,2.7387593197926163,67.04675101784022
8,41.73048023671269,55.86898284457517
9,14.038693859523375,19.81014890848788


### Обзор transactions

In [78]:
transactions.printSchema()

root
 |-- TRANSACTION_ID: integer (nullable = true)
 |-- TX_DATETIME: string (nullable = true)
 |-- CUSTOMER_ID: integer (nullable = true)
 |-- TERMINAL_ID: integer (nullable = true)
 |-- TX_AMOUNT: double (nullable = true)
 |-- TX_TIME_SECONDS: integer (nullable = true)
 |-- TX_TIME_DAYS: integer (nullable = true)
 |-- TX_FRAUD: integer (nullable = true)
 |-- TX_FRAUD_SCENARIO: integer (nullable = true)



In [79]:
transactions.limit(10)

TRANSACTION_ID,TX_DATETIME,CUSTOMER_ID,TERMINAL_ID,TX_AMOUNT,TX_TIME_SECONDS,TX_TIME_DAYS,TX_FRAUD,TX_FRAUD_SCENARIO
0,2019-05-05 00:32:35,183,47,39.3,1955,0,0,0
1,2019-05-05 00:43:59,382,43,15.35,2639,0,0,0
2,2019-05-05 00:45:51,381,58,23.15,2751,0,0,0
3,2019-05-05 00:57:25,426,50,82.58,3445,0,0,0
4,2019-05-05 01:11:00,8,8,2.08,4260,0,0,0
5,2019-05-05 01:26:30,408,60,23.41,5190,0,0,0
6,2019-05-05 01:38:25,230,33,18.93,5905,0,0,0
7,2019-05-05 01:55:28,474,18,10.67,6928,0,0,0
8,2019-05-05 01:56:23,398,37,8.8,6983,0,0,0
9,2019-05-05 01:56:44,55,81,35.06,7004,0,0,0


### Препроцессинг данных (предварительный). Промежуточный итог будет сохранен в HDFS

In [80]:
# Удаляем вспомогательные при генерации столбцы, чтобы не было ликов при обучении
customers = customers.drop("available_terminals","nb_terminals")
transactions = transactions.drop("TX_FRAUD_SCENARIO")

In [81]:
# Удаляем мусорные признаки
transactions = transactions.drop("TX_TIME_SECONDS", "TX_TIME_DAYS")

In [82]:
# Сводим все в одну таблицу
result = transactions.join(customers, transactions.CUSTOMER_ID == customers.CUSTOMER_ID, "left")
result = result.join(terminals, result.TERMINAL_ID == terminals.TERMINAL_ID, "left")

In [83]:
# Работаем с временными признаками
result = result.withColumn('day_of_week', dayofweek(result.TX_DATETIME))
result = result.withColumn('hour', hour(result.TX_DATETIME))
result = result.drop("TX_DATETIME")

In [84]:
# Удаляем потенциально полезные признаки (надо проверить), чтобы не раздувать пространство
result = result.drop("TRANSACTION_ID", "CUSTOMER_ID", "TERMINAL_ID")

In [85]:
result.limit(10)

TX_AMOUNT,TX_FRAUD,x_customer_id,y_customer_id,mean_amount,std_amount,mean_nb_tx_per_day,x_terminal_id,y_terminal_id,day_of_week,hour
39.3,0,36.49118360212381,26.090449938105976,52.11717806047961,26.058589030239805,2.726959780277445,40.81368027612812,23.70269802430277,1,0
15.35,0,42.83785131058563,92.31590211737402,14.983995953654924,7.491997976827462,3.930295554723651,42.80911898712949,96.48400471483856,1,0
23.15,0,77.05440616163654,90.8248379234579,19.28322690278018,9.64161345139009,2.233133696676994,75.38761884612464,92.30245355464834,1,0
82.58,0,31.019549824420316,51.54330866863324,44.51556488982905,22.257782444914525,0.9250198119187538,32.664490177209615,52.70581022576093,1,0
2.08,0,45.615033221654855,56.84339488686485,6.785031041453738,3.392515520726869,2.470541988303508,41.73048023671269,55.86898284457517,1,1
23.41,0,4.276313794779885,0.036734375145786,31.0045997919041,15.50229989595205,1.8483901185097995,1.9880133839795588,2.621098687771928,1,1
18.93,0,65.73189166171419,51.732608351608015,51.071736290016695,25.535868145008347,3.6046486825966464,66.37946452197887,51.48891120583086,1,1
10.67,0,71.81865260891838,80.19572403734452,7.500526851988695,3.750263425994348,2.875515661085698,68.65009276815837,83.46256718973729,1,1
8.8,0,37.305452930520325,19.68520546653137,14.382189245102728,7.191094622551364,2.9944240233183117,39.76768369855336,16.53541971169328,1,1
35.06,0,62.89818435911487,87.26506554473953,30.9864933074854,15.493246653742698,3.192187335650255,61.99557183813798,82.89808995501787,1,1


### Сохранение данных в формате parquet

In [89]:
# Удаление старой версии, если она есть
#!hdfs dfs -rm -r /user/processed_data/05_05_2019-11_05_2019/processed.parquet

Deleted /user/processed_data/05_05_2019-11_05_2019/processed.parquet


In [90]:
result.write.parquet('/user/processed_data/05_05_2019-11_05_2019/processed.parquet')

### Проверка, что сохраненный spark'ом датафрейм номально открывается

In [145]:
data_processed = spark.read.parquet('/user/processed_data/05_05_2019-11_05_2019/processed.parquet')
data_processed.limit(10)

TX_AMOUNT,TX_FRAUD,x_customer_id,y_customer_id,mean_amount,std_amount,mean_nb_tx_per_day,x_terminal_id,y_terminal_id,day_of_week,hour
39.3,0,36.49118360212381,26.090449938105976,52.11717806047961,26.058589030239805,2.726959780277445,40.81368027612812,23.70269802430277,1,0
15.35,0,42.83785131058563,92.31590211737402,14.983995953654924,7.491997976827462,3.930295554723651,42.80911898712949,96.48400471483856,1,0
23.15,0,77.05440616163654,90.8248379234579,19.28322690278018,9.64161345139009,2.233133696676994,75.38761884612464,92.30245355464834,1,0
82.58,0,31.019549824420316,51.54330866863324,44.51556488982905,22.257782444914525,0.9250198119187538,32.664490177209615,52.70581022576093,1,0
2.08,0,45.615033221654855,56.84339488686485,6.785031041453738,3.392515520726869,2.470541988303508,41.73048023671269,55.86898284457517,1,1
23.41,0,4.276313794779885,0.036734375145786,31.0045997919041,15.50229989595205,1.8483901185097995,1.9880133839795588,2.621098687771928,1,1
18.93,0,65.73189166171419,51.732608351608015,51.071736290016695,25.535868145008347,3.6046486825966464,66.37946452197887,51.48891120583086,1,1
10.67,0,71.81865260891838,80.19572403734452,7.500526851988695,3.750263425994348,2.875515661085698,68.65009276815837,83.46256718973729,1,1
8.8,0,37.305452930520325,19.68520546653137,14.382189245102728,7.191094622551364,2.9944240233183117,39.76768369855336,16.53541971169328,1,1
35.06,0,62.89818435911487,87.26506554473953,30.9864933074854,15.493246653742698,3.192187335650255,61.99557183813798,82.89808995501787,1,1


### Препроцессинг данных (итоговый). Будет совмещен с шагом обучения. Этот артифакт необходимо сохранять.

In [146]:
train_data, valid_data = data_processed.randomSplit([0.8, 0.2], seed=26)

In [147]:
columns_to_scale = ["TX_AMOUNT", "x_customer_id", "y_customer_id", "mean_amount", "std_amount", "mean_nb_tx_per_day",
                   "x_terminal_id", "y_terminal_id"]

assemblers = [VectorAssembler(inputCols=[col], outputCol=col + "_vec") for col in columns_to_scale]
scalers = [MinMaxScaler(inputCol=col + "_vec", outputCol=col + "_scaled") for col in columns_to_scale]
stages = assemblers + scalers

In [148]:
# one-hot преобразование hour
stages.append(OneHotEncoder(inputCol="hour", outputCol="hour_encoded"))

# one-hot преобразование day_of_week
stages.append(OneHotEncoder(inputCol="day_of_week", outputCol="day_of_week_encoded"))

# Собираем все признаки вместе
stages.append(VectorAssembler(inputCols=[
    "hour_encoded",
    "day_of_week_encoded",
    "TX_AMOUNT_scaled",
    "x_customer_id_scaled",
    "y_customer_id_scaled",
    "mean_amount_scaled",
    "std_amount_scaled",
    "mean_nb_tx_per_day_scaled",
    "x_terminal_id_scaled",
    "y_terminal_id_scaled"
    ],
    outputCol="Features",
))

stages.append(LogisticRegression(featuresCol='Features', labelCol='TX_FRAUD'))

pipeline = Pipeline(stages=stages)

scalerModel = pipeline.fit(train_data)
valid_data = scalerModel.transform(valid_data)

In [150]:
valid_data.limit(10)

TX_AMOUNT,TX_FRAUD,x_customer_id,y_customer_id,mean_amount,std_amount,mean_nb_tx_per_day,x_terminal_id,y_terminal_id,day_of_week,hour,TX_AMOUNT_vec,x_customer_id_vec,y_customer_id_vec,mean_amount_vec,std_amount_vec,mean_nb_tx_per_day_vec,x_terminal_id_vec,y_terminal_id_vec,TX_AMOUNT_scaled,x_customer_id_scaled,y_customer_id_scaled,mean_amount_scaled,std_amount_scaled,mean_nb_tx_per_day_scaled,x_terminal_id_scaled,y_terminal_id_scaled,hour_encoded,day_of_week_encoded,Features,rawPrediction,probability,prediction
0.05,0,69.77284017849593,97.03753278372166,21.651174358493567,10.825587179246783,0.8078657136968128,69.0896917516924,99.73228504514805,3,13,[0.05],[69.77284017849593],[97.03753278372166],[21.651174358493567],[10.825587179246783],[0.8078657136968128],[69.0896917516924],[99.73228504514805],[4.35394685282208...,[0.7074728432802446],[0.9714849688071363],[0.1747608334150926],[0.1747608334150926],[0.1983454853821229],[0.697802268630947],[1.0],"(23,[13],[1.0])","(7,[3],[1.0])","(38,[13,26,30,31,...",[4.38175585227650...,[0.98765101883058...,0.0
0.92,0,85.90026396580586,15.202722720956952,5.063100766069427,2.531550383034713,3.7666711815590257,90.34019152878837,13.747470414623752,1,15,[0.92],[85.90026396580586],[15.202722720956952],[5.063100766069427],[2.5315503830347135],[3.7666711815590257],[90.34019152878835],[13.747470414623752],[0.00130618405584...,[0.8716354457350997],[0.15189080870242...,[0.0],[0.0],[0.9420759992203321],[0.913326598054459],[0.12685106267845...,"(23,[15],[1.0])","(7,[1],[1.0])","(38,[15,24,30,31,...",[5.57276449811888...,[0.99621442412645...,0.0
1.6,0,85.90026396580586,15.202722720956952,5.063100766069427,2.531550383034713,3.7666711815590257,90.34019152878837,13.747470414623752,5,8,[1.6],[85.90026396580586],[15.202722720956952],[5.063100766069427],[2.5315503830347135],[3.7666711815590257],[90.34019152878835],[13.747470414623752],[0.00229307867581...,[0.8716354457350997],[0.15189080870242...,[0.0],[0.0],[0.9420759992203321],[0.913326598054459],[0.12685106267845...,"(23,[8],[1.0])","(7,[5],[1.0])","(38,[8,28,30,31,3...",[3.77245148088268...,[0.97752129106014...,0.0
1.96,0,5.68480764332403,69.69972417249873,78.97606261440482,39.48803130720241,3.1096302473950126,1.9366957870297077,67.8835532939891,4,11,[1.96],[5.68480764332403],[69.69972417249873],[78.97606261440482],[39.48803130720241],[3.1096302473950126],[1.9366957870297075],[67.8835532939891],[0.00281555229815...,[0.05511457788489...,[0.6976906229702742],[0.7786974624161508],[0.7786974624161508],[0.7769210416246898],[0.01673101967787...,[0.6765860762384962],"(23,[11],[1.0])","(7,[4],[1.0])","(38,[11,27,30,31,...",[1.60652929160586...,[0.83292896645340...,0.0
2.13,0,14.038395779934689,22.73624490775018,11.540936626870526,5.770468313435263,2.8228401759584307,14.038693859523375,19.81014890848788,3,6,[2.13],[14.038395779934687],[22.73624490775018],[11.540936626870526],[5.770468313435263],[2.8228401759584307],[14.038693859523377],[19.81014890848788],[0.00306227595315...,[0.14014655735213...,[0.2273407399062686],[0.0682461400627495],[0.0682461400627495],[0.7048329880131982],[0.13947048682537...,[0.18841566925446...,"(23,[6],[1.0])","(7,[3],[1.0])","(38,[6,26,30,31,3...",[1.90532196516836...,[0.87049268220898...,0.0
2.25,0,8.379135291008454,51.61237007410931,25.886773780952662,12.943386890476331,1.097182815371006,7.002214371922233,48.63451109370318,2,16,[2.25],[8.379135291008454],[51.61237007410931],[25.886773780952662],[12.943386890476331],[1.097182815371006],[7.002214371922233],[48.63451109370318],[0.00323643382726...,[0.08254039891763...,[0.5165416745042586],[0.21938427211381...,[0.21938427211381...,[0.27106873773091...,[0.06810592886320...,[0.4811180646615925],"(23,[16],[1.0])","(7,[2],[1.0])","(38,[16,25,30,31,...",[2.09444467172924...,[0.89036205660618...,0.0
2.28,0,85.90026396580586,15.202722720956952,5.063100766069427,2.531550383034713,3.7666711815590257,90.34019152878837,13.747470414623752,5,6,[2.28],[85.90026396580586],[15.202722720956952],[5.063100766069427],[2.5315503830347135],[3.7666711815590257],[90.34019152878835],[13.747470414623752],[0.00327997329579...,[0.8716354457350997],[0.15189080870242...,[0.0],[0.0],[0.9420759992203321],[0.913326598054459],[0.12685106267845...,"(23,[6],[1.0])","(7,[5],[1.0])","(38,[6,28,30,31,3...",[2.94040768736527...,[0.94980816589657...,0.0
2.83,0,77.05440616163654,90.8248379234579,19.28322690278018,9.64161345139009,2.233133696676994,75.38761884612464,92.30245355464834,5,19,[2.83],[77.05440616163654],[90.8248379234579],[19.28322690278018],[9.64161345139009],[2.233133696676994],[75.38761884612464],[92.30245355464834],[0.00407819688547...,[0.7815926058352345],[0.909263422473988],[0.14981372496769...,[0.14981372496769...,[0.5566033366848235],[0.7616763670297363],[0.9245523817399148],"(23,[19],[1.0])","(7,[5],[1.0])","(38,[19,28,30,31,...",[3.73110351263206...,[0.97659456926518...,0.0
2.88,0,61.434629997963086,22.149017880295773,9.203937274950611,4.601968637475306,1.7250313873675929,56.81004619199421,20.329323466099048,1,7,[2.88],[61.434629997963086],[22.149017880295773],[9.203937274950611],[4.601968637475306],[1.7250313873675927],[56.81004619199421],[20.329323466099048],[0.00415076266635...,[0.622597397511348],[0.2214595282068934],[0.04362508010925...,[0.04362508010925...,[0.42888585214938...,[0.5732610859494527],[0.19368772473554...,"(23,[7],[1.0])","(7,[1],[1.0])","(38,[7,24,30,31,3...",[4.29806393172957...,[0.98658748704185...,0.0
3.08,0,67.15276967657582,84.29732964010842,6.544014924350119,3.2720074621750594,2.571213501234048,68.65009276815837,83.46256718973729,6,5,[3.08],[67.15276967657582],[84.29732964010842],[6.544014924350119],[3.2720074621750594],[2.5712135012340482],[68.65009276815837],[83.46256718973729],[0.00444102578987...,[0.6808028928974161],[0.8438889483966956],[0.01560191972113...,[0.01560191972113...,[0.6415836672582996],[0.693343819317508],[0.834786096626296],"(23,[5],[1.0])","(7,[6],[1.0])","(38,[5,29,30,31,3...",[3.21077394782330...,[0.96123771295564...,0.0


In [155]:
evaluator = BinaryClassificationEvaluator(labelCol="TX_FRAUD", rawPredictionCol="prediction", metricName='areaUnderROC')
valid_roc = evaluator.evaluate(valid_data)
valid_roc

0.5235590778097983

In [156]:
train_data = scalerModel.transform(train_data)
train_roc = evaluator.evaluate(train_data)
train_roc

0.5207199387838426