In [24]:
import pyspark.sql.types as T
import pyspark.sql.functions as F
import datetime

In [25]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local")\
        .appName('PySpark_Tutorial').\
        config("spark.driver.bindAddress","localhost").\
        config("spark.ui.port","4040").\
        getOrCreate()


In [26]:
schema_web = T.StructType([
    T.StructField("id",T.StringType(),True),
    T.StructField("timestamp",T.LongType(),True),
    T.StructField("type",T.StringType(),True),
    T.StructField("page_id",T.IntegerType(),True),
    T.StructField("tag",T.StringType(),True),
    T.StructField("sign",T.BooleanType(),True),])

schema_lk = T.StructType([
    T.StructField("id",T.StringType(),True),
    T.StructField("user_id",T.IntegerType(),True),
    T.StructField("fio",T.StringType(),True),
    T.StructField("dob",T.DateType(),True),
    T.StructField("doc",T.DateType(),True)])

In [27]:
data_web =[(1, 1669986321, "visit", 101, 'Sport', False),
           (1, 1667622486, "scroll", 101, 'Sport', False),
           (1, 1667627500, "click", 101, 'Sport', False),
           (1, 1667627505, "visit", 102, 'Politics', False),
           (1, 1667627565, "click", 102, 'Politics', False),
           (1, 1667627586, "visit", 103, 'Sport', False),
           (2, 1667628001, "visit", 104, 'Politics', True),
           (2, 1667628101, "scroll", 104, 'Politics', True),
           (2, 1667628151, "click", 104, 'Politics', True),
           (2, 1667628200, "visit", 105, 'Business', True),
           (2, 1667628226, "click", 105, 'Business', True),
           (2, 1667628317, "visit", 106, 'Business', True),
           (2, 1667628359, "scroll", 106, 'Business', True),
           (3, 1667628422, "visit", 101, 'Sport', False),
           (3, 1667628486, "scroll", 101, 'Sport', False),
           (4, 1667628505, "visit", 106, 'Business', False),
           (5, 1667628511, "visit", 101, 'Sport', True),
           (5, 1667628901, "click", 101, 'Sport', True),
           (5, 1667628926, "visit", 102, 'Politics', True),
           (5, 1667628976, "click", 102, 'Politics', True)]

data_lk = [
    (101, 2, "Иванов Иван Иванович", datetime.datetime(1990, 7, 5), datetime.datetime(2016, 8, 1)),
    (102, 5, "Александрова Александра александровна", datetime.datetime(1995, 1, 22), datetime.datetime(2017, 10, 7)),    
]

df_web = spark.createDataFrame(data = data_web, schema = schema_web)
df_lk = spark.createDataFrame(data = data_lk, schema = schema_lk)

In [28]:
df_web.columns

['id', 'timestamp', 'type', 'page_id', 'tag', 'sign']

In [29]:
df_web = df_web.select(*[i for i in df_web.columns if i != "timestamp"],
                    F.from_unixtime("timestamp").alias("event_time"))

In [30]:
df_web.show()

+---+------+-------+--------+-----+-------------------+
| id|  type|page_id|     tag| sign|         event_time|
+---+------+-------+--------+-----+-------------------+
|  1| visit|    101|   Sport|false|2022-12-02 13:05:21|
|  1|scroll|    101|   Sport|false|2022-11-05 04:28:06|
|  1| click|    101|   Sport|false|2022-11-05 05:51:40|
|  1| visit|    102|Politics|false|2022-11-05 05:51:45|
|  1| click|    102|Politics|false|2022-11-05 05:52:45|
|  1| visit|    103|   Sport|false|2022-11-05 05:53:06|
|  2| visit|    104|Politics| true|2022-11-05 06:00:01|
|  2|scroll|    104|Politics| true|2022-11-05 06:01:41|
|  2| click|    104|Politics| true|2022-11-05 06:02:31|
|  2| visit|    105|Business| true|2022-11-05 06:03:20|
|  2| click|    105|Business| true|2022-11-05 06:03:46|
|  2| visit|    106|Business| true|2022-11-05 06:05:17|
|  2|scroll|    106|Business| true|2022-11-05 06:05:59|
|  3| visit|    101|   Sport|false|2022-11-05 06:07:02|
|  3|scroll|    101|   Sport|false|2022-11-05 06

In [31]:
df_web.groupby("id")\
        .agg(F.count("*").alias("event_cnt"),
        F.max("page_id").alias("max_page_id"))\
        .orderBy("event_cnt", ascending = False)\
        .show(10)

+---+---------+-----------+
| id|event_cnt|max_page_id|
+---+---------+-----------+
|  2|        7|        106|
|  1|        6|        103|
|  5|        4|        102|
|  3|        2|        101|
|  4|        1|        106|
+---+---------+-----------+



In [66]:
df_web2 = df_web.groupby("id")\
        .agg(F.count("*").alias("event_cnt"),
        F.max("sign").alias("sign"))\
        .orderBy("event_cnt", ascending = False)
df_web2.show()
x = df_web2.groupby("sign").count()
x.show()
y = x.filter(x.sign == "true").collect()[0][1]
y*100/df_web2.count()


+---+---------+-----+
| id|event_cnt| sign|
+---+---------+-----+
|  2|        7| true|
|  1|        6|false|
|  5|        4| true|
|  3|        2|false|
|  4|        1|false|
+---+---------+-----+



                                                                                

+-----+-----+
| sign|count|
+-----+-----+
| true|    2|
|false|    3|
+-----+-----+



                                                                                

40.0

In [33]:
df_web3 = df_web.filter(df_web.type == "click")
df_web3.show()
df_web3.groupBy("page_id").count().show(5)

+---+-----+-------+--------+-----+-------------------+
| id| type|page_id|     tag| sign|         event_time|
+---+-----+-------+--------+-----+-------------------+
|  1|click|    101|   Sport|false|2022-11-05 05:51:40|
|  1|click|    102|Politics|false|2022-11-05 05:52:45|
|  2|click|    104|Politics| true|2022-11-05 06:02:31|
|  2|click|    105|Business| true|2022-11-05 06:03:46|
|  5|click|    101|   Sport| true|2022-11-05 06:15:01|
|  5|click|    102|Politics| true|2022-11-05 06:16:16|
+---+-----+-------+--------+-----+-------------------+

+-------+-----+
|page_id|count|
+-------+-----+
|    101|    2|
|    102|    2|
|    105|    1|
|    104|    1|
+-------+-----+



In [44]:
df_web4 = df_web.withColumn("time_interval", F.floor(F.hour("event_time") / F.lit(4)))
df_web4.show(5)
df_web4.groupBy("time_interval").count().show()

+---+------+-------+--------+-----+-------------------+-------------+
| id|  type|page_id|     tag| sign|         event_time|time_interval|
+---+------+-------+--------+-----+-------------------+-------------+
|  1| visit|    101|   Sport|false|2022-12-02 13:05:21|            3|
|  1|scroll|    101|   Sport|false|2022-11-05 04:28:06|            1|
|  1| click|    101|   Sport|false|2022-11-05 05:51:40|            1|
|  1| visit|    102|Politics|false|2022-11-05 05:51:45|            1|
|  1| click|    102|Politics|false|2022-11-05 05:52:45|            1|
+---+------+-------+--------+-----+-------------------+-------------+
only showing top 5 rows

+-------------+-----+
|time_interval|count|
+-------------+-----+
|            1|   19|
|            3|    1|
+-------------+-----+



In [45]:
df_all = df_lk.alias("lk").join(df_web.alias("web"),
                                    on = [F.col("lk.user_id") == F.col("web.id")],
                                    how = "left")

In [48]:
df_all.filter(df_all.tag == "Sport").show()

+---+-------+--------------------+----------+----------+---+-----+-------+-----+----+-------------------+
| id|user_id|                 fio|       dob|       doc| id| type|page_id|  tag|sign|         event_time|
+---+-------+--------------------+----------+----------+---+-----+-------+-----+----+-------------------+
|102|      5|Александрова Алек...|1995-01-22|2017-10-07|  5|visit|    101|Sport|true|2022-11-05 06:08:31|
|102|      5|Александрова Алек...|1995-01-22|2017-10-07|  5|click|    101|Sport|true|2022-11-05 06:15:01|
+---+-------+--------------------+----------+----------+---+-----+-------+-----+----+-------------------+

