In [1]:
import $ivy.`org.apache.spark::spark-sql:2.4.4` // Or use any other 2.x version here
import $ivy.`org.apache.spark::spark-mllib:2.4.4` // Or use any other 2.x version here
// import $ivy.`sh.almond::almond-spark:0.10.9` 

[32mimport [39m[36m$ivy.$                                   // Or use any other 2.x version here
[39m
[32mimport [39m[36m$ivy.$                                     // Or use any other 2.x version here
// import $ivy.`sh.almond::almond-spark:0.10.9` [39m

In [2]:
import org.apache.log4j.{Level, Logger}
Logger.getLogger("org").setLevel(Level.OFF)


[32mimport [39m[36morg.apache.log4j.{Level, Logger}
[39m

In [3]:
import org.apache.spark.sql._

[32mimport [39m[36morg.apache.spark.sql._[39m

In [4]:
val spark = {
  SparkSession.builder()
    .master("local[*]")
    .config("spark.testing.memory", 471859200)
    .config("spark.executor.instances", "4")
    .config("spark.executor.memory", "2g")
    .getOrCreate()
}

import spark.implicits._

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties


[36mspark[39m: [32mSparkSession[39m = org.apache.spark.sql.SparkSession@194f9b39
[32mimport [39m[36mspark.implicits._[39m

In [5]:
var df = spark.read.format("parquet").load("data/train")

[36mdf[39m: [32mDataFrame[39m = [instanceId_userId: int, instanceId_objectType: string ... 167 more fields]

In [6]:
df.createOrReplaceTempView("trainDF")

# Сформировать критерии оттекших/удержавшихся пользователей

In [7]:
// посмотрим на список колонок
for (col <- df.columns){
    println(col)
}

instanceId_userId
instanceId_objectType
instanceId_objectId
audit_pos
audit_clientType
audit_timestamp
audit_timePassed
audit_experiment
audit_resourceType
metadata_ownerId
metadata_ownerType
metadata_createdAt
metadata_authorId
metadata_applicationId
metadata_numCompanions
metadata_numPhotos
metadata_numPolls
metadata_numSymbols
metadata_numTokens
metadata_numVideos
metadata_platform
metadata_totalVideoLength
metadata_options
relationsMask
userOwnerCounters_USER_FEED_REMOVE
userOwnerCounters_USER_PROFILE_VIEW
userOwnerCounters_VOTE_POLL
userOwnerCounters_USER_SEND_MESSAGE
userOwnerCounters_USER_DELETE_MESSAGE
userOwnerCounters_USER_INTERNAL_LIKE
userOwnerCounters_USER_INTERNAL_UNLIKE
userOwnerCounters_USER_STATUS_COMMENT_CREATE
userOwnerCounters_PHOTO_COMMENT_CREATE
userOwnerCounters_MOVIE_COMMENT_CREATE
userOwnerCounters_USER_PHOTO_ALBUM_COMMENT_CREATE
userOwnerCounters_COMMENT_INTERNAL_LIKE
userOwnerCounters_USER_FORUM_MESSAGE_CREATE
userOwnerCounters_PHOTO_MARK_CREATE
userOwnerCoun

In [8]:
// посмотрим минимальное, максимальное значение даты и разницу между ними
spark.sql("""
    select min(date), max(date), datediff(max(date), min(date))
    from trainDF
""").show()

+----------+----------+------------------------------+
| min(date)| max(date)|datediff(max(date), min(date))|
+----------+----------+------------------------------+
|2018-02-01|2018-03-21|                            48|
+----------+----------+------------------------------+



#### будем считать, что если пользователь не совершал никаких действий более трех недель, то он уже и не вернется

In [9]:
val df_tgt = spark.sql("""
with cte_last_action as (
    select instanceId_userId, max(date) as md from trainDF group by instanceId_userId
)
, cte_target as (
    select 
        instanceId_userId
        , case when datediff('2018-03-21', md) > 21 then 1 else 0 end as target
    from cte_last_action
) select e.*, t.target from trainDF e
    join cte_target t on e.instanceId_userId = t.instanceId_userId
""")

df_tgt.createOrReplaceTempView("df_tgt")

[36mdf_tgt[39m: [32mDataFrame[39m = [instanceId_userId: int, instanceId_objectType: string ... 168 more fields]

In [10]:
// посмотрим сколько у нас получилось оттёкших пользователей
spark.sql("""
    select target, count(*) from df_tgt group by target
""").show()

+------+--------+
|target|count(1)|
+------+--------+
|     1| 1925038|
|     0|16361537|
+------+--------+



# Подготовить признаки, обучить модель

Выдвинем гипотезу, что отток напрямую зависит либо от общего кол-ва взаимодействий с пабликами в течение последних Х недель, или же от каких-то конкретных взаимодействий в течение последних Х недель

In [11]:
// расширим наш врейм, вытащив список реакций в каждую отдельную строку

var df_exp_tgt = spark.sql("""
    select 
        instanceId_userId
        , explode(feedback) as fb_exp 
        , date
        , target
    from df_tgt t 
""")
df_exp_tgt.createOrReplaceTempView("df_exp_tgt")

[36mdf_exp_tgt[39m: [32mDataFrame[39m = [instanceId_userId: int, fb_exp: string ... 2 more fields]

In [12]:
// разобьем разницу между датой действия и последней даты на группы с шагом 7
// , чтобы на выходе можно было посчитать кол-во действий в ретроспективе
// , сгруппировав их по этим "лагам времени" и типам реакций

val df_bucket = spark.sql("""
    select 
        instanceId_userId
        , explode(feedback) as fb_exp 
        , floor(datediff('2018-03-21',date) / 7) date_diff_bucket
        , target
    from df_tgt t 
    order by date
""")
df_bucket.createOrReplaceTempView("df_bucket")



[36mdf_bucket[39m: [32mDataFrame[39m = [instanceId_userId: int, fb_exp: string ... 2 more fields]

In [13]:
// сделаем фрейм с разбивкой по типу действия + недельному лагу
val df_act_type_lag = spark.sql("""
select 
    instanceId_userId
    , concat(fb_exp, '_', date_diff_bucket) as act_type
    , target
from df_bucket
""")

val df_pivot_act_type = df_act_type_lag
    .groupBy("instanceId_userId")
    .pivot("act_type")
    .count()
    .na.fill(0)

df_pivot_act_type.createOrReplaceTempView("df_pivot_act_type")

// сделаем фрейм с разбивкой по недельному лагу
val df_lag = spark.sql("""
select 
    instanceId_userId
    , concat('total_', date_diff_bucket) as total_lag
    , target
from df_bucket
""")

val df_pivot_lag = df_lag
    .groupBy("instanceId_userId")
    .pivot("total_lag")
    .count()
    .na.fill(0)

df_pivot_lag.createOrReplaceTempView("df_pivot_lag")

[36mdf_act_type_lag[39m: [32mDataFrame[39m = [instanceId_userId: int, act_type: string ... 1 more field]
[36mdf_pivot_act_type[39m: [32mDataFrame[39m = [instanceId_userId: int, Clicked_0: bigint ... 62 more fields]
[36mdf_lag[39m: [32mDataFrame[39m = [instanceId_userId: int, total_lag: string ... 1 more field]
[36mdf_pivot_lag[39m: [32mDataFrame[39m = [instanceId_userId: int, total_0: bigint ... 6 more fields]

In [14]:
val df_for_split = spark.sql("""
    select 
        pa.*
        , la.total_0
        , la.total_1
        , la.total_2
        , la.total_3
        , la.total_4
        , la.total_5
        , la.total_6
        , t.target
    from df_tgt t
    left join df_pivot_act_type pa on t.instanceId_userId = pa.instanceId_userId
    left join df_pivot_lag la on t.instanceId_userId = la.instanceId_userId
""")

[36mdf_for_split[39m: [32mDataFrame[39m = [instanceId_userId: int, Clicked_0: bigint ... 70 more fields]

Таким образом, получили разряженную матрицу с кол-ом реакций, разбитых по неделям и по типам + неделям. Теперь выборку можно разбить на трейн и тест, обучить модель и посмотреть что из этого вышло

In [27]:
// для начала нам необходимо объединить все фичи в один вектор, чтобы потом скормить его модели
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.ml.feature.VectorAssembler

// получим массив колонок с фичами
var f_arr = Array[String]()
for (col <- df_for_split.columns){
    if(col != "instanceId_userId" && col != "target" && col.endsWith("_0") != true)
        // срезали последнюю неделю, чтобы моделе было интереснее =)
        f_arr = f_arr :+ col
}

val assembler = new VectorAssembler()
  .setInputCols(f_arr)
  .setOutputCol("features")

val df_features = assembler.transform(df_for_split)

// разделим набор данных на трейн и тест
val Array(trainDF, testDF) = df_features.randomSplit(Array(0.7, 0.3), seed = 42)

[32mimport [39m[36morg.apache.spark.mllib.linalg.Vectors
[39m
[32mimport [39m[36morg.apache.spark.ml.feature.VectorAssembler

// получим массив колонок с фичами
[39m
[36mf_arr[39m: [32mArray[39m[[32mString[39m] = [33mArray[39m(
  [32m"Clicked_1"[39m,
  [32m"Clicked_2"[39m,
  [32m"Clicked_3"[39m,
  [32m"Clicked_4"[39m,
  [32m"Clicked_5"[39m,
  [32m"Clicked_6"[39m,
  [32m"Commented_1"[39m,
  [32m"Commented_2"[39m,
  [32m"Commented_3"[39m,
  [32m"Commented_4"[39m,
  [32m"Commented_5"[39m,
  [32m"Commented_6"[39m,
  [32m"Complaint_1"[39m,
  [32m"Complaint_2"[39m,
  [32m"Complaint_3"[39m,
  [32m"Complaint_4"[39m,
  [32m"Complaint_5"[39m,
  [32m"Complaint_6"[39m,
  [32m"Disliked_1"[39m,
  [32m"Disliked_2"[39m,
  [32m"Disliked_3"[39m,
  [32m"Disliked_4"[39m,
  [32m"Disliked_5"[39m,
  [32m"Disliked_6"[39m,
  [32m"Ignored_1"[39m,
  [32m"Ignored_2"[39m,
  [32m"Ignored_3"[39m,
  [32m"Ignored_4"[39m,
  [32m"Ignored_5"[39m,
 

In [28]:
// Попробуем обучить модель градиентного бустинга

import org.apache.spark.ml.classification.{GBTClassificationModel, GBTClassifier}
val gbt = new GBTClassifier()
  .setLabelCol("target")
  .setFeaturesCol("features")
  .setMaxIter(10)

val gbtModel = gbt.fit(trainDF)


[32mimport [39m[36morg.apache.spark.ml.classification.{GBTClassificationModel, GBTClassifier}
// Train a GBT model.
[39m
[36mgbt[39m: [32mGBTClassifier[39m = gbtc_cec363ded67e
[36mgbtModel[39m: [32mGBTClassificationModel[39m = GBTClassificationModel (uid=gbtc_cec363ded67e) with 10 trees

In [29]:
val df_gbt_pred = gbtModel.transform(testDF)
df_gbt_pred.createOrReplaceTempView("df_gbt_pred")

spark.sql("""
    select target, prediction, count(*)
    from df_gbt_pred
    group by target, prediction
""").show()

+------+----------+--------+
|target|prediction|count(1)|
+------+----------+--------+
|     1|       0.0|   97997|
|     0|       0.0| 4651170|
|     1|       1.0|  480112|
|     0|       1.0|  255063|
+------+----------+--------+



[36mdf_gbt_pred[39m: [32mDataFrame[39m = [instanceId_userId: int, Clicked_0: bigint ... 74 more fields]

In [31]:
// посчитаем F1Score для модели бустинга
val TP = 480112.0
val TN = 4651170.0
val FP = 255063.0
val FN = 97997.0

val PPV = TP / (TP + FP) // Precision (Positive Predictive Value)
val TPR = TP / (TP + FN) // Recall (True Positive Rate)
val F1 = (2 * PPV * TPR) / (PPV + TPR)


[36mTP[39m: [32mDouble[39m = [32m480112.0[39m
[36mTN[39m: [32mDouble[39m = [32m4651170.0[39m
[36mFP[39m: [32mDouble[39m = [32m255063.0[39m
[36mFN[39m: [32mDouble[39m = [32m97997.0[39m
[36mPPV[39m: [32mDouble[39m = [32m0.6530581154146972[39m
[36mTPR[39m: [32mDouble[39m = [32m0.83048698428843[39m
[36mF1[39m: [32mDouble[39m = [32m0.7311624903676585[39m

In [34]:
// Попробуем обучить модель наивного байеса

import org.apache.spark.ml.classification.NaiveBayes

val nbModel = new NaiveBayes()
    .setLabelCol("target")
    .setFeaturesCol("features")
    .fit(trainDF)

[32mimport [39m[36morg.apache.spark.ml.classification.NaiveBayes

[39m
[36mnbModel[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32mml[39m.[32mclassification[39m.[32mNaiveBayesModel[39m = NaiveBayesModel (uid=nb_09faf60a372e) with 2 classes

In [35]:
val df_nb_pred = nbModel.transform(testDF)
df_nb_pred.createOrReplaceTempView("df_nb_pred")

spark.sql("""
    select target, prediction, count(*)
    from df_nb_pred
    group by target, prediction
""").show()

+------+----------+--------+
|target|prediction|count(1)|
+------+----------+--------+
|     1|       0.0|  242299|
|     0|       0.0| 4670866|
|     1|       1.0|  335810|
|     0|       1.0|  235367|
+------+----------+--------+



[36mdf_nb_pred[39m: [32mDataFrame[39m = [instanceId_userId: int, Clicked_0: bigint ... 74 more fields]

In [36]:
// посчитаем F1Score для модели бустинга
val TP = 335810.0
val TN = 4670866.0
val FP = 255063.0
val FN = 235367.0

val PPV = TP / (TP + FP) // Precision (Positive Predictive Value)
val TPR = TP / (TP + FN) // Recall (True Positive Rate)
val F1 = (2 * PPV * TPR) / (PPV + TPR)



[36mTP[39m: [32mDouble[39m = [32m335810.0[39m
[36mTN[39m: [32mDouble[39m = [32m4670866.0[39m
[36mFP[39m: [32mDouble[39m = [32m255063.0[39m
[36mFN[39m: [32mDouble[39m = [32m235367.0[39m
[36mPPV[39m: [32mDouble[39m = [32m0.5683285579134603[39m
[36mTPR[39m: [32mDouble[39m = [32m0.587926334568794[39m
[36mF1[39m: [32mDouble[39m = [32m0.5779613613872037[39m

Показазатели конечно не фонтан, но будем считать, что нас это устраивает т.к. по заданию перед нами не стоит цель получить максимально точную модель

# Интерпретируем модель, сделать сегментацию пользователей.

А теперь вспоминаем, что с интерпретацией моделей в скале всё очень плохо, страдаем какое-то время. Вспоминаем, что есть PravdaML и пробуем запилить модель, используюя PravdaML, чтобы можно было интрпретировать модель

In [43]:
import $ivy.`ru.odnoklassniki::pravda-ml:0.6.2`
// import org.apache.spark.ml.odkl.MatrixLBFGS
import org.apache.spark.ml.classification.odkl.XGBoostClassifier

[32mimport [39m[36m$ivy.$                                  
// import org.apache.spark.ml.odkl.MatrixLBFGS
[39m
[32mimport [39m[36morg.apache.spark.ml.classification.odkl.XGBoostClassifier[39m

In [45]:
val pmlXGBModel = new XGBoostClassifier()
    .setLabelCol("target")
    .fit(trainDF)

: 

In [46]:
trainDF.sqlContext

[36mres45[39m: [32mSQLContext[39m = org.apache.spark.sql.SQLContext@75eddd19