
<center><font size="6"><b>Комп'ютерний практикум 4.

 Основи PySpark: обробка даних DataFrame</b></font></center>


***

<center><img src="https://media.licdn.com/dms/image/v2/C4E12AQEb6oxAxtYD-Q/article-cover_image-shrink_600_2000/article-cover_image-shrink_600_2000/0/1620420835464?e=2147483647&v=beta&t=EWVtIY_UWl1XNucKkGMrn6ooEuwOBaQ313Z1TarbZgk" width="400"></center>


**PySpark** — це інтерфейс для використання **Apache Spark** з мовою програмування **Python**.

**Apache Spark** — це потужний фреймворк для розподіленої обробки пакетних даних у кластері. PySpark надає Python API для інтеграції цих можливостей.

### Основні особливості PySpark:

1. **Обробка великих даних**: PySpark дозволяє обробляти великі набори даних розподілено на кількох вузлах у кластері, використовуючи потужні обчислювальні кластери, що пришвидшує виконання операцій.
   
2. **API для Python**: PySpark надає простий API для роботи з даними, який включає операції над RDD (Resilient Distributed Datasets), DataFrames, SQL, та Machine Learning.

3. **Інструменти для обробки даних у реальному часі**: PySpark підтримує обробку даних у потоковому режимі через модуль **Spark Streaming**.

4. **Машинне навчання**: PySpark включає бібліотеку для машинного навчання (MLlib), що дозволяє виконувати завдання класифікації, кластеризації, регресії та зниження розмірності.

### Використання:
- PySpark активно використовується для ___ETL-процесів___, де необхідно обробляти великі обсяги даних.
- Використовується для ___аналітики___ та ___машинного навчання___ на масштабних даних.
- Є популярним інструментом в середовищах ___Big Data___, таких як Amazon EMR, Google Dataproc та Microsoft Azure HDInsight.

PySpark робить можливим використання простоти Python для роботи з великими даними в Apache Spark, що забезпечує гнучкість і масштабованість обчислень.




> __Spark SQL__ - це модуль Spark для обробки структурованих даних. Він призначений для запитів до структурованих даних у програмах Spark, використовуючи або SQL, або знайомий API DataFrame.


In [1]:
# Встановлюємо необхідні пакети
! pip install pyspark
! pip install findspark
! pip install pyarrow



In [2]:
# імпорт допоміжної бібліотеки для організації правильного шляху до Spark
import findspark
findspark.init()

Для роботи з **Apache Spark** у Python через бібліотеку **PySpark** необхідно імпортувати наступні модулі

**`SparkContext`**:
   - Це об'єкт, який є точкою входу в функціональні можливості Spark. Він представляє з'єднання програми зі **Spark кластером** (або локальним екземпляром Spark).
   - `SparkContext` використовується для створення **RDD (Resilient Distributed Dataset)**, а також для взаємодії зі Spark API. Він керує роботою і розподілом завдань між вузлами кластера.
   - У сучасних версіях Spark, `SparkContext` зазвичай створюється автоматично всередині **`SparkSession`**, тому його часто не потрібно ініціалізувати окремо, якщо використовується `SparkSession`.

**`SparkConf`**:
   - `SparkConf` дозволяє налаштувати параметри роботи Spark-додатка, такі як ім'я додатка, кількість ядер, кількість пам'яті тощо. Об'єкт `SparkConf` при створенні передається `SparkContext` або `SparkSession`, щоб визначити, як додаток повинен працювати.
   - Це спосіб налаштувати специфічні параметри роботи Spark на рівні додатку.

**`SparkSession`**:
   - Це вищий рівень абстракції, який був введений у Spark 2.0. `SparkSession` об'єднує всі попередні функціональні можливості **SparkContext**, **SQLContext** і **HiveContext** в одному об'єкті. Це єдина точка входу для роботи з різними компонентами Spark (RDD, DataFrame, SQL-запити).
   - `SparkSession` також використовується для роботи з **DataFrame**, SQL-запитами та інтеграції з іншими компонентами, такими як **Spark SQL**, **Streaming**, **MLlib** та **GraphX**.
> **Основні можливості `SparkSession`**:
   - Робота з `DataFrame` та SQL-запитами.
   - Використання `Spark SQL` для взаємодії з даними у стилі SQL.
   - Створення і керування RDD.
   - Інтеграція з різними джерелами даних (HDFS, Cassandra, S3 та ін.).

### Коли використовувати?

- **`SparkContext`**: Якщо ви працюєте зі **старими версіями Spark** або хочете безпосередньо взаємодіяти з RDD.
- **`SparkSession`**: У більшості сучасних додатків, починаючи з **Spark 2.0**, рекомендовано використовувати **SparkSession** для створення та керування всіма обчислювальними компонентами (DataFrame, SQL-запити та RDD).


In [3]:
#імпорт бібліотек та модулів

import pandas as pd
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

## Створення Spark сесії

- `.builder` - це спосіб налаштувати і створити `SparkSession`, який є головним об'єктом у сучасних версіях Spark
- `.appName("name")` задає ім'я програми Spark для ідентифікації додатку під час його роботи в кластері або в локальному середовищі
- `.config()` -  метод дозволяє передавати додаткові конфігураційні параметри для Spark
- `.getOrCreate()` - метод або створює новий об'єкт `SparkSession`, або повертає вже існуючий, якщо такий є.

In [7]:
# Створення об'єкту spark context class (не обов'язково)
SPARK_LOCAL_IP="127.0.0.1"
sc = SparkContext()

# Створення spark session

spark = SparkSession \
    .builder \
    .appName("My_Spark_basic") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

24/10/26 13:47:37 WARN SparkContext: Another SparkContext is being constructed (or threw an exception in its constructor). This may indicate an error, since only one SparkContext should be running in this JVM (see SPARK-2243). The other SparkContext was created at:
org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
java.base/jdk.internal.reflect.DirectConstructorHandleAccessor.newInstance(DirectConstructorHandleAccessor.java:62)
java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:501)
java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:485)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
py4j.Gateway.invoke(Gateway.java:238)
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.ja

Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
: java.lang.UnsupportedOperationException: getSubject is supported only if a security manager is allowed
	at java.base/javax.security.auth.Subject.getSubject(Subject.java:347)
	at org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:577)
	at org.apache.spark.util.Utils$.$anonfun$getCurrentUserName$1(Utils.scala:2416)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.util.Utils$.getCurrentUserName(Utils.scala:2416)
	at org.apache.spark.SparkContext.<init>(SparkContext.scala:329)
	at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
	at java.base/jdk.internal.reflect.DirectConstructorHandleAccessor.newInstance(DirectConstructorHandleAccessor.java:62)
	at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:501)
	at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:485)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:238)
	at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
	at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1575)


In [10]:
# Для роботи з датафреймами необхідно переконатися, що екземпляр сеансу spark створено.
spark

NameError: name 'spark' is not defined

## Завантаження даних

> Для завантаження даних спочатку імпортується CSV-файл у таблицю даних `Pandas`, а потім передається в таблицю даних `Spark`

Для створення фрейму даних `Spark` завантажуємо зовнішній фрейм даних, який називається `mtcars`. Цей фрейм даних містить 32 спостереження та 11 змінних:

| colIndex | colName | units/description |
| :---: | :--- | :--- |
|[, 1] | mpg |Miles per gallon  |
|[, 2] | cyl | Number of cylinders  |
|[, 3] | disp | Displacement (cu.in.) |  
|[, 4] | hp  | Gross horsepower  |
|[, 5] | drat | Rear axle ratio  |
|[, 6] | wt | Weight (lb/1000)  |
|[, 7] | qsec | 1/4 mile time  |
|[, 8] | vs  | V/S  |
|[, 9] | am | Transmission (0 = automatic, 1 = manual)  |
|[,10] | gear | Number of forward gears  |
|[,11] | carb | Number of carburetors |


In [None]:
# Використаємо `read_csv` з pandas для завантаження датасету
mtcars = pd.read_csv('https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/mtcars.csv')

In [None]:
# Перегляд датасету
mtcars.head()

In [None]:
# переіменуємо перший стовпчик
mtcars.rename( columns={'Unnamed: 0':'name'}, inplace=True )

> Функція `createDataFrame` завантажує дані в spark dataframe


In [None]:
sdf = spark.createDataFrame(mtcars)

> Метод **`printSchema()`** використовується для виведення **схеми** (структури) **DataFrame** у Spark.

**Схема** — це структура **DataFrame**, яка визначає назви стовпців і типи даних у цих стовпцях (наприклад, `StringType`, `IntegerType`, `DoubleType` і т.д.). Ця структура дозволяє знати, які дані містить DataFrame та в якому форматі вони зберігаються.

Використовується щоб перевірити структуру даних у DataFrame перед виконанням операцій та для верифікації того, що типи даних і назви стовпців вірні, особливо якщо дані із зовнішніх джерел (файли CSV, бази даних тощо).

In [None]:
sdf.printSchema()

> Функція `withColumnRenamed()` перейменовує існуючі назви стовпців.  


In [None]:
#Переіменовуємо назву стовпця «vs» на «versus» і зберігаємо зміни в новий DataFrame  «sdf_new».
sdf_new = sdf.withColumnRenamed("vs", "versus")

In [None]:
sdf_new.head()

## Створення тимчасової таблиці (Table View/табличне представлення)
Створення табличного представлення в Spark SQL необхідне для програмного запуску SQL запитів для DataFrame. Представлення - це тимчасова таблиця для виконання SQL запитів, яке забезпечує локальну область видимості в межах поточного сеансу Spark за допомогою функції `createTempView()`.

In [None]:
sdf.createTempView("cars")

У `PySpark` для роботи з `DataFrame` часто використовують методи, схожі на SQL-запити.

__<center>Таблиця основних SQL-запитів та їх еквівалентів у PySpark:</center>__

| **SQL Запит**               | **PySpark (DataFrame API)**                                              | **Опис**                                      |
|-----------------------------|---------------------------------------------------------------------------|-----------------------------------------------|
| **SELECT**                   | `df.select("column1", "column2")`                                         | Вибір конкретних колонок                     |
| **WHERE**                    | `df.filter(df["column"] == value)`                                        | Фільтрація рядків за умовою                   |
| **AND / OR**                 | `df.filter((df["col1"] == val1) & (df["col2"] == val2))`                  | Використання логічних операторів              |
| **ORDER BY**                 | `df.orderBy("column", ascending=False)`                                   | Сортування за колонкою                       |
| **GROUP BY**                 | `df.groupBy("column").agg({"column": "sum"})`                             | Групування даних і агрегація                  |
| **HAVING**                   | `df.groupBy("column").agg({"column": "sum"}).filter("sum(column) > value")` | Фільтрація результатів після групування       |
| **JOIN**                     | `df1.join(df2, df1["id"] == df2["id"], "inner")`                          | Об'єднання двох DataFrame                     |
| **LIMIT**                    | `df.limit(10)`                                                           | Обмеження кількості рядків                   |
| **DISTINCT**                 | `df.select("column").distinct()`                                          | Вибір унікальних значень                     |
| **COUNT**                    | `df.count()`                                                             | Підрахунок кількості рядків                  |
| **SUM**                      | `df.groupBy().sum("column")`                                              | Підсумовування значень у колонці             |
| **AVG (Середнє значення)**   | `df.groupBy().avg("column")`                                              | Обчислення середнього значення               |
| **MAX (Максимум)**           | `df.groupBy().max("column")`                                              | Визначення максимального значення            |
| **MIN (Мінімум)**            | `df.groupBy().min("column")`                                              | Визначення мінімального значення             |
| **WITH ALIAS (Псевдоніми)**  | `df.select(df["column"].alias("new_name"))`                               | Присвоєння псевдонімів для колонок           |
| **INNER JOIN**               | `df1.join(df2, df1["id"] == df2["id"], "inner")`                          | Внутрішнє об'єднання                         |
| **LEFT JOIN**                | `df1.join(df2, df1["id"] == df2["id"], "left")`                           | Ліве об'єднання                              |
| **RIGHT JOIN**               | `df1.join(df2, df1["id"] == df2["id"], "right")`                          | Праве об'єднання                             |
| **FULL OUTER JOIN**          | `df1.join(df2, df1["id"] == df2["id"], "outer")`                          | Повне зовнішнє об'єднання                    |
| **UNION**                    | `df1.union(df2)`                                                         | Об'єднання двох DataFrame (без дублікатів)   |
| **UNION ALL**                | `df1.unionAll(df2)`                                                      | Об'єднання двох DataFrame (з дублікатами)    |



> `spark.sql` — це метод у **PySpark**, який дозволяє виконувати **SQL-запити** безпосередньо на **DataFrame**. Він використовується для інтеграції SQL з PySpark, дозволяючи працювати з великими даними у знайомому форматі SQL-запитів.

1. **Створення тимчасової таблиці**: Це дозволяє звертатися до цього DataFrame в SQL-запитах, як до таблиці в базі даних.
   
2. **Виконання SQL-запитів**: Після створення тимчасової таблиці можна виконувати SQL-запити з використанням методу `spark.sql()`. Цей метод повертає результат як новий DataFrame.

*Синтаксис*
```python
spark.sql("SQL-запит")
```
### Переваги використання `spark.sql`:
1. **Знайомий синтаксис**: SQL-запити.
2. **Масштабованість**: SQL-запити, виконані через `spark.sql`, можуть працювати з великими наборами даних, розподіленими в кластері.
3. **Гнучкість**: Можна комбінувати SQL із методами DataFrame API для виконання складних обчислень.

`spark.sql` — це потужний інструмент у PySpark, який дозволяє використовувати SQL-запити для обробки великих даних. Він надає гнучкість у використанні стандартного SQL-синтаксису для вибірок, агрегацій та інших операцій на великих наборах даних, що зберігаються в Spark DataFrame.

In [None]:
# Вивід всієї таблиці даних
spark.sql("SELECT * FROM cars").show()

In [None]:
# Вибір змінної mpg
spark.sql("SELECT mpg FROM cars").show(5)

In [None]:
# Запит для визначення автомобілів з великим пробігом і малою кількістю циліндрів
spark.sql("SELECT * FROM cars where mpg > 15 AND cyl < 6").show(5)

In [None]:
# Авто, які мають витрати палива менше 15 миль на галон
sdf.where(sdf['mpg'] < 15).show()

In [None]:
# Агрегація даних та групування за циліндрами
spark.sql("SELECT count(*), cyl from cars GROUP BY cyl").show()

## Створення Pandas UDF

**Pandas UDF (User Defined Functions)** в **PySpark** — це користувацькі функції, які використовують бібліотеку **Pandas** для обробки даних. Pandas UDF дозволяють застосовувати **функції на рівні Python** з використанням можливостей векторизації Pandas і при цьому підтримують **розподілені обчислення** у Spark.

**UDF (User Defined Function)** — це користувацька функція, яку можна застосувати до даних в Spark для виконання специфічних операцій. Стандартні UDF у PySpark використовують звичайні Python-функції, що може бути повільним для великих даних, оскільки обробка йде рядок за рядком.
  
**Pandas UDF** (також називають **Vectorized UDF**) працюють на рівні **векторів** (або серій Pandas), що дозволяє Spark працювати з ними значно швидше, завдяки векторизації та оптимізації через **Apache Arrow**.

### Основні переваги **Pandas UDF**:
- **Швидкість**: Використання векторизації та інтеграція з **Apache Arrow** дозволяє суттєво прискорити обробку даних, порівняно зі звичайними UDF.
- **Простота**: Pandas UDF використовують знайомі інтерфейси Pandas, що робить їх простими у використанні для тих, хто знайомий із Pandas.
- **Масштабованість**: Хоча Pandas зазвичай використовується для обробки даних у пам'яті, використання Pandas UDF дозволяє масштабувати ці операції для обробки великих обсягів даних у кластері.

### Типи Pandas UDF:

1. **Scalar UDF**:
   - Використовується для обробки поелементно (аналог звичайного UDF).
   - Функція отримує та повертає серію Pandas (колонку).
   
2. **Grouped Map UDF**:
   - Використовується для обробки груп даних.
   - Функція отримує і повертає **DataFrame Pandas** для кожної групи.
   
3. **Grouped Aggregate UDF**:
   - Використовується для обробки та агрегації даних у групах.
   - Функція повертає одне значення для кожної групи.
   
   

> `pandas_udf` і `PandasUDFType` — це ключові компоненти в **PySpark**, що дозволяють створювати і використовувати **Pandas UDF** (векторизовані користувацькі функції), які значно прискорюють обробку даних шляхом векторизації та використання **Apache Arrow**.

### 1. **`pandas_udf`**
**`pandas_udf`** — це декоратор, який використовується для визначення **Pandas UDF**. За допомогою цього декоратора ви можете вказати тип функції і тип даних, які вона буде обробляти або повертати.

*Синтаксис*
```python
from pyspark.sql.functions import pandas_udf

@pandas_udf(returnType)
def my_udf(col: pd.Series) -> pd.Series:
    return col * 2
```
- **`returnType`** — це тип даних, який повертає функція. Ви можете вказати тип, наприклад, `StringType`, `DoubleType`, або безпосередньо `"double"`, `"int"`.
- Функція може приймати і повертати **Pandas Series**, що дозволяє виконувати операції векторно.

### 2. **`PandasUDFType`**
**`PandasUDFType`** — це перелік типів Pandas UDF, що визначають, як саме функція обробляє дані. У новіших версіях PySpark тип UDF можна просто передати як параметр до **`pandas_udf`**, але `PandasUDFType` може бути використано для уточнення типу.

#### Основні типи `PandasUDFType`:
1.  **`SCALAR` (поелементна обробка)**:
   - Обробляє кожен елемент колонки (векторно).
   - Вхід і вихід — це **Pandas Series**.
2. **`GROUPED_MAP` (обробка груп рядків)**:
   - Обробляє кожну групу як **Pandas DataFrame**, повертаючи DataFrame.
   - Використовується з функціями типу **`groupBy().apply()`**.
3. **`GROUPED_AGG` (агрегація груп)**:
   - Використовується для виконання агрегацій (підрахунку, середнього, суми тощо) для групованих даних.
   - Вхід — це **Pandas Series**, вихід — одне значення.

*Синтаксис*
```python
from pyspark.sql.functions import pandas_udf, PandasUDFType
import pandas as pd

# Визначення функції Grouped Map UDF
@pandas_udf("name string, avg_age double", PandasUDFType.GROUPED_MAP)
def average_age(pdf: pd.DataFrame) -> pd.DataFrame:
    return pdf.assign(avg_age=pdf["age"].mean())

# Використання цієї функції
df.groupby("name").apply(average_age).show()
```



In [None]:
# імпорт функцій Pandas UDF
from pyspark.sql.functions import pandas_udf, PandasUDFType

In [None]:
@pandas_udf("float")
def convert_wt(s: pd.Series) -> pd.Series:
    # Формула для перерахунку в метричну систему (1 фунт ≈ 0.45 кг)
    return s * 0.45

spark.udf.register("convert_weight", convert_wt)

In [None]:
# застосуємо створену функцію до таблиці cars
spark.sql("SELECT *, wt AS weight_imperial, convert_weight(wt) as weight_metric FROM cars").show()

##<center>__Самостійні завдання__</center>

> Скопіювати блок самостійних завдань в окремий файл ***LastName_CP4.ipynb***

### Завдання №1
1. Інсталювати та імпортувати необхідні бібліотеки та модулі
2. Створити Spark Session
2. Завантажити та перетворити датасет `mtcars` з лекційної частини у DataFrame PySpark

In [None]:
# МІСЦЕ ДЛЯ КОДУ



### Завдання №2
Використовуючи Spark SQL виконати наступні запити
1. Обчислити середнє значення витрат палива (`mpg`) для кожного типу передач (`am`) (0 = автоматична, 1 = ручна передача)
2. Знайдіть три автомобілі з найбільшим значенням потужності `hp`
3. Підрахувати кількість автомобілів з різними кількостями циліндрів (`cyl`)
4. Визначте середню вагу `wt` автомобілів з ручною та автоматичною передачею
5. Виберіть автомобілі, у яких `hp` більше 150 та `mpg` більше 20.
6. Створіть запит на свій розсуд


In [None]:
# МІСЦЕ ДЛЯ КОДУ



### Завдання №3

Створіть користувацькі функції Pandas UDF (User Defined Functions):
1. Виведіть квадрат значення потужності (`hp`) для кожного автомобіля та виведіть результат як новий стовпчик таблиці
2. Створіть UDF, яка нормалізує вагу автомобілів `wt`, використовуючи мінімаксний принцип нормалізації та виведіть результат як новий стовпчик таблиці
3. Створіть UDF, яка обчислює коефіцієнт потужність/вага для кожного автомобіля та виведіть результат як новий стовпчик таблиці
4. Створіть UDF, яка присвоює автомобілю категорію (малий, середній або великий) на основі кількості циліндрів (`cyl`) та виведіть результат як новий стовпчик таблиці
5. Створіть UDF, яка визначає, чи є автомобіль економічним (якщо `mpg > 25`) і приймає бінарне значення та виведіть результат як новий стовпчик таблиці
6. Створіть UDF на свій розсуд та виведіть результат як новий стовпчик таблиці

In [None]:
# МІСЦЕ ДЛЯ КОДУ

