# Feature Extraction with Spark MLlib

In [1]:
!pip install findspark

Defaulting to user installation because normal site-packages is not writeable
Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [2]:
import findspark
findspark.init()

In [3]:
!hdfs dfs -ls data

Found 1 items
-rw-r--r--   1 ubuntu hadoop 2807409271 2025-12-13 09:50 data/2019-08-22.txt


In [4]:
import warnings
warnings.filterwarnings('ignore')
spark_ui_port = 4040
app_name = "Otus"

from itertools import groupby

In [5]:
import pyspark

spark = (
    pyspark.sql.SparkSession
        .builder
        .appName(app_name)
        .config("spark.executor.memory", "1g")
        .config("spark.driver.memory", "1g")
        .getOrCreate()
)
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)  # to pretty print pyspark.DataFrame in jupyter

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/lib/spark/jars/log4j-slf4j-impl-2.17.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


25/12/13 10:08:03 WARN Utils: spark.executor.instances less than spark.dynamicAllocation.minExecutors is invalid, ignoring its setting, please update your configs.
25/12/13 10:08:19 WARN Utils: spark.executor.instances less than spark.dynamicAllocation.minExecutors is invalid, ignoring its setting, please update your configs.
25/12/13 10:08:19 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Attempted to request executors before the AM has registered!


In [6]:
spark

In [None]:
from pyspark.sql.types import *
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

schema = StructType([
    StructField("tranaction_id", IntegerType(), True),
    StructField("tx_datetime", TimestampType(), True),
    StructField("customer_id", IntegerType(), True),
    StructField("terminal_id", IntegerType(), True),
    StructField("tx_amount", DoubleType(), True),
    StructField("tx_time_seconds", IntegerType(), True),
    StructField("tx_time_days", IntegerType(), True),
    StructField("tx_fraud", IntegerType(), True),
    StructField("tx_fraud_scenario", IntegerType(), True),
])

df = (
    spark.read
    .format("csv")
    .option("header", "true")
    .option("comment", "#") 
    .option("inferSchema", "false")
    .schema(schema)
    .load("hdfs:///user/ubuntu/data/*")
)

df.show(5, truncate=False)
df.printSchema()

+-------------+-------------------+-----------+-----------+---------+---------------+------------+--------+-----------------+
|tranaction_id|tx_datetime        |customer_id|terminal_id|tx_amount|tx_time_seconds|tx_time_days|tx_fraud|tx_fraud_scenario|
+-------------+-------------------+-----------+-----------+---------+---------------+------------+--------+-----------------+
|1            |2019-08-22 05:10:37|0          |0          |90.55    |18637          |0           |0       |0                |
|2            |2019-08-22 19:05:33|0          |753        |35.38    |68733          |0           |0       |0                |
|3            |2019-08-22 07:21:33|0          |0          |80.41    |26493          |0           |0       |0                |
|4            |2019-08-22 09:06:17|1          |981        |102.83   |32777          |0           |0       |0                |
|5            |2019-08-22 18:41:25|3          |205        |34.2     |67285          |0           |0       |0          

In [40]:
(
    df
    .write
    .mode("overwrite")
    .parquet("hdfs:///user/ubuntu/tx_parquet")
)

                                                                                

In [32]:
type(df)

pyspark.sql.dataframe.DataFrame

In [41]:
# dataset shape
(len(df.columns), df.count())

                                                                                

(9, 46988417)

In [11]:
df.printSchema()

root
 |-- tranaction_id: integer (nullable = true)
 |-- tx_datetime: timestamp (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- terminal_id: integer (nullable = true)
 |-- tx_amount: double (nullable = true)
 |-- tx_time_seconds: integer (nullable = true)
 |-- tx_time_days: integer (nullable = true)
 |-- tx_fraud: integer (nullable = true)
 |-- tx_fraud_scenario: integer (nullable = true)



Так смотреть не удобно - отформатируем вывод

In [12]:
dt = df.dtypes
for r in dt:
    print(f"{r[0]:>25}\t{r[1]}")

            tranaction_id	int
              tx_datetime	timestamp
              customer_id	int
              terminal_id	int
                tx_amount	double
          tx_time_seconds	int
             tx_time_days	int
                 tx_fraud	int
        tx_fraud_scenario	int


Давайте cортируем по типам

In [13]:
for r in sorted(df.dtypes, key=lambda x: x[1]):
    print(f"{r[0]:>25}\t{r[1]}")

                tx_amount	double
            tranaction_id	int
              customer_id	int
              terminal_id	int
          tx_time_seconds	int
             tx_time_days	int
                 tx_fraud	int
        tx_fraud_scenario	int
              tx_datetime	timestamp


Соберем по типам

In [14]:
dt.sort(key=lambda x: x[1])

print('Data types:')
for k, g in groupby(dt, lambda x: x[1]):
    print(f'{k:<6} - {len(list(g))}')

Data types:
double - 1
int    - 7
timestamp - 1


## Кодирование категориальных признаков

### Проверка пропущенных

In [None]:
from pyspark.ml.feature import StringIndexer
from pyspark.sql import functions as F

In [42]:
# поиск некорректной суммы операций (tx_amount <= 0)
bad_cond = (F.col("tx_amount") <= 0)
bad_count = df.filter(bad_cond).count()

print("Строк с некорректной суммой операций:", bad_count)

df.filter(bad_cond).show(20, truncate=False)
df_clean = df.filter(~bad_cond)

print("Строк после очистки:", df.count())
df=df_clean



                                                                                

Строк с некорректной суммой операций: 884


                                                                                

+-------------+-------------------+-----------+-----------+---------+---------------+------------+--------+-----------------+
|tranaction_id|tx_datetime        |customer_id|terminal_id|tx_amount|tx_time_seconds|tx_time_days|tx_fraud|tx_fraud_scenario|
+-------------+-------------------+-----------+-----------+---------+---------------+------------+--------+-----------------+
|32612        |2019-08-22 15:04:07|20987      |461        |0.0      |54247          |0           |0       |0                |
|51947        |2019-08-22 07:59:23|33354      |145        |0.0      |28763          |0           |0       |0                |
|90921        |2019-08-22 11:40:41|58240      |145        |0.0      |42041          |0           |0       |0                |
|111111       |2019-08-22 20:06:49|71171      |228        |0.0      |72409          |0           |0       |0                |
|115880       |2019-08-22 13:08:59|74277      |723        |0.0      |47339          |0           |0       |0          



Строк после очистки: 46988417


                                                                                

In [43]:
# customer_id или terminal_id =0 (считаем, что такого не должно быть)
bad_cond = (
    (F.col("customer_id") == 0) |
    (F.col("terminal_id") == 0) |
    F.col("customer_id").isNull() |
    F.col("terminal_id").isNull()
)

bad_count = df.filter(bad_cond).count()
print("Строк с плохими customer_id/terminal_id:", bad_count)

df.filter(bad_cond).show(20, truncate=False)

df_clean = df.filter(~bad_cond)

# проверка
print("Строк всего:", df.count())
print("Строк после очистки:", df_clean.count())
df=df_clean

                                                                                

Строк с плохими customer_id/terminal_id: 2041701
+-------------+-------------------+-----------+-----------+---------+---------------+------------+--------+-----------------+
|tranaction_id|tx_datetime        |customer_id|terminal_id|tx_amount|tx_time_seconds|tx_time_days|tx_fraud|tx_fraud_scenario|
+-------------+-------------------+-----------+-----------+---------+---------------+------------+--------+-----------------+
|1            |2019-08-22 05:10:37|0          |0          |90.55    |18637          |0           |0       |0                |
|2            |2019-08-22 19:05:33|0          |753        |35.38    |68733          |0           |0       |0                |
|3            |2019-08-22 07:21:33|0          |0          |80.41    |26493          |0           |0       |0                |
|6            |2019-08-22 03:12:21|3          |0          |47.2     |11541          |0           |0       |0                |
|12           |2019-08-22 15:47:54|10         |0          |58.89    |

                                                                                

Строк всего: 46987533




Строк после очистки: 44945832


                                                                                

In [None]:
# Некорректные datetime

bad_cond = (
    (F.col("tx_datetime").isNull())
)

bad_count = df.filter(bad_cond).count()
print("Строк с некорректными datetime:", bad_count)

df.filter(bad_cond).show(20, truncate=False)

df_clean = df.filter(~bad_cond)

# проверка
print("Строк всего:", df.count())
print("Строк после очистки:", df_clean.count())
df=df_clean

                                                                                

Строк с некорректными datetime: 95


                                                                                

+-------------+-----------+-----------+-----------+---------+---------------+------------+--------+-----------------+
|tranaction_id|tx_datetime|customer_id|terminal_id|tx_amount|tx_time_seconds|tx_time_days|tx_fraud|tx_fraud_scenario|
+-------------+-----------+-----------+-----------+---------+---------------+------------+--------+-----------------+
|933817       |null       |597125     |611        |62.83    |86400          |0           |0       |0                |
|1205236      |null       |769896     |44         |15.1     |86400          |0           |0       |0                |
|1543099      |null       |985197     |967        |85.18    |86400          |0           |0       |0                |
|1670385      |null       |66533      |956        |21.39    |172800         |1           |0       |0                |
|1781446      |null       |137885     |124        |13.7     |172800         |1           |0       |0                |
|2615257      |null       |669953     |489        |40.75

                                                                                

Строк всего: 44945832




Строк после очистки: 44945737


                                                                                

In [None]:
# размещение в backet
from pyspark.sql import SparkSession
spark = (
    SparkSession.builder
    .appName("clean-transactions")
    .getOrCreate()
)

sc = spark.sparkContext
hconf = sc._jsc.hadoopConfiguration()



hconf.set("fs.s3a.endpoint", "storage.yandexcloud.net")
hconf.set("fs.s3a.access.key", "")
hconf.set("fs.s3a.secret.key", "")
hconf.set("fs.s3a.path.style.access", "true")
hconf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")

target_path = ""

(df
    .write
    .mode("overwrite") 
    .parquet(target_path))



25/12/13 12:23:15 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


                                                                                

In [64]:
terminal_indexer = StringIndexer(inputCol="terminal_id", outputCol="terminal_indexer_id")
terminal_indexer_model = terminal_indexer.fit(df)
df_prep = terminal_indexer_model.transform(df)
df_prep.show(5)

                                                                                

+-------------+-------------------+-----------+-----------+---------+---------------+------------+--------+-----------------+-------------------+
|tranaction_id|        tx_datetime|customer_id|terminal_id|tx_amount|tx_time_seconds|tx_time_days|tx_fraud|tx_fraud_scenario|terminal_indexer_id|
+-------------+-------------------+-----------+-----------+---------+---------------+------------+--------+-----------------+-------------------+
|            1|2019-08-22 05:10:37|          0|          0|    90.55|          18637|           0|       0|                0|                0.0|
|            2|2019-08-22 19:05:33|          0|        753|    35.38|          68733|           0|       0|                0|               81.0|
|            3|2019-08-22 07:21:33|          0|          0|    80.41|          26493|           0|       0|                0|                0.0|
|            4|2019-08-22 09:06:17|          1|        981|   102.83|          32777|           0|       0|                0

                                                                                

### Pclass

In [None]:
df_prep.filter(df_prep['Pclass'].isNull()).count()

In [None]:
from pyspark.ml.feature import OneHotEncoder

pclass_indexer = (
    StringIndexer()
        .setInputCol('Pclass')
        .setOutputCol('PclassIndex')
)

pclass_indexer_model = pclass_indexer.fit(df_prep)
df_prep = pclass_indexer_model.transform(df_prep)
df_prep.show(5)

In [None]:
pclass_encoder = OneHotEncoder()\
    .setInputCol('PclassIndex')\
    .setOutputCol('PclassEncoded')

pclass_encoder_model = pclass_encoder.fit(df_prep)
df_prep = pclass_encoder_model.transform(df_prep)
df_prep.show(5)

In [None]:
df_prep.head(10)

In [None]:
df_prep.head()[-1]

### Embarked

In [None]:
df_prep.filter(df_prep['Embarked'].isNull()).count()

<div class="alert alert-warning">

<b>Warning! Частая ошибка</b>



StringIndexer может работать с пропущенными значениями только в формате <b>NaN</b>, но не <b>NULL</b>!

Если мы закодируем значения `Embarked` то мы не увидим ошибки. Мы получим ошибку только при обращении к этой строке!
    
</div>

In [None]:
embark_indexer = StringIndexer()\
    .setInputCol('Embarked')\
    .setOutputCol('EmbarkedIndex')

df_err = embark_indexer.fit(df_prep).transform(df_prep)
df_err.show(5)

In [None]:
df_err.show(62)

#### Заполним пропуски

In [None]:
df_prep = df_prep.fillna('X', subset=['Embarked'])
df_prep.show(62)

In [None]:
embark_indexer = StringIndexer()\
    .setInputCol('Embarked')\
    .setOutputCol('EmbarkedIndex')

embark_indexer_model = embark_indexer.fit(df_prep)
df_prep = embark_indexer_model.transform(df_prep)
df_prep.show(62)

In [None]:
embarked_encoder = OneHotEncoder()\
    .setInputCol('EmbarkedIndex')\
    .setOutputCol('EmbarkedEncoded')

embarked_encoder_model = embarked_encoder.fit(df_prep)
df_prep = embarked_encoder_model.transform(df_prep)
df_prep.show(5)

## Нормализация числовых признаков

### Age

In [None]:
from pyspark.ml.feature import Imputer

In [None]:
age_imputer = Imputer()\
    .setInputCol('Age')\
    .setOutputCol('AgeImputed')\
    .setStrategy('mean')


age_imputer_model = age_imputer.fit(df_prep)
df_prep = age_imputer_model.transform(df_prep)
df_prep.show(62)

In [None]:
from pyspark.ml.feature import VectorAssembler

In [None]:
age_assembler = VectorAssembler()\
    .setInputCols(["AgeImputed"])\
    .setOutputCol("AgeVector")

df_prep = age_assembler.transform(df_prep)
df_prep.show(5)

In [None]:
from pyspark.ml.feature import MinMaxScaler

In [None]:
age_scaler = MinMaxScaler(inputCol="", outputCol="AgeScaled")\
    .setInputCol('AgeVector')\
    .setOutputCol('AgeScaled')

age_scaler_model = age_scaler.fit(df_prep)
df_prep = age_scaler_model.transform(df_prep)
df_prep.show(5)

In [None]:
df_prep.filter(df_prep['Fare'].isNull()).count()

In [None]:
fare_imputer = Imputer(inputCol="Fare", outputCol="FareImputed")
fare_imputer_model = fare_imputer.fit(df_prep)
df_prep = fare_imputer_model.transform(df_prep)
df_prep.show(5)

In [None]:
fare_assembler = VectorAssembler(inputCols=["FareImputed"], outputCol="FareVector")
df_prep = fare_assembler.transform(df_prep)
df_prep.show(5)

In [None]:
from pyspark.ml.feature import RobustScaler

fare_scaler = RobustScaler(inputCol="FareVector", outputCol="FareScaled")
fare_scaler_model = fare_scaler.fit(df_prep)
df_prep = fare_scaler_model.transform(df_prep)
df_prep.show(5)


## Собираем вектор признаков

Для алгоритмов МО из Spark MlLib нужно подавать на вход столбец с вектором признаков

In [None]:
from pyspark.ml.feature import VectorAssembler

features_assembler = VectorAssembler(inputCols=[
    "SexIndex",
    "PclassEncoded",
    "AgeScaled",
    "FareScaled",
    ],
    outputCol="Features",
)

prep_df = df
prep_df = sex_indexer_model.transform(prep_df)
prep_df = pclass_indexer_model.transform(prep_df)
prep_df = pclass_encoder_model.transform(prep_df)
prep_df = age_imputer_model.transform(prep_df)
prep_df = age_assembler.transform(prep_df)
prep_df = age_scaler_model.transform(prep_df)
prep_df = fare_imputer_model.transform(prep_df)
prep_df = fare_assembler.transform(prep_df)
prep_df = fare_scaler_model.transform(prep_df)
feat_df = features_assembler.transform(prep_df)

feat_df.show(5)

## Конвейер

Объединим различные этапы подготовки признаков в единый конвейер

In [None]:
from pyspark.ml.pipeline import Pipeline

feat_ext_pipe = Pipeline(stages=[
    sex_indexer,
    pclass_indexer,
    pclass_encoder,
    age_imputer,
    age_assembler,
    age_scaler,
    fare_imputer_model,
    fare_assembler,
    fare_scaler_model,
    features_assembler,
]).fit(df)


In [None]:
feat_df = feat_ext_pipe.transform(df)
feat_df.show(5)

## Сохранение

Сохраним конвейер на диск для последующего использования при подготовке других данных

In [None]:
feat_ext_pipe.write().overwrite().save(f"{app_name}_feat_exty_pipe")

In [None]:
!hdfs dfs -ls {app_name}_feat_exty_pipe

In [None]:
!hdfs dfs -ls {app_name}_feat_exty_pipe/stages

## Обработка тестовых данных

In [None]:
from pyspark.ml.pipeline import PipelineModel

test_df = spark.read.csv("data/titanic/test.csv", inferSchema=True, header=True)

feat_ext_pipe_loaded = PipelineModel.load(f"{app_name}_feat_exty_pipe")

prep_test_df = feat_ext_pipe_loaded.transform(test_df)
prep_test_df.show(5)