In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [12]:
!pip install pyngrok

Collecting pyngrok
  Downloading pyngrok-7.2.1-py3-none-any.whl.metadata (8.3 kB)
Downloading pyngrok-7.2.1-py3-none-any.whl (22 kB)
Installing collected packages: pyngrok
Successfully installed pyngrok-7.2.1


In [13]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum
from pyngrok import ngrok

In [4]:
# Створення Spark-сесії
spark = SparkSession.builder \
    .appName("Data Processing with Spark") \
    .getOrCreate()

nuek_vuh3_data_path = '/content/drive/MyDrive/GoIT/DataEngineering/lesson4/nuek-vuh3.csv'

# Завантаження даних
nuek_vuh3_df = spark.read.csv(nuek_vuh3_data_path, header=True, inferSchema=True)

In [5]:
nuek_vuh3_df.show(10, truncate=False)

+-----------+-------+---------------+----------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------+-------------+----------------------+-------------------+----------------------+----+-------------------+---------+------------+----+-----------------+--------+--------------+--------+---------------+----------------+---------+------------------------------+------------------------+-------------------+---------------------------------+-------------+-----------------------------+-------------------+-------------------+---------------------------+
|call_number|unit_id|incident_number|call_type       |call_date          |watch_date         |received_dttm      |entry_dttm         |dispatch_dttm      |response_dttm      |on_scene_dttm      |transport_dttm|hospital_dttm|call_final_disposition|available_dttm     |address               |city|zipcode_of_incident|battalion|station_area|box 

**Task 1**

In [14]:
# Створюємо сесію Spark
spark = SparkSession.builder \
    .master("local[*]") \
    .config("spark.sql.shuffle.partitions", "2") \
    .appName("MyGoitSparkSandbox") \
    .getOrCreate()

# Завантажуємо датасет
nuek_df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv(nuek_vuh3_data_path)

nuek_repart = nuek_df.repartition(2)

nuek_processed = nuek_repart \
    .where("final_priority < 3") \
    .select("unit_id", "final_priority") \
    .groupBy("unit_id") \
    .count()

# Ось ТУТ додано рядок
nuek_processed = nuek_processed.where("count>2")

# nuek_processed.show()

nuek_processed.collect()

input("Press Enter to continue...5")

# Закриваємо сесію Spark
# spark.stop()


Press Enter to continue...5


''

**Отримання публічного URL до локального Spark UI на Google Colab**

In [None]:
# авторизаційний токен ngrock для доступу до локального Spark UI в Google Colab
!ngrok config add-authtoken <Authtoken>


Authtoken saved to configuration file: /root/.config/ngrok/ngrok.yml


In [17]:
# Запускаємо тунель для порту 4040
public_url = ngrok.connect(4040)
print(f"Spark UI доступний за адресою: {public_url}")




Spark UI доступний за адресою: NgrokTunnel: "https://1b28-34-70-241-250.ngrok-free.app" -> "http://localhost:4040"


In [20]:
spark.stop()

**Task 2**

In [21]:
# Створюємо сесію Spark
spark = SparkSession.builder \
    .master("local[*]") \
    .config("spark.sql.shuffle.partitions", "2") \
    .appName("MyGoitSparkSandbox") \
    .getOrCreate()

# Завантажуємо датасет
nuek_df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv(nuek_vuh3_data_path)

nuek_repart = nuek_df.repartition(2)

nuek_processed = nuek_repart \
    .where("final_priority < 3") \
    .select("unit_id", "final_priority") \
    .groupBy("unit_id") \
    .count()

# Проміжний action: collect
nuek_processed.collect()

# Ось ТУТ додано рядок
nuek_processed = nuek_processed.where("count>2")

nuek_processed.collect()

input("Press Enter to continue...5")

# Закриваємо сесію Spark
# spark.stop()


Press Enter to continue...5


''

In [22]:
spark.stop()

# Чому додавання `.collect()` призвело до збільшення кількості Jobs?

## Основна причина
Spark використовує **ліниву обчислювальну модель**, де трансформації додаються в Directed Acyclic Graph (DAG) і виконуються тільки при виклику **action** (наприклад, `.collect()`). Кожен виклик `.collect()` змушує Spark виконати весь DAG до цього моменту, створюючи Jobs.

---

## Причина збільшення Jobs
1. **Spark працює ліниво (lazy execution):**
   - Трансформації (наприклад, `.where`, `.select`, `.groupBy`) лише додають операції до DAG і не виконуються негайно.
   - Обчислення запускаються лише під час виклику **action** (наприклад, `.collect()`).

2. **Два виклики `.collect()` запускають обчислення двічі:**
   - Перший виклик `.collect()` виконує весь DAG до цього моменту.
   - Після додаткової фільтрації (`where("count>2")`) створюється новий DAG.
   - Другий виклик `.collect()` змушує Spark виконати новий DAG.

3. **Розподілення на Jobs:**
   - Кожна складна трансформація (наприклад, `groupBy`) викликає **shuffle**, що створює нові Jobs.
   - Кожен виклик `.collect()` викликає обчислення DAG, додаючи Jobs.

---

## Аналіз кількості Jobs

### 1. До першого `.collect()`
Трансформації додаються до DAG:
- `repartition(2)` — розподіляє дані між двома партиціями.
- `where("final_priority < 3")` — фільтрує рядки.
- `select` — вибирає потрібні стовпці.
- `groupBy("unit_id").count()` — групує дані, викликаючи **shuffle**.

Коли викликається перше `.collect()`, Spark:
- Виконує весь DAG, створюючи кілька Jobs.

### 2. Додаткове фільтрування та друге `.collect()`
- Фільтрація (`where("count>2")`) створює новий DAG.
- Друге `.collect()` викликає обчислення нового DAG, створюючи ще кілька Jobs.

---

## Чому саме 3 додаткові Jobs?
1. Перше `.collect()` запускає Jobs для обчислення всього попереднього DAG.
2. Додане фільтрування створює новий DAG.
3. Друге `.collect()` запускає Jobs для нового DAG.
4. Spark не використовує кешування між викликами `.collect()` (якщо не вказати це явно), тому всі операції виконуються повторно.



**Task 3**

In [24]:
# Створюємо сесію Spark
spark = SparkSession.builder \
    .master("local[*]") \
    .config("spark.sql.shuffle.partitions", "2") \
    .appName("MyGoitSparkSandbox") \
    .getOrCreate()

# Завантажуємо датасет
nuek_df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv(nuek_vuh3_data_path)

nuek_repart = nuek_df.repartition(2)

nuek_processed_cached = nuek_repart \
    .where("final_priority < 3") \
    .select("unit_id", "final_priority") \
    .groupBy("unit_id") \
    .count() \
    .cache()  # Додано функцію cache

# Проміжний action: collect
nuek_processed_cached.collect()

# Ось ТУТ додано рядок
nuek_processed = nuek_processed_cached.where("count>2")

nuek_processed.collect()

input("Press Enter to continue...5")

# Звільняємо пям'ять від Dataframe
nuek_processed_cached.unpersist()

# Закриваємо сесію Spark
# spark.stop()


Press Enter to continue...5


DataFrame[unit_id: string, count: bigint]

In [25]:
spark.stop()

# Зменшення кількості Jobs

## Без кешування:
- Кожен виклик `.collect()` змушує Spark виконати всі трансформації з самого початку.
- Це призводить до повторного виконання DAG і додаткових Jobs.

## З кешуванням:
- При першому виконанні Spark обчислює весь DAG і кешує результат.
- Усі подальші виклики **action** використовують кешовані дані, зменшуючи обсяг роботи та кількість Jobs.
