In [1]:
from pyspark.sql import SparkSession

In [117]:
from pyspark.sql.functions import col, count, max, desc, from_unixtime, udf

In [118]:
from pyspark.sql.types import DateType, IntegerType

In [145]:
import matplotlib
import matplotlib.pyplot as plt

In [6]:
spark = SparkSession.builder.appName("PySpark").master("local[*]").getOrCreate()

In [7]:
spark

In [8]:
df = spark.read.format("parquet").load("../data/test")

In [11]:
df.printSchema()

root
 |-- instanceId_userId: integer (nullable = true)
 |-- instanceId_objectType: string (nullable = true)
 |-- instanceId_objectId: integer (nullable = true)
 |-- audit_pos: long (nullable = true)
 |-- audit_clientType: string (nullable = true)
 |-- audit_timestamp: long (nullable = true)
 |-- audit_timePassed: long (nullable = true)
 |-- audit_experiment: string (nullable = true)
 |-- audit_resourceType: long (nullable = true)
 |-- metadata_ownerId: integer (nullable = true)
 |-- metadata_ownerType: string (nullable = true)
 |-- metadata_createdAt: long (nullable = true)
 |-- metadata_authorId: integer (nullable = true)
 |-- metadata_applicationId: long (nullable = true)
 |-- metadata_numCompanions: integer (nullable = true)
 |-- metadata_numPhotos: integer (nullable = true)
 |-- metadata_numPolls: integer (nullable = true)
 |-- metadata_numSymbols: integer (nullable = true)
 |-- metadata_numTokens: integer (nullable = true)
 |-- metadata_numVideos: integer (nullable = true)
 |-- me

In [12]:
print("The dataframe has {} rows.".format(df.count()))

The dataframe has 1181443 rows.


In [22]:
# Посмотрим на количество каждого типа объекта
df.groupby("instanceId_objectType").count().show()

+---------------------+-------+
|instanceId_objectType|  count|
+---------------------+-------+
|                Video|  32561|
|                Photo|  83493|
|                 Post|1065389|
+---------------------+-------+



## Какая группа пользовалась популярностью, для каждого типа?

Всего 3 типа объекта, найдем самый популярный объект для каждого типа

In [85]:
df.where(df["instanceId_objectType"] == "Video") \
    .groupby("instanceId_objectId") \
    .agg(count(col("instanceId_objectId")).alias("count_group")) \
    .sort(desc("count_group")) \
    .show(1)

+-------------------+-----------+
|instanceId_objectId|count_group|
+-------------------+-----------+
|            9462035|        488|
+-------------------+-----------+
only showing top 1 row



In [86]:
df.where(df["instanceId_objectType"] == "Photo") \
    .groupby("instanceId_objectId") \
    .agg(count(col("instanceId_objectId")).alias("count_group")) \
    .sort(desc("count_group")) \
    .show(1)

+-------------------+-----------+
|instanceId_objectId|count_group|
+-------------------+-----------+
|            8296214|        112|
+-------------------+-----------+
only showing top 1 row



In [87]:
df.where(df["instanceId_objectType"] == "Post") \
    .groupby("instanceId_objectId") \
    .agg(count(col("instanceId_objectId")).alias("count_group")) \
    .sort(desc("count_group")) \
    .show(1)

+-------------------+-----------+
|instanceId_objectId|count_group|
+-------------------+-----------+
|           37256411|        525|
+-------------------+-----------+
only showing top 1 row



In [88]:
# Проверим
df_01 = df.groupby("instanceId_objectType", "instanceId_objectId").count()

In [89]:
df_01.groupBy("instanceId_objectType").max("count").show()

+---------------------+----------+
|instanceId_objectType|max(count)|
+---------------------+----------+
|                Video|       488|
|                Photo|       112|
|                 Post|       525|
+---------------------+----------+



## Построите гистограммы популярности/активности по времени суток

Сначала определимся, что в сутках у нас есть 4 времени, на каждое время по 6 часов:
- Утро: с 6.00 до 12.00
- День: с 12.00 до 18.00
- Вечер: с 18.00 до 0.00
- Ночь: с 0.00 до 6.00

__Сделаем преобразование юниксового времени в формат timestamp__

In [121]:
df = df.withColumn("timestamp_mili", (col("audit_timestamp")/1000).cast("timestamp"))

__Добавим новую колонку по часам__

In [122]:
hour = udf(lambda x: x.hour, IntegerType())

In [123]:
df = df.withColumn("hour", hour("timestamp_mili"))

In [126]:
# Проверим
df[["instanceId_objectId", "timestamp_mili", "hour"]].show(5)

+-------------------+--------------------+----+
|instanceId_objectId|      timestamp_mili|hour|
+-------------------+--------------------+----+
|             549335|2018-03-27 10:56:...|  10|
|           38727626|2018-03-28 17:51:...|  17|
|           14594128|2018-03-31 10:15:...|  10|
|           35364024|2018-03-25 16:03:...|  16|
|           21498907|2018-03-31 15:58:...|  15|
+-------------------+--------------------+----+
only showing top 5 rows



### Утро

In [140]:
df[["instanceId_objectId"]].where((df["hour"] >= 6) & (df["hour"] < 12)).show()

+-------------------+
|instanceId_objectId|
+-------------------+
|             549335|
|           14594128|
|           33609321|
|           38257234|
|           30267629|
|           36814836|
|           18315321|
|           36345030|
|           27880236|
|           29765141|
|           24202818|
|           19234201|
|           37268710|
|           39142518|
|           19452612|
|           35953801|
|           37842339|
|           37631409|
|           19189239|
|           35854539|
+-------------------+
only showing top 20 rows



### День

In [141]:
df[["instanceId_objectId"]].where((df["hour"] >= 12) & (df["hour"] < 18)).show()

+-------------------+
|instanceId_objectId|
+-------------------+
|           38727626|
|           35364024|
|           21498907|
|           17284534|
|           32691036|
|           36766228|
|           10851938|
|           37414808|
|           35975141|
|            2266236|
|            4873805|
|           29688615|
|           31709743|
|           22283729|
|           23859601|
|           20865643|
|           30633900|
|            9203121|
|           28613120|
|           19265114|
+-------------------+
only showing top 20 rows



### Вечер

In [143]:
df[["instanceId_objectId"]].where((df["hour"] >= 18) & (df["hour"] < 24)).show()

+-------------------+
|instanceId_objectId|
+-------------------+
|           34296911|
|           22566334|
|           27621925|
|           20459135|
|           29112606|
|           36190811|
|           23629217|
|           20206202|
|            9131424|
|           29181935|
|           34042520|
|           18540018|
|           34320614|
|           35944517|
|           23737526|
|           38435122|
|           30989425|
|            4621525|
|           27615906|
|            4872030|
+-------------------+
only showing top 20 rows



### Ночь

In [144]:
df[["instanceId_objectId"]].where((df["hour"] >= 0) & (df["hour"] < 6)).show()

+-------------------+
|instanceId_objectId|
+-------------------+
|           38005241|
|           39101128|
|           35505331|
|           37934113|
|           35042727|
|           37491913|
|           36007840|
|           37895106|
|           21024912|
|           27790522|
|           25636500|
|           37987803|
|           15904523|
|           36752418|
|           26493917|
|           26376902|
|           19888238|
|           36272739|
|           17108103|
|           23052023|
+-------------------+
only showing top 20 rows

