In [1]:
#Устанавливаем pySpark
!pip install pyspark >> None

In [27]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, avg, when, max, month
from pyspark.sql.functions import countDistinct
from pyspark.sql import functions as F
import pandas as pd

In [28]:
df = pd.read_csv('sales_data.csv')
df.head()

Unnamed: 0,order_id,product_id,customer_id,order_date,quantity,price_per_unit,total_price,payment_method,region
0,2818,128,11577,2023-08-25,1,796.66,796.66,cash,south
1,2742,135,45498,2024-01-24,4,262.72,1050.88,card,north
2,4155,328,62904,2023-11-21,4,871.24,3484.96,credit,west
3,7955,795,82611,2023-04-26,3,477.74,1433.22,cash,ost
4,3169,378,74035,2024-03-18,6,875.12,5250.72,card,center


In [29]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100 entries, 0 to 99
Data columns (total 9 columns):
 #   Column          Non-Null Count  Dtype  
---  ------          --------------  -----  
 0   order_id        100 non-null    int64  
 1   product_id      100 non-null    int64  
 2   customer_id     100 non-null    int64  
 3   order_date      100 non-null    object 
 4   quantity        100 non-null    int64  
 5   price_per_unit  100 non-null    float64
 6   total_price     100 non-null    float64
 7   payment_method  100 non-null    object 
 8   region          100 non-null    object 
dtypes: float64(2), int64(4), object(3)
memory usage: 7.2+ KB


In [30]:
#Создаем SparkSession
spark=SparkSession.builder.appName('Practise').getOrCreate()

#загружаем csv файл
'''
    spark.read.load: Это метод PySpark для чтения данных из файла. Он используется для загрузки данных в DataFrame.
    'sales_data.csv': Это путь к файлу CSV, который вы хотите прочитать. В данном случае, файл называется sales_data.csv.
    format="csv": Указывает формат файла, который вы хотите прочитать. В данном случае, формат файла - CSV.
    sep=",": Этот параметр определяет разделитель полей в CSV-файле. В данном случае, разделителем является запятая.
    header="true": Этот параметр указывает, что первая строка CSV-файла содержит имена столбцов.
    Infer_schema=True: Этот параметр указывает PySpark автоматически определять типы данных столбцов на основе содержимого файла.
'''
df_pyspark=spark.read.load('sales_data.csv', format="csv", sep=",", header="true", Infer_schema=True)

#необходимые преобразования
'''
Изменяем тип данных столбца `order_id` в DataFrame `df_pyspark` на целочисленный (`int`).
Это достигается с помощью функции `withColumn()`, которая используется для добавления, замены или
обновления столбцов в DataFrame. В данном случае, `withColumn()` применяется для изменения типа
данных столбца `order_id` с использованием метода `cast()`, который принимает в качестве аргумента
тип данных, на который нужно преобразовать столбец. В данном случае, тип данных для преобразования
указан как `'int'`, что соответствует целочисленному типу данных в PySpark.

Пример кода:
```python
df_pyspark = df_pyspark.withColumn("order_id", df_pyspark["order_id"].cast('int'))
```
Код создает новый DataFrame, в котором тип данных столбца `order_id` изменен на `int`, не
изменяя исходный DataFrame `df_pyspark`. Функция `withColumn()` является трансформацией DataFrame,
что означает, что она возвращает новый DataFrame с указанными изменениями, не изменяя исходный DataFrame.
'''
df_pyspark = df_pyspark.withColumn("order_id", df_pyspark["order_id"].cast('int'))
df_pyspark = df_pyspark.withColumn("product_id", df_pyspark["product_id"].cast('int'))
df_pyspark = df_pyspark.withColumn("customer_id", df_pyspark["customer_id"].cast('int'))
df_pyspark = df_pyspark.withColumn("order_date", df_pyspark["order_date"].cast('date'))
df_pyspark = df_pyspark.withColumn("quantity", df_pyspark["quantity"].cast('int'))
df_pyspark = df_pyspark.withColumn("price_per_unit", df_pyspark["price_per_unit"].cast('int'))
df_pyspark = df_pyspark.withColumn("total_price", df_pyspark["total_price"].cast('int'))

#добавление колонки со значением месяца
df_pyspark = df_pyspark.withColumn("order_month", month(df_pyspark["order_date"]))

#нахождение нужных значений
'''
Выполняем агрегацию данных в DataFrame `df_pyspark` по столбцу `order_month`,
используя функции `groupBy` и `agg` из PySpark. В результате получаем новый DataFrame
`sales_analysis`, который содержит следующие агрегированные данные:

- `total_sales`: сумма значений столбца `total_price` для каждого месяца.
- `average_sales`: среднее значение столбца `total_price` для каждого месяца.
- `max_sales`: максимальное значение столбца `total_price` для каждого месяца.

Это позволяет анализировать продажи по месяцам, выявляя общую сумму продаж,
среднюю сумму продаж и максимальную сумму продаж для каждого месяца.
'''
sales_analysis = df_pyspark.groupBy("order_month").agg(
                            sum("total_price").alias("total_sales"),
                            avg("total_price").alias("average_sales"),
                            max("total_price").alias("max_sales"))

#вывод результата с сортировкой
sales_analysis.orderBy('order_month').show()

+-----------+-----------+------------------+---------+
|order_month|total_sales|     average_sales|max_sales|
+-----------+-----------+------------------+---------+
|          1|      23306|           2913.25|     6537|
|          2|      12103|2017.1666666666667|     5625|
|          3|      30389|          3798.625|     9905|
|          4|      28471| 2190.076923076923|     7821|
|          5|      20784|            3464.0|     8744|
|          6|      23232|            2323.2|     5486|
|          7|      15185|          1898.125|     5444|
|          8|      30575|            3057.5|     8444|
|          9|      26291|          3286.375|     8056|
|         10|      25513|          3189.125|     6531|
|         11|      21918|           2739.75|     6667|
|         12|      13716|1959.4285714285713|     5755|
+-----------+-----------+------------------+---------+



In [31]:
df_pyspark.schema

StructType([StructField('order_id', IntegerType(), True), StructField('product_id', IntegerType(), True), StructField('customer_id', IntegerType(), True), StructField('order_date', DateType(), True), StructField('quantity', IntegerType(), True), StructField('price_per_unit', IntegerType(), True), StructField('total_price', IntegerType(), True), StructField('payment_method', StringType(), True), StructField('region', StringType(), True), StructField('order_month', IntegerType(), True)])

### Задание 2: Вычислите количество товаров, купленных различными методами оплаты.


Код выполняет агрегацию данных в DataFrame `df_pyspark` по столбцу `payment_method`, суммируя значения в столбце `quantity` для каждой группы и присваивая результату новое имя `num_of_sales`. В результате получается новый DataFrame `sales_analysis`, в котором каждая строка соответствует уникальному значению из столбца `payment_method`, а столбец
`num_of_sales` содержит сумму значений `quantity` для каждой группы.

- `groupBy("payment_method")` группирует данные по столбцу `payment_method`, создавая группы для каждого уникального значения в этом столбце.
- `agg(sum("quantity").alias("num_of_sales"))` применяет агрегационную функцию `sum` к столбцу `quantity` для каждой группы, суммируя значения `quantity` внутри каждой группы. Результат этой операции затем переименовывается в `num_of_sales` с помощью метода `alias`.

Таким образом, код позволяет анализировать продажи по различным методам оплаты, суммируя количество продаж для каждого метода оплаты.

In [32]:
#нахождение нужных значениях
sales_analysis = df_pyspark.groupBy("payment_method").agg(sum("quantity").alias("num_of_sales"))

#вывод результата
sales_analysis.show()

+--------------+------------+
|payment_method|num_of_sales|
+--------------+------------+
|          cash|         152|
|          card|         182|
|        credit|         185|
+--------------+------------+



----------------------------------------------------------

### Задание 3: Найдите регион с самым большой суммарной стоимостью продаж


1. Группируем данные в DataFrame `df_pyspark` по столбцу `region`, а затем применяет агрегационную функцию `sum` к столбцу `total_price` для каждой группы. Результат этой операции переименовывается в `total_price_per_region`.
2. Создаем новый DataFrame `sales_analysis`, содержащий сумму `total_price` для каждой уникальной `region`.
3. Выбираем столбец `region` из DataFrame `sales_analysis`, сортирует его по убыванию значений `total_price_per_region` и выбирает первую запись.
4. Выводим результат, который представляет собой регион с наибольшей суммой `total_price`.

Таким образом, код анализирует продажи по регионам, суммирует общую стоимость продаж для каждого региона и определяет регион с наибольшей суммой продаж.

In [35]:
#нахождение нужных значениях
sales_analysis = df_pyspark.groupBy("region").agg(
    sum("total_price").alias("total_price_per_region")
)
total_price_per_region = sales_analysis.select("region").orderBy(sales_analysis.total_price_per_region.desc()).first()

#вывод результата
print(*total_price_per_region)
sales_analysis.show()

west
+------+----------------------+
|region|total_price_per_region|
+------+----------------------+
|  west|                 81691|
|   ost|                 59872|
|center|                 47258|
| north|                 30214|
| south|                 52448|
+------+----------------------+



--------------------------------------------------------------

### Задание 4: Вычислите общую сумму продаж и среднюю сумму продажи для каждого региона.


1. Группируем данные в DataFrame `df_pyspark` по столбцу `region`, а затем применяет агрегационные функции `sum` и `avg` к столбцу `total_price` для каждой группы. Результаты этих операций переименовываются в `total_price_per_region` и `avg_price_per_region` соответственно.
2. Создаем новый DataFrame `sales_analysis`, содержащий сумму `total_price` и среднее значение `total_price` для каждой уникальной `region`.
3. Выводим результаты из DataFrame `sales_analysis` с помощью метода `show()`, который отображает содержимое DataFrame в консоли.

Таким образом, код анализирует продажи по регионам, суммирует общую стоимость продаж и вычисляет среднюю стоимость продаж для каждого региона. Это позволяет получить общую картину продаж по регионам, включая как общую стоимость продаж, так и среднюю стоимость продаж в каждом регионе.

In [36]:
#нахождение нужных значениях
sales_analysis = df_pyspark.groupBy("region").agg(
    sum("total_price").alias("total_price_per_region"),
    avg("total_price").alias("avg_price_per_region")
)
#вывод результата
sales_analysis.show()

+------+----------------------+--------------------+
|region|total_price_per_region|avg_price_per_region|
+------+----------------------+--------------------+
|  west|                 81691|             4084.55|
|   ost|                 59872|              2993.6|
|center|                 47258|              2362.9|
| north|                 30214|              1510.7|
| south|                 52448|              2622.4|
+------+----------------------+--------------------+



-------------------------------------------------------------------------

### Задание 5: Вычислите общее количество и сумму товаров, проданных за наличные в 2023 году.


1. Добавляем новую колонку `order_year` в DataFrame `df_pyspark`, используя функцию `year` для извлечения года из даты в колонке `order_date`.
2. Фильтруем DataFrame `df_pyspark` по двум условиям: методу оплаты должен быть "Наличные" и год заказа должен быть 2023.
3. Агрегируем данные, суммируя количество (`quantity`) и общую стоимость (`total_price`) для всех записей, удовлетворяющих условиям фильтрации. Результаты этих операций переименовываются в `total_quantity_2023` и `total_price_2023` соответственно.
4. Создаем новый DataFrame `sales_analysis`, содержащий сумму количества и общую стоимость продаж за 2023 год, оплаченных наличными.
5. Выводим результаты из DataFrame `sales_analysis` с помощью метода `show()`, который отображает содержимое DataFrame в консоли.

Таким образом, код анализирует продажи, оплаченные наличными, за 2023 год, суммируя количество и общую стоимость продаж для этого периода.

In [42]:
#добавление колонки со значением года
df_pyspark = df_pyspark.withColumn("order_year", F.year(df_pyspark["order_date"]))

#нахождение нужных значений
sales_analysis = df_pyspark.filter((df_pyspark.payment_method == "cash") & (df_pyspark.order_year == 2023)).agg(
    sum("quantity").alias("total_quantity_2023"),
    sum("total_price").alias("total_price_2023")
)

#вывод результата с сортировкой
sales_analysis.show()

+-------------------+----------------+
|total_quantity_2023|total_price_2023|
+-------------------+----------------+
|                121|           71739|
+-------------------+----------------+



### Задание 6: Найдите уникальное количество покупателей за 2023 год


1. Импортируем функцию `countDistinct` из модуля `pyspark.sql.functions`. Эта функция используется для подсчета уникальных значений в определенных столбцах DataFrame.
2. Фильтруем исходный DataFrame `df_pyspark`, оставляя только записи за 2023 год. Это делается с помощью метода `filter`, который принимает условие для фильтрации.
3. Выбираем из отфильтрованного DataFrame количество уникальных значений в столбце `customer_id` с использованием функции `countDistinct`. Результат сохраняется в новом столбце с именем `unique_users` [1].
4. Выводим результат на экран с помощью метода `show`. Этот метод отображает данные DataFrame в удобном для чтения формате.

В итоге, код подсчитывает количество уникальных клиентов (`customer_id`), совершивших покупки в 2023 году, и выводит это количество на экран.

In [44]:
#нахождение нужных значений
sales_analysis = df_pyspark.filter(df_pyspark.order_year == 2023)
sales_analysis = sales_analysis.select(countDistinct("customer_id").alias("unique_users"))

#вывод результата с сортировкой
sales_analysis.show()

+------------+
|unique_users|
+------------+
|          78|
+------------+



### Задание 7: Вам даны данные с информацией о стоимости продуктов в различных валютах. Ваша задача состоит в том, чтобы перевести все цены в доллары, используя текущие курсы валют. Однако у вас есть ограничение: для некоторых продуктов курс валюты неизвестен и их стоимость должна остаться в исходной валюте. (Для конвертации из EUR в USD нужно умножить на 1.2)


1. Импортируем необходимые модули и функции из PySpark для работы с SparkSession и функцией `when` для условной логики.
2. Создаем сессию Spark с именем приложения "currency_conversion". Это необходимо для работы с Spark и выполнения операций с данными.
3. Загружаем данные в DataFrame `df` с помощью метода `createDataFrame`. Данные представляют собой список кортежей, где каждый кортеж содержит информацию о продукте: его идентификатор (`product_id`), цену (`price`) и валюту (`currency`).
4. Добавляем новый столбец `price_usd` в DataFrame, который содержит цену продукта в долларах США. Для этого используется функция `when` для условного преобразования цены в зависимости от валюты:
   - Если валюта равна "USD", цена остается без изменений.
   - Если валюта равна "EUR", цена умножается на курс обмена 1.2 (примерный курс EUR/USD).
   - Если валюта неизвестна, цена остается без изменений.
5. Выводим результат на экран с помощью метода `show`. Это позволяет увидеть, как изменились цены после конвертации в доллары США.

В итоге, код позволяет преобразовать цены продуктов из разных валют в доллары США, используя примерный курс обмена для EUR/USD.

In [45]:
# Создание сессии Spark
spark = SparkSession.builder.appName("currency_conversion").getOrCreate()

# Загрузка датасета
data = [(1, 100, "USD"),
        (2, 200, "EUR"),
        (3, 300, "Unknown"),
        (4, 100, "EUR"),
        (5, 200, "EUR"),
        (6, 300, "Unknown"),
        (7, 100, "Unknown"),
        (8, 200, "USD"),
        (9, 300, "USD")
]
df = spark.createDataFrame(data, ["product_id", "price", "currency"])

# Добавление столбца с конвертированной ценой
df = df.withColumn("price_usd", when(df.currency == "USD", df.price)
                                 .when(df.currency == "EUR", df.price * 1.2)  # Примерный курс EUR/USD = 1.2
                             	.otherwise(df.price))  # Если курс неизвестен, оставляем в исходной валюте

# Вывод результата
df.show()

+----------+-----+--------+---------+
|product_id|price|currency|price_usd|
+----------+-----+--------+---------+
|         1|  100|     USD|    100.0|
|         2|  200|     EUR|    240.0|
|         3|  300| Unknown|    300.0|
|         4|  100|     EUR|    120.0|
|         5|  200|     EUR|    240.0|
|         6|  300| Unknown|    300.0|
|         7|  100| Unknown|    100.0|
|         8|  200|     USD|    200.0|
|         9|  300|     USD|    300.0|
+----------+-----+--------+---------+



### Задание 8: Допустим, есть два датасета: один содержит информацию о пользователях (user_id, name, age), а другой содержит информацию о покупках пользователей (user_id, product_id, date). Необходимо найти средний возраст пользователей, совершивших покупки.


1. Инициализируем Spark сессию с именем приложения "user_purchase_join".
2. Создаем два DataFrame: `users_df` и `purchases_df`. Первый содержит информацию о пользователях (ID, имя, возраст), второй - о покупках (ID пользователя, ID продукта, дата покупки).
3. Выполняем операцию соединения (join) между этими двумя DataFrame по столбцу `user_id`. Это позволяет объединить информацию о пользователях и их покупках в одном DataFrame.
4. Группируем результат по `user_id` и вычисляет средний возраст пользователей, которые совершили покупки.
5. Выводим результат на экран.

Основная цель кода - анализировать данные о покупках пользователей, вычисляя средний возраст пользователей, совершивших покупки. Это может быть полезно для анализа поведения пользователей и оптимизации маркетинговых стратегий.

In [46]:
# Создаем Spark сессию
spark = SparkSession.builder.appName("user_purchase_join").getOrCreate()

# Загружаем датасеты
users_df = spark.createDataFrame([
	(1, "Alice", 25),
	(2, "Bob", 30),
	(3, "Charlie", 28),
	(4, "John", 56),
	(5, "Alex", 41),
	(6, "Juliya", 17)

], ["user_id", "name", "age"])

purchases_df = spark.createDataFrame([
	(1, 101, "2022-01-01"),
	(2, 102, "2022-01-02"),
	(3, 103, "2022-01-03"),
	(3, 104, "2022-01-04"),
	(6, 105, "2022-01-05")
], ["user_id", "product_id", "date"])

# Производим операцию join и вычисляем средний возраст
result_df = users_df.join(purchases_df, "user_id").groupBy("user_id").agg(avg("age").alias("average_age"))

# Выводим результат
result_df.show()

+-------+-----------+
|user_id|average_age|
+-------+-----------+
|      1|       25.0|
|      2|       30.0|
|      3|       28.0|
|      6|       17.0|
+-------+-----------+



### Задание 9: У вас есть два набора данных. Первый набор содержит информацию о продуктах: id продукта, название, категория и цена. Второй набор содержит информацию о заказах: id заказа, id продукта, количество. Ваша задача — использовать PySpark для выполнения следующих шагов:

1. Присоединить набор данных о продуктах к набору данных о заказах с помощью id продукта.
3. Рассчитать общую стоимость каждого заказа, учитывая количество продуктов и их цену.
4. Отфильтровать заказы, у которых общая стоимость больше 1000.

1. Инициализируем сессию Spark с именем приложения "aggregate-join-filter".

2. Создаем два DataFrame (`products_df` и `orders_df`) из предопределенных списков кортежей, представляющих данные о продуктах и заказах соответственно.

3. Присоединяем DataFrame `orders_df` к DataFrame `products_df` по столбцу "product_id", используя внутреннее соединение (inner join) по умолчанию.

4. Добавляем новый столбец "total_cost" в DataFrame `joined_df`, вычисляя общую стоимость каждого заказа как произведение количества товара на цену товара.

5. Отфильтровываем заказы, у которых общая стоимость больше 1000, и сохраняет результат в DataFrame `filtered_orders_df`.

6. Выводим результаты фильтрации на экран.

In [53]:
# Создание сессии Spark
spark = SparkSession.builder.appName("aggregate-join-filter").getOrCreate()

# Чтение наборов данных из CSV файлов
data_produсts = [(1, "product1", "category1", 10.0),
                (2, "product2", "category2", 15.0),
                (3, "product3", "category1", 12.5),
                (4, "product4", "category3", 20.0),
                (5, "product5", "category2", 18.0),
                (6, "product6", "category3", 25.0),
                (7, "product7", "category1", 9.0),
                (8, "product8", "category2", 16.0),
                (9, "product9", "category3", 22.0),
                (10, "product10", "category1", 11.5)]

products_df = spark.createDataFrame(data_produсts, ["product_id", "title", "category", "price"])

data_orders = [(1, 1, 5),
               (2, 3, 2),
               (3, 2, 3),
               (4, 5, 1),
               (5, 4, 4),
               (6, 7, 2),
               (7, 6, 3),
               (8, 8, 2),
               (9, 10, 1),
               (10, 9, 4)]

orders_df = spark.createDataFrame(data_orders, ["order_id", "product_id", "quantity"])

# Присоединение набора данных о продуктах к набору данных о заказах
joined_df = orders_df.join(products_df, "product_id")

# Расчет общей стоимости каждого заказа
total_cost_df = joined_df.withColumn("total_cost", F.col("quantity") * F.col("price"))

# Отфильтрование заказов с общей стоимостью больше 50
filtered_orders_df = total_cost_df.filter(total_cost_df.total_cost > 50)

# Вывод результата
filtered_orders_df.show()

+----------+--------+--------+--------+---------+-----+----------+
|product_id|order_id|quantity|   title| category|price|total_cost|
+----------+--------+--------+--------+---------+-----+----------+
|         4|       5|       4|product4|category3| 20.0|      80.0|
|         6|       7|       3|product6|category3| 25.0|      75.0|
|         9|      10|       4|product9|category3| 22.0|      88.0|
+----------+--------+--------+--------+---------+-----+----------+



### Задание 10: Найти сумму чисел в колонке


1. Создаем сессию Spark с именем приложения "sum_example".
2. Создаем DataFrame с одним столбцом "number", содержащим числа от 1 до 4.
3. Вычисляем сумму всех чисел в столбце "number" и выводит результат.

Код использует функцию `sum` из модуля `pyspark.sql.functions`, которая является агрегатной функцией и возвращает сумму всех значений в указанном столбце. В данном случае, она применяется к столбцу "number" DataFrame `df` для вычисления общей суммы чисел в этом столбце. Результат вычисления суммы затем отображается с помощью метода `show()`.

In [54]:
# Создание Spark сессии
spark = SparkSession.builder.appName("sum_example").getOrCreate()

# Создание DataFrame
data = [(1,), (2,), (3,), (4,)]
df = spark.createDataFrame(data, ["number"])

# Найти сумму чисел в колонке "number"
sum_result = df.select(sum("number")).show()

+-----------+
|sum(number)|
+-----------+
|         10|
+-----------+



### Задание 11: Посчитать количество уникальных значений в колонке


1. Импортируем необходимые модули из библиотеки PySpark для работы с Spark и функцией `countDistinct` для подсчета уникальных значений в DataFrame.
2. Создаем экземпляр SparkSession с именем приложения "count_distinct_example". Это необходимо для инициализации Spark и подготовки среды для выполнения операций с данными.
3. Создаем DataFrame с именем `df`, используя предоставленные данные. В данном случае, DataFrame содержит одну колонку "name" с именами "Alice", "Bob", "Alice" и "Eve".
4. Выполняем подсчет количества уникальных значений в колонке "name" с помощью функции `countDistinct` и выводит результат на экран с помощью метода `show()`.

В результате выполнения кода будет выведено количество уникальных имен в колонке "name", что в данном случае равно 3, так как имена "Alice" и "Bob" встречаются дважды, а "Eve" - один раз.

In [55]:
# Создание Spark сессии
spark = SparkSession.builder.appName("count_distinct_example").getOrCreate()

# Создание DataFrame
data = [("Alice",), ("Bob",), ("Alice",), ("Eve",)]
df = spark.createDataFrame(data, ["name"])

# Посчитать количество уникальных значений в колонке "name"
count_result = df.select(countDistinct("name")).show()

+--------------------+
|count(DISTINCT name)|
+--------------------+
|                   3|
+--------------------+



### Задание 12: Выполнить фильтрацию данных по определенному условию


Данный код выполняет следующие действия:

1. Импортируем класс `SparkSession` из модуля `pyspark.sql`, который необходим для создания сессии Spark.
2. Создаем сессию Spark с именем приложения "filter_example".
3. Создаем список кортежей `data`, содержащий пары имя-возраст.
4. Создаем DataFrame `df` из списка `data`, указывая имена столбцов как "name" и "age".
5. Фильтруем данные в DataFrame `df`, оставляя только те строки, где возраст меньше 30, и выводит результат на экран с помощью метода `show()`.

Важно отметить, что метод `filter()` используется для фильтрации строк в DataFrame на основе заданного условия. В данном случае, условием является `df.age < 30`, что означает выборку строк, где значение в столбце "age" меньше 30. Результатом выполнения этого кода будет отображение на экране строк DataFrame, соответствующих этому условию.

In [56]:
 # Создание Spark сессии
spark = SparkSession.builder.appName("filter_example").getOrCreate()

# Создание DataFrame
data = [("Alice", 25), ("Bob", 30), ("Eve", 20), ("Charlie", 35)]
df = spark.createDataFrame(data, ["name", "age"])

# Выполнить фильтрацию данных по возрасту младше 30
filtered_data = df.filter(df.age < 30).show()

+-----+---+
| name|age|
+-----+---+
|Alice| 25|
|  Eve| 20|
+-----+---+



### Домашнее задание

### Условие: дана таблица с колонками (id, name, salary, managerId), студентам необходимо написать код на spark, который создаст эту таблицу (данные указаны ниже) и в результате выдаст таблицу в которой будут имена сотрудников, которые зарабатывают больше своих менеджеров.

Данные для таблицы:


In [57]:
data = {'id': [1, 2, 3, 4], 'Name': ['Joe', 'Henry', 'Sam', 'Max'],
        'Salary':['70', '80', '60', '90'], 'ManagerId':['3', '4', 'Null', 'Null']}
df = pd.DataFrame(data)
df

Unnamed: 0,id,Name,Salary,ManagerId
0,1,Joe,70,3
1,2,Henry,80,4
2,3,Sam,60,Null
3,4,Max,90,Null


Результат должен быть:
Joe (табличка с одной строкой и одним столбцом со значением Joe)


In [58]:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import avg

spark = SparkSession.builder.appName("WindowFunctionExample").getOrCreate()

# Создание DataFrame
data = [("A", 1), ("A", 2), ("A", 3), ("B", 4), ("B", 5), ("B", 6)]
df = spark.createDataFrame(data, ["Group", "Value"])

# Определение окна для каждой группы
windowSpec = Window.partitionBy("Group").orderBy("Value")

# Вычисление скользящего среднего
df = df.withColumn("MovingAverage", avg("Value").over(windowSpec.rowsBetween(-2, 0)))

df.show()


+-----+-----+-------------+
|Group|Value|MovingAverage|
+-----+-----+-------------+
|    A|    1|          1.0|
|    A|    2|          1.5|
|    A|    3|          2.0|
|    B|    4|          4.0|
|    B|    5|          4.5|
|    B|    6|          5.0|
+-----+-----+-------------+



In [None]:
1/ Ранжирующие
row_number
rank
percent_rank
dense_rank
ntile

2/ Аналитические
lag
lead
cume_dist
nth_value

3/ Аггрегирущие
min
max
count
std


In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import cume_dist

# Определение спецификации окна
windowSpec = Window.partitionBy("course").orderBy("income")

# Применение оконной функции cume_dist к DataFrame
df1.withColumn("cume_dist", cume_dist().over(windowSpec)).show()


In [None]:
1. PartitionBy()
2. OrderBy()
3. rowsBetweens(start, end)
4. rangeBetwen(start, end)

Window.unbdoudedPreceding, Window.unboudedFolliwing, Window.currentRow

In [None]:
from pyspark.sql.window import Window

# Определение окна с разделением на партиции по столбцу "country" и сортировкой по столбцу "date"
window = Window.partitionBy("country").orderBy("date")

# Определение окна с границами, охватывающими строки от начала до текущей строки
window = Window.orderBy("date").rowsBetween(Window.unboundedPreceding, Window.currentRow)

# Определение окна с границами, охватывающими строки от 3 строк до текущей строки
window = Window.orderBy("date").rowsBetween(-3, Window.currentRow)

# Определение окна с границами, охватывающими строки от 3 строк до 3 строк после текущей строки
window = Window.orderBy("date").rowsBetween(-3, 3)


In [None]:
1. Гибкость в анализе данных
2. Сохранение идентичности иходнызх данных
3. Поддержка сложных аналитических запросов
4. Оптимизация по производительности
5. Поддержка различых типов окон

In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import avg

# Определение окна с разделением на партиции по столбцу "department" и сортировкой по столбцу "salary"
windowSpec = Window.partitionBy("department").orderBy("salary")

# Применение оконной функции avg с использованием rowsBetween
df.withColumn("avg_salary", avg("salary").over(windowSpec.rowsBetween(Window.unboundedPreceding, Window.currentRow)))

# Применение оконной функции avg с использованием rangeBetween
df.withColumn("avg_salary_range", avg("salary").over(windowSpec.rangeBetween(Window.unboundedPreceding, 1000)))

In [None]:
rowsBetween(Window.currentRow, 1)
rangeBetween(Window.currentRow, 1)

In [None]:
from pyspark.sql import Window
from pyspark.sql import functions as func

# Определение окна с использованием rowsBetween
window1 = Window.partitionBy("category").orderBy("id").rowsBetween(Window.currentRow, 1)

# Определение окна с использованием rangeBetween
window2 = Window.partitionBy("category").orderBy("id").rangeBetween(Window.currentRow, 1)

# Применение оконных функций к DataFrame
df.withColumn("sum_rows", func.sum("id").over(window1)) \
 .withColumn("sum_range", func.sum("id").over(window2)) \
 .show()

In [None]:
!pip install pyspark >> None

In [None]:
import pyspark
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

# Определение пользовательской функции
def add_one(x):
    return x + 1

# Преобразование функции в UDF
add_one_udf = udf(add_one, IntegerType())

# Использование UDF в DataFrame
df = pyspark.createDataFrame([(1,), (2,), (3,)], ["value"])
df.withColumn("value_plus_one", add_one_udf(df["value"])).show()

In [None]:
1. IntergerType
2. DoubleType
3. StringType
4. BooleanType
5. ArrayType
6. MapType
7. StructType

In [None]:
1. udf
2. registrUDF
3. unregistUDF

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

# Определение пользовательской функции
def add_one(x):
    return x + 1

# Преобразование функции в UDF
add_one_udf = udf(add_one, IntegerType())

# Использование UDF в DataFrame
df = spark.createDataFrame([(1,), (2,), (3,)], ["value"])
df.withColumn("value_plus_one", add_one_udf(df["value"])).show()