In [1]:
import pyspark.sql.types as T
import pyspark.sql.functions as F
from random import randint, choice
from datetime import datetime

In [2]:
spark = SparkSession.builder.master("local").\
appName("Word Count").\
config("spark.drive.bindAddress", "localhost").\
config("spark.ui.port", "4040").\
getOrCreate()

spark.sql('set spark.sql.caseSensitive=true')

DataFrame[key: string, value: string]

# Создайте схему будущего фрейма данных.

In [3]:
schema = T.StructType ([
    T.StructField("id", T.IntegerType(), True),
    T.StructField("timestamp", T.IntegerType(), True),
    T.StructField("type", T.StringType(), True),
    T.StructField("page_id", T.IntegerType(), True),
    T.StructField("tag", T.StringType(), True),
    T.StructField("sign", T.BooleanType(), True),
])

 # Создайте датафрейм с описанной выше схемой данных.

In [4]:
df = spark.createDataFrame([],
    schema=schema)
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- timestamp: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- page_id: integer (nullable = true)
 |-- tag: string (nullable = true)
 |-- sign: boolean (nullable = true)



 # Наполните датафрейм данными.

In [5]:
data = []
users, articles = dict(), dict()
for _ in range(1000):
    id_ = randint(12345, 12385)
    timestamp = randint(1667627426, 1667627426+3600*24)
    type_ = choice(['click', 'visit', 'scroll', 'move'])
    page_id = randint(101, 121)
    
    if page_id in articles.keys():
        tag = articles[page_id]
    else:        
        tag = choice(['Sport', 'Politics', 'Medicine', 'Movie'])
        articles[page_id] = tag
    
    if id_ in users.keys():
        sign = users[id_]
    else:        
        sign = bool(randint(0, 1))
        users[id_] = sign
        
    data.append((id_, timestamp, type_, page_id, tag, sign))

df = spark.createDataFrame(data=data, schema=schema)    

In [6]:
df.show()

+-----+----------+------+-------+--------+-----+
|   id| timestamp|  type|page_id|     tag| sign|
+-----+----------+------+-------+--------+-----+
|12364|1667682874|scroll|    113|   Movie| true|
|12381|1667697061|  move|    101|   Sport|false|
|12365|1667693267|scroll|    121|Politics|false|
|12372|1667674887|  move|    105|Politics| true|
|12359|1667707763|scroll|    111|Politics|false|
|12375|1667627752|  move|    120|   Movie| true|
|12382|1667688045| visit|    118|   Movie| true|
|12347|1667705717| visit|    103|Medicine|false|
|12351|1667638998| click|    115|   Movie| true|
|12360|1667639666|scroll|    119|   Movie|false|
|12353|1667699485| visit|    121|Politics| true|
|12363|1667681706| click|    109|Politics|false|
|12355|1667628651| visit|    106|   Movie|false|
|12369|1667677245| click|    106|   Movie| true|
|12355|1667644555| click|    111|Politics|false|
|12366|1667702344|scroll|    120|   Movie|false|
|12382|1667680120| visit|    111|Politics| true|
|12381|1667697916| c

 # Вывести топ-5 самых активных посетителей сайта

In [7]:
top_users = df[df['type'] == 'visit'].groupBy('id').count()
top_users.sort(top_users['count'].desc()).show(5)

+-----+-----+
|   id|count|
+-----+-----+
|12362|   11|
|12346|   11|
|12382|   10|
|12363|   10|
|12345|   10|
+-----+-----+
only showing top 5 rows



# Посчитать процент посетителей, у которых есть ЛК

In [8]:
float(df[df['sign']].select('id').distinct().count()) * 100 / df.select('id').distinct().count()

56.09756097560975

  # Вывести топ-5 страниц сайта по показателю общего кол-ва кликов на данной странице

In [9]:
top_pages = df[df['type'] == 'click'].groupBy('page_id').count()
top_pages.sort(top_pages['count'].desc()).show(5)

+-------+-----+
|page_id|count|
+-------+-----+
|    115|   15|
|    108|   15|
|    107|   15|
|    119|   15|
|    117|   15|
+-------+-----+
only showing top 5 rows



# Добавьте столбец к фрейму данных со значением временного диапазона в рамках суток с размером окна – 4 часа(0-4, 4-8, 8-12 и т.д.)

In [10]:
@F.udf(returnType=T.StringType())
def interval_it(timestamp,
               ints = {
                   0: '0-4',
                   1: '4-8',
                   2: '8-12',
                   3: '12-16',
                   4: '16-20',
                   5: '20-24',
                    }
               ):
    id = datetime.fromtimestamp(timestamp).hour // 4
    return ints[id]

df = df.withColumn('interval', interval_it('timestamp')) 

In [11]:
df.show(5)

+-----+----------+------+-------+--------+-----+--------+
|   id| timestamp|  type|page_id|     tag| sign|interval|
+-----+----------+------+-------+--------+-----+--------+
|12364|1667682874|scroll|    113|   Movie| true|     0-4|
|12381|1667697061|  move|    101|   Sport|false|     4-8|
|12365|1667693267|scroll|    121|Politics|false|     0-4|
|12372|1667674887|  move|    105|Politics| true|   20-24|
|12359|1667707763|scroll|    111|Politics|false|     4-8|
+-----+----------+------+-------+--------+-----+--------+
only showing top 5 rows



 # Выведите временной промежуток на основе предыдущего задания, в течение которого было больше всего активностей на сайте. 

In [12]:
top_int = df.groupBy('interval').count()
top_int.sort(top_int['count'].desc()).show(1)

+--------+-----+
|interval|count|
+--------+-----+
|     0-4|  188|
+--------+-----+
only showing top 1 row



# Создайте второй фрейм данных, который будет содержать информацию о ЛК посетителя сайта

In [13]:
schema = T.StructType ([
    T.StructField("Id", T.IntegerType(), True),
    T.StructField("gender", T.StringType(), True),
    T.StructField("User_id", T.IntegerType(), True),
    T.StructField("ФИО", T.StringType(), True),
    T.StructField("dob", T.DateType(), True),
    T.StructField("creation_date", T.DateType(), True),
])

In [14]:
data = []
for key, value in users.items():
    if value:
        id_ = randint(0, 100000000)
        gender = choice(['M', 'F'])
        dob = datetime(randint(1960, 2000), randint(1, 12),randint(1, 28)).date()
        creation_date = datetime(randint(2020, 2021), randint(1, 12),randint(1, 28)).date()
        data.append((id_, gender, key, 'name_'+str(id_), dob, creation_date))
df_lk = spark.createDataFrame(data=data, schema=schema)            

In [15]:
df_lk.show(5)

+--------+------+-------+-------------+----------+-------------+
|      Id|gender|User_id|          ФИО|       dob|creation_date|
+--------+------+-------+-------------+----------+-------------+
|15882360|     M|  12364|name_15882360|1994-08-01|   2021-12-28|
|12823211|     F|  12372|name_12823211|1974-10-01|   2021-08-13|
|17671183|     M|  12375|name_17671183|1975-03-17|   2021-10-22|
|51209904|     F|  12382|name_51209904|1975-03-13|   2020-07-26|
| 4599397|     M|  12351| name_4599397|1969-10-01|   2021-11-15|
+--------+------+-------+-------------+----------+-------------+
only showing top 5 rows



# Вывести фамилии посетителей, которые читали хотя бы одну новость про спорт.

In [16]:
df_joined = df.join(df_lk, df.id == df_lk.User_id, 'inner')

In [17]:
df_joined[df_joined['tag'] == 'Sport'].select('ФИО').distinct().show()

+-------------+
|          ФИО|
+-------------+
|name_26689758|
|name_76003725|
|name_90232334|
| name_4599397|
|name_51209904|
|name_19236785|
|name_13476839|
|name_17763432|
|name_28008070|
|name_96185390|
|name_57503129|
|name_57494273|
|name_12823211|
|name_11804110|
| name_6666339|
|name_88420324|
|name_15882360|
|name_12505146|
|name_77384750|
|name_38315514|
+-------------+
only showing top 20 rows



 # Выведите 10% ЛК, у которых максимальная разница между датой создания ЛК и датой последнего посещения. 

In [18]:
df_joined.select('Id', 'creation_date', 'timestamp')\
.withColumn('intv', F.col('timestamp') - F.unix_timestamp('creation_date', 'yyyy-MM-dd'))\
.groupBy('Id').agg(F.max('intv').alias('intv'))\
.sort(F.col('intv').desc())\
.select('Id').show(int(0.1 * df_joined.select('Id').distinct().count()))

+--------+
|      Id|
+--------+
|57503129|
|12505146|
+--------+
only showing top 2 rows



# Вывести топ-5 страниц, которые чаще всего посещают мужчины и топ-5 страниц, которые посещают чаще женщины

In [19]:
print('ТОП-5 страниц женщины')
df_joined[df_joined.gender == 'F']\
.groupBy('page_id').agg(F.count('id').alias('count'))\
.sort(F.col('count').desc())\
.show(5)

ТОП-5 страниц женщины
+-------+-----+
|page_id|count|
+-------+-----+
|    103|   23|
|    112|   19|
|    113|   19|
|    107|   19|
|    111|   17|
+-------+-----+
only showing top 5 rows



In [20]:
print('ТОП-5 страниц мужчины')
df_joined[df_joined.gender == 'M']\
.groupBy('page_id').agg(F.count('id').alias('count'))\
.sort(F.col('count').desc())\
.show(5)

ТОП-5 страниц мужчины
+-------+-----+
|page_id|count|
+-------+-----+
|    112|   19|
|    101|   16|
|    107|   16|
|    115|   16|
|    119|   16|
+-------+-----+
only showing top 5 rows

