# Описание данных

- `instanceId_userId` — идентификатор пользователя (анонимизированный)
- `instanceId_objectType` — тип объекта
- `instanceId_objectId` — идентификатор объекта (анонимизированный)
- `feedback` — массив с типами реакций пользователя (наличие в массиве токена Liked говорит о том, что объект получил «класс» от пользователя)
- `audit_clientType` — тип платформы, с которой зашёл пользователь
- `audit_timestamp` — время, когда строилась лента
- `metadata_ownerId` — автор показанного объекта (анонимизированный)
- `metadata_createdAt` — дата создания показанного объекта
- `audit_*` — расширенная информация о контексте построения ленты;
- `metadata_*` — расширенная информация о самом объекте;
- `userOwnerCounters_*` — информация о предыдущих взаимодействиях пользователя и автора контента;
- `ownerUserCounters_*` — информация о предыдущих взаимодействиях автора контента и пользователя;
- `membership_*` — информация о членстве пользователя в группе, где опубликован контент;
- `user_*` — подробная информация о пользователе;
- `auditweights_*` — большое количество runtime-признаков, извлечённых текущей системой.

In [None]:
import $ivy.`org.apache.spark::spark-sql:3.1.1`
import $ivy.`org.apache.spark::spark-mllib:3.1.1`
import $ivy.`sh.almond::almond-spark:0.11.2`
import $ivy.`org.plotly-scala::plotly-almond:0.5.2`

import scala.math.Ordered._
import scala.reflect.runtime.universe._

import org.apache.spark.sql.{DataFrame, SparkSession,Row}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

import org.apache.spark.ml.linalg.{Matrix, Vector}
import org.apache.spark.ml.stat.{Correlation, Summarizer}
import org.apache.spark.ml.feature.{VectorAssembler,StringIndexer,OneHotEncoder,MinMaxScaler,Normalizer}
import org.apache.spark.ml.Pipeline

import plotly._, plotly.element._ , plotly.layout._ , plotly.Almond._ 

import org.apache.log4j.{Logger, Level}
Logger.getRootLogger.setLevel(Level.ERROR)
Logger.getRootLogger.setLevel(Level.FATAL)
Logger.getLogger("org").setLevel(Level.WARN)

In [None]:
// Data path to sna dataset
val DATA_PATH_IN = "../otus-bigdataml/data/sna"

// Data path where to store an intermediate data
val DATA_PATH_OUT= "/tmp/sna.parquet"

// Data path where to store the names of features
val FEATURES_PATH= "data/features.txt"

val spark=SparkSession.builder()
    .appName("Data Sources Practice")
    .config("spark.master", "local")
    .getOrCreate()

import spark.implicits._

val dfLoaded=spark.read.parquet(DATA_PATH_IN)
val DF_COUNT=dfLoaded.count.toDouble
println(s"Data shape=(${DF_COUNT}, ${dfLoaded.columns.length})")

val dfRaw=dfLoaded
    .withColumn("activity", when(array_contains(col("feedback"), "Ignored"), lit(0)).otherwise(lit(1))) // It is activity if user didn't ignore post, otherwise not
    .withColumn("timeDelta", ((col("audit_timestamp") - col("metadata_createdAt")) / 3600000 /24).cast("integer"))
    .filter(col("timeDelta")>=0 && col("timeDelta")<3650)

def transformCategoricalValues(df:DataFrame, columns:Seq[String], labels:Array[String]):(DataFrame,Array[String]) = columns match {
        case head::tail => {
            val indexer = new StringIndexer()
                .setInputCol(head)
                .setOutputCol(head+"_num")
                .setHandleInvalid("keep")
                .fit(df)

            val df1=indexer.transform(df)

            val encoder = new OneHotEncoder()
                .setInputCol(head+"_num")
                .setOutputCol(head+"_vec")
                //.setDropLast(false)
                .fit(df1)

            transformCategoricalValues(encoder.transform(df1), tail, labels++indexer.labels.map(x=>f"${head}_${x}"))
        }
        case _ => (df.na.fill(0), labels)
    }

// It's quite voluntaristic decision to choose numerical and categorical columns 
val numericalColumns=dfRaw.columns.filter(s=>(s.startsWith("membership_")&&s!="membership_status")||s.startsWith("auditweights_"))
val categoricalColumns=Seq("instanceId_objectType","audit_resourceType","metadata_ownerType","membership_status")
val (dfEnc, categoricalLabels)=transformCategoricalValues(dfRaw,categoricalColumns, Array.empty[String])
dfEnc.createOrReplaceTempView("EncView")

Well, here we have:
- dfEnc - dataset with one-hot encoded categorical features
- categoricalLabels - all categorical features names

It is difficult to decide the correct criteria to label churned users. I think something meaningful can be found in time gaps. 

So, simple calc time gaps (in days) between activities for evety user, and also the last user appearance can be important.

In [12]:
// Calc time difference between user's actions in days
val dfDateDiff = spark.sql("""WITH DateDiff as (
                              SELECT instanceId_userId, 
                              datediff(date, LAG(date, 1) OVER (PARTITION BY instanceId_userId ORDER BY date)) as ddiff
                              FROM EncView)
                              select instanceId_userId, max(ddiff) as ddiff_max from DateDiff group by instanceId_userId
                         """)
dfDateDiff.createOrReplaceTempView("DateDiffView")

// Days after the last user activity
val dfLastActivity = spark.sql(""" select instanceId_userId, 
                                   datediff('2018-03-21', max(date)) as last_ddiff
                                   from EncView 
                                   group by instanceId_userId
                         """)
dfLastActivity.createOrReplaceTempView("LastActivityView")

[36mdfDateDiff[39m: [32mDataFrame[39m = [instanceId_userId: int, ddiff_max: int]
[36mdfLastActivity[39m: [32mDataFrame[39m = [instanceId_userId: int, last_ddiff: int]

# User Activity

In [13]:
def dfBar(df:DataFrame):Bar = {
    val (x,y) = df
        .collect
        .map(r=>(r(0).toString, r(1).toString.toInt))
        .toList.unzip
    Bar(x,y)
}

dfBar(spark.sql(""" select ddiff_max, count(ddiff_max)
               from DateDiffView 
               where ddiff_max is not null
               group by ddiff_max
               order by ddiff_max
        """)).plot(title = "Activity date diff")

dfBar(spark.sql(""" select last_ddiff, count(last_ddiff)
               from LastActivityView 
               where last_ddiff is not null
               group by last_ddiff
               order by last_ddiff
        """)).plot(title = "Last Activity date diff")

defined [32mfunction[39m [36mdfBar[39m
[36mres12_1[39m: [32mString[39m = [32m"plot-1097138054"[39m
[36mres12_2[39m: [32mString[39m = [32m"plot-2146386214"[39m

### My intuition here tells abouth 23th day as a key point. If user has no activity 23+ days I suppose there is high probability he/she/it will never return.

In [14]:
val dfLabeled = spark.sql("""SELECT instanceId_userId, case when last_ddiff >= 23 then 1 else 0 end as label
                              FROM LastActivityView                              
                         """)
dfLabeled.createOrReplaceTempView("LastActivityView")
//println(f" There is ${spark.sql("select sum(label)/count(label) from LastActivityView").first()(0).toString.toDouble*100}%.1f%% of users are churned")

[36mdfLabeled[39m: [32mDataFrame[39m = [instanceId_userId: int, label: int]

# Prepare Features
I suppose for the task not necessary to make complicated futures engineering, time windows, dynamics etc. 
As a features I calculated for every user:
- means of numerical columns;
- sum and maean of activities (if user didn't ignore post it counts as 1);
- max of time difference between user's activities in days.

It is free of wasting because labeling was based on last activity lag, instead of inner ddiff_max. 

All Data is scaled with MinMaxScaler. 

In [15]:
val summarizer = Summarizer.metrics("mean")
val assemblerCol="assemblerCol"
val assembler = new VectorAssembler()
    .setInputCols(categoricalColumns.map(_+"_vec").toArray++numericalColumns)
    .setOutputCol(assemblerCol) 
    
val dfAggregated=assembler
                    .transform(dfEnc)
                    .groupBy($"instanceId_userId")
                    .agg(
                        Summarizer.mean(col(assemblerCol)).alias("features_mean"),
                        mean($"activity").alias("activity_mean"),
                        sum($"activity").alias("activity_sum")
                    )

dfAggregated.createOrReplaceTempView("AggregatedView")

val dfCombined = spark.sql(""" SELECT a.instanceId_userId,
                                   a.features_mean, 
                                   a.activity_mean,
                                   a.activity_sum,
                                   b.ddiff_max,
                                   c.label
                              FROM AggregatedView a
                              JOIN DateDiffView b on a.instanceId_userId = b.instanceId_userId
                              JOIN LastActivityView c on a.instanceId_userId = c.instanceId_userId
                         """).na.drop()
                         
dfCombined.createOrReplaceTempView("CombinedView")

val featuresNames=categoricalLabels++numericalColumns:+"activity_mean":+"activity_sum":+"ddiff_max"

[36msummarizer[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32mml[39m.[32mstat[39m.[32mSummaryBuilder[39m = org.apache.spark.ml.stat.SummaryBuilderImpl@5bff98a9
[36massemblerCol[39m: [32mString[39m = [32m"assemblerCol"[39m
[36massembler[39m: [32mVectorAssembler[39m = VectorAssembler: uid=vecAssembler_0e283d1baf06, handleInvalid=error, numInputCols=65
[36mdfAggregated[39m: [32mDataFrame[39m = [instanceId_userId: int, features_mean: vector ... 2 more fields]
[36mdfCombined[39m: [32mDataFrame[39m = [instanceId_userId: int, features_mean: vector ... 4 more fields]
[36mfeaturesNames[39m: [32mArray[39m[[32mString[39m] = [33mArray[39m(
  [32m"instanceId_objectType_Post"[39m,
  [32m"instanceId_objectType_Photo"[39m,
  [32m"instanceId_objectType_Video"[39m,
  [32m"audit_resourceType_8"[39m,
  [32m"audit_resourceType_3"[39m,
  [32m"audit_resourceType_7"[39m,
  [32m"audit_resourceType_6"[39m,
  [32m"audit_resourceType_14"[39m,
  [32m"meta

In [16]:
val FEATURES_PATH= "data/features.txt"
spark.sparkContext.parallelize(featuresNames).saveAsObjectFile(FEATURES_PATH)

[36mFEATURES_PATH[39m: [32mString[39m = [32m"data/features.txt"[39m

In [18]:
val pipeline = new Pipeline().setStages(Array(
    new VectorAssembler()
        .setInputCols(Array("features_mean","activity_mean","activity_sum","ddiff_max"))
        .setOutputCol("features_comb"),
    //new MinMaxScaler().setInputCol("features_comb").setOutputCol("features")
    ))

val dfAssembled=pipeline.fit(dfCombined).transform(dfCombined)
dfAssembled.write.mode("overwrite").parquet(DATA_PATH_OUT)

[36mpipeline[39m: [32mPipeline[39m = pipeline_87b7817ed398
[36mdfAssembled[39m: [32mDataFrame[39m = [instanceId_userId: int, features_mean: vector ... 5 more fields]