In [35]:
import findspark
findspark.init()

In [36]:
from pyspark.sql import SparkSession

In [37]:
from pyspark.sql.functions import count, desc, rank, avg, round, current_timestamp
from pyspark.sql.window import Window

In [38]:
spark = SparkSession \
    .builder \
    .appName("Bespot app") \
    .config("spark.executor.extraClassPath", "/path/to/postgresql-42.6.0.jar") \
    .config("spark.jars", "/path/to/postgresql-42.6.0.jar") \
    .getOrCreate()

In [39]:
df_dataset1 = spark.read.jdbc(url = "jdbc:postgresql://localhost:5432/bespot1", 
                     table = "(SELECT * FROM user_pages_process) AS pages_table",
                     properties={"user": "myprojectuser", "password": "password", "driver": 'org.postgresql.Driver'})


In [40]:
df_dataset1 = df_dataset1.withColumnRenamed("timestamp", "timestamp_page")

In [41]:
df_dataset2 = spark.read.jdbc(url = "jdbc:postgresql://localhost:5432/bespot1", 
                     table = "(SELECT * FROM user_trans_process) AS trans_table",
                     properties={"user": "myprojectuser", "password": "password", "driver": 'org.postgresql.Driver'})

In [42]:
df_dataset2 = df_dataset2.withColumnRenamed("timestamp", "timestamp_trans")  

Page popularity:

In [43]:
page_count_df = df_dataset1.groupBy("page").agg(count("page").alias("page_count"))

In [44]:
pageRankSpec = Window.orderBy(desc("page_count"))
page_popularity_df = page_count_df.withColumn("rank", rank().over(pageRankSpec))

In [45]:
page_popularity_df = page_popularity_df.withColumn("created_on", current_timestamp())

In [46]:
page_popularity_df.show()

23/05/26 21:00:39 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/05/26 21:00:39 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/05/26 21:00:39 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/05/26 21:00:40 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/05/26 21:00:40 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/05/26 21:00:40 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
+------+--

Users per page:

In [47]:
page_user_session_count_df = df_dataset1.groupBy("user_id","session_id","page").agg(count("user_id").alias("page_count"))

In [48]:
user_per_page_count_df_ = page_user_session_count_df.groupBy("page").agg(count("page_count").alias("user_per_page_count"))

In [49]:
user_per_page_count_df_ = user_per_page_count_df_.withColumn("created_on", current_timestamp())

In [50]:
user_per_page_count_df_.show()

[Stage 63:>                                                         (0 + 1) / 1]

+------+-------------------+--------------------+
|  page|user_per_page_count|          created_on|
+------+-------------------+--------------------+
| Page3|               3779|2023-05-26 21:00:...|
| Page4|               3766|2023-05-26 21:00:...|
| Page6|               3738|2023-05-26 21:00:...|
| Page7|               3770|2023-05-26 21:00:...|
|Page10|               3716|2023-05-26 21:00:...|
| Page2|               3788|2023-05-26 21:00:...|
| Page5|               3795|2023-05-26 21:00:...|
| Page8|               3816|2023-05-26 21:00:...|
| Page9|               3665|2023-05-26 21:00:...|
| Page1|               3721|2023-05-26 21:00:...|
+------+-------------------+--------------------+



                                                                                

Transactions per page:

In [51]:
df_page_trans = df_dataset1.join(df_dataset2, (df_dataset1.user_id == df_dataset2.user_id) &
   (df_dataset1.session_id == df_dataset2.session_id)) \
   .select(df_dataset1.user_id, df_dataset1.session_id, df_dataset1.timestamp_page, df_dataset2.timestamp_trans, df_dataset1.page)

In [52]:
df_page_trans_edit = df_page_trans.withColumn('timestamp_delta', (df_page_trans.timestamp_trans - df_page_trans.timestamp_page))

In [53]:
timestampRankSpec = Window.partitionBy("user_id", "session_id").orderBy("timestamp_delta")
df_page_trans_delta_ranked = df_page_trans_edit.withColumn("rank", rank().over(timestampRankSpec))

In [54]:
trans_per_page_df = df_page_trans_delta_ranked.filter("rank = 1")

In [55]:
trans_per_page_count_df = trans_per_page_df.groupBy("page").agg(count("rank").alias("trans_per_page"))

In [56]:
trans_per_page_count_df = trans_per_page_count_df.withColumn("created_on", current_timestamp())

In [57]:
trans_per_page_count_df.show()

[Stage 74:>                                                         (0 + 1) / 1]

+------+--------------+--------------------+
|  page|trans_per_page|          created_on|
+------+--------------+--------------------+
| Page3|          3652|2023-05-26 21:00:...|
| Page4|          3424|2023-05-26 21:00:...|
| Page6|          3680|2023-05-26 21:00:...|
| Page7|          3584|2023-05-26 21:00:...|
|Page10|          3604|2023-05-26 21:00:...|
| Page2|          3644|2023-05-26 21:00:...|
| Page5|          3576|2023-05-26 21:00:...|
| Page8|          3652|2023-05-26 21:00:...|
| Page9|          3632|2023-05-26 21:00:...|
| Page1|          3540|2023-05-26 21:00:...|
+------+--------------+--------------------+



                                                                                

Purchase time per user:

In [58]:
timestampRankSpecDesc = Window.partitionBy("user_id", "session_id").orderBy(desc("timestamp_delta"))
df_page_trans_delta_ranked_desc = df_page_trans_edit.withColumn("rank", rank().over(timestampRankSpecDesc))

In [59]:
purchace_time_df = df_page_trans_delta_ranked_desc.filter("rank = 1")

In [60]:
purchace_time_per_user_df = purchace_time_df.groupBy("user_id").agg(round(avg("timestamp_delta"),2).alias("purchace_time_per_user"))

In [61]:
purchace_time_per_user_df = purchace_time_per_user_df.withColumn("created_on", current_timestamp())

In [62]:
purchace_time_per_user_df.show()

[Stage 83:>                                                         (0 + 1) / 1]

+-------+----------------------+--------------------+
|user_id|purchace_time_per_user|          created_on|
+-------+----------------------+--------------------+
|U560627|            -887646.65|2023-05-26 21:00:...|
|U690705|               1176.65|2023-05-26 21:00:...|
|U866891|                1027.1|2023-05-26 21:00:...|
|U137656|             595366.88|2023-05-26 21:00:...|
|U197780|               1794.43|2023-05-26 21:00:...|
|U227446|               1347.53|2023-05-26 21:00:...|
|U680488|                1126.3|2023-05-26 21:00:...|
|U216704|               1747.33|2023-05-26 21:00:...|
|U939318|               1455.87|2023-05-26 21:00:...|
|U139713|               1406.17|2023-05-26 21:00:...|
|U528371|               1208.82|2023-05-26 21:00:...|
|U496385|                 878.0|2023-05-26 21:00:...|
|U781771|               1322.53|2023-05-26 21:00:...|
|U394114|               1589.25|2023-05-26 21:00:...|
|U648815|               1001.23|2023-05-26 21:00:...|
|U437532|               1336

                                                                                

In [63]:
url = "jdbc:postgresql://localhost:5432/bespot1"

properties = {
    "user": "myprojectuser",
    "password": "password",
    "driver": "org.postgresql.Driver"
}

mode = "overwrite"

In [64]:
page_popularity_df.show()


23/05/26 21:00:54 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/05/26 21:00:54 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/05/26 21:00:54 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/05/26 21:00:54 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/05/26 21:00:54 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/05/26 21:00:54 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
+------+--

In [65]:
page_popularity_df.write.jdbc(url=url, table='page_hits', mode=mode, properties=properties)

23/05/26 21:00:55 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/05/26 21:00:55 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/05/26 21:00:55 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/05/26 21:00:56 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/05/26 21:00:56 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/05/26 21:00:56 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/05/26 2

In [66]:
user_per_page_count_df_.write.jdbc(url=url, table='page_users', mode=mode, properties=properties)

                                                                                

In [67]:
trans_per_page_count_df.write.jdbc(url=url, table='page_trans', mode=mode, properties=properties)

                                                                                

In [68]:
purchace_time_per_user_df.write.jdbc(url=url, table='user_purchasing_time', mode=mode, properties=properties)

                                                                                