In [9]:
import os

import findspark
import pyspark.sql.functions as F
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.storagelevel import StorageLevel

os.environ['HADOOP_CONF_DIR'] = '/etc/hadoop/conf'
os.environ['YARN_CONF_DIR'] = '/etc/hadoop/conf'

findspark.init()
findspark.find()

'/usr/local/lib/python3.8/dist-packages/pyspark'

In [2]:
spark = SparkSession.builder \
    .master("local") \
    .appName("Learning DataFrames") \
    .getOrCreate()
# данные датафрейма 
data = [('2021-01-06', 3744, 63, 322),
        ('2021-01-04', 2434, 21, 382),
        ('2021-01-04', 2434, 32, 159),
        ('2021-01-05', 3744, 32, 159),
        ('2021-01-06', 4342, 32, 159),
        ('2021-01-05', 4342, 12, 259),
        ('2021-01-06', 5677, 12, 259),
        ('2021-01-04', 5677, 23, 499)]
# названия атрибутов
columns = ['dt', 'user_id', 'product_id', 'purchase_amount']
# создаём датафрейм
df = spark.createDataFrame(data=data, schema=columns)

23/08/18 13:00:25 WARN Utils: Your hostname, fhmr7tigqkf5v271ckk4 resolves to a loopback address: 127.0.1.1; using 172.16.0.31 instead (on interface eth0)
23/08/18 13:00:25 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/08/18 13:00:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


---

In [3]:
df.show(5)

[Stage 0:>                                                          (0 + 1) / 1]

+----------+-------+----------+---------------+
|        dt|user_id|product_id|purchase_amount|
+----------+-------+----------+---------------+
|2021-01-06|   3744|        63|            322|
|2021-01-04|   2434|        21|            382|
|2021-01-04|   2434|        32|            159|
|2021-01-05|   3744|        32|            159|
|2021-01-06|   4342|        32|            159|
+----------+-------+----------+---------------+


                                                                                

---

## 1

In [4]:
w = Window.orderBy('purchase_amount')

df.orderBy('purchase_amount') \
    .select(
    'dt',
    'user_id',
    'purchase_amount',
    F.row_number().over(w).alias('row_number'),
) \
    .show()

23/08/18 13:00:42 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/08/18 13:00:42 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/08/18 13:00:42 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+----------+-------+---------------+----------+
|        dt|user_id|purchase_amount|row_number|
+----------+-------+---------------+----------+
|2021-01-04|   2434|            159|         1|
|2021-01-05|   3744|            159|         2|
|2021-01-06|   4342|            159|         3|
|2021-01-05|   4342|            259|         4|
|2021-01-06|   5677|            259|         5|
|2021-01-06|   3744|            322|         6|
|2021-01-04|   2434|            382|         7|
|2021-01-04|   5677|            499|         8|
+----------+-------+---------------+----------+


---

## 2

In [5]:
events = spark.read.json("/user/master/data/events/date=2022-05-01")
events.printSchema()

[Stage 2:>                                                          (0 + 1) / 1]

root
 |-- event: struct (nullable = true)
 |    |-- admins: array (nullable = true)
 |    |    |-- element: long (containsNull = true)
 |    |-- channel_id: long (nullable = true)
 |    |-- datetime: string (nullable = true)
 |    |-- media: struct (nullable = true)
 |    |    |-- media_type: string (nullable = true)
 |    |    |-- src: string (nullable = true)
 |    |-- message: string (nullable = true)
 |    |-- message_channel_to: long (nullable = true)
 |    |-- message_from: long (nullable = true)
 |    |-- message_group: long (nullable = true)
 |    |-- message_id: long (nullable = true)
 |    |-- message_to: long (nullable = true)
 |    |-- message_ts: string (nullable = true)
 |    |-- reaction_from: string (nullable = true)
 |    |-- reaction_type: string (nullable = true)
 |    |-- subscription_channel: long (nullable = true)
 |    |-- tags: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- user: string (nullable = true)
 |-- event_type: s

                                                                                

## 3

In [6]:
events = spark.read.json("/user/master/data/events")

                                                                                

In [10]:
w = Window.partitionBy('event.message_from').orderBy('event.message_ts')

dfWithLag = events \
    .select(
        F.col('event.message_from').alias('message_from'),
        F.lag("event.message_to").over(w).alias('lag_7'),
    ) \
    .persist(StorageLevel.DISK_ONLY)

dfWithLag \
    .filter(
        dfWithLag.lag_7.isNotNull() &
        (dfWithLag.message_from <= 117522)
    ) \
    .orderBy(F.desc('event.message_from')) \
    .show(20, False)

[Stage 11:>                                                         (0 + 1) / 1]

+------------+------+
|message_from|lag_7 |
+------------+------+
|117522      |75319 |
|117522      |111232|
|117522      |149488|
|117522      |149488|
|117522      |136603|
|117522      |82650 |
|117522      |86206 |
|117522      |33232 |
|117522      |149488|
|117522      |146134|
|117522      |609   |
|117522      |149488|
|117522      |90121 |
|117522      |108673|
|117522      |149488|
|117522      |149488|
|117522      |21265 |
|117522      |95648 |
|117522      |149488|
|117522      |5880  |
+------------+------+


                                                                                

---

In [20]:
dfWithLag \
    .filter('message_from = 117522 and lag_7 == 7815') \
    .show()

                                                                                

+------------+-----+
|message_from|lag_7|
+------------+-----+
|      117522| 7815|
|      117522| 7815|
+------------+-----+


## 4

In [22]:
data = [('2021-01-06', 3744, 63, 322),
        ('2021-01-04', 2434, 21, 382),
        ('2021-01-04', 2434, 32, 159),
        ('2021-01-05', 3744, 32, 159),
        ('2021-01-06', 4342, 32, 159),
        ('2021-01-05', 4342, 12, 259),
        ('2021-01-06', 5677, 12, 259),
        ('2021-01-04', 5677, 23, 499)
]

columns = ['dt', 'user_id', 'product_id', 'purchase_amount']

df = spark.createDataFrame(data=data, schema=columns)

In [26]:
w = Window.partitionBy('user_id')

df \
    .select(
        'user_id',
        F.max('purchase_amount').over(w).alias('max'),
        F.min('purchase_amount').over(w).alias('min'),
    ) \
    .dropDuplicates(['user_id']) \
    .show(10, False)

+-------+---+---+
|user_id|max|min|
+-------+---+---+
|2434   |382|159|
|3744   |322|159|
|4342   |259|159|
|5677   |499|259|
+-------+---+---+
