In [1]:
import findspark
findspark.init()
findspark.find()
import pyspark
findspark.find()

'C:\\spark\\spark-3.3.1-bin-hadoop3'

In [2]:
import pyspark
from pyspark.sql import SparkSession

In [3]:
from pyspark.sql.functions import *

In [4]:
import pyspark.sql.types as T

In [5]:
spark = SparkSession.builder.master("local"). \
                    appName("HW3.3 Spark").\
                    config("spark.driver.bindAdress", "localhost"). \
                    config("spark.ui.port", "4040"). \
                    getOrCreate()

In [6]:
schema = StructType ([
    T.StructField("id", T.IntegerType(), True),
    T.StructField("timestamp", T.IntegerType(), True),
    T.StructField("type", T.StringType(), True),
    T.StructField("page_id", T.IntegerType(), True),
    T.StructField("tag", T.StringType(), True),
    T.StructField("sign", T.BooleanType(), True)
])

In [7]:
data = [
    (1, 1662485039,"visit",112,"Good versus evil", True),
    (2, 1591291439,"click",100,"Redemption", True),
    (3, 1591291439,"scroll",111,"Courage and Heroism", False),
    (4, 1541352239,"visit",112,"Sport", True),
    (5, 1662485039,"visit",111,"Love", True),
    (3, 1662485039,"visit",112,"Good versus evil", False),
    (2, 1654363439,"scroll",100,"Redemption", True),
    (2, 1528133039,"click",111,"Love", True),
    (7, 1528133039,"click",100,"Courage and Heroism", False),
    (11, 1654363439,"move",100,"Redemption", False)
  ]

In [8]:
df = spark.createDataFrame(data=data, schema=schema).show()

5 most active visiters

In [9]:
df.groupBy("id").count().orderBy(col("count").desc()).show(5)

+---+-----+
| id|count|
+---+-----+
|  2|    3|
|  3|    2|
|  1|    1|
|  5|    1|
|  7|    1|
+---+-----+
only showing top 5 rows



Percent visiters with account

In [10]:
df.filter(col("sign") == "true").select(countDistinct("id")).first()[0] / df.select(countDistinct("id")).first()[0]

0.5714285714285714

5 most clicked pages

In [11]:
df.filter(col("type") == "click").groupBy("page_id").count().orderBy(col("count").desc()).show()

+-------+-----+
|page_id|count|
+-------+-----+
|    100|    2|
|    111|    1|
+-------+-----+



add col with a time range value within a day with a window size of 4 hours 

In [21]:
df = df.withColumn("tsDate", col("timestamp").cast(T.TimestampType()))

In [22]:
df = df.withColumn('time', date_format('tsDate', 'HH:mm:ss'))

In [23]:
df = df.withColumn('time', split(df['time'], ':').getItem(0).cast(T.IntegerType()))

In [24]:
df = df.withColumn("time_range", when((df.time >= 0) & (df.time <4) ,"0-4")
                                 .when((df.time >= 4) & (df.time <8) ,"4-8")
                                 .when((df.time >= 8) & (df.time <12) ,"8-12")
                                 .when((df.time >= 12) & (df.time <16) ,"12-16")
                                 .when((df.time >= 16) & (df.time <20) ,"16-20")
                                 .when((df.time >= 20) & (df.time <24) ,"20-24")
                                 .otherwise(df.time))
df.show()

+---+----------+------+-------+-------------------+-----+-------------------+----+----------+
| id| timestamp|  type|page_id|                tag| sign|             tsDate|time|time_range|
+---+----------+------+-------+-------------------+-----+-------------------+----+----------+
|  1|1662485039| visit|    112|   Good versus evil| true|2022-09-06 20:23:59|  20|     20-24|
|  2|1591291439| click|    100|         Redemption| true|2020-06-04 20:23:59|  20|     20-24|
|  3|1591291439|scroll|    111|Courage and Heroism|false|2020-06-04 20:23:59|  20|     20-24|
|  4|1541352239| visit|    112|              Sport| true|2018-11-04 20:23:59|  20|     20-24|
|  5|1662485039| visit|    111|               Love| true|2022-09-06 20:23:59|  20|     20-24|
|  3|1662485039| visit|    112|   Good versus evil|false|2022-09-06 20:23:59|  20|     20-24|
|  2|1654363439|scroll|    100|         Redemption| true|2022-06-04 20:23:59|  20|     20-24|
|  2|1528133039| click|    111|               Love| true|201

most active time range

In [25]:
df.groupBy("time_range").count().orderBy(col("count").desc()).show(1)

+----------+-----+
|time_range|count|
+----------+-----+
|     20-24|   10|
+----------+-----+



new data frame with user's account info

In [26]:
account_schema = StructType ([
    T.StructField("id", T.IntegerType(), True),
    T.StructField("user_id", T.IntegerType(), True),
    T.StructField("name", T.StringType(), True),
    T.StructField("dob", T.StringType(), True),
    T.StructField("create_acc_date", T.StringType(), True),
])

In [27]:
account_data = [
    (1, 1,"Austen Bierd","1977-03-06","2005-06-06"),
    (2, 2,"Baron Barrington","1984-04-03","2008-03-18"),
    (3, 4,"Blake Burkhardt","2001-02-06","2015-03-06"),
    (4, 5,"Emma Finley","2001-09-03","2020-01-12")
  ]

In [28]:
account_df = spark.createDataFrame(data=account_data, schema=account_schema)

users, who read the sport news

In [30]:
df.join(account_df, df.id == account_df.user_id).filter(col("tag") == "Sport").select(col("name")).distinct().show()

+---------------+
|           name|
+---------------+
|Blake Burkhardt|
+---------------+

