# Робота з Spark SQL
## Складна аналітика з Spark SQL

> Можна виконати або за допомогою чистого `DataFrame` API, `Pandas on Spark` API, або за допомогою чистих `SQL` запитів. Вибір за вами.

Ви розробник у компанії **BikeServe**, яка займається орендою велосипедів/скутерів. У вас є певні місця ("станції"), де зберігаються ваші велосипеди. Якщо на станції немає вільних місць для велосипедів (хтось уже зарезервував або взяв велосипед) протягом певного періоду часу (`timeslot`), це означає, що бізнес йде чудово. Однак вам потрібно покращити обслуговування клієнтів, пропонуючи користувачам велосипеди, коли та в тому місці, де це для них найважливіше.

Ваше завдання — знайти найбільш важливі ("критичні") пари станції та періоду часу `(stationId, timeslot)`, щоб ваш бізнес знав, куди і коли доставити більше велосипедів.

Ваш результат має бути відсортований за цією *критичністю* у порядку спадання.

Набір даних містить:
* `register.csv` містить інформацію з вашої IoT системи моніторингу про кількість використаних і вільних слотів на ваших станціях оренди велосипедів. Кожен рядок відповідає одному запису про ситуацію на одній станції в певний момент часу.

    Кожен рядок має такий формат:

    ```bash
    stationId\ttimestamp\tusedslots\tfreeslots
    ```
    
    де `timestamp` має формат `datetime`.

    > Перший рядок файлу містить заголовок.
    > Деякі дані в наборі даних пошкоджено через тимчасові збої мережі та/або вашої системи моніторингу. Це означає, що деякі рядки характеризуються "використаними слотами (used slots) = 0" і "вільними слотами (free slots) = 0". **Ці рядки необхідно відфільтрувати** перед виконанням будь-яких операцій.

* `input/stations.csv` містить опис станцій.

    Кожен рядок має такий формат:

    ```bash
    stationId\tlongitude\ttitude\tname
    ```
    > Перший рядок файлу містить заголовок.

### Опис завдання

Кожна пара "день тижня – година" є "часовим інтервалом" (`timeslot`) і пов’язана з усіма показаннями моніторингу, пов’язаними з цією парою, незалежно від дати. Наприклад, часовий інтервал `Wednesday - 17` відповідає всім показанням, зробленим у середу з `17:00:00` до `17:59:59`.

Станція $S_i$ знаходиться в критичному стані, якщо кількість вільних слотів дорівнює `0` (всі велосипеди на станції заброньовані).

*Критичність* станції $S_i$ у часовому інтервалі $T_j$ визначається як:

$$
\frac{\text{кількість записів із числом вільних слотів, яке дорівнює нулю, для пари}_{\left(S_i,T_j\right)}}{\text{загальна кількість записів для пари}_{\left(S_i,T_j\right)}}
$$

необхідно:
* Обчислити значення *критичності* для кожної пари $(S_i, T_j)$.
* Вибирати лише пари, у яких значення *критичності* перевищує "мінімальний поріг критичності".
    * `Мінімальний поріг критичності` має бути параметром конфігурації програми.
* Зберегти у вихідній папці вибрані записи, використовуючи файли `csv` (із заголовком). Зберегти лише такі атрибути:
    * ідентифікатор станції
    * день тижня
    * година
    * критичність
    * довгота станції
    * широта станції
* Зберегти результати за зменшення критичності. Якщо є два або більше записів, що характеризуються однаковим значенням критичності, то додатково відсортувати по ідентифікатору станції (у порядку зростання). Якщо і станція та сама, то сортувати за днем тижня (за зростанням) і, нарешті, за годиною (за зростанням).

### Поради та підказки

Мова SQL, доступна в Spark SQL, має низку попередньо визначених функцій, одна з яких, `hour(timestamp)`, може використовуватися в запитах SQL або в перетворенні `selectExpr`, щоб вибрати `hour` з заданого позначка часу. Ще одна цікава функція, `date_format(timestamp,format)`, може бути використана для отримання іншої корисної інформації зі стовпця timestamp. Наприклад, у форматі `EE` можна отримати день тижня.

```python
new_df= df.selectExpr("date_format(timestamp,'EE') as weekday hour(timestamp) as hour")
```

Щоб вказати, що роздільником вхідних файлів CSV є спеціальний символ `tab`, установіть параметр роздільника на `\\t`, викликавши `.option("delimiter", "\\t")` під час читання вхідних даних.

## Конфігурація

- `number_cores`: Кількість ядер, виділених під Spark
- `memory_gb`: Обʼєм оперативної памʼяті, виділеної під Spark (в Гб)

In [2]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=51e5637c028807ea6b21a264335f15c0d31aa45f98544eca055bd1993177f066
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [None]:
number_cores = 2
memory_gb = 4

In [None]:
from pyspark.sql import SparkSession
import warnings

warnings.filterwarnings('ignore')

spark = (SparkSession
    .builder
    .appName('Spark Bikes')
    .master(f"local[{number_cores}]")
    .config("spark.driver.memory", f"{memory_gb}g")
    .getOrCreate())

## Рішення
Прочитайте вміст вхідного файлу `register.csv` і збережіть його у DataFrame.

Вхідний файл має заголовок.

Схема даних:
* station: integer (nullable = true)
* timestamp: timestamp (nullable = true)
* used_slots: integer (nullable = true)
* free_slots: integer (nullable = true)

In [None]:
from google.colab import files
uploaded = files.upload()

Saving register.csv to register (2).csv


In [None]:
# ваш код
register_df = spark.read.csv("register.csv", header=True, inferSchema=True, sep="\t")

# Виведення перших декількох рядків DataFrame
register_df.show()


Видаліть рядки де одночасно `free_slots = 0` та `used_slots = 0`

In [None]:
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf
# ваш код
# Фільтрація рядків де одночасно free_slots = 0 та used_slots = 0
# Визначення функції для UDF
def full_function(free_slots: int) -> int:
    if free_slots==0:
        return 1
    else:
        return 0

spark.udf.register("full", full_function)

# Додавання стовпця з логічним маркером
register_df.createOrReplaceTempView("register_table")
filtered_register_df = spark.sql("SELECT *, full(`free_slots`) AS full_station FROM register_table")

# Фільтрація рядків де одночасно free_slots = 0 та used_slots = 0
filtered_register_df = filtered_register_df.filter((filtered_register_df.used_slots != 0) | (filtered_register_df.full_station != 1))

# Відображення перших декількох записів
filtered_register_df.show(5)

Нам потрібен логічний маркер, щоб побачити, заповнена станція чи ні. Це можна зробити за допомогою UDF під назвою `full(free_slots: int)`, яка повертає
* 1, якщо `free_slots` дорівнює 0
* 0, якщо `free_slots` більше 0

> Якщо ви використовуєте Pandas on Spark API, то треба самостійно застосувати цю функцію (або переписати її)

In [None]:
def full_function(free_slots: int) -> int:
    if free_slots==0:
        return 1
    else:
        return 0

spark.udf.register("full", full_function)

Створіть DataFrame з такою схемою:
* station: integer (nullable = true)
* dayofweek: string (nullable = true)
* hour: integer (nullable = true)
* fullstatus: integer (nullable = true) - 1 = full, 0 = non-full

In [3]:
# ваш код
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql import SparkSession

# Створення SparkSession
spark = SparkSession.builder \
    .appName("BikeServe") \
    .getOrCreate()

# Визначення схеми
schema = StructType([
    StructField("station", IntegerType(), nullable=True),
    StructField("dayofweek", StringType(), nullable=True),
    StructField("hour", IntegerType(), nullable=True),
    StructField("fullstatus", IntegerType(), nullable=True)
])

# Створення DataFrame
data = [
    (4, "Thursday", 10, 0),
    (1, "Monday", 18, 1),
    (5, "Friday", 8, 1),
    (3, "Wednesday", 15, 1),
]

df = spark.createDataFrame(data, schema=schema)

# Відображення DataFrame
df.show()

+-------+---------+----+----------+
|station|dayofweek|hour|fullstatus|
+-------+---------+----+----------+
|      4| Thursday|  10|         0|
|      1|   Monday|  18|         1|
|      5|   Friday|   8|         1|
|      3|Wednesday|  15|         1|
+-------+---------+----+----------+



Визначте одну групу для кожної комбінації `(station, dayofweek, hour)`

In [4]:
# ваш код

# Визначення груп
grouped_df = df.groupBy("station", "dayofweek", "hour").agg({"fullstatus": "max"})

# Перейменування стовпця з результатами групування
grouped_df = grouped_df.withColumnRenamed("max(fullstatus)", "fullstatus")

# Відображення результатів
grouped_df.show()

+-------+---------+----+----------+
|station|dayofweek|hour|fullstatus|
+-------+---------+----+----------+
|      4| Thursday|  10|         0|
|      1|   Monday|  18|         1|
|      5|   Friday|   8|         1|
|      3|Wednesday|  15|         1|
+-------+---------+----+----------+



Обчисліть "критичність" для кожної групи `(station, dayofweek, hour)`, тобто для кожної пари `(station, timeslot)`.

Критичність дорівнює середньому `fullStatus`.

In [5]:
# ваш код
from pyspark.sql.functions import avg
# Групування та обчислення середнього значення fullstatus
criticality_df = df.groupBy("station", "dayofweek", "hour") \
                   .agg(avg("fullstatus").alias("criticality"))

# Відображення результатів
criticality_df.show()

+-------+---------+----+-----------+
|station|dayofweek|hour|criticality|
+-------+---------+----+-----------+
|      4| Thursday|  10|        0.0|
|      1|   Monday|  18|        1.0|
|      5|   Friday|   8|        1.0|
|      3|Wednesday|  15|        1.0|
+-------+---------+----+-----------+



Виберіть лише рядки з `criticality > threshold`

> `threshold` є деякою бізнес-вимогою, тому візьміть випадкове число від `0.1` до `0.5`, яке вам подобається :)

In [6]:
threshold = 0.25

# ваш код
# Вибір рядків, де criticality > threshold
selected_rows = criticality_df.filter(criticality_df.criticality > threshold)

# Відображення результату
selected_rows.show()

+-------+---------+----+-----------+
|station|dayofweek|hour|criticality|
+-------+---------+----+-----------+
|      1|   Monday|  18|        1.0|
|      5|   Friday|   8|        1.0|
|      3|Wednesday|  15|        1.0|
+-------+---------+----+-----------+



Прочитайте вміст вхідного файлу `stations.csv` і збережіть його у DataFrame.

Вхідний файл має заголовок.

Схема даних:
* id: integer (nullable = true)
* longitude: double (nullable = true)
* latitude: double (nullable = true)
* name: string (nullable = true)

In [8]:
from google.colab import files
uploaded = files.upload()

Saving stations.csv to stations.csv


In [9]:
# ваш код
# Завантаження даних з файлу stations.csv
stations_df = spark.read.csv("stations.csv", header=True, inferSchema=True, sep="\t")

# Відображення схеми даних
stations_df.printSchema()

# Відображення перших декількох записів
stations_df.show()

root
 |-- id: integer (nullable = true)
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- name: string (nullable = true)

+---+---------+---------+--------------------+
| id|longitude| latitude|                name|
+---+---------+---------+--------------------+
|  1| 2.180019|41.397978|Gran Via Corts Ca...|
|  2| 2.176414|41.394381|       Plaza TetuÃ¡n|
|  3| 2.181164| 41.39375|             Ali Bei|
|  4|   2.1814|41.393364|               Ribes|
|  5| 2.180214|41.391072|  Pg LluÃ­s Companys|
|  6| 2.180508|41.391272|  Pg LluÃ­s Companys|
|  7| 2.183183|41.388867|  Pg LluÃ­s Companys|
|  8| 2.183453|41.389044|Passeig lluÃ­s co...|
|  9| 2.185294|41.385006|MarquÃ¨s de l\'Ar...|
| 10| 2.185206|41.384875|Avinguda del Marq...|
| 11| 2.183622|41.385394|             ComerÃ§|
| 12| 2.193939|41.381681|            Trelawny|
| 13| 2.195661|41.384522|pg marÃ­tim barce...|
| 14| 2.195603|41.384417|     Passeig Maritim|
| 15| 2.195706|41.386811|       Avda. Litor

Об’єднайте (`JOIN`) вибрані критичні часові інтервали з таблицею станцій, щоб отримати координати станцій

In [10]:
# Об'єднання criticality_df та stations_df за допомогою поля "station"
joined_df = criticality_df.join(stations_df)

# Вибір необхідних стовпців (наприклад, координати станцій та рівень критичності)
result_df = joined_df.select("id", "latitude", "longitude", "name", "criticality")

# Відображення результатів
result_df.show()

+---+---------+---------+--------------------+-----------+
| id| latitude|longitude|                name|criticality|
+---+---------+---------+--------------------+-----------+
|  1|41.397978| 2.180019|Gran Via Corts Ca...|        0.0|
|  2|41.394381| 2.176414|       Plaza TetuÃ¡n|        0.0|
|  3| 41.39375| 2.181164|             Ali Bei|        0.0|
|  4|41.393364|   2.1814|               Ribes|        0.0|
|  5|41.391072| 2.180214|  Pg LluÃ­s Companys|        0.0|
|  6|41.391272| 2.180508|  Pg LluÃ­s Companys|        0.0|
|  7|41.388867| 2.183183|  Pg LluÃ­s Companys|        0.0|
|  8|41.389044| 2.183453|Passeig lluÃ­s co...|        0.0|
|  9|41.385006| 2.185294|MarquÃ¨s de l\'Ar...|        0.0|
| 10|41.384875| 2.185206|Avinguda del Marq...|        0.0|
| 11|41.385394| 2.183622|             ComerÃ§|        0.0|
| 12|41.381681| 2.193939|            Trelawny|        0.0|
| 13|41.384522| 2.195661|pg marÃ­tim barce...|        0.0|
| 14|41.384417| 2.195603|     Passeig Maritim|        0.

Відсортуйте вміст DataFrame

In [11]:
# ваш код
# Відсортувати DataFrame за стовпцем "id" в порядку зростання
sorted_df = result_df.orderBy("id")

# Відображення відсортованого DataFrame
sorted_df.show()

+---+---------+---------+--------------------+-----------+
| id| latitude|longitude|                name|criticality|
+---+---------+---------+--------------------+-----------+
|  1|41.397978| 2.180019|Gran Via Corts Ca...|        1.0|
|  1|41.397978| 2.180019|Gran Via Corts Ca...|        1.0|
|  1|41.397978| 2.180019|Gran Via Corts Ca...|        0.0|
|  1|41.397978| 2.180019|Gran Via Corts Ca...|        1.0|
|  2|41.394381| 2.176414|       Plaza TetuÃ¡n|        0.0|
|  2|41.394381| 2.176414|       Plaza TetuÃ¡n|        1.0|
|  2|41.394381| 2.176414|       Plaza TetuÃ¡n|        1.0|
|  2|41.394381| 2.176414|       Plaza TetuÃ¡n|        1.0|
|  3| 41.39375| 2.181164|             Ali Bei|        1.0|
|  3| 41.39375| 2.181164|             Ali Bei|        0.0|
|  3| 41.39375| 2.181164|             Ali Bei|        1.0|
|  3| 41.39375| 2.181164|             Ali Bei|        1.0|
|  4|41.393364|   2.1814|               Ribes|        0.0|
|  4|41.393364|   2.1814|               Ribes|        1.

Write to file:

In [12]:
# ваш код
sorted_df.write.csv("result.csv", header=True)

## Зупинка Spark

In [None]:
spark.stop()