# Робота з Spark SQL
## Складна аналітика з Spark SQL

> Можна виконати або за допомогою чистого `DataFrame` API, `Pandas on Spark` API, або за допомогою чистих `SQL` запитів. Вибір за вами.

Ви розробник у компанії **BikeServe**, яка займається орендою велосипедів/скутерів. У вас є певні місця ("станції"), де зберігаються ваші велосипеди. Якщо на станції немає вільних місць для велосипедів (хтось уже зарезервував або взяв велосипед) протягом певного періоду часу (`timeslot`), це означає, що бізнес йде чудово. Однак вам потрібно покращити обслуговування клієнтів, пропонуючи користувачам велосипеди, коли та в тому місці, де це для них найважливіше.

Ваше завдання — знайти найбільш важливі ("критичні") пари станції та періоду часу `(stationId, timeslot)`, щоб ваш бізнес знав, куди і коли доставити більше велосипедів.

Ваш результат має бути відсортований за цією *критичністю* у порядку спадання.

Набір даних містить:
* `register.csv` містить інформацію з вашої IoT системи моніторингу про кількість використаних і вільних слотів на ваших станціях оренди велосипедів. Кожен рядок відповідає одному запису про ситуацію на одній станції в певний момент часу.

    Кожен рядок має такий формат:

    ```bash
    stationId\ttimestamp\tusedslots\tfreeslots
    ```
    
    де `timestamp` має формат `datetime`.

    > Перший рядок файлу містить заголовок.
    > Деякі дані в наборі даних пошкоджено через тимчасові збої мережі та/або вашої системи моніторингу. Це означає, що деякі рядки характеризуються "використаними слотами (used slots) = 0" і "вільними слотами (free slots) = 0". **Ці рядки необхідно відфільтрувати** перед виконанням будь-яких операцій.

* `input/stations.csv` містить опис станцій.

    Кожен рядок має такий формат:

    ```bash
    stationId\tlongitude\ttitude\tname
    ```
    > Перший рядок файлу містить заголовок.

### Опис завдання

Кожна пара "день тижня – година" є "часовим інтервалом" (`timeslot`) і пов’язана з усіма показаннями моніторингу, пов’язаними з цією парою, незалежно від дати. Наприклад, часовий інтервал `Wednesday - 17` відповідає всім показанням, зробленим у середу з `17:00:00` до `17:59:59`.

Станція $S_i$ знаходиться в критичному стані, якщо кількість вільних слотів дорівнює `0` (всі велосипеди на станції заброньовані).

*Критичність* станції $S_i$ у часовому інтервалі $T_j$ визначається як:

$$
\frac{\text{кількість записів із числом вільних слотів, яке дорівнює нулю, для пари}_{\left(S_i,T_j\right)}}{\text{загальна кількість записів для пари}_{\left(S_i,T_j\right)}}
$$

необхідно:
* Обчислити значення *критичності* для кожної пари $(S_i, T_j)$.
* Вибирати лише пари, у яких значення *критичності* перевищує "мінімальний поріг критичності".
    * `Мінімальний поріг критичності` має бути параметром конфігурації програми.
* Зберегти у вихідній папці вибрані записи, використовуючи файли `csv` (із заголовком). Зберегти лише такі атрибути:
    * ідентифікатор станції
    * день тижня
    * година
    * критичність
    * довгота станції
    * широта станції
* Зберегти результати за зменшення критичності. Якщо є два або більше записів, що характеризуються однаковим значенням критичності, то додатково відсортувати по ідентифікатору станції (у порядку зростання). Якщо і станція та сама, то сортувати за днем тижня (за зростанням) і, нарешті, за годиною (за зростанням).

### Поради та підказки

Мова SQL, доступна в Spark SQL, має низку попередньо визначених функцій, одна з яких, `hour(timestamp)`, може використовуватися в запитах SQL або в перетворенні `selectExpr`, щоб вибрати `hour` з заданого позначка часу. Ще одна цікава функція, `date_format(timestamp,format)`, може бути використана для отримання іншої корисної інформації зі стовпця timestamp. Наприклад, у форматі `EE` можна отримати день тижня.

```python
new_df= df.selectExpr("date_format(timestamp,'EE') as weekday hour(timestamp) as hour")
```

Щоб вказати, що роздільником вхідних файлів CSV є спеціальний символ `tab`, установіть параметр роздільника на `\\t`, викликавши `.option("delimiter", "\\t")` під час читання вхідних даних.

## Конфігурація

- `number_cores`: Кількість ядер, виділених під Spark
- `memory_gb`: Обʼєм оперативної памʼяті, виділеної під Spark (в Гб)

In [None]:
number_cores = 2
memory_gb = 4

In [None]:
from pyspark.sql import SparkSession
import warnings

warnings.filterwarnings('ignore')

spark = (SparkSession
    .builder
    .appName('Spark Bikes')
    .master(f"local[{number_cores}]")
    .config("spark.driver.memory", f"{memory_gb}g")
    .getOrCreate())

## Рішення
Прочитайте вміст вхідного файлу `register.csv` і збережіть його у DataFrame.

Вхідний файл має заголовок.

Схема даних:
* station: integer (nullable = true)
* timestamp: timestamp (nullable = true)
* used_slots: integer (nullable = true)
* free_slots: integer (nullable = true)

In [None]:
input_file_path = "register.csv"
df = spark.read.csv(input_file_path, header=True, inferSchema=True)

Видаліть рядки де одночасно `free_slots = 0` та `used_slots = 0`

In [None]:
df_filtered = df.filter((df['free_slots'] != 0) | (df['used_slots'] != 0))

Нам потрібен логічний маркер, щоб побачити, заповнена станція чи ні. Це можна зробити за допомогою UDF під назвою `full(free_slots: int)`, яка повертає
* 1, якщо `free_slots` дорівнює 0
* 0, якщо `free_slots` більше 0

> Якщо ви використовуєте Pandas on Spark API, то треба самостійно застосувати цю функцію (або переписати її)

In [None]:
def full_function(free_slots: int) -> int:
    if free_slots==0:
        return 1
    else:
        return 0

spark.udf.register("full", full_function)

Створіть DataFrame з такою схемою:
* station: integer (nullable = true)
* dayofweek: string (nullable = true)
* hour: integer (nullable = true)
* fullstatus: integer (nullable = true) - 1 = full, 0 = non-full

In [None]:
df = df.withColumn("fullstatus", udf(lambda free_slots: full_function(free_slots), IntegerType())(df["free_slots"]))

Визначте одну групу для кожної комбінації `(station, dayofweek, hour)`

In [None]:
grouped_df = df.groupBy("station", "dayofweek", "hour").count()

Обчисліть "критичність" для кожної групи `(station, dayofweek, hour)`, тобто для кожної пари `(station, timeslot)`.

Критичність дорівнює середньому `fullStatus`.

In [None]:
from pyspark.sql.functions import avg

criticality_df = df.groupBy("station", "dayofweek", "hour").agg(avg("fullstatus").alias("criticality"))

Виберіть лише рядки з `criticality > threshold`

> `threshold` є деякою бізнес-вимогою, тому візьміть випадкове число від `0.1` до `0.5`, яке вам подобається :)

In [None]:
threshold = 0.25

selected_rows = criticality_df.filter(criticality_df["criticality"] > threshold)

Прочитайте вміст вхідного файлу `stations.csv` і збережіть його у DataFrame.

Вхідний файл має заголовок.

Схема даних:
* id: integer (nullable = true)
* longitude: double (nullable = true)
* latitude: double (nullable = true)
* name: string (nullable = true)

In [None]:
input_file_path = "stations.csv"
stations_df = spark.read.csv(input_file_path, header=True, inferSchema=True)

Об’єднайте (`JOIN`) вибрані критичні часові інтервали з таблицею станцій, щоб отримати координати станцій

In [None]:
joined_df = selected_criticality_df.join(stations_df, selected_criticality_df["station"] == stations_df["id"], "inner")


Відсортуйте вміст DataFrame

In [None]:
sorted_df = joined_df.sort(["criticality", "station"], ascending=[False, True])

Write to file:

In [None]:
output_file_path = "sorted_data.csv"

sorted_df.write.csv(output_file_path, header=True)

## Зупинка Spark

In [None]:
spark.stop()