<a href="https://colab.research.google.com/github/aleksandraqubert/GitHub/blob/main/Lab4.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=2bed05f234445302407823350807dc06ac711cef6bec79aa54e7907d0a99b5be
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


## Конфігурація

- `number_cores`: Кількість ядер, виділених під Spark
- `memory_gb`: Обʼєм оперативної памʼяті, виділеної під Spark (в Гб)

In [4]:
number_cores = 2
memory_gb = 4

In [5]:
from pyspark.sql import SparkSession
import warnings

warnings.filterwarnings('ignore')

spark = (SparkSession
    .builder
    .appName('Spark Bikes')
    .master(f"local[{number_cores}]")
    .config("spark.driver.memory", f"{memory_gb}g")
    .getOrCreate())

## Рішення
Прочитайте вміст вхідного файлу `register.csv` і збережіть його у DataFrame.

Вхідний файл має заголовок.

Схема даних:
* station: integer (nullable = true)
* timestamp: timestamp (nullable = true)
* used_slots: integer (nullable = true)
* free_slots: integer (nullable = true)

In [65]:
# ваш код
from pyspark.sql import SparkSession

# Ініціалізуємо SparkSession
spark = SparkSession.builder \
    .appName("Read Register CSV") \
    .getOrCreate()

# Читаємо вміст вхідного файлу register.csv та зберігаємо його у DataFrame
register_df = spark.read \
    .format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("delimiter", "\t") \
    .load("register.csv")

# Виводимо  для перевірки
register_df.show()

+-------+-------------------+----------+----------+
|station|          timestamp|used_slots|free_slots|
+-------+-------------------+----------+----------+
|      1|2023-05-15 12:01:00|         0|        18|
|      1|2023-05-15 12:02:00|         0|        18|
|      1|2023-05-15 12:04:00|         0|        18|
|      1|2023-05-15 12:06:00|         0|        18|
|      1|2023-05-15 12:08:00|         0|        18|
|      1|2023-05-15 12:10:00|         0|        18|
|      1|2023-05-15 12:12:00|         0|        18|
|      1|2023-05-15 12:14:00|         0|        18|
|      1|2023-05-15 12:16:00|         0|        18|
|      1|2023-05-15 12:18:00|         0|        18|
|      1|2023-05-15 12:20:00|         2|        16|
|      1|2023-05-15 12:22:00|         3|        15|
|      1|2023-05-15 12:24:00|         3|        15|
|      1|2023-05-15 12:26:00|         3|        15|
|      1|2023-05-15 12:28:00|         4|        14|
|      1|2023-05-15 12:30:00|         0|        12|
|      1|202

Видаліть рядки де одночасно `free_slots = 0` та `used_slots = 0`

In [64]:
# ваш код
# Фільтруємо рядки, де free_slots не дорівнює 0 або used_slots не дорівнює 0
filtered_register_df = register_df.filter((register_df["free_slots"] != 0) & (register_df["used_slots"] != 0))

# Виводимо для перевірки

filtered_register_df.show()

+-------+-------------------+----------+----------+-------+----------+--------+----------+
|station|          timestamp|used_slots|free_slots|is_full| dayofweek|    hour|fullstatus|
+-------+-------------------+----------+----------+-------+----------+--------+----------+
|      1|2023-05-15 12:20:00|         2|        16|      0|2023-05-15|12:20:00|         0|
|      1|2023-05-15 12:22:00|         3|        15|      0|2023-05-15|12:22:00|         0|
|      1|2023-05-15 12:24:00|         3|        15|      0|2023-05-15|12:24:00|         0|
|      1|2023-05-15 12:26:00|         3|        15|      0|2023-05-15|12:26:00|         0|
|      1|2023-05-15 12:28:00|         4|        14|      0|2023-05-15|12:28:00|         0|
|      1|2023-05-15 12:32:00|         4|        14|      0|2023-05-15|12:32:00|         0|
|      1|2023-05-15 12:34:00|         4|        14|      0|2023-05-15|12:34:00|         0|
|      1|2023-05-15 12:36:00|         4|        14|      0|2023-05-15|12:36:00|         0|

Нам потрібен логічний маркер, щоб побачити, заповнена станція чи ні. Це можна зробити за допомогою UDF під назвою `full(free_slots: int)`, яка повертає
* 1, якщо `free_slots` дорівнює 0
* 0, якщо `free_slots` більше 0

> Якщо ви використовуєте Pandas on Spark API, то треба самостійно застосувати цю функцію (або переписати її)

In [66]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

# Визначаємо функцію UDF
def full_function(free_slots: int) -> int:
    if free_slots == 0:
        return 1
    else:
        return 0

# Реєструємо функцію UDF
full_udf = udf(full_function, IntegerType())

# Додаємо новий стовпець, який використовує функцію UDF
register_df = register_df.withColumn("is_full", full_udf(register_df["free_slots"]))

# Виводимо схему даних та  рядки для перевірки
register_df.show()

+-------+-------------------+----------+----------+-------+
|station|          timestamp|used_slots|free_slots|is_full|
+-------+-------------------+----------+----------+-------+
|      1|2023-05-15 12:01:00|         0|        18|      0|
|      1|2023-05-15 12:02:00|         0|        18|      0|
|      1|2023-05-15 12:04:00|         0|        18|      0|
|      1|2023-05-15 12:06:00|         0|        18|      0|
|      1|2023-05-15 12:08:00|         0|        18|      0|
|      1|2023-05-15 12:10:00|         0|        18|      0|
|      1|2023-05-15 12:12:00|         0|        18|      0|
|      1|2023-05-15 12:14:00|         0|        18|      0|
|      1|2023-05-15 12:16:00|         0|        18|      0|
|      1|2023-05-15 12:18:00|         0|        18|      0|
|      1|2023-05-15 12:20:00|         2|        16|      0|
|      1|2023-05-15 12:22:00|         3|        15|      0|
|      1|2023-05-15 12:24:00|         3|        15|      0|
|      1|2023-05-15 12:26:00|         3|

Створіть DataFrame з такою схемою:
* station: integer (nullable = true)
* dayofweek: string (nullable = true)
* hour: integer (nullable = true)
* fullstatus: integer (nullable = true) - 1 = full, 0 = non-full

In [68]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, hour,date_format, when
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

# Описуємо схему для DataFrame
schema = StructType([
    StructField("station", IntegerType(), nullable=True),
    StructField("timestamp", StringType(), nullable=True)
])


# Розділяємо стовбець timestamp на dayofweek та hour
register_df = register_df.withColumn("dayofweek", to_date(col("timestamp"))) \
    .withColumn("hour", date_format(col("timestamp"), "HH:mm:ss")) \
    .withColumn("fullstatus", col("is_full"))



# Вибираємо тільки необхідні колонки: station, dayofweek, hour, fullstatus
selected_df = register_df.select("station", "dayofweek", "hour", "fullstatus")

# Виведення схеми створеного DataFrame
selected_df.printSchema()

# Виведення вмісту DataFrame
selected_df.show()



root
 |-- station: integer (nullable = true)
 |-- dayofweek: date (nullable = true)
 |-- hour: string (nullable = true)
 |-- fullstatus: integer (nullable = true)

+-------+----------+--------+----------+
|station| dayofweek|    hour|fullstatus|
+-------+----------+--------+----------+
|      1|2023-05-15|12:01:00|         0|
|      1|2023-05-15|12:02:00|         0|
|      1|2023-05-15|12:04:00|         0|
|      1|2023-05-15|12:06:00|         0|
|      1|2023-05-15|12:08:00|         0|
|      1|2023-05-15|12:10:00|         0|
|      1|2023-05-15|12:12:00|         0|
|      1|2023-05-15|12:14:00|         0|
|      1|2023-05-15|12:16:00|         0|
|      1|2023-05-15|12:18:00|         0|
|      1|2023-05-15|12:20:00|         0|
|      1|2023-05-15|12:22:00|         0|
|      1|2023-05-15|12:24:00|         0|
|      1|2023-05-15|12:26:00|         0|
|      1|2023-05-15|12:28:00|         0|
|      1|2023-05-15|12:30:00|         0|
|      1|2023-05-15|12:32:00|         0|
|      1|2023-05

Визначте одну групу для кожної комбінації `(station, dayofweek, hour)`

In [69]:


# Імпорт необхідних функцій
from pyspark.sql.functions import count

# Групуємо дані за комбінаціями (station, dayofweek, hour) та рахуємо кількість рядків у кожній групі
grouped_df = register_df.groupBy("station", "dayofweek", "hour").agg(count("*").alias("group_count"))

# Виводимо результат
grouped_df.show()



+-------+----------+--------+-----------+
|station| dayofweek|    hour|group_count|
+-------+----------+--------+-----------+
|      1|2023-05-15|18:46:00|          1|
|      1|2023-05-15|23:18:00|          1|
|      1|2023-05-17|08:46:00|          1|
|      1|2023-05-17|11:02:00|          1|
|      1|2023-05-17|13:16:00|          1|
|      1|2023-05-19|06:54:00|          1|
|      1|2023-05-19|08:00:00|          1|
|      1|2023-05-21|12:02:00|          1|
|      1|2023-05-21|19:46:00|          1|
|      1|2023-05-21|21:22:00|          1|
|      1|2023-05-22|06:02:00|          1|
|      1|2023-05-22|23:10:00|          1|
|      1|2023-05-23|06:08:00|          1|
|      1|2023-05-23|13:58:00|          1|
|      1|2023-05-23|19:46:00|          1|
|      1|2023-05-24|17:18:00|          1|
|      1|2023-05-24|20:36:00|          1|
|      1|2023-05-25|05:58:00|          1|
|      1|2023-05-25|15:58:00|          1|
|      1|2023-05-25|17:48:00|          1|
+-------+----------+--------+-----

Обчисліть "критичність" для кожної групи `(station, dayofweek, hour)`, тобто для кожної пари `(station, timeslot)`.

Критичність дорівнює середньому `fullStatus`.

In [70]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg

# Згрупуємо дані за station, dayofweek та hour та обчислимо середнє значення fullstatus
criticality_df =register_df.groupBy("station", "dayofweek", "hour").agg(avg("fullstatus").alias("criticality"))

# Виведемо результат
criticality_df.show()



+-------+----------+--------+-----------+
|station| dayofweek|    hour|criticality|
+-------+----------+--------+-----------+
|      1|2023-05-15|18:46:00|        0.0|
|      1|2023-05-15|23:18:00|        0.0|
|      1|2023-05-17|08:46:00|        0.0|
|      1|2023-05-17|11:02:00|        0.0|
|      1|2023-05-17|13:16:00|        0.0|
|      1|2023-05-19|06:54:00|        0.0|
|      1|2023-05-19|08:00:00|        0.0|
|      1|2023-05-21|12:02:00|        0.0|
|      1|2023-05-21|19:46:00|        0.0|
|      1|2023-05-21|21:22:00|        0.0|
|      1|2023-05-22|06:02:00|        0.0|
|      1|2023-05-22|23:10:00|        0.0|
|      1|2023-05-23|06:08:00|        0.0|
|      1|2023-05-23|13:58:00|        0.0|
|      1|2023-05-23|19:46:00|        0.0|
|      1|2023-05-24|17:18:00|        0.0|
|      1|2023-05-24|20:36:00|        0.0|
|      1|2023-05-25|05:58:00|        0.0|
|      1|2023-05-25|15:58:00|        0.0|
|      1|2023-05-25|17:48:00|        0.0|
+-------+----------+--------+-----

Виберіть лише рядки з `criticality > threshold`

> `threshold` є деякою бізнес-вимогою, тому візьміть випадкове число від `0.1` до `0.5`, яке вам подобається :)

In [71]:
threshold = 0.25

# ваш код
# Вибираємо лише рядки, де значення критичності перевищує поріг
selected_criticality_df = criticality_df.filter(criticality_df["criticality"] > threshold)

# Виводимо результат
selected_criticality_df.show()

+-------+----------+--------+-----------+
|station| dayofweek|    hour|criticality|
+-------+----------+--------+-----------+
|      1|2023-05-26|21:02:00|        1.0|
|      1|2023-05-26|21:52:00|        1.0|
|      1|2023-06-13|23:32:00|        1.0|
|      1|2023-06-21|22:00:00|        1.0|
|      1|2023-06-25|22:56:00|        1.0|
|      1|2023-07-02|23:22:00|        1.0|
|      1|2023-07-03|01:50:00|        1.0|
|      1|2023-07-07|00:42:00|        1.0|
|      1|2023-07-07|02:00:00|        1.0|
|      1|2023-07-08|05:42:00|        1.0|
|      1|2023-08-03|02:32:00|        1.0|
|      1|2023-08-05|03:04:00|        1.0|
|      1|2023-08-07|02:06:00|        1.0|
|      1|2023-08-11|00:52:00|        1.0|
|      1|2023-08-18|07:16:00|        1.0|
|      1|2023-08-20|00:40:00|        1.0|
|      1|2023-08-20|08:58:00|        1.0|
|      1|2023-08-24|02:38:00|        1.0|
|      1|2023-08-24|08:00:00|        1.0|
|      1|2023-08-24|08:18:00|        1.0|
+-------+----------+--------+-----

Прочитайте вміст вхідного файлу `stations.csv` і збережіть його у DataFrame.

Вхідний файл має заголовок.

Схема даних:
* id: integer (nullable = true)
* longitude: double (nullable = true)
* latitude: double (nullable = true)
* name: string (nullable = true)

In [85]:
from pyspark.sql import SparkSession

# Ініціалізуємо SparkSession
spark = SparkSession.builder \
    .appName("Read Stations CSV") \
    .getOrCreate()

# Читаємо вміст вхідного файлу stations.csv та зберігаємо його у DataFrame
stations_df = spark.read \
    .format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("delimiter", "\t") \
    .load("stations.csv")

# Виведемо перші 5 рядків для перевірки
stations_df.show(5)




+---+---------+---------+--------------------+
| id|longitude| latitude|                name|
+---+---------+---------+--------------------+
|  1| 2.180019|41.397978|Gran Via Corts Ca...|
|  2| 2.176414|41.394381|       Plaza TetuÃ¡n|
|  3| 2.181164| 41.39375|             Ali Bei|
|  4|   2.1814|41.393364|               Ribes|
|  5| 2.180214|41.391072|  Pg LluÃ­s Companys|
+---+---------+---------+--------------------+
only showing top 5 rows



Об’єднайте (`JOIN`) вибрані критичні часові інтервали з таблицею станцій, щоб отримати координати станцій

In [86]:
# Объединяем DataFrame'ы по ключу "station"
merged_df = selected_criticality_df.join(stations_df, selected_criticality_df.station == stations_df.id, "inner")

# Выводим результат объединения
merged_df.show()


+-------+----------+--------+-----------+---+---------+---------+--------------------+
|station| dayofweek|    hour|criticality| id|longitude| latitude|                name|
+-------+----------+--------+-----------+---+---------+---------+--------------------+
|      1|2023-05-26|21:02:00|        1.0|  1| 2.180019|41.397978|Gran Via Corts Ca...|
|      1|2023-05-26|21:52:00|        1.0|  1| 2.180019|41.397978|Gran Via Corts Ca...|
|      1|2023-06-13|23:32:00|        1.0|  1| 2.180019|41.397978|Gran Via Corts Ca...|
|      1|2023-06-21|22:00:00|        1.0|  1| 2.180019|41.397978|Gran Via Corts Ca...|
|      1|2023-06-25|22:56:00|        1.0|  1| 2.180019|41.397978|Gran Via Corts Ca...|
|      1|2023-07-02|23:22:00|        1.0|  1| 2.180019|41.397978|Gran Via Corts Ca...|
|      1|2023-07-03|01:50:00|        1.0|  1| 2.180019|41.397978|Gran Via Corts Ca...|
|      1|2023-07-07|00:42:00|        1.0|  1| 2.180019|41.397978|Gran Via Corts Ca...|
|      1|2023-07-07|02:00:00|        1.0|  

Відсортуйте вміст DataFrame

In [89]:
columns_to_drop = ["id", "name"]
sorted_df = merged_df.drop(*columns_to_drop)
# Выводим результат объединения
sorted_df.show()

+-------+----------+--------+-----------+---------+---------+
|station| dayofweek|    hour|criticality|longitude| latitude|
+-------+----------+--------+-----------+---------+---------+
|      1|2023-05-26|21:02:00|        1.0| 2.180019|41.397978|
|      1|2023-05-26|21:52:00|        1.0| 2.180019|41.397978|
|      1|2023-06-13|23:32:00|        1.0| 2.180019|41.397978|
|      1|2023-06-21|22:00:00|        1.0| 2.180019|41.397978|
|      1|2023-06-25|22:56:00|        1.0| 2.180019|41.397978|
|      1|2023-07-02|23:22:00|        1.0| 2.180019|41.397978|
|      1|2023-07-03|01:50:00|        1.0| 2.180019|41.397978|
|      1|2023-07-07|00:42:00|        1.0| 2.180019|41.397978|
|      1|2023-07-07|02:00:00|        1.0| 2.180019|41.397978|
|      1|2023-07-08|05:42:00|        1.0| 2.180019|41.397978|
|      1|2023-08-03|02:32:00|        1.0| 2.180019|41.397978|
|      1|2023-08-05|03:04:00|        1.0| 2.180019|41.397978|
|      1|2023-08-07|02:06:00|        1.0| 2.180019|41.397978|
|      1

Write to file:

In [92]:
# Запись DataFrame в CSV-файл
sorted_df.write.csv("ready_file.csv", header=True)


## Зупинка Spark

In [93]:
spark.stop()