# Знакомство со Spark
Используем возможности Spark для анализа данных clickstream пользователей новостного Интернет-портала.

In [1]:
import pyspark.sql.types as T
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import udf
from pyspark.sql.window import Window

import datetime

In [2]:
spark = SparkSession.builder.master("local").\
                    appName("Home_work_py").\
                    config("spark.driver.bindAddress", "localhost").\
                    config("spark.ui.port", "4040").\
                    getOrCreate()

22/11/29 16:40:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


### a.       Создайте схему будущего фрейма данных. Схема должна включать следующие атрибуты:
-   id -  уникальный идентификатор посетителя сайта. Тип – последовательность чисел фиксированной длины. Данное поле не является первичным ключом.
-   timestamp – дата и время события в формате unix timestamp.
-   type – тип события, значение из списка (факт посещения(visit), клик по визуальному элементу страницы(click), скролл(scroll), перед на другую страницу(move)).
-   page_id – id текущей страницы. Тип - последовательность чисел фиксированной длины.
-   tag – каждая страница с новостью размечается редакцией специальными тегами, которые отражают тематику конкретной новости со страницы. Возможный список тематик: политика, спорт, медицина и т.д.
-   sign – наличие у пользователя личного кабинета. Значения – True/False.

In [3]:
schema_web = T.StructType([
                T.StructField("id", T.StringType(), True),
                T.StructField("timestamp", T.LongType(), True),
                T.StructField("type", T.StringType(), True),
                T.StructField("page_id", T.IntegerType(), True),
                T.StructField("tag", T.StringType(), True),
                T.StructField("sign", T.BooleanType(), True)])

### b.       Создайте датафрейм с описанной выше схемой данных.
### c.       Наполните датафрейм данными. Пример:
(12345, 1667627426, "click", 101, "Sport”, False)

In [4]:
data_web =[(1, 1667617426, "visit", 101, 'Sport', True),
           (1, 1667627486, "scroll", 101, 'Sport', True),
           (1, 1667637500, "click", 101, 'Sport', True),
           (1, 1667647505, "visit", 102, 'Politics', True),
           (1, 1667657565, "click", 102, 'Politics', True),
           (1, 1667667586, "visit", 103, 'Sport', True),
           (2, 1667678001, "visit", 104, 'Politics', True),
           (2, 1667688101, "scroll", 104, 'Politics', True),
           (2, 1667698151, "click", 104, 'Politics', True),
           (2, 1667618200, "visit", 105, 'Business', True),
           (2, 1667628226, "click", 105, 'Business', True),
           (2, 1667628317, "visit", 106, 'Business', True),
           (2, 1667638359, "scroll", 106, 'Business', True),
           (3, 1667638422, "visit", 101, 'Sport', False),
           (3, 1667648486, "scroll", 101, 'Sport', False),
           (4, 1667648505, "visit", 106, 'Business', False),
           (5, 1667658511, "visit", 101, 'Sport', True),
           (5, 1667658901, "click", 101, 'Sport', True),
           (5, 1667658926, "visit", 102, 'Politics', True),
           (5, 1667658976, "click", 102, 'Politics', True),
           (6, 1667669359, "scroll", 106, 'Business', False),
           (6, 1667679422, "visit", 101, 'Sport', False),
           (6, 1667679486, "scroll", 101, 'Sport', False),
           (6, 1667689505, "visit", 106, 'Business', False),
           (6, 1667699511, "visit", 102, 'Politics', False),
           (7, 1667669901, "click", 101, 'Sport', True),
           (7, 1667659926, "visit", 102, 'Politics', True),
           (7, 1667649976, "click", 102, 'Politics', True)]

df_web = spark.createDataFrame(data = data_web, schema = schema_web)

# df_web.select(F.from_unixtime('timestamp').alias('ts')).head(2)

In [5]:
df_web.columns

['id', 'timestamp', 'type', 'page_id', 'tag', 'sign']

In [6]:
df_web.show(5)

+---+----------+------+-------+--------+----+
| id| timestamp|  type|page_id|     tag|sign|
+---+----------+------+-------+--------+----+
|  1|1667617426| visit|    101|   Sport|true|
|  1|1667627486|scroll|    101|   Sport|true|
|  1|1667637500| click|    101|   Sport|true|
|  1|1667647505| visit|    102|Politics|true|
|  1|1667657565| click|    102|Politics|true|
+---+----------+------+-------+--------+----+
only showing top 5 rows



In [7]:
df_web = df_web.select(*[i for i in df_web.columns if i != "timestamp"],
                    F.from_unixtime("timestamp").alias("event_time"))

In [8]:
df_web.show(5)

+---+------+-------+--------+----+-------------------+
| id|  type|page_id|     tag|sign|         event_time|
+---+------+-------+--------+----+-------------------+
|  1| visit|    101|   Sport|true|2022-11-05 03:03:46|
|  1|scroll|    101|   Sport|true|2022-11-05 05:51:26|
|  1| click|    101|   Sport|true|2022-11-05 08:38:20|
|  1| visit|    102|Politics|true|2022-11-05 11:25:05|
|  1| click|    102|Politics|true|2022-11-05 14:12:45|
+---+------+-------+--------+----+-------------------+
only showing top 5 rows



### d.       Решите следующие задачи:

-   Вывести топ-5 самых активных посетителей сайта
-   Посчитать процент посетителей, у которых есть ЛК
-   Вывести топ-5 страниц сайта по показателю общего кол-ва кликов на данной странице
-   Добавьте столбец к фрейму данных со значением временного диапазона в рамках суток с размером окна – 4 часа(0-4, 4-8, 8-12 и т.д.)
-   Выведите временной промежуток на основе предыдущего задания, в течение которого было больше всего активностей на сайте.

##### топ-5 самых активных посетителей сайта

In [9]:
df_web.groupby("id")\
    .agg(F.count("*").alias("activity"))\
    .orderBy("activity", ascending = False)\
    .show(5)



+---+--------+
| id|activity|
+---+--------+
|  2|       7|
|  1|       6|
|  6|       5|
|  5|       4|
|  7|       3|
+---+--------+
only showing top 5 rows



                                                                                

##### Посчитать процент посетителей, у которых есть ЛК

In [10]:
r_lk = df_web.filter("sign = true").select("id").distinct().count()
r_tot = df_web.select("id").distinct().count()
print("Доля пользователей с лк: {0}%".format(round(r_lk/r_tot*100,1)))

                                                                                

Доля пользователей с лк: 57.1%


                                                                                

##### Вывести топ-5 страниц сайта по показателю общего кол-ва кликов на данной странице

In [11]:
df_web.filter('type = "click"')\
    .groupby("page_id")\
    .count()\
    .orderBy("count", ascending = False)\
    .show(5)

+-------+-----+
|page_id|count|
+-------+-----+
|    101|    3|
|    102|    3|
|    105|    1|
|    104|    1|
+-------+-----+



##### Добавьте столбец к фрейму данных со значением временного диапазона в рамках суток с размером окна – 4 часа(0-4, 4-8, 8-12 и т.д.)

In [12]:
df_web.withColumn("period", F.floor(F.hour("event_time") / F.lit(4))).show(50)

+---+------+-------+--------+-----+-------------------+------+
| id|  type|page_id|     tag| sign|         event_time|period|
+---+------+-------+--------+-----+-------------------+------+
|  1| visit|    101|   Sport| true|2022-11-05 03:03:46|     0|
|  1|scroll|    101|   Sport| true|2022-11-05 05:51:26|     1|
|  1| click|    101|   Sport| true|2022-11-05 08:38:20|     2|
|  1| visit|    102|Politics| true|2022-11-05 11:25:05|     2|
|  1| click|    102|Politics| true|2022-11-05 14:12:45|     3|
|  1| visit|    103|   Sport| true|2022-11-05 16:59:46|     4|
|  2| visit|    104|Politics| true|2022-11-05 19:53:21|     4|
|  2|scroll|    104|Politics| true|2022-11-05 22:41:41|     5|
|  2| click|    104|Politics| true|2022-11-06 01:29:11|     0|
|  2| visit|    105|Business| true|2022-11-05 03:16:40|     0|
|  2| click|    105|Business| true|2022-11-05 06:03:46|     1|
|  2| visit|    106|Business| true|2022-11-05 06:05:17|     1|
|  2|scroll|    106|Business| true|2022-11-05 08:52:39|

##### Выведите временной промежуток на основе предыдущего задания, в течение которого было больше всего активностей на сайте.

In [13]:
df1 = df_web.withColumn("period", F.floor(F.hour("event_time") / F.lit(4)))\
.groupby("period").count().orderBy("count", ascending = False).select("period").show(1)


+------+
|period|
+------+
|     3|
+------+
only showing top 1 row



- Создайте второй фрейм данных, который будет содержать информацию о ЛК посетителя сайта со следующим списком атрибутов

    1. Id – уникальный идентификатор личного кабинета
    2. User_id – уникальный идентификатор посетителя
    3. ФИО посетителя
    4. Дату рождения посетителя 
    5. Дата создания ЛК

-   Вывести фамилии посетителей, которые читали хотя бы одну новость про спорт.
-   Выведите 10% ЛК, у которых максимальная разница между датой создания ЛК и датой последнего посещения.
-   Вывести топ-5 страниц, которые чаще всего посещают мужчины и топ-5 страниц, которые посещают чаще женщины.

In [14]:
schema_lk = T.StructType([
                T.StructField("id", T.StringType(), True),
                T.StructField("user_id", T.IntegerType(), True),
                T.StructField("fio", T.StringType(), True),
                T.StructField("dob", T.DateType(), True),
                T.StructField("doc", T.DateType(), True)])

data_lk = [
    (101, 2, "Иванов Иван Иванович", datetime.datetime(1990, 7, 5), datetime.datetime(2016, 8, 1)),
    (102, 5, "Александрова Александра Александровна", datetime.datetime(1995, 1, 22), datetime.datetime(2017, 10, 7)),
    (103, 1, "Тарасова Алина Владимировна", datetime.datetime(1975, 8, 12), datetime.datetime(2018, 10, 7)),
    (104, 6, "Иванов Владимир Олегович", datetime.datetime(1980, 4, 15), datetime.datetime(2019, 7, 15))
]

df_lk = spark.createDataFrame(data = data_lk, schema = schema_lk)

In [15]:
df_all = df_lk.alias("lk").join(df_web.alias("web"),
                                    on = [F.col("lk.user_id") == F.col("web.id")],
                                    how = "left")
df_all.show(10)

+---+-------+--------------------+----------+----------+---+------+-------+--------+-----+-------------------+
| id|user_id|                 fio|       dob|       doc| id|  type|page_id|     tag| sign|         event_time|
+---+-------+--------------------+----------+----------+---+------+-------+--------+-----+-------------------+
|103|      1|Тарасова Алина Вл...|1975-08-12|2018-10-07|  1| visit|    101|   Sport| true|2022-11-05 03:03:46|
|103|      1|Тарасова Алина Вл...|1975-08-12|2018-10-07|  1|scroll|    101|   Sport| true|2022-11-05 05:51:26|
|103|      1|Тарасова Алина Вл...|1975-08-12|2018-10-07|  1| click|    101|   Sport| true|2022-11-05 08:38:20|
|103|      1|Тарасова Алина Вл...|1975-08-12|2018-10-07|  1| visit|    102|Politics| true|2022-11-05 11:25:05|
|103|      1|Тарасова Алина Вл...|1975-08-12|2018-10-07|  1| click|    102|Politics| true|2022-11-05 14:12:45|
|103|      1|Тарасова Алина Вл...|1975-08-12|2018-10-07|  1| visit|    103|   Sport| true|2022-11-05 16:59:46|
|

##### Вывести фамилии посетителей, которые читали хотя бы одну новость про спорт.

In [16]:
df_all.filter('tag = "Sport" AND ( type = "scroll" OR type = "visit")') \
    .select(F.split(df_all.fio,' ')[0].alias("Family")) \
    .distinct().show()

                                                                                

+------------+
|      Family|
+------------+
|      Иванов|
|    Тарасова|
|Александрова|
+------------+



##### Выведите 10% ЛК, у которых максимальная разница между датой создания ЛК и датой последнего посещения.

In [17]:
df_all.groupby('lk.id')\
    .agg(F.max(F.round(F.datediff("event_time", "doc")/365,1)).alias('Diff_in_years'))\
    .orderBy("Diff_in_years",ascending = False)\
    .show(max(1,round(df_lk.select("id").distinct().count()*0.1,0)))



+---+-------------+
| id|Diff_in_years|
+---+-------------+
|101|          6.3|
+---+-------------+
only showing top 1 row



                                                                                

##### Вывести топ-5 страниц, которые чаще всего посещают мужчины и топ-5 страниц, которые посещают чаще женщины.

In [18]:
@udf(T.StringType())
def calc_gender(fio):
    surname, name, middlename = fio.split(' ')
    if (((surname[-2:] == "ов") or (surname[-2:] == "ев")) and \
        middlename[-2:] == "ич"):
        return 'm'
    else:
        return 'w'

In [19]:
df_all.withColumn("gender", calc_gender(F.col("fio"))) \
      .select("page_id", "gender") \
      .groupby("page_id", "gender") \
      .count() \
      .withColumn("rating", F.row_number()\
                                    .over(Window.partitionBy("gender").orderBy(F.col("count").desc())))\
      .filter("rating < 6")\
      .show(10)
#      .select("page_id","gender").show(10)

                                                                                

+-------+------+-----+------+
|page_id|gender|count|rating|
+-------+------+-----+------+
|    106|     m|    4|     1|
|    104|     m|    3|     2|
|    105|     m|    2|     3|
|    101|     m|    2|     4|
|    102|     m|    1|     5|
|    101|     w|    5|     1|
|    102|     w|    4|     2|
|    103|     w|    1|     3|
+-------+------+-----+------+



### e.       Создайте в Postgres таблицы аналогичной структуры и выполните следующие задания с помощью Spark.

-    Создайте витрину данных в Postgres со следующим содержанием

    1.       Id посетителя
    2.       Возраст посетителя
    3.       Пол посетителя (постарайтесь описать логику вычисления пола в отдельной пользовательской функции)
    4.       Любимая тематика новостей
    5.       Любимый временной диапазон посещений
    6.       Id личного кабинета
    7.       Разница в днях между созданием ЛК и датой последнего посещения. (-1 если ЛК нет)
    8.       Общее кол-во посещений сайта
    9.       Средняя длина сессии(сессией считаем временной промежуток, который охватывает последовательность событий, которые происходили подряд с разницей не более 5 минут).
    10.   Среднее кол-во активностей в рамках одной сессии


In [20]:
df_all_g = df_all.withColumn("gender", calc_gender(F.col("fio")))
df_all_g.show(5)



+---+-------+--------------------+----------+----------+---+------+-------+--------+----+-------------------+------+
| id|user_id|                 fio|       dob|       doc| id|  type|page_id|     tag|sign|         event_time|gender|
+---+-------+--------------------+----------+----------+---+------+-------+--------+----+-------------------+------+
|103|      1|Тарасова Алина Вл...|1975-08-12|2018-10-07|  1| visit|    101|   Sport|true|2022-11-05 03:03:46|     w|
|103|      1|Тарасова Алина Вл...|1975-08-12|2018-10-07|  1|scroll|    101|   Sport|true|2022-11-05 05:51:26|     w|
|103|      1|Тарасова Алина Вл...|1975-08-12|2018-10-07|  1| click|    101|   Sport|true|2022-11-05 08:38:20|     w|
|103|      1|Тарасова Алина Вл...|1975-08-12|2018-10-07|  1| visit|    102|Politics|true|2022-11-05 11:25:05|     w|
|103|      1|Тарасова Алина Вл...|1975-08-12|2018-10-07|  1| click|    102|Politics|true|2022-11-05 14:12:45|     w|
+---+-------+--------------------+----------+----------+---+----

                                                                                

In [21]:
df_tag = df_all_g.groupby("user_id", "tag").count()\
    .withColumn("rating",F.row_number().over(Window.partitionBy("user_id").orderBy(F.col("count").desc())))\
    .filter("rating=1").select("user_id","tag")
df_period = df_all_g.withColumn("period", F.floor(F.hour("event_time") / F.lit(4)))\
    .groupby("user_id", "period").count()\
    .withColumn("rating",F.row_number().over(Window.partitionBy("user_id").orderBy(F.col("count").desc())))\
    .filter("rating=1").select("user_id","period")
df_period = df_all_g.withColumn("period", F.floor(F.hour("event_time") / F.lit(4)))\
    .groupby("user_id", "period").count()\
    .withColumn("rating",F.row_number().over(Window.partitionBy("user_id").orderBy(F.col("count").desc())))\
    .filter("rating=1").select("user_id","period")
df_diff = df_all.groupby('user_id')\
    .agg(F.max(F.round(F.datediff("event_time", "doc"),0)).alias('diff'))\
    .orderBy("diff",ascending = False)
df_visits = df_all.filter('type = "visit"')\
    .groupby('user_id')\
    .agg(F.count(F.col("user_id")).alias("visits"))

In [22]:
df_ses = df_all\
    .withColumn("Pred_T",F.lag("event_time").over(Window.partitionBy("user_id").orderBy(F.col("event_time"))))\
    .withColumn("dt",F.coalesce(F.round((F.unix_timestamp("event_time")-F.unix_timestamp("Pred_T"))/60.,2),F.lit(-1)).alias('dt'))\
    .select("user_id","event_time","dt").orderBy("user_id","event_time")\
    .withColumn("new_ses", F.when(F.col("dt") >= "5","2")
                                 .when(F.col("dt") == "-1","1")
                                 .otherwise("Cont"))\
    .filter("new_ses = 1 OR new_ses = 2")\
    .withColumn("n_ses",F.row_number().over(Window.partitionBy("user_id").orderBy("event_time")))\
    .select("user_id","event_time","n_ses").alias("ses")
df_ses = df_all.select("user_id","event_time").alias("df").join(df_ses.alias("ses"),
                                    on = [F.col("ses.user_id") == F.col("df.user_id"),
                                         F.col("ses.event_time") == F.col("df.event_time")],
                                    how = "left")\
    .orderBy("df.user_id","df.event_time")\
    .select("df.user_id","df.event_time","n_ses")

df_ses=df_ses.withColumn("n_ses2",F.max("n_ses").over(Window.partitionBy("user_id").orderBy("event_time")))
df_ses.show()

                                                                                

+-------+-------------------+-----+------+
|user_id|         event_time|n_ses|n_ses2|
+-------+-------------------+-----+------+
|      1|2022-11-05 03:03:46|    1|     1|
|      1|2022-11-05 05:51:26|    2|     2|
|      1|2022-11-05 08:38:20|    3|     3|
|      1|2022-11-05 11:25:05|    4|     4|
|      1|2022-11-05 14:12:45|    5|     5|
|      1|2022-11-05 16:59:46|    6|     6|
|      6|2022-11-05 17:29:19|    1|     1|
|      6|2022-11-05 20:17:02|    2|     2|
|      6|2022-11-05 20:18:06| null|     2|
|      6|2022-11-05 23:05:05|    3|     3|
|      6|2022-11-06 01:51:51|    4|     4|
|      5|2022-11-05 14:28:31|    1|     1|
|      5|2022-11-05 14:35:01|    2|     2|
|      5|2022-11-05 14:35:26| null|     2|
|      5|2022-11-05 14:36:16| null|     2|
|      2|2022-11-05 03:16:40|    1|     1|
|      2|2022-11-05 06:03:46|    2|     2|
|      2|2022-11-05 06:05:17| null|     2|
|      2|2022-11-05 08:52:39|    3|     3|
|      2|2022-11-05 19:53:21|    4|     4|
+-------+--

In [23]:
df_ses = df_ses.groupby("user_id","n_ses2")\
    .agg((F.max(F.unix_timestamp("event_time"))-F.min(F.unix_timestamp("event_time"))).alias("Ses_Time"),
         F.count("event_time").alias("Ses_Act"))\
    .groupby("user_id").agg(F.avg("Ses_Time").alias("avg_ses_time"),F.avg("Ses_Act").alias("avg_ses_act"))
df_ses.show()

                                                                                

+-------+------------------+------------------+
|user_id|      avg_ses_time|       avg_ses_act|
+-------+------------------+------------------+
|      1|               0.0|               1.0|
|      6|              16.0|              1.25|
|      5|              37.5|               2.0|
|      2|15.166666666666666|1.1666666666666667|
+-------+------------------+------------------+



                                                                                

In [24]:
df1 = df_all_g.withColumn("age",F.round(F.datediff(F.current_date(), "dob")/365,0)).select("user_id","age","gender","lk.id").distinct().alias("df0")
df1 = df1.join(df_tag.alias("tg"),
                           on = [F.col("tg.user_id") == F.col("df0.user_id")],
                           how = "left")
df1 = df1.join(df_period.alias("pr"),
                           on = [F.col("pr.user_id") == F.col("df0.user_id")],
                           how = "left")
df1 = df1.join(df_diff.alias("df"),
                           on = [F.col("df.user_id") == F.col("df0.user_id")],
                           how = "left")
df1 = df1.join(df_visits.alias("vs"),
                           on = [F.col("vs.user_id") == F.col("df0.user_id")],
                           how = "left")
df1 = df1.join(df_ses.alias("ses"),
                           on = [F.col("ses.user_id") == F.col("df0.user_id")],
                           how = "left")
df1.show()



+-------+----+------+---+-------+--------+-------+------+-------+----+-------+------+-------+------------------+------------------+
|user_id| age|gender| id|user_id|     tag|user_id|period|user_id|diff|user_id|visits|user_id|      avg_ses_time|       avg_ses_act|
+-------+----+------+---+-------+--------+-------+------+-------+----+-------+------+-------+------------------+------------------+
|      1|47.0|     w|103|      1|   Sport|      1|     2|      1|1490|      1|     3|      1|               0.0|               1.0|
|      6|43.0|     m|104|      6|Business|      6|     5|      6|1210|      6|     3|      6|              16.0|              1.25|
|      5|28.0|     w|102|      5|   Sport|      5|     3|      5|1855|      5|     2|      5|              37.5|               2.0|
|      2|32.0|     m|101|      2|Business|      2|     0|      2|2288|      2|     3|      2|15.166666666666666|1.1666666666666667|
+-------+----+------+---+-------+--------+-------+------+-------+----+------

                                                                                

In [25]:
df1=df1.select("df0.user_id","age","gender","tag","period","df0.id","diff","visits","avg_ses_time","avg_ses_act")
df1.show()



+-------+----+------+--------+------+---+----+------+------------------+------------------+
|user_id| age|gender|     tag|period| id|diff|visits|      avg_ses_time|       avg_ses_act|
+-------+----+------+--------+------+---+----+------+------------------+------------------+
|      1|47.0|     w|   Sport|     2|103|1490|     3|               0.0|               1.0|
|      6|43.0|     m|Business|     5|104|1210|     3|              16.0|              1.25|
|      5|28.0|     w|   Sport|     3|102|1855|     2|              37.5|               2.0|
|      2|32.0|     m|Business|     0|101|2288|     3|15.166666666666666|1.1666666666666667|
+-------+----+------+--------+------+---+----+------+------------------+------------------+



                                                                                



### f.        Редакция совместно с аналитиками хотят провести масштабную рекламную кампанию, в рамках которой на сайте будут выпущены 3 новости с новой тематикой. Рекламный бюджет позволяет охватить только 10% доступной вам аудитории посетителей сайта. Напишите запрос, который позволит вычислить целевую аудиторию для данных новостей. Постарайтесь объяснить ваше решение.   

In [26]:
# нужно дополнить данные насколько разнообразны интересы у пользователей - чем больше тематик просматривают, тем вероятнее, что новую посмотрят
df_tags = df_all.select("user_id","tag").distinct()\
    .groupby("user_id").agg(F.count("user_id").alias("tags"))
df_tags.show()

+-------+----+
|user_id|tags|
+-------+----+
|      1|   2|
|      6|   3|
|      5|   2|
|      2|   2|
+-------+----+



In [27]:
df2 = df1.alias("df")\
    .join(df_tags.alias("tgs"),
                 on = [F.col("tgs.user_id") == F.col("df.user_id")],
                 how = "left")\
    .orderBy(F.col("tags").desc(),
             F.col("avg_ses_act").desc(),
             F.col("avg_ses_time").desc())\
    .select("df.user_id","tags","avg_ses_act","avg_ses_time")

df2.show()



+-------+----+------------------+------------------+
|user_id|tags|       avg_ses_act|      avg_ses_time|
+-------+----+------------------+------------------+
|      6|   3|              1.25|              16.0|
|      5|   2|               2.0|              37.5|
|      2|   2|1.1666666666666667|15.166666666666666|
|      1|   2|               1.0|               0.0|
+-------+----+------------------+------------------+



                                                                                

In [28]:
df2.show(max(1,round(df2.select("user_id").distinct().count()*0.1,0)))



+-------+----+-----------+------------+
|user_id|tags|avg_ses_act|avg_ses_time|
+-------+----+-----------+------------+
|      6|   3|       1.25|        16.0|
+-------+----+-----------+------------+
only showing top 1 row



                                                                                

In [29]:
d1 = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:5432") \
    .option("dbtable", "public.users_stat") \
    .option("user", "postgres") \
    .option("password", "postgres") \
    .load()

Py4JJavaError: An error occurred while calling o574.load.
: java.sql.SQLException: No suitable driver
	at java.sql.DriverManager.getDriver(DriverManager.java:315)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$2(JDBCOptions.scala:105)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:105)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:35)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:32)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:339)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:279)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:268)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:268)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:203)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:750)
