In [1]:
!apt-get install openjdk-11-jdk-headless -qq > /dev/null


In [2]:
!wget -q https://dlcdn.apache.org/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz
!tar xf spark-3.3.1-bin-hadoop3.tgz

In [3]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.1-bin-hadoop3"

In [None]:
!pip install findspark

In [150]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, \
                              StringType, IntegerType,BooleanType, \
                              LongType, DateType
import numpy as np
import pyspark.sql.functions as F
from pyspark.sql.functions import when
import datetime
from pyspark.sql.window import Window

In [6]:
spark = SparkSession.builder\
                    .master("local[*]")\
                    .getOrCreate()

In [303]:
schema_web = StructType([ 
                StructField("id", IntegerType(), True),
                StructField("timestamp", LongType(), True),
                StructField("type", StringType(), True),
                StructField("page_id", IntegerType(), True),
                StructField("tag", StringType(), True),
                StructField("sign", BooleanType(), True)
])

schema_lk = StructType([
                StructField("id", IntegerType(), True),
                StructField("user_id", IntegerType(), True),
                StructField("fio", StructType([
                      StructField('lastname', StringType(), True),
                      StructField('firstname', StringType(), True),
                      StructField('patronymic', StringType(), True)])),
                StructField("dob", DateType(), True),
                StructField("doc", DateType(), True)
])

Generate Data_web

In [304]:
id = [int(x) for x in np.random.randint(1, 11, 1000)]
timestamp = [int(x) for x in np.random.randint(1670198400 , 1670803200, 1000)]
Type = [str(x) for x in np.random.choice(["visit", "click", "scroll", "move"], size=1000)]
page_id = [int(x) for x in np.random.randint(100, 111, 1000)]
tag = []

for i in page_id:
  if  i in [100,101]:
     tag.append("medicine"),
  elif  i in [102,103,104]:
     tag.append("sport"),
  elif  i in [105,106,107]:
     tag.append("finance"),  
  else:
     tag.append("politic")

sign = []

for i in id:
  if  i in [1,3,5,7]:
     sign.append(True),
  else:  
     sign.append(False) 



In [308]:
data_lk = [
    (101, 1, ("Иванов", "Иван", "Иванович"), datetime.datetime(1990, 7, 5), datetime.datetime(2016, 8, 1)),
    (102, 3, ("Александрова", "Александра", "Александровна"), datetime.datetime(1995, 1, 22), datetime.datetime(2017, 10, 7)), 
    (103, 5, ("Петров", "Петр", "Петрович"), datetime.datetime(1999, 9, 11), datetime.datetime(2016, 10, 16)),
    (104, 7, ("Сидоров", "Антон", "Николаевич"), datetime.datetime(1993, 3, 10), datetime.datetime(2018, 2, 22)),
]

In [309]:
df_web = spark.createDataFrame(list(zip(id, timestamp, Type, page_id, tag, sign)), schema_web)
df_lk = spark.createDataFrame(data_lk, schema_lk)

In [312]:
df_web.groupBy("id")\
      .agg(F.count("*").alias("event_cnt"))\
      .orderBy("event_cnt", ascending = False)\
      .show(5)

+---+---------+
| id|event_cnt|
+---+---------+
|  9|      118|
|  3|      106|
|  6|      104|
|  4|      104|
| 10|      103|
+---+---------+
only showing top 5 rows



In [313]:
df_web.groupBy("sign").agg(F.countDistinct("id").alias('count_users'))\
                      .withColumn('percent', F.col('count_users')*100/F.sum('count_users')\
                                              .over(Window.partitionBy()))\
                      .show()

+-----+-----------+-------+
| sign|count_users|percent|
+-----+-----------+-------+
| true|          4|   40.0|
|false|          6|   60.0|
+-----+-----------+-------+



In [314]:
df_web.groupBy("sign").agg(F.countDistinct("id").alias('count_users'))\
                      .withColumn('percent', F.col('count_users')*100/F.sum('count_users')\
                                              .over(Window.partitionBy()))\
                      .filter(df_web.sign == True)\
                      .show()

+----+-----------+-------+
|sign|count_users|percent|
+----+-----------+-------+
|true|          4|   40.0|
+----+-----------+-------+



In [315]:
df_web.groupBy("page_id", "type")\
      .agg(F.count("*").alias("click_cnt"))\
      .orderBy("click_cnt", ascending = False)\
      .filter(df_web.type == 'click')\
      .show(5)

+-------+-----+---------+
|page_id| type|click_cnt|
+-------+-----+---------+
|    106|click|       33|
|    108|click|       32|
|    109|click|       29|
|    105|click|       28|
|    103|click|       27|
+-------+-----+---------+
only showing top 5 rows



In [316]:
df_web = df_web.select(*[i for i in df_web.columns if i != "timestamp"],
                    F.from_unixtime("timestamp").alias("event_time"))

df_web = df_web.withColumn("new", F.floor(F.hour("event_time") / F.lit(4)))

df_web = df_web.withColumn("time_period", when(df_web.new == "0","0-4")\
                                         .when(df_web.new == "1","4-8")\
                                         .when(df_web.new == "2","8-12")\
                                         .when(df_web.new == "3","12-16")\
                                         .when(df_web.new == "4","16-20")\
                                         .when(df_web.new == "5","20-24")\
                                         )
df_web = df_web.drop("new")

In [317]:
df_web.groupBy("time_period")\
      .agg(F.count("*").alias("activity"))\
      .orderBy("activity", ascending = False)\
      .show()

+-----------+--------+
|time_period|activity|
+-----------+--------+
|       8-12|     186|
|        0-4|     172|
|      12-16|     170|
|        4-8|     170|
|      20-24|     166|
|      16-20|     136|
+-----------+--------+



In [318]:
df_all = df_lk.alias("lk").join(df_web.alias("web"),
                                    on = [F.col("lk.user_id") == F.col("web.id")],
                                    how = "left")

In [344]:
df_all.select('fio.lastname', 'tag').where(F.col('tag') == "sport").distinct().show()

+------------+-----+
|    lastname|  tag|
+------------+-----+
|      Петров|sport|
|Александрова|sport|
|      Иванов|sport|
|     Сидоров|sport|
+------------+-----+

