In [1]:
!pip install pyspark findspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.2.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m3.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m14.1 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Created wheel for pyspark: filename=pyspark-3.3.2-py2.py3-none-any.whl size=281824025 sha256=d9f32c7159b22811a8378dd4133ee54d96da79b88486bde081c89d9f41ed4708
  Stored in directory: /root/.cache/pip/wheels/6c/e3/9b/0525ce8a69478916513509d43693511463c6468db0de237c86
Successfully built pyspark
Installing collected p

In [2]:
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf

conf = SparkConf().set('spark.ui.port', '4050').set('spark.serializer', 'org.apache.spark.serializer.KryoSerializer')\
                  .set('spark.dynamicAllocation.enabled', 'true')\
                  .set('spark.shuffle.service.enabled', 'true') #трекер, чтобы возвращать ресурсы
sc = SparkContext(conf=conf)
spark = SparkSession.builder.master('local[*]').getOrCreate()

Анализировать будет датасет отсюда https://www.kaggle.com/shelvigarg/credit-card-buyers

Definition

ID - Unique Identifier for a row

Gender - Gender of the Customer

Age - Age of the Customer (in Years)

Region_Code - Code of the Region for the customers

Occupation - Occupation Type for the customer

Channel_Code - Acquisition Channel Code for the Customer (Encoded)

Vintage - Vintage for the Customer (In Months)

Credit_Product - If the Customer has any active credit product (Home loan Personal loan, Credit Card etc.)

AvgAccountBalance - Average Account Balance for the Customer in last 12 Months

Is_Active - If the Customer is Active in last 3 Months

Загрузим данные и посмотрим, что там внутри

In [3]:
data = spark.read.csv('credit_card_data.csv', header=True, inferSchema=True)

In [4]:
data.printSchema()

root
 |-- ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Region_Code: string (nullable = true)
 |-- Occupation: string (nullable = true)
 |-- Channel_Code: string (nullable = true)
 |-- Vintage: integer (nullable = true)
 |-- Credit_Product: string (nullable = true)
 |-- Avg_Account_Balance: integer (nullable = true)
 |-- Is_Active: string (nullable = true)
 |-- Is_Lead: integer (nullable = true)



In [5]:
data.show()

+--------+------+---+-----------+-------------+------------+-------+--------------+-------------------+---------+-------+
|      ID|Gender|Age|Region_Code|   Occupation|Channel_Code|Vintage|Credit_Product|Avg_Account_Balance|Is_Active|Is_Lead|
+--------+------+---+-----------+-------------+------------+-------+--------------+-------------------+---------+-------+
|NNVBBKZB|Female| 73|      RG268|        Other|          X3|     43|            No|            1045696|       No|      0|
|IDD62UNG|Female| 30|      RG277|     Salaried|          X1|     32|            No|             581988|       No|      0|
|HD3DSEMC|Female| 56|      RG268|Self_Employed|          X3|     26|            No|            1484315|      Yes|      0|
|BF3NC7KV|  Male| 34|      RG270|     Salaried|          X1|     19|            No|             470454|       No|      0|
|TEASRWXV|Female| 30|      RG282|     Salaried|          X1|     33|            No|             886787|       No|      0|
|ACUTYTWS|  Male| 56|   

Посмотрим различные базовые вещи

In [6]:
from pyspark.sql.functions import col,isnan, when, count

In [7]:
data.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in data.columns]).show()

+---+------+---+-----------+----------+------------+-------+--------------+-------------------+---------+-------+
| ID|Gender|Age|Region_Code|Occupation|Channel_Code|Vintage|Credit_Product|Avg_Account_Balance|Is_Active|Is_Lead|
+---+------+---+-----------+----------+------------+-------+--------------+-------------------+---------+-------+
|  0|     0|  0|          0|         0|           0|      0|         29325|                  0|        0|      0|
+---+------+---+-----------+----------+------------+-------+--------------+-------------------+---------+-------+



Пропуски только в кредитном продукте, логично заметь на тип, что кредита нет

In [8]:
data.select(col('Credit_Product')).groupBy('Credit_Product').count().show()

+--------------+------+
|Credit_Product| count|
+--------------+------+
|          null| 29325|
|            No|144357|
|           Yes| 72043|
+--------------+------+



In [9]:
data = data.fillna({'Credit_Product': 'No'})

Проверим

In [10]:
data.select(col('Credit_Product')).groupBy('Credit_Product').count().show()

+--------------+------+
|Credit_Product| count|
+--------------+------+
|            No|173682|
|           Yes| 72043|
+--------------+------+



Посмотри на данные с точки зрения дисбаланса классов

In [11]:
data.count()

245725

In [12]:
import pyspark.sql.functions as F

In [13]:
data.select(col('Is_Lead'))\
    .groupBy('Is_Lead')\
    .count()\
    .withColumn('count', F.round(col('count') / data.count(), 2))\
    .show()

+-------+-----+
|Is_Lead|count|
+-------+-----+
|      1| 0.24|
|      0| 0.76|
+-------+-----+



Ладно, достаточно, мы тут сейчас говорим про MLlib, всякие анализы - тема прошлого семинара

**Некоторые преобразования данных**

Начнем с простой обработки категориальных переменных

In [14]:
from pyspark.ml.feature import StringIndexer, IndexToString, OneHotEncoder

In [15]:
gender_indexer = StringIndexer(inputCol="Gender", outputCol="GenderIndex")
gender_indexer = gender_indexer.fit(data)
data = gender_indexer.transform(data)

data.show()

+--------+------+---+-----------+-------------+------------+-------+--------------+-------------------+---------+-------+-----------+
|      ID|Gender|Age|Region_Code|   Occupation|Channel_Code|Vintage|Credit_Product|Avg_Account_Balance|Is_Active|Is_Lead|GenderIndex|
+--------+------+---+-----------+-------------+------------+-------+--------------+-------------------+---------+-------+-----------+
|NNVBBKZB|Female| 73|      RG268|        Other|          X3|     43|            No|            1045696|       No|      0|        1.0|
|IDD62UNG|Female| 30|      RG277|     Salaried|          X1|     32|            No|             581988|       No|      0|        1.0|
|HD3DSEMC|Female| 56|      RG268|Self_Employed|          X3|     26|            No|            1484315|      Yes|      0|        1.0|
|BF3NC7KV|  Male| 34|      RG270|     Salaried|          X1|     19|            No|             470454|       No|      0|        0.0|
|TEASRWXV|Female| 30|      RG282|     Salaried|          X1|  

In [16]:
gender_indexer.labels

['Male', 'Female']

Обратная трансформация доступна через метод

In [17]:
converter = IndexToString(inputCol="GenderIndex", outputCol="originalGender")
data = converter.transform(data)
data.show()

+--------+------+---+-----------+-------------+------------+-------+--------------+-------------------+---------+-------+-----------+--------------+
|      ID|Gender|Age|Region_Code|   Occupation|Channel_Code|Vintage|Credit_Product|Avg_Account_Balance|Is_Active|Is_Lead|GenderIndex|originalGender|
+--------+------+---+-----------+-------------+------------+-------+--------------+-------------------+---------+-------+-----------+--------------+
|NNVBBKZB|Female| 73|      RG268|        Other|          X3|     43|            No|            1045696|       No|      0|        1.0|        Female|
|IDD62UNG|Female| 30|      RG277|     Salaried|          X1|     32|            No|             581988|       No|      0|        1.0|        Female|
|HD3DSEMC|Female| 56|      RG268|Self_Employed|          X3|     26|            No|            1484315|      Yes|      0|        1.0|        Female|
|BF3NC7KV|  Male| 34|      RG270|     Salaried|          X1|     19|            No|             470454|   

Давайте аналогично поступим с каналом продаж и типом занятости

In [18]:
occupation_indexer = StringIndexer(inputCol="Occupation", outputCol="OccupationIndex")
occupation_indexer = occupation_indexer.fit(data)
data = occupation_indexer.transform(data)

channel_indexer = StringIndexer(inputCol="Channel_Code", outputCol="ChannelIndex")
channel_indexer = channel_indexer.fit(data)
data = channel_indexer.transform(data)

data.show()

+--------+------+---+-----------+-------------+------------+-------+--------------+-------------------+---------+-------+-----------+--------------+---------------+------------+
|      ID|Gender|Age|Region_Code|   Occupation|Channel_Code|Vintage|Credit_Product|Avg_Account_Balance|Is_Active|Is_Lead|GenderIndex|originalGender|OccupationIndex|ChannelIndex|
+--------+------+---+-----------+-------------+------------+-------+--------------+-------------------+---------+-------+-----------+--------------+---------------+------------+
|NNVBBKZB|Female| 73|      RG268|        Other|          X3|     43|            No|            1045696|       No|      0|        1.0|        Female|            2.0|         1.0|
|IDD62UNG|Female| 30|      RG277|     Salaried|          X1|     32|            No|             581988|       No|      0|        1.0|        Female|            1.0|         0.0|
|HD3DSEMC|Female| 56|      RG268|Self_Employed|          X3|     26|            No|            1484315|      Y

In [19]:
print(f'Occupation len = {len(occupation_indexer.labels)}, Channel_code len = {len(channel_indexer.labels)}')

Occupation len = 4, Channel_code len = 4


Тут по 4 категории, что самое простое, что приходит в голову? Правильно - OHE

In [20]:
ohe_encoder = OneHotEncoder(inputCols=["OccupationIndex", "ChannelIndex"],
                        outputCols=["OccupationVector", "ChannelVec"])
ohe_encoder = ohe_encoder.fit(data)
data = ohe_encoder.transform(data)

data.show()

+--------+------+---+-----------+-------------+------------+-------+--------------+-------------------+---------+-------+-----------+--------------+---------------+------------+----------------+-------------+
|      ID|Gender|Age|Region_Code|   Occupation|Channel_Code|Vintage|Credit_Product|Avg_Account_Balance|Is_Active|Is_Lead|GenderIndex|originalGender|OccupationIndex|ChannelIndex|OccupationVector|   ChannelVec|
+--------+------+---+-----------+-------------+------------+-------+--------------+-------------------+---------+-------+-----------+--------------+---------------+------------+----------------+-------------+
|NNVBBKZB|Female| 73|      RG268|        Other|          X3|     43|            No|            1045696|       No|      0|        1.0|        Female|            2.0|         1.0|   (3,[2],[1.0])|(3,[1],[1.0])|
|IDD62UNG|Female| 30|      RG277|     Salaried|          X1|     32|            No|             581988|       No|      0|        1.0|        Female|            1.0|

In [21]:
ohe_encoder.categorySizes

[4, 4]

Странный формат, не правда ли? Все из-за того, что тут у нас SparseVector

 На 4 категории нужен вектор размерности 3, а дальше храним позицию и 1 там, где нужная категория

In [22]:
data.select(col('OccupationVector')).head()

Row(OccupationVector=SparseVector(3, {2: 1.0}))

Теперь все надо собрать в одну структуру, чтобы можно было анализировать данные и строить модели

In [23]:
from pyspark.ml.feature import VectorAssembler

In [24]:
data.show(5)

+--------+------+---+-----------+-------------+------------+-------+--------------+-------------------+---------+-------+-----------+--------------+---------------+------------+----------------+-------------+
|      ID|Gender|Age|Region_Code|   Occupation|Channel_Code|Vintage|Credit_Product|Avg_Account_Balance|Is_Active|Is_Lead|GenderIndex|originalGender|OccupationIndex|ChannelIndex|OccupationVector|   ChannelVec|
+--------+------+---+-----------+-------------+------------+-------+--------------+-------------------+---------+-------+-----------+--------------+---------------+------------+----------------+-------------+
|NNVBBKZB|Female| 73|      RG268|        Other|          X3|     43|            No|            1045696|       No|      0|        1.0|        Female|            2.0|         1.0|   (3,[2],[1.0])|(3,[1],[1.0])|
|IDD62UNG|Female| 30|      RG277|     Salaried|          X1|     32|            No|             581988|       No|      0|        1.0|        Female|            1.0|

In [25]:
feature_columns = [
                   'Age',
                   'Vintage',
                   'Avg_Account_Balance',
                   'GenderIndex',
                   'OccupationVector',
                   'ChannelVec'                 
]

In [26]:
df_va = VectorAssembler(inputCols = feature_columns, outputCol = 'features')
data = df_va.transform(data)

In [27]:
data.printSchema()

root
 |-- ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Region_Code: string (nullable = true)
 |-- Occupation: string (nullable = true)
 |-- Channel_Code: string (nullable = true)
 |-- Vintage: integer (nullable = true)
 |-- Credit_Product: string (nullable = false)
 |-- Avg_Account_Balance: integer (nullable = true)
 |-- Is_Active: string (nullable = true)
 |-- Is_Lead: integer (nullable = true)
 |-- GenderIndex: double (nullable = false)
 |-- originalGender: string (nullable = true)
 |-- OccupationIndex: double (nullable = false)
 |-- ChannelIndex: double (nullable = false)
 |-- OccupationVector: vector (nullable = true)
 |-- ChannelVec: vector (nullable = true)
 |-- features: vector (nullable = true)



In [28]:
data.show()

+--------+------+---+-----------+-------------+------------+-------+--------------+-------------------+---------+-------+-----------+--------------+---------------+------------+----------------+-------------+--------------------+
|      ID|Gender|Age|Region_Code|   Occupation|Channel_Code|Vintage|Credit_Product|Avg_Account_Balance|Is_Active|Is_Lead|GenderIndex|originalGender|OccupationIndex|ChannelIndex|OccupationVector|   ChannelVec|            features|
+--------+------+---+-----------+-------------+------------+-------+--------------+-------------------+---------+-------+-----------+--------------+---------------+------------+----------------+-------------+--------------------+
|NNVBBKZB|Female| 73|      RG268|        Other|          X3|     43|            No|            1045696|       No|      0|        1.0|        Female|            2.0|         1.0|   (3,[2],[1.0])|(3,[1],[1.0])|[73.0,43.0,104569...|
|IDD62UNG|Female| 30|      RG277|     Salaried|          X1|     32|            

В полученном features можно автоматичеки проанализировать все переменные и если у кого-то уникальных значений меньше заданного вами порога, то они автоматичсеки переведутся в индексы при помощи pyspark.ml.feature import VectorIndexer

**Статистика**

В ml pyspark есть некоторые статистические методы, которые можно использовать для анализа

Корреляция

In [29]:
from pyspark.ml.stat import Correlation

In [30]:
corr = Correlation.corr(data, 'features', method='pearson').collect()[0][0]

In [31]:
corr

DenseMatrix(10, 10, [1.0, 0.6312, 0.1452, -0.1521, 0.1527, -0.5632, 0.3948, -0.6646, ..., 0.0102, -0.116, 0.2933, -0.3238, 0.0005, -0.5272, -0.3843, 1.0], False)

In [32]:
corr.toArray()

array([[ 1.00000000e+00,  6.31242411e-01,  1.45232189e-01,
        -1.52075940e-01,  1.52651808e-01, -5.63226982e-01,
         3.94834177e-01, -6.64600051e-01,  4.56338440e-01,
         2.73153253e-01],
       [ 6.31242411e-01,  1.00000000e+00,  1.67433481e-01,
        -1.46379743e-01,  2.21023818e-01, -4.10109383e-01,
         1.55662661e-01, -5.71828453e-01,  5.38828562e-01,
         1.44931244e-01],
       [ 1.45232189e-01,  1.67433481e-01,  1.00000000e+00,
        -2.24772031e-02,  3.46714040e-03, -7.16906860e-02,
         6.03874569e-02, -9.81785292e-02,  1.06905544e-01,
         1.01634033e-02],
       [-1.52075940e-01, -1.46379743e-01, -2.24772031e-02,
         1.00000000e+00, -8.58626857e-02,  1.22439249e-01,
        -2.58175123e-02,  1.84372479e-01, -8.07817702e-02,
        -1.16018433e-01],
       [ 1.52651808e-01,  2.21023818e-01,  3.46714040e-03,
        -8.58626857e-02,  1.00000000e+00, -5.37283514e-01,
        -5.27660791e-01, -4.34990948e-01,  1.63662837e-01,
         2.

Можно вычислить корреляцию спирмена

In [33]:
corr = Correlation.corr(data, 'features', method='spearman').collect()[0][0]
corr.toArray()

array([[ 1.00000000e+00,  6.52477119e-01,  1.78395617e-01,
        -1.66276503e-01,  2.50479037e-01, -5.94192974e-01,
         3.16280765e-01, -7.14324073e-01,  4.68523607e-01,
         3.11874367e-01],
       [ 6.52477119e-01,  1.00000000e+00,  1.98039737e-01,
        -1.37241231e-01,  2.26617372e-01, -4.03612329e-01,
         1.44248315e-01, -5.43361590e-01,  4.88850315e-01,
         1.86509197e-01],
       [ 1.78395617e-01,  1.98039737e-01,  1.00000000e+00,
        -3.35726184e-02,  1.74003362e-02, -9.65169583e-02,
         6.80005979e-02, -1.34528255e-01,  1.34941462e-01,
         2.41122172e-02],
       [-1.66276503e-01, -1.37241231e-01, -3.35726184e-02,
         1.00000000e+00, -8.58626857e-02,  1.22439249e-01,
        -2.58175123e-02,  1.84372479e-01, -8.07817702e-02,
        -1.16018433e-01],
       [ 2.50479037e-01,  2.26617372e-01,  1.74003362e-02,
        -8.58626857e-02,  1.00000000e+00, -5.37283514e-01,
        -5.27660791e-01, -4.34990948e-01,  1.63662837e-01,
         2.

Можно использовать хи-квадрат тест для оценки независимости каждой переменной в features относительно целевого признака, но этот тест для категориальных переменных, поэтому для примера на одной фиче

In [34]:
from pyspark.ml.stat import ChiSquareTest, KolmogorovSmirnovTest, Summarizer

In [35]:
r = ChiSquareTest.test(data, "OccupationVector", "Is_Lead")

In [36]:
r.show()

+--------------------+----------------+--------------------+
|             pValues|degreesOfFreedom|          statistics|
+--------------------+----------------+--------------------+
|[0.0,0.0,1.161583...|       [1, 1, 1]|[1420.86324574575...|
+--------------------+----------------+--------------------+



KS-тест

In [37]:
data.select(
    F.mean(col('Age')).alias('mean_Age'),
    F.stddev(col('Age')).alias('std_Age')
).collect()

[Row(mean_Age=43.85630684708516, std_Age=14.828671804648007)]

In [38]:
ks = KolmogorovSmirnovTest.test(data, 'Age', 'norm', 44, 15).first()

In [39]:
ks

Row(pValue=2.045950076023928e-10, statistic=0.12561207843265515)

Еще можно посчитать разные статистики

In [40]:
summarizer = Summarizer.metrics("mean", "count")
data.select(summarizer.summary(data.features)).show(truncate=False)

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|aggregate_metrics(features, 1.0)                                                                                                                                                                        |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|{[43.8563068470854,46.95914131651267,1128403.101019436,0.4538732322718486,0.4105646556109472,0.29300640960423235,0.2855753382846678,0.42208973445925324,0.2796296673110184,0.27561705158205313], 245725}|
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

**Работа с фичами**

Квантизация

In [41]:
from pyspark.ml.feature import QuantileDiscretizer

Обучаем

In [42]:
discretizer = QuantileDiscretizer(numBuckets=5, inputCol="Age", outputCol="Age_quant")
discretizer = discretizer.fit(data)

In [43]:
data = discretizer.transform(data)

In [44]:
data.select('Age', 'Age_quant')\
    .groupby('Age_quant').agg(
        F.min('Age').alias('min_age'),
        F.max('Age').alias('max_age'),
        F.count('Age').alias('count')
    )\
    .orderBy('Age_quant')\
    .show(5)

+---------+-------+-------+-----+
|Age_quant|min_age|max_age|count|
+---------+-------+-------+-----+
|      0.0|     23|     28|43790|
|      1.0|     29|     35|52017|
|      2.0|     36|     46|46007|
|      3.0|     47|     55|50808|
|      4.0|     56|     85|53103|
+---------+-------+-------+-----+



Заполнить пропуски можно через Imputer

Заполнять пропуски умеет только для числовых переменных, поэтому попробуем на игрушечном примере



In [45]:
from pyspark.ml.feature import Imputer

In [46]:
df = spark.createDataFrame([
    (1.0, float("nan")),
    (2.0, float("nan")),
    (float("nan"), 3.0),
    (4.0, 4.0),
    (5.0, 5.0)
], ["a", "b"])

#стратегия может быть 'mean', 'median', 'mode'
#через setMissingValue(0.0) можно сказать, что пропуски - это 0
imputer = Imputer(inputCols=["a", "b"], outputCols=["out_a", "out_b"], strategy='mean')
imputer = imputer.fit(df)
imputer.transform(df).show()

+---+---+-----+-----+
|  a|  b|out_a|out_b|
+---+---+-----+-----+
|1.0|NaN|  1.0|  4.0|
|2.0|NaN|  2.0|  4.0|
|NaN|3.0|  3.0|  3.0|
|4.0|4.0|  4.0|  4.0|
|5.0|5.0|  5.0|  5.0|
+---+---+-----+-----+



**Pipeline**

Как и в scikit-learn можно создавать пайплайны обработки данных

Мы много делали преобразований, давайте соберем все в 1 пайплайн

In [47]:
from pyspark.ml import Pipeline

In [48]:
#string в индесы
gender_indexer = StringIndexer(inputCol="Gender", outputCol="GenderIndex")
occupation_indexer = StringIndexer(inputCol="Occupation", outputCol="OccupationIndex")
channel_indexer = StringIndexer(inputCol="Channel_Code", outputCol="ChannelIndex")

#OHE
ohe_encoder = OneHotEncoder(inputCols=["OccupationIndex", "ChannelIndex"],
                        outputCols=["OccupationVector", "ChannelVec"])

#квантизация
discretizer = QuantileDiscretizer(numBuckets=5, inputCol="Age", outputCol="Age_quant")

#собираем все в вектор
feature_columns = [
                   'Age',
                   'Vintage',
                   'Avg_Account_Balance',
                   'GenderIndex',
                   'OccupationVector',
                   'ChannelVec',
                   'Age_quant'                 
]
vector_assembler = VectorAssembler(inputCols = feature_columns, outputCol = 'features')

собираем все в пайплайн

In [49]:
pipeline = Pipeline(stages=[
                           gender_indexer,
                           occupation_indexer,
                           channel_indexer,
                           ohe_encoder,
                           discretizer,
                           vector_assembler,
])

Давайте заново загрузим данные и сделаем трансформацию

---



In [50]:
data = spark.read.csv('credit_card_data.csv', header=True, inferSchema=True)
data = data.fillna({'Credit_Product': 'No'})
pipeline = pipeline.fit(data)

In [51]:
transformed_data = pipeline.transform(data)

In [52]:
transformed_data.show()

+--------+------+---+-----------+-------------+------------+-------+--------------+-------------------+---------+-------+-----------+---------------+------------+----------------+-------------+---------+--------------------+
|      ID|Gender|Age|Region_Code|   Occupation|Channel_Code|Vintage|Credit_Product|Avg_Account_Balance|Is_Active|Is_Lead|GenderIndex|OccupationIndex|ChannelIndex|OccupationVector|   ChannelVec|Age_quant|            features|
+--------+------+---+-----------+-------------+------------+-------+--------------+-------------------+---------+-------+-----------+---------------+------------+----------------+-------------+---------+--------------------+
|NNVBBKZB|Female| 73|      RG268|        Other|          X3|     43|            No|            1045696|       No|      0|        1.0|            2.0|         1.0|   (3,[2],[1.0])|(3,[1],[1.0])|      4.0|[73.0,43.0,104569...|
|IDD62UNG|Female| 30|      RG277|     Salaried|          X1|     32|            No|             5819

In [53]:
transformed_data.select('Is_Lead', 'features').show(5)

+-------+--------------------+
|Is_Lead|            features|
+-------+--------------------+
|      0|[73.0,43.0,104569...|
|      0|[30.0,32.0,581988...|
|      0|[56.0,26.0,148431...|
|      0|(11,[0,1,2,5,7,10...|
|      0|[30.0,33.0,886787...|
+-------+--------------------+
only showing top 5 rows



**Модельки**

Пора нам уже что-то обучить, начнем с логрега

In [54]:
from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel

In [55]:
lr = LogisticRegression(featuresCol='features', labelCol='Is_Lead', predictionCol='prediction',
                        maxIter=100, probabilityCol='proba')

lr = lr.fit(transformed_data)

Сохраним

In [56]:
lr.save('logreg_model')

Загрузка

In [57]:
lr2 = LogisticRegressionModel.load('logreg_model')

Коэффициенты и метрики

In [58]:
print("Coefficients: " + str(lr.coefficients))
print("Intercept: " + str(lr.intercept))

Coefficients: [-0.016713044807987118,0.009916392983822526,9.91826749028193e-09,-0.05296577197114642,-1.5152849582681478,-0.5266676981678328,-1.5009517711168423,-1.5679772035552135,-0.1459536443737886,-0.05760664451981407,0.34189553941632866]
Intercept: 0.09853176624209481


In [59]:
print("Coefficients: " + str(lr2.coefficients))
print("Intercept: " + str(lr2.intercept))

Coefficients: [-0.016713044807987118,0.009916392983822526,9.91826749028193e-09,-0.05296577197114642,-1.5152849582681478,-0.5266676981678328,-1.5009517711168423,-1.5679772035552135,-0.1459536443737886,-0.05760664451981407,0.34189553941632866]
Intercept: 0.09853176624209481


In [60]:
print(f'ROC_AUC = {lr.summary.areaUnderROC}')

ROC_AUC = 0.7291583664123229


In [61]:
lr.summary.recallByLabel

[0.9880226422744709, 0.10472138347515783]

In [62]:
lr.params

[Param(parent='LogisticRegression_1c32fc242e68', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2).'),
 Param(parent='LogisticRegression_1c32fc242e68', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'),
 Param(parent='LogisticRegression_1c32fc242e68', name='family', doc='The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial'),
 Param(parent='LogisticRegression_1c32fc242e68', name='featuresCol', doc='features column name.'),
 Param(parent='LogisticRegression_1c32fc242e68', name='fitIntercept', doc='whether to fit an intercept term.'),
 Param(parent='LogisticRegression_1c32fc242e68', name='labelCol', doc='label column name.'),
 Param(parent='LogisticRegression_1c32fc242e68', name='lowerBoundsOnCoefficients', doc='The lower bounds on coefficients if fitting under bound cons

In [63]:
lr.transform(transformed_data.select('Is_Lead', 'features')).show()

+-------+--------------------+--------------------+--------------------+----------+
|Is_Lead|            features|       rawPrediction|               proba|prediction|
+-------+--------------------+--------------------+--------------------+----------+
|      0|[73.0,43.0,104569...|[1.01703314359154...|[0.73439428907056...|       0.0|
|      0|[30.0,32.0,581988...|[1.88547782413292...|[0.86823905458579...|       0.0|
|      0|[56.0,26.0,148431...|[0.91147290916372...|[0.71330147094138...|       0.0|
|      0|(11,[0,1,2,5,7,10...|[2.02938356422968...|[0.88384780907754...|       0.0|
|      0|[30.0,33.0,886787...|[1.87253835313633...|[0.86675171295996...|       0.0|
|      0|(11,[0,1,2,4,7,10...|[2.23035701748859...|[0.90294265153487...|       0.0|
|      1|(11,[0,1,2,6,8,10...|[1.00819128083161...|[0.73266603083243...|       0.0|
|      0|[48.0,13.0,444724...|[1.25888814052434...|[0.77883464798448...|       0.0|
|      0|[40.0,38.0,127428...|[1.12259469904799...|[0.75446968923717...|    

**Подбор параметров**

Тут нет всяких hyperopt, optuna...есть стандартная кросс-валидация и поиск по сетке

In [64]:
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

Для этого соберем все в пайплайн. Можно было "вложить" старый пайплайн в новый, но соберем все с самого начала

In [65]:
#string в индесы
gender_indexer = StringIndexer(inputCol="Gender", outputCol="GenderIndex")
occupation_indexer = StringIndexer(inputCol="Occupation", outputCol="OccupationIndex")
channel_indexer = StringIndexer(inputCol="Channel_Code", outputCol="ChannelIndex")

#OHE
ohe_encoder = OneHotEncoder(inputCols=["OccupationIndex", "ChannelIndex"],
                        outputCols=["OccupationVector", "ChannelVec"])

#квантизация
discretizer = QuantileDiscretizer(numBuckets=5, inputCol="Age", outputCol="Age_quant")

#собираем все в вектор
feature_columns = [
                   'Age',
                   'Vintage',
                   'Avg_Account_Balance',
                   'GenderIndex',
                   'OccupationVector',
                   'ChannelVec',
                   'Age_quant'                 
]
vector_assembler = VectorAssembler(inputCols = feature_columns, outputCol = 'features')

lr = LogisticRegression(featuresCol='features', labelCol='Is_Lead', predictionCol='prediction',
                        maxIter=100, probabilityCol='proba')

In [66]:
pipeline = Pipeline(stages=[
                           gender_indexer,
                           occupation_indexer,
                           channel_indexer,
                           ohe_encoder,
                           discretizer,
                           vector_assembler,
                           lr
])

Сетка параметров

In [67]:
paramGrid = ParamGridBuilder() \
    .addGrid(discretizer.numBuckets, [5, 10]) \
    .addGrid(lr.maxIter, [10, 20]) \
    .build()

Разобьем данные на train, test

In [68]:
train, test = data.randomSplit([0.7, 0.3], seed=7)

Описываем стратегию кросс-валидации

In [69]:
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(rawPredictionCol='rawPrediction',
                                                                  labelCol='Is_Lead', metricName='areaUnderROC'),
                          numFolds=2,
                          parallelism=2)

Гоняем сетку. Знаю, перебор по сетке прошлый век, но что поделать)

In [70]:
cvModel = crossval.fit(train)

In [71]:
cvModel.avgMetrics

[0.7288569508077829,
 0.7292670310710259,
 0.7285752749191472,
 0.7291851930489013]

Параметры

In [72]:
import numpy as np
print(cvModel.getEstimatorParamMaps()[np.argmax(cvModel.avgMetrics)])

{Param(parent='QuantileDiscretizer_4391a4c5bf40', name='numBuckets', doc='Maximum number of buckets (quantiles, or categories) into which data points are grouped. Must be >= 2.'): 5, Param(parent='LogisticRegression_fcffd46edb39', name='maxIter', doc='max number of iterations (>= 0).'): 20}


Сделаем предикт

In [73]:
test_pred = cvModel.transform(test)

In [74]:
test_pred.show()

+--------+------+---+-----------+-------------+------------+-------+--------------+-------------------+---------+-------+-----------+---------------+------------+----------------+-------------+---------+--------------------+--------------------+--------------------+----------+
|      ID|Gender|Age|Region_Code|   Occupation|Channel_Code|Vintage|Credit_Product|Avg_Account_Balance|Is_Active|Is_Lead|GenderIndex|OccupationIndex|ChannelIndex|OccupationVector|   ChannelVec|Age_quant|            features|       rawPrediction|               proba|prediction|
+--------+------+---+-----------+-------------+------------+-------+--------------+-------------------+---------+-------+-----------+---------------+------------+----------------+-------------+---------+--------------------+--------------------+--------------------+----------+
|224VSEND|  Male| 29|      RG261|     Salaried|          X1|     13|            No|             736866|      Yes|      0|        0.0|            1.0|         0.0|   (

Проверим модель

In [75]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction',
                                          labelCol='Is_Lead', metricName='areaUnderROC')

In [76]:
evaluator.evaluate(test_pred)

0.7282672237430954

Сохраним пайплайн

In [77]:
cvModel.write().save('model')

вместо кросс-валидации можно взять TrainValidationSplit для подбора параметров, это train_test_split

**Ваша любимая домашка**

Кто проходил курс GPU прекрасно знают датасет.
Данные находятся в файле Train_Set_90621.csv
Amount Defaulted - эту переменную нужно удалить=)

Что ожидается? - творчество)

    1) Начните с анализа баланса классов, пропусков, статистик при помощи DataFrame API
    2) Посомтрите статистики, заполните пропуски при помощи уже MLlib
    3) Соберите пайплайн, похожий на наш, где будет обработка данных, обучение моделей и все при помощи Spark
    4) Разбейте данные на train/test + реализуйте подбор параметров одним из способов спарка
    5) Cохраниет пайплайн на диск
    6) Проверьте качество модели на отложенной test выборке

### Срок выполнения - 02.03.2023


In [164]:
data = spark.read.csv('Train_Set_90621.csv', header=True, inferSchema=True)

data = data.drop('Amount Defaulted')
data.cache()

DataFrame[Application ID: int, Bank Masked: string, Bank Type: string, Approved_Timestamp: string, Name Masked: string, Business Owner State: string, Business_Industry_Type_Code: int, Approved_Year: int, New Business: int, Term: int, BankState: string, Interest Rate: int, Employees: int, Gross Disbursed Amount: int, Term_years: int, Jobs Retained: int, Male to Female Employees Ratio: int, Expected Company Income: bigint, Funds available with company: int, Gross_Apprv_Amount: string, Company Branch Code: string, City or Rural: int, Jobs Generated: int, Carry-forward Credit: string, Documents Provided: string, Balance Left: string, Final_Appved_Amount: int, Default_Status: int]

In [165]:
data.printSchema()

root
 |-- Application ID: integer (nullable = true)
 |-- Bank Masked: string (nullable = true)
 |-- Bank Type: string (nullable = true)
 |-- Approved_Timestamp: string (nullable = true)
 |-- Name Masked: string (nullable = true)
 |-- Business Owner State: string (nullable = true)
 |-- Business_Industry_Type_Code: integer (nullable = true)
 |-- Approved_Year: integer (nullable = true)
 |-- New Business: integer (nullable = true)
 |-- Term: integer (nullable = true)
 |-- BankState: string (nullable = true)
 |-- Interest Rate: integer (nullable = true)
 |-- Employees: integer (nullable = true)
 |-- Gross Disbursed Amount: integer (nullable = true)
 |-- Term_years: integer (nullable = true)
 |-- Jobs Retained: integer (nullable = true)
 |-- Male to Female Employees Ratio: integer (nullable = true)
 |-- Expected Company Income: long (nullable = true)
 |-- Funds available with company: integer (nullable = true)
 |-- Gross_Apprv_Amount: string (nullable = true)
 |-- Company Branch Code: strin

In [166]:
data.show()

+--------------+---------------+---------+------------------+--------------------+--------------------+---------------------------+-------------+------------+----+-------------+-------------+---------+----------------------+----------+-------------+------------------------------+-----------------------+----------------------------+------------------+-------------------+-------------+--------------+--------------------+------------------+------------+-------------------+--------------+
|Application ID|    Bank Masked|Bank Type|Approved_Timestamp|         Name Masked|Business Owner State|Business_Industry_Type_Code|Approved_Year|New Business|Term|    BankState|Interest Rate|Employees|Gross Disbursed Amount|Term_years|Jobs Retained|Male to Female Employees Ratio|Expected Company Income|Funds available with company|Gross_Apprv_Amount|Company Branch Code|City or Rural|Jobs Generated|Carry-forward Credit|Documents Provided|Balance Left|Final_Appved_Amount|Default_Status|
+--------------+----

In [167]:
"""
  нужно привести типы 
  - Gross_Apprv_Amount (int)
  - Balance Left (int)
  - Company Branch Code (int)
"""

data = data.withColumns({'Gross_Apprv_Amount': col('Gross_Apprv_Amount').cast('Int'),
                         'Balance Left': col('Balance Left').cast('Int'),
                         'Company Branch Code': col('Company Branch Code').cast('Int')})

In [168]:
data.select(col('Default_Status'))\
    .groupBy('Default_Status')\
    .count()\
    .withColumn('count', F.round(col('count') / data.count(), 2))\
    .show()

+--------------+-----+
|Default_Status|count|
+--------------+-----+
|             1| 0.18|
|      10500000|  0.0|
|             0| 0.82|
+--------------+-----+



In [169]:
data.select(col('Default_Status'))\
    .filter(col('Default_Status')==10500000)\
    .count()
# фиг знает че это за 10500000 - удалю

1

In [170]:
#prep
data = data.filter(col('Default_Status')!=10500000)

In [171]:
data.withColumn("Company Branch Code", F.when(~F.col("Company Branch Code").isin(0,1),-1)\
                .otherwise(F.col("Company Branch Code")))\
    .select('Company Branch Code')\
    .groupBy('Company Branch Code')\
    .count()\
    .sort(col('count').desc())\
    .show()

data.select('Company Branch Code').distinct().count()
# в этой колонке много значений уникальных значений, c маленьким кол-вом наблюдений на каждое - заменю на общую категорию




+-------------------+------+
|Company Branch Code| count|
+-------------------+------+
|                  1|142888|
|                  0| 46688|
|                 -1| 11780|
+-------------------+------+



1635

In [172]:
#prep
data = data.withColumn("Company Branch Code", F.when(~F.col("Company Branch Code").isin(0,1),-1)\
                .otherwise(F.col("Company Branch Code")))

In [173]:
data.select(col('Approved_Year'))\
    .groupBy('Approved_Year')\
    .count()\
    .sort(col('Approved_Year').desc())\
    .show()

+-------------+-----+
|Approved_Year|count|
+-------------+-----+
|         2017| 2022|
|         2016| 3142|
|         2015| 3426|
|         2014| 3567|
|         2013| 5758|
|         2012|15975|
|         2011|34437|
|         2010|23266|
|         2009|19609|
|         2008|13799|
|         2007|11816|
|         2006|10875|
|         2005|13414|
|         2004|13837|
|         2003|11133|
|         2002|10599|
|         2001| 3754|
|         2000|  489|
|         1900|  438|
+-------------+-----+



In [174]:
data.select([F.count(F.when(F.isnan(c) | F.col(c).isNull(), c)).alias(c) for c in data.columns]).show(vertical=True)

-RECORD 0------------------------------
 Application ID                 | 0    
 Bank Masked                    | 82   
 Bank Type                      | 82   
 Approved_Timestamp             | 438  
 Name Masked                    | 1    
 Business Owner State           | 3    
 Business_Industry_Type_Code    | 0    
 Approved_Year                  | 0    
 New Business                   | 37   
 Term                           | 0    
 BankState                      | 83   
 Interest Rate                  | 0    
 Employees                      | 0    
 Gross Disbursed Amount         | 0    
 Term_years                     | 0    
 Jobs Retained                  | 0    
 Male to Female Employees Ratio | 0    
 Expected Company Income        | 3    
 Funds available with company   | 0    
 Gross_Apprv_Amount             | 0    
 Company Branch Code            | 0    
 City or Rural                  | 0    
 Jobs Generated                 | 0    
 Carry-forward Credit           | 1004 


In [178]:
"""
   Bank Masked = 'unknown'
   Bank Type = 'unknown'
   Approved_Timestamp = не буду использовать
   Name Masked = дропну
   Business Owner State = дропну
   Expected Company Income = дропну
   New Business = -1
   BankState = 'unknown'
   Carry-forward Credit = 'unknown'
   Documents Provided = 'unknown'
"""

data = data.dropna(subset=['Name Masked', 'Business Owner State', 'New Business', 'Expected Company Income'])\
           .fillna({'Bank Masked': 'unknown',
                    'Bank Type': 'unknown',
                    'BankState': 'unknown',
                    'Carry-forward Credit': 'unknown',
                    'Documents Provided': 'unknown'})
data.cache()

DataFrame[Application ID: int, Bank Masked: string, Bank Type: string, Approved_Timestamp: string, Name Masked: string, Business Owner State: string, Business_Industry_Type_Code: int, Approved_Year: int, New Business: int, Term: int, BankState: string, Interest Rate: int, Employees: int, Gross Disbursed Amount: int, Term_years: int, Jobs Retained: int, Male to Female Employees Ratio: int, Expected Company Income: bigint, Funds available with company: int, Gross_Apprv_Amount: int, Company Branch Code: int, City or Rural: int, Jobs Generated: int, Carry-forward Credit: string, Documents Provided: string, Balance Left: int, Final_Appved_Amount: int, Default_Status: int]

In [176]:
data.show(5)

+--------------+--------------+---------+------------------+--------------------+--------------------+---------------------------+-------------+------------+----+-------------+-------------+---------+----------------------+----------+-------------+------------------------------+-----------------------+----------------------------+------------------+-------------------+-------------+--------------+--------------------+------------------+------------+-------------------+--------------+
|Application ID|   Bank Masked|Bank Type|Approved_Timestamp|         Name Masked|Business Owner State|Business_Industry_Type_Code|Approved_Year|New Business|Term|    BankState|Interest Rate|Employees|Gross Disbursed Amount|Term_years|Jobs Retained|Male to Female Employees Ratio|Expected Company Income|Funds available with company|Gross_Apprv_Amount|Company Branch Code|City or Rural|Jobs Generated|Carry-forward Credit|Documents Provided|Balance Left|Final_Appved_Amount|Default_Status|
+--------------+------

## Pipeline

In [156]:
#string в индесы
bankstate_indexer = StringIndexer(inputCol="BankState", outputCol="BankStateIndex")
doc_prov_indexer = StringIndexer(inputCol="Documents Provided", outputCol="DocumentsProvidedIndex")
car_cred_indexer = StringIndexer(inputCol="Carry-forward Credit", outputCol="CarryforwardCreditIndex")

#OHE
ohe_encoder = OneHotEncoder(inputCols=["BankStateIndex", "DocumentsProvidedIndex", "CarryforwardCreditIndex", "New Business"],
                        outputCols=["BankStateVector", "DocumentsProvidedVec", "CarryforwardCreditVec", "NewBusinessVec"])

#квантизация
discretizer = QuantileDiscretizer(numBuckets=5, inputCol="Employees", outputCol="Employees_quant")

#собираем все в вектор
feature_columns = [
                   'NewBusinessVec',
                   'Term',
                   'Term_years',
                   'BankStateVector', 
                   'Interest Rate',
                   'Employees_quant',
                   'Gross Disbursed Amount',
                   'Jobs Retained',
                   'Male to Female Employees Ratio',
                   'Expected Company Income',
                   'Funds available with company',
                   'Gross_Apprv_Amount',
                   'Balance Left',
                   'Final_Appved_Amount',
                   'DocumentsProvidedVec', 
                   'CarryforwardCreditVec' 

]
vector_assembler = VectorAssembler(inputCols = feature_columns, outputCol = 'features')

lr = LogisticRegression(featuresCol='features', labelCol='Default_Status', predictionCol='prediction',
                        maxIter=100, probabilityCol='proba')

In [180]:
pipeline = Pipeline(stages=[
                           bankstate_indexer,
                           doc_prov_indexer,
                           car_cred_indexer,
                           ohe_encoder,
                           discretizer,
                           vector_assembler,
                           lr
])

In [182]:
paramGrid = ParamGridBuilder() \
    .addGrid(discretizer.numBuckets, [5, 10]) \
    .addGrid(lr.maxIter, [10, 20]) \
    .build()

In [183]:
train, test = data.randomSplit([0.7, 0.3], seed=7)

In [187]:
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(rawPredictionCol='rawPrediction',
                                                                  labelCol='Default_Status', metricName='areaUnderROC'),
                          numFolds=2,
                          parallelism=2)

In [188]:
cvModel = crossval.fit(train)

### Test

In [189]:
test_pred = cvModel.transform(test)

In [190]:
test_pred.show()

+--------------+---------------+---------+------------------+--------------------+--------------------+---------------------------+-------------+------------+----+-------------+-------------+---------+----------------------+----------+-------------+------------------------------+-----------------------+----------------------------+------------------+-------------------+-------------+--------------+--------------------+------------------+------------+-------------------+--------------+--------------+----------------------+-----------------------+---------------+--------------------+---------------------+--------------+---------------+--------------------+--------------------+--------------------+----------+
|Application ID|    Bank Masked|Bank Type|Approved_Timestamp|         Name Masked|Business Owner State|Business_Industry_Type_Code|Approved_Year|New Business|Term|    BankState|Interest Rate|Employees|Gross Disbursed Amount|Term_years|Jobs Retained|Male to Female Employees Ratio|Expe

In [193]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction',
                                          labelCol='Default_Status', metricName='areaUnderROC')

In [194]:
evaluator.evaluate(test_pred)

0.7042825580214932

### На диск

In [196]:
cvModel.write().save('model_defolt')