# Описание данных

- `instanceId_userId` — идентификатор пользователя (анонимизированный)
- `instanceId_objectType` — тип объекта
- `instanceId_objectId` — идентификатор объекта (анонимизированный)
- `feedback` — массив с типами реакций пользователя (наличие в массиве токена Liked говорит о том, что объект получил «класс» от пользователя)
- `audit_clientType` — тип платформы, с которой зашёл пользователь
- `audit_timestamp` — время, когда строилась лента
- `metadata_ownerId` — автор показанного объекта (анонимизированный)
- `metadata_createdAt` — дата создания показанного объекта
- `audit_*` — расширенная информация о контексте построения ленты;
- `metadata_*` — расширенная информация о самом объекте;
- `userOwnerCounters_*` — информация о предыдущих взаимодействиях пользователя и автора контента;
- `ownerUserCounters_*` — информация о предыдущих взаимодействиях автора контента и пользователя;
- `membership_*` — информация о членстве пользователя в группе, где опубликован контент;
- `user_*` — подробная информация о пользователе;
- `auditweights_*` — большое количество runtime-признаков, извлечённых текущей системой.

In [None]:
import $ivy.`org.apache.spark::spark-sql:3.1.1`
import $ivy.`org.apache.spark::spark-mllib:3.1.1`
import $ivy.`sh.almond::almond-spark:0.11.2`
import $ivy.`org.plotly-scala::plotly-almond:0.5.2`

In [None]:
import scala.math.Ordered._

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

import org.apache.spark.ml.linalg.Matrix
import org.apache.spark.ml.stat.Correlation
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.sql.Row

import plotly._, plotly.element._ , plotly.layout._ , plotly.Almond._ 

import org.apache.log4j.{Logger, Level}
Logger.getRootLogger.setLevel(Level.ERROR)
Logger.getRootLogger.setLevel(Level.FATAL)
Logger.getLogger("org").setLevel(Level.WARN)

In [3]:
val DATA_PATH = "../data"

val spark=SparkSession.builder()
    .appName("Data Sources Practice")
    .config("spark.master", "local")
    .getOrCreate()

import spark.implicits._

val dfLoaded=spark.read.parquet(DATA_PATH)
val DF_COUNT=dfLoaded.count.toDouble
println(s"Data shape=(${DF_COUNT}, ${dfLoaded.columns.length})")

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
21/05/20 14:22:40 WARN Utils: Your hostname, DESKTOP-G76NQH1 resolves to a loopback address: 127.0.1.1; using 172.25.151.111 instead (on interface eth0)
21/05/20 14:22:40 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
21/05/20 14:22:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Data shape=(1.8286575E7, 169)


[36mDATA_PATH[39m: [32mString[39m = [32m"../data"[39m
[36mspark[39m: [32mSparkSession[39m = org.apache.spark.sql.SparkSession@3262e097
[32mimport [39m[36mspark.implicits._

[39m
[36mdfLoaded[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mpackage[39m.[32mDataFrame[39m = [instanceId_userId: int, instanceId_objectType: string ... 167 more fields]
[36mDF_COUNT[39m: [32mDouble[39m = [32m1.8286575E7[39m

In [None]:
dfLoaded.printSchema()

In [41]:
val dfRaw=dfLoaded
    .withColumn("target", when(array_contains('feedback, "Liked"), lit(1)).otherwise(lit(0)))
    .withColumn("createdTime", from_unixtime('metadata_createdAt / 1000))
    .withColumn("auditedTime", from_unixtime('audit_timestamp / 1000))
    .withColumn("timeDelta", (('audit_timestamp - 'metadata_createdAt) / 3600000 /24).cast("integer"))
    .withColumn("createdHour", hour('createdTime))
    .withColumn("auditedHour", hour('auditedTime))
    //.na.fill(0)

dfRaw
    .select("metadata_ownerId", "date", "createdTime", "auditedTime", "timeDelta", "createdHour", "auditedHour", "feedback", "target")
    .orderBy("metadata_ownerId")
    .show(10, truncate=false)
    

+----------------+----------+-------------------+-------------------+---------+-----------+-----------+-------------------+------+
|metadata_ownerId|date      |createdTime        |auditedTime        |timeDelta|createdHour|auditedHour|feedback           |target|
+----------------+----------+-------------------+-------------------+---------+-----------+-----------+-------------------+------+
|1               |2018-02-27|2018-02-18 01:15:16|2018-02-27 03:51:08|9        |1          |3          |[Disliked, Ignored]|0     |
|1               |2018-02-21|2018-02-18 01:15:16|2018-02-21 10:10:22|3        |1          |10         |[Clicked]          |0     |
|1               |2018-02-25|2018-02-18 01:15:16|2018-02-25 16:55:18|7        |1          |16         |[Clicked, Liked]   |1     |
|1               |2018-02-20|2018-02-18 01:15:16|2018-02-20 22:28:17|2        |1          |22         |[Ignored]          |0     |
|1               |2018-02-20|2018-02-18 01:15:16|2018-02-21 00:49:55|2        |1   

[36mdfRaw[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mpackage[39m.[32mDataFrame[39m = [instanceId_userId: int, instanceId_objectType: string ... 173 more fields]

# Топ групп

In [5]:
dfRaw.groupBy('metadata_ownerId).count.orderBy('count.desc).show(10)

+----------------+-----+
|metadata_ownerId|count|
+----------------+-----+
|           37463|71558|
|           76851|53090|
|           11222|49656|
|           65305|46334|
|            8225|44342|
|           38703|44308|
|           18942|42829|
|           62833|40545|
|           79619|40540|
|           19481|40370|
+----------------+-----+
only showing top 10 rows



# Гистограммы популярности/активности 
Гистограммы популярности/активности групп на портале по времени суток 

In [None]:
val (x, y) = dfRaw
    .groupBy('createdHour).count
    .orderBy('createdHour).collect
    .map(r=>(r(0).toString, r(1).toString.toInt))
    .toList.unzip

In [86]:
Bar(x, y).plot(title = "Популярность/Активность от createdHour")

[36mres85[39m: [32mString[39m = [32m"plot-1816684770"[39m

In [None]:
val (x, y) = dfRaw
    .groupBy('auditedHour).count
    .orderBy('auditedHour).collect
    .map(r=>(r(0).toString, r(1).toString.toInt))
    .toList.unzip

In [88]:
Bar(x, y).plot(title = "Популярность/Активность от auditedHour")

[36mres87[39m: [32mString[39m = [32m"plot-1342072777"[39m

# Корреляция числовых признаков с целевой переменной
Выбрана корреляция из Spark SQL, так как время исполнения было ниже

In [None]:
def correlateColumn(column:String): Option[Double] = 
    dfRaw.agg(corr(col(column), 'target)).head()(0) match {
        case n: java.lang.Number => Some(n.doubleValue())
        case _ => None
    }

val colsToExclude=Array("target","instanceId_userId","instanceId_objectId","instanceId_objectType","audit_clientType","audit_timestamp","audit_timePassed",
    "audit_experiment","audit_resourceType", "metadata_ownerType","metadata_ownerId","metadata_createdAt","metadata_authorId","metadata_applicationId",
    "metadata_platform","metadata_options","relationsMask","membership_status","membership_statusUpdateDate","membership_joinDate","membership_joinRequestDate",
    "owner_create_date","owner_birth_date", "feedback","objectId","date")
    
val cols=dfRaw.columns diff colsToExclude

val correlated:Array[(String, Option[Double])]=cols.map(x=>(x,correlateColumn(x))).sortWith(_._2 > _._2)

In [9]:
Seq(correlated.head, correlated.filter(_._2.isDefined).last).map(x=>println(x._1+f": ${x._2.get}%1.4f"))

auditweights_svd_prelaunch: 0.2855
auditweights_userOwner_USER_FEED_REMOVE: -0.1147


[36mres8[39m: [32mSeq[39m[[32mUnit[39m] = [33mList[39m((), ())

Нет явной корреляции признаков с целевой переменной

# Проверка категориальных признаков

## instanceId_objectType 

In [37]:
dfRaw
    .groupBy('instanceId_objectType)
    .agg(mean('target).as('mean),count('target).as('count))
    .orderBy('mean.desc)
    .show()

+---------------------+-------------------+--------+
|instanceId_objectType|               mean|   count|
+---------------------+-------------------+--------+
|                 Post|0.18475945252001133|16119749|
|                Photo|0.11326861322439467| 1345589|
|                Video| 0.1092741802914384|  821237|
+---------------------+-------------------+--------+



Как видно из таблицы, посты лайкают существенно чаще, нежели фото и видео. Это странно, априори я думал, что на первом месте будут фото.

## audit_resourceType

In [38]:
dfRaw
    .groupBy('audit_resourceType)
    .agg(mean('target).as('mean),count('target).as('count))
    .orderBy('mean.desc)
    .show()

+------------------+-------------------+--------+
|audit_resourceType|               mean|   count|
+------------------+-------------------+--------+
|                 8|0.19136913113271348|14618366|
|                14|0.16358282519562675|  599739|
|                 3|0.11326861322439467| 1345589|
|                 6| 0.1092741802914384|  821237|
|                 7|0.09168252658477181|  901644|
+------------------+-------------------+--------+



`№8` и `№14` определенно более популярны
## metadata_ownerType

In [39]:
dfRaw.groupBy('metadata_ownerType).agg(mean('target).as('mean),count('target).as('count)).orderBy('mean.desc).show()

+-------------------+-------------------+--------+
| metadata_ownerType|               mean|   count|
+-------------------+-------------------+--------+
|GROUP_OPEN_OFFICIAL|0.18409800425332773|15192810|
|         GROUP_OPEN|0.13687626565042918| 3093765|
+-------------------+-------------------+--------+



Активность в официально открытых группах чаще получает лайки, нежели в обычных.

## membership_status

In [40]:
dfRaw
    .groupBy('membership_status)
    .agg(mean('target).as('mean),count('target).as('count))
    .orderBy('mean.desc)
    .show()

+-----------------+-------------------+--------+
|membership_status|               mean|   count|
+-----------------+-------------------+--------+
|                M|0.34384178578421776|    5107|
|                !|0.24410933081998115|    2122|
|                I|0.21739130434782608|    5911|
|             null| 0.1996272719846519| 6451353|
|                P| 0.1906833127978267|   67363|
|                A|0.16303083088882844|11750164|
|                Y|0.11487130600571974|    4196|
|                B|0.10674157303370786|     356|
|                R|                0.0|       3|
+-----------------+-------------------+--------+



`membership_status` имеет значение! Лучший статус - `M`, статусы же `B` и `Y` лучше не иметь.

## timeDelta
Сначала проверим данные на правильность. Так как `timeDelta` считается как разница между временем создания ленты (`createdTime`) и временем показа (`auditedTime`), второе должно быть больше первого, но в разумных пределах.

In [None]:
dfRaw
    .select("metadata_ownerId", "date", "createdTime", "auditedTime", "timeDelta", "createdHour", "auditedHour", "feedback", "target")
    .orderBy("metadata_ownerId")
    .filter('timeDelta < 0)
    .count

In [67]:
dfRaw
    .select("metadata_ownerId", "date", "createdTime", "auditedTime", "timeDelta", "createdHour", "auditedHour", "feedback", "target")
    .orderBy("metadata_ownerId")
    .filter('timeDelta > 3650).show(10)

+----------------+----------+-------------------+-------------------+---------+-----------+-----------+-------------------+------+
|metadata_ownerId|      date|        createdTime|        auditedTime|timeDelta|createdHour|auditedHour|           feedback|target|
+----------------+----------+-------------------+-------------------+---------+-----------+-----------+-------------------+------+
|               4|2018-03-07|1922-02-04 04:38:31|2018-03-07 19:25:15|    35095|          4|         19|  [Clicked, Viewed]|     0|
|              27|2018-02-24|1922-02-04 04:38:31|2018-02-24 11:32:58|    35084|          4|         11|          [Ignored]|     0|
|             165|2018-02-16|1922-02-04 04:38:31|2018-02-16 19:30:04|    35076|          4|         19|          [Clicked]|     0|
|             370|2018-03-01|1922-02-04 04:38:31|2018-03-01 22:38:18|    35089|          4|         22|          [Clicked]|     0|
|             370|2018-03-11|1922-02-04 04:38:31|2018-03-11 16:32:07|    35099|    

Удаляем все строки с отрицательными значениями `timeDelta`, равно как и строки с `createdTime` от 1922 года. Разбиение производим по логарифму `timeDelta`, дабы уменьшить количество групп. 

In [69]:
dfRaw
    .filter('timeDelta>=0 && 'timeDelta<3650)
    .withColumn("log_timeDelta", round(log('timeDelta)))
    .groupBy('log_timeDelta)
    .agg(mean('target).as('mean),count('target).as('count))
    .orderBy('mean.desc)
    .show()

+-------------+-------------------+--------+
|log_timeDelta|               mean|   count|
+-------------+-------------------+--------+
|          2.0|0.22569663229240366|  645929|
|          3.0|0.21981250336463404|  130029|
|          1.0|0.20794307653892039| 2342795|
|          4.0|0.20563211970852222|   76987|
|          0.0|0.19311657570873927| 2620992|
|          5.0| 0.1856417103592015|   61414|
|          6.0| 0.1696442546340809|   44723|
|         null|0.16325962444647824|12338268|
|          7.0|0.13491280026324448|   21273|
|          8.0|                0.0|      17|
+-------------+-------------------+--------+



К сожалению, анализ времени, прошедшего между `createdTime` и `auditedTime` не показал ничего интересного.
## IDs
Если ids генерируются с автоинкрементом, то они являются прямым отражением времени регистрации пользователей. Чем меньше id, тем раньше пользователь зарегистрировался в системе. 
Применим логарифм по основанию 10 к полям `metadata_ownerId` и `metadata_authorId`:

In [70]:
val dfIds=dfRaw
    .select("target","metadata_ownerId","metadata_authorId")
    .withColumn("log_ownerId", round(log10('metadata_ownerId)))
    .withColumn("log_authorId", round(log10('metadata_authorId)))
dfIds.show(5)

+------+----------------+-----------------+-----------+------------+
|target|metadata_ownerId|metadata_authorId|log_ownerId|log_authorId|
+------+----------------+-----------------+-----------+------------+
|     1|           13680|            73356|        4.0|         5.0|
|     1|           42167|           828613|        5.0|         6.0|
|     1|           12988|          1076820|        4.0|         6.0|
|     0|            2772|           608137|        3.0|         6.0|
|     0|           20289|           565604|        4.0|         6.0|
+------+----------------+-----------------+-----------+------------+
only showing top 5 rows



[36mdfIds[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mpackage[39m.[32mDataFrame[39m = [target: int, metadata_ownerId: int ... 3 more fields]

In [59]:
dfIds
    .groupBy('log_ownerId)
    .agg(mean('target).as('mean),count('target).as('count))
    .orderBy('mean.desc)
    .show()

+-----------+-------------------+-------+
|log_ownerId|               mean|  count|
+-----------+-------------------+-------+
|        5.0|0.17830895852624556|9749684|
|        4.0|0.17528856911673626|8008480|
|        3.0|0.14866905546330528| 492019|
|        2.0|0.14345719995935097|  29521|
|        1.0|0.11572787125091441|   6835|
|        0.0| 0.1111111111111111|     36|
+-----------+-------------------+-------+



In [60]:
dfIds
    .groupBy('log_authorId)
    .agg(mean('target).as('mean),count('target).as('count))
    .orderBy('mean.desc)
    .show()

+------------+-------------------+--------+
|log_authorId|               mean|   count|
+------------+-------------------+--------+
|         6.0|0.17782806520900454|15345120|
|         5.0|0.16870395358004242| 2458253|
|         4.0|  0.163244299287682|  438147|
|         3.0|0.12431117815472709|   32483|
|         0.0| 0.1091876108040463|    9589|
|         1.0|0.10714285714285714|     112|
|         2.0|0.10344827586206896|    2871|
+------------+-------------------+--------+



Интересно, чем больше `metadata_ownerId` и `metadata_authorId`, тем выше вероятность получить лайк. Посчитаем корреляцию Пирсона, так как всё же ids это непрерывные значения:

In [72]:
val correlatedIds:Array[(String, Option[Double])]=Array("metadata_ownerId","metadata_authorId").map(x=>(x,correlateColumn(x))).sortWith(_._2 > _._2)

[36mcorrelatedIds[39m: [32mArray[39m[([32mString[39m, [32mOption[39m[[32mDouble[39m])] = [33mArray[39m(
  ([32m"metadata_ownerId"[39m, [33mSome[39m([32m0.011679487961495112[39m)),
  ([32m"metadata_authorId"[39m, [33mSome[39m([32m0.007233333042612956[39m))
)

По Пирсону корреляции нет. 