In [1]:
!pip install pyspark
!pip install faker

Collecting pyspark
  Downloading pyspark-3.3.2.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m3.6 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 kB[0m [31m15.2 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-3.3.2-py2.py3-none-any.whl size=281824025 sha256=f27aa3c4502b598e748addc14f4565ec578e5bfce02eb19f33e735c38a8f6930
  Stored in directory: /root/.cache/pip/wheels/5a/54/9b/a89cac960efb57c4c35d41cc7c9f7b80daa21108bc376339b7
Successfully built pyspark
Installing collected packages: py4j, pyspark
  Attempting uninstall: py4j
    Found existing installation: py4j 0.10.9.7
  

# Пример решения задачи на PySpark

Решение задачи классификации на синтезированных данных.

In [45]:
import pandas as pd
import numpy as np

from sklearn import datasets
from faker import Faker

import plotly.express as px

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.window import Window

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, MinMaxScaler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator


RAND = sum(ord(x) for x in 'NEVER SURRENDER')
SAMPLES = 10_000

In [3]:
faker = Faker('ru_RU')
Faker.seed(RAND)
np.random.seed(RAND)

In [4]:
spark = (SparkSession.builder 
         .appName('sparky')   
         .getOrCreate())

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/08 06:45:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Подготовка. Создание данных


Предположим, у нас есть данные о работниках предприятия и надо предсказать, доволен ли он работой в компании.  
Создадим информацию:
- 3 столбца неких зашифрованных данных
- код профессии
- дата выхода на работу
- зарплата

In [5]:
X, y = datasets.make_classification(n_samples=SAMPLES, 
                                    n_features=3, 
                                    n_redundant=0,
                                    flip_y=0.2,
                                    weights=[0.3],
                                    random_state=RAND)
X = pd.DataFrame(np.round(X, 4), columns=['var1', 'var2', 'var3'])

In [6]:
job = []
date = []
for _ in range(SAMPLES):
    job.append(faker.bothify(text='Job code: ?-#', letters='RQT'))
    date.append(faker.date_time_between(start_date='-500d', end_date='-50d'))

In [7]:
X['job'] = job
X['date'] = date
X['salary'] = np.round(np.random.normal(50, 7, SAMPLES) * 1_000)
X.loc[np.random.randint(0, SAMPLES, 50), 'salary'] = None
X['target'] = y

In [8]:
X.head(3)

Unnamed: 0,var1,var2,var3,job,date,salary,target
0,1.0171,0.3642,1.1831,Job code: Q-8,2022-10-10 09:44:37,45682.0,1
1,0.7311,-1.0879,-1.3944,Job code: T-7,2022-11-23 19:46:42,53476.0,1
2,0.9668,-0.2888,-0.3218,Job code: R-6,2022-04-30 07:17:21,39997.0,1


## EDA

Загрузка данных в PySpark

In [9]:
df =  spark.createDataFrame(X)
df.show(3)

[Stage 0:>                                                          (0 + 1) / 1]

+------+-------+-------+-------------+-------------------+-------+------+
|  var1|   var2|   var3|          job|               date| salary|target|
+------+-------+-------+-------------+-------------------+-------+------+
|1.0171| 0.3642| 1.1831|Job code: Q-8|2022-10-10 09:44:37|45682.0|     1|
|0.7311|-1.0879|-1.3944|Job code: T-7|2022-11-23 19:46:42|53476.0|     1|
|0.9668|-0.2888|-0.3218|Job code: R-6|2022-04-30 07:17:21|39997.0|     1|
+------+-------+-------+-------------+-------------------+-------+------+
only showing top 3 rows



                                                                                

In [10]:
df.printSchema()

root
 |-- var1: double (nullable = true)
 |-- var2: double (nullable = true)
 |-- var3: double (nullable = true)
 |-- job: string (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- salary: double (nullable = true)
 |-- target: long (nullable = true)



Для удобства подчистим немного. Обрежем время у даты, вытащим у специальности код.

In [11]:
df = df.withColumn('date', F.to_date('date')) \
       .withColumn('job', F.regexp_replace('job', 'Job code: ', ''))

In [12]:
df.show(3)

+------+-------+-------+---+----------+-------+------+
|  var1|   var2|   var3|job|      date| salary|target|
+------+-------+-------+---+----------+-------+------+
|1.0171| 0.3642| 1.1831|Q-8|2022-10-10|45682.0|     1|
|0.7311|-1.0879|-1.3944|T-7|2022-11-23|53476.0|     1|
|0.9668|-0.2888|-0.3218|R-6|2022-04-30|39997.0|     1|
+------+-------+-------+---+----------+-------+------+
only showing top 3 rows



In [13]:
df.pandas_api().isna().sum()

  "'PYARROW_IGNORE_TIMEZONE' environment variable was not set. It is required to "


var1       0
var2       0
var3       0
job        0
date       0
salary    50
target     0
dtype: int64

Есть пропущенные значения в столбце зарплаты.  
Заполним их средним значением по специальности:

In [14]:
w = Window.partitionBy('job')

df = df.withColumn('avg_salary', F.avg(F.col('salary').cast('int')).over(w)) \
       .withColumn('salary', F.when(F.isnan('salary'), F.col('avg_salary')).otherwise(F.col('salary'))) \
       .drop('avg_salary')

In [15]:
df.pandas_api().describe()

                                                                                

Unnamed: 0,var1,var2,var3,salary,target
count,10000.0,10000.0,10000.0,10000.0,10000.0
mean,0.39399,0.024355,-0.010211,50009.253514,0.657
std,1.110487,1.004017,1.270407,7032.571292,0.474735
min,-2.5664,-3.7152,-3.8349,22536.0,0.0
25%,-0.7698,-0.6582,-1.0315,45266.0,0.0
50%,0.7226,0.0268,-0.0014,50071.0,1.0
75%,1.1682,0.7113,1.0052,54726.0,1.0
max,4.6181,4.248,4.2337,78039.0,1.0


#### Посмотрим распределения

**Целевой**

In [16]:
pie_data = df.select('target').toPandas()
fig = px.pie(pie_data, values=pie_data.value_counts().values, names=['0','1'])
fig.show()

**Признаки: зарплата**

In [17]:
fig = px.histogram(df.select('salary','target').toPandas(), x='salary', color='target', marginal="box")
fig.show()

**Закодированные признаки**   
На примере одного


In [18]:
fig = px.box(df.select('var1','target').toPandas(), x='var1', color='target')
fig.show()

In [19]:
fig = px.scatter(df.select('salary','var1', 'target').toPandas(), x='salary', y='var1', color='target')
fig.show()

**Специальности**

In [20]:
fig = px.histogram(df.select('job','target').toPandas(), x="job", color='target')
fig.show()

Обнаружен дисбаланс в целевом признаке, распределение примерно 2 к 3.  
Зарплата: нормальное распредление. Нет выделяющихся моментов в зависимости от целевого.  
Закодированный признак: можно предположить зависимость целевого признака.
Специальности: распределение близко к равномерному. 


**Признак даты**  
В явном виде не особо полезен. Предположим, что база актуальна "на текущую дату". Значит, можно посчитать, сколько месяцев человек находится в компании.


In [21]:
df = df.withColumn('date', F.round(F.months_between(F.current_date(), 'date'), 2))

Посмотрим корреляцию числовых признаков:

In [22]:
df.toPandas().corr()

Unnamed: 0,var1,var2,var3,date,salary,target
var1,1.0,0.001177,0.069805,0.005602,0.00132,0.627544
var2,0.001177,1.0,0.009302,-0.016715,6.1e-05,-0.002743
var3,0.069805,0.009302,1.0,-0.000552,-0.004087,-0.005269
date,0.005602,-0.016715,-0.000552,1.0,-0.017943,-0.000142
salary,0.00132,6.1e-05,-0.004087,-0.017943,1.0,-0.005154
target,0.627544,-0.002743,-0.005269,-0.000142,-0.005154,1.0


В теории, на целевой признак имеет влияние только закодированная переменная 1. Но т.к. данный ноутбук является просто демонстрацией PySpark на сгенерированных данных - для построения моделей используем все.

### Моделирование. Random Forest

Есть категориальная переменная. Закодируем ее StringIndexer.  
Для леса нет необходимости изменять числовые признаки, но для примера работы пайплайна сделаем MinMaxScaler для признака salary.


In [23]:
indexer = StringIndexer(inputCol='job', outputCol='job_cat') 
assembler = VectorAssembler(inputCols=['salary'],outputCol='salary_vec')
minmax = MinMaxScaler(inputCol='salary_vec', outputCol='salary_norm')

In [24]:
features = ['var1', 'var2', 'var3', 'date', 'job_cat', 'salary_norm']
finalfeature = VectorAssembler(inputCols=features, outputCol='features')
pipeline = Pipeline(stages=[indexer, assembler, minmax, finalfeature])

Отделим тестовую выборку и обучим модель

In [25]:
data = pipeline.fit(df).transform(df)
train, test = data.randomSplit([0.7, 0.3], seed=RAND) 

                                                                                

In [83]:
rfc_model = RandomForestClassifier(featuresCol='features', 
                                   labelCol='target', 
                                   maxDepth=7,
                                   seed=RAND)
rfc_model = rfc_model.fit(train)

In [84]:
pred = rfc_model.transform(test)

Метрики:

In [85]:
auc = BinaryClassificationEvaluator(labelCol="target", rawPredictionCol="prediction").evaluate(pred)
f1 = MulticlassClassificationEvaluator(labelCol="target", predictionCol="prediction", metricName="f1").evaluate(pred)
print(f'AUC = { auc }')
print(f'F1 score = { f1 }')

AUC = 0.8371862844606394
F1 score = 0.8642308541949738


Получилась вполне неплохая моделька)