In [1]:
import os
os.environ['HADOOP_CONF_DIR'] = '/etc/hadoop/conf'
os.environ['YARN_CONF_DIR'] = '/etc/hadoop/conf'

import findspark
findspark.init()
findspark.find()

import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
                    .master("yarn") \
                    .config("spark.executor.memory", "2g") \
                    .config("spark.executor.cores", 2) \
                    .config("spark.driver.cores", 2) \
                    .appName("Learning DataFrames") \
                    .config("spark.ui.port", "4051") \
                    .getOrCreate()

spark = SparkSession.builder.getOrCreate()


SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/lib/spark/jars/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
2024-02-20 18:09:41,507 WARN util.Utils: Your hostname, fhmbmgti0ul88u8innkd resolves to a loopback address: 127.0.1.1; using 172.16.0.39 instead (on interface eth0)
2024-02-20 18:09:41,508 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2024-02-20 18:09:43,340 WARN util.Utils: spark.executor.instances less than spark.dynamicAllocation.minExecutors is invalid, ignoring its setting, please update 

In [2]:
events = spark.read.json("/user/master/data/events/date=2022-05-01")

                                                                                

In [3]:
events.printSchema()

root
 |-- event: struct (nullable = true)
 |    |-- admins: array (nullable = true)
 |    |    |-- element: long (containsNull = true)
 |    |-- channel_id: long (nullable = true)
 |    |-- datetime: string (nullable = true)
 |    |-- media: struct (nullable = true)
 |    |    |-- media_type: string (nullable = true)
 |    |    |-- src: string (nullable = true)
 |    |-- message: string (nullable = true)
 |    |-- message_channel_to: long (nullable = true)
 |    |-- message_from: long (nullable = true)
 |    |-- message_group: long (nullable = true)
 |    |-- message_id: long (nullable = true)
 |    |-- message_to: long (nullable = true)
 |    |-- message_ts: string (nullable = true)
 |    |-- reaction_from: string (nullable = true)
 |    |-- reaction_type: string (nullable = true)
 |    |-- subscription_channel: long (nullable = true)
 |    |-- tags: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- user: string (nullable = true)
 |-- event_type: s

In [13]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

window = Window().partitionBy('event.message_from').orderBy('event.datetime')

dfWithLag = events.withColumn("lag_7",F.lag("event.message_to", 7).over(window))

dfWithLag.select("event.message_from", "lag_7") \
.filter(F.col("lag_7").isNotNull()) \
.orderBy(F.col("event.message_from").desc()) \
.show(10, False)



+------------+------+
|message_from|lag_7 |
+------------+------+
|155747      |98478 |
|155747      |23058 |
|155747      |32788 |
|155747      |121581|
|155058      |37570 |
|155058      |11144 |
|155058      |139528|
|155058      |94655 |
|155058      |111506|
|155058      |116979|
+------------+------+
only showing top 10 rows



                                                                                

In [14]:
# Определение оконной спецификации: группировка по отправителю и сортировка по времени
window = Window.partitionBy('event.message_from').orderBy('event.datetime')

# Создание колонки lag_7 для определения, с кем пользователь взаимодействовал семь сообщений назад
dfWithLag = events.withColumn("lag_7", F.lag("event.message_to", 7).over(window))

# Фильтрация записей, где получатель (lag_7) не NULL, и сортировка по отправителю (message_from) в порядке убывания
dfWithLag_filtered = dfWithLag.filter(F.col("lag_7").isNotNull())

dfWithLag_filtered.select(
    F.col("event.message_from"), 
    F.col("lag_7")
).orderBy(F.col("event.message_from").desc()) \
.show(10, False)



+------------+------+
|message_from|lag_7 |
+------------+------+
|155747      |98478 |
|155747      |23058 |
|155747      |32788 |
|155747      |121581|
|155058      |37570 |
|155058      |11144 |
|155058      |139528|
|155058      |94655 |
|155058      |111506|
|155058      |116979|
+------------+------+
only showing top 10 rows



                                                                                

In [25]:
!hdfs dfs -ls "/user/timefor/data"

Found 1 items
drwxr-xr-x   - timefor timefor          0 2024-02-20 18:58 /user/timefor/data/events


In [20]:
events = spark.read.json("/user/master/data/events")

                                                                                

In [24]:
events.write.option("header",True) \
        .partitionBy("event_type") \
        .mode('overwrite') \
        .parquet("/user/timefor/data/events") 

                                                                                

In [26]:
from pyspark.sql import functions as F

# Чтение данных из Parquet файла
df = spark.read.parquet('/user/timefor/data/events')

# Сортировка данных по времени в порядке убывания и вывод последних 10 строк
df.orderBy(F.col('event.datetime').desc()).show(10)



+--------------------+----------+------------+
|               event|      date|  event_type|
+--------------------+----------+------------+
|[[19342], 987160,...|2022-05-31|     message|
|[,, 2022-05-31 23...|2022-05-31|subscription|
|[[26358], 247511,...|2022-05-31|     message|
|[[79792], 748847,...|2022-05-31|     message|
|[,, 2022-05-31 23...|2022-05-31|subscription|
|[,, 2022-05-31 23...|2022-05-31|subscription|
|[[151897], 396845...|2022-05-31|     message|
|[,, 2022-05-31 23...|2022-05-31|subscription|
|[,, 2022-05-31 23...|2022-05-31|subscription|
|[,, 2022-05-31 23...|2022-05-31|subscription|
+--------------------+----------+------------+
only showing top 10 rows





In [12]:
spark.stop()