<a href="https://colab.research.google.com/github/Dima200206/-2/blob/main/%D0%9B%D0%913_%D0%9A%D0%A2%D0%9E%D0%92%D0%94%20_%D0%A4%D0%86%D0%A2_1-4%D0%BC_%D0%92%D0%BB%D0%B0%D1%81%D0%B5%D0%BD%D0%BA%D0%BE_%D0%94%D0%BC%D0%B8%D1%82%D1%80%D0%BE_GIT.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# -*- coding: utf-8 -*-
# Лабораторна робота: Робота з RDD у PySpark

# ============================================================
# ЧАСТИНА 1. СТВОРЕННЯ RDD ТА БАЗОВІ ОПЕРАЦІЇ
# ============================================================

!pip install pyspark
from pyspark.sql import SparkSession
import random

# Ініціалізація середовища PySpark
spark = SparkSession.builder.appName("Lab3_RDD").master("local[*]").getOrCreate()
sc = spark.sparkContext
print("✅ SparkContext створено:", sc)

# ------------------------------------------------------------
# 1. Створення RDD зі списку слів
# ------------------------------------------------------------
words = ["apple", "banana", "orange", "blueberry", "blackberry", "grape", "banana", "apple", "berry"] * 12
rdd_words = sc.parallelize(words)
print("Кількість елементів у RDD:", rdd_words.count())
print("Перші 10 елементів:", rdd_words.take(10))

# ------------------------------------------------------------
# 2. Трансформації над RDD
# ------------------------------------------------------------
rdd_upper = rdd_words.map(lambda x: x.upper())
rdd_b = rdd_upper.filter(lambda x: x.startswith("B"))
rdd_distinct = rdd_b.distinct()

print("\n🔹 Слова, що починаються з B:", rdd_b.collect())
print("🔹 Унікальні слова:", rdd_distinct.collect())

# ------------------------------------------------------------
# 3. Робота з числовими даними
# ------------------------------------------------------------
numbers = [random.randint(1, 100) for _ in range(100)]
rdd_numbers = sc.parallelize(numbers)

rdd_squared = rdd_numbers.map(lambda x: x ** 2)
rdd_even = rdd_squared.filter(lambda x: x % 2 == 0)
even_count = rdd_even.count()

print("\n🔹 Кількість парних чисел:", even_count)
print("🔹 Перші 10 квадратів:", rdd_squared.take(10))

# ------------------------------------------------------------
# 4. Агрегація даних
# ------------------------------------------------------------
sum_all = rdd_numbers.reduce(lambda a, b: a + b)
product_all = rdd_numbers.reduce(lambda a, b: a * b)

print("\n🔹 Сума всіх чисел:", sum_all)
print("🔹 Добуток усіх чисел:", product_all)

# ------------------------------------------------------------
# 5. Кешування та повторне використання RDD
# ------------------------------------------------------------
rdd_numbers.cache()
print("\nКількість елементів (1-й раз):", rdd_numbers.count())
print("Кількість елементів (2-й раз із кешу):", rdd_numbers.count())

# ============================================================
# ЧАСТИНА 2. ПРАКТИЧНІ ЗАВДАННЯ
# ============================================================

# ------------------------------------------------------------
# 1. Підрахунок кількості слів у датасеті
# ------------------------------------------------------------
sample_text = ["This is an example of PySpark word count.",
               "PySpark RDD operations are very powerful.",
               "This example demonstrates map and reduceByKey in Spark."]

text_rdd = sc.parallelize(sample_text)
words = text_rdd.flatMap(lambda line: line.split(" "))
word_counts = words.map(lambda x: (x.lower(), 1)).reduceByKey(lambda a, b: a + b)
print("\n🔹 Кількість слів у датасеті:")
for w, c in word_counts.collect():
    print(f"{w}: {c}")

# ------------------------------------------------------------
# 2. Аналіз логів сервера
# ------------------------------------------------------------
logs_data = [
    "127.0.0.1 - - [10/Oct/2023:13:55:36] 'GET /index.html HTTP/1.1' 200 2326",
    "192.168.1.1 - - [10/Oct/2023:14:55:36] 'POST /login HTTP/1.1' 404 452",
] * 60  # 120 записів

logs = sc.parallelize(logs_data)

# Успішні запити (200)
success = logs.filter(lambda line: " 200 " in line)
ip_counts = success.map(lambda line: (line.split(" ")[0], 1)).reduceByKey(lambda a, b: a + b)

print("\n🔹 Топ 10 IP із успішними запитами:")
print(ip_counts.take(10))

# Групування за статусом
grouped = logs.map(lambda line: (line.split(" ")[-2], line)).groupByKey()
print("\n🔹 Групування за статусами:")
for status, group in grouped.collect():
    print(status, "→", len(list(group)))

# ------------------------------------------------------------
# 3. Аналіз тексту (імітація великої статті)
# ------------------------------------------------------------
article_text = """
PySpark is an interface for Apache Spark in Python. It allows you to write Spark applications using Python APIs.
PySpark supports most of Spark’s features such as Spark SQL, DataFrame, Streaming, MLlib (Machine Learning) and Spark Core.
Spark is designed to cover a wide range of workloads such as batch applications, iterative algorithms, interactive queries and streaming.
""" * 300  # ~1MB тексту

article = sc.parallelize(article_text.split("\n"))

words = (article
         .flatMap(lambda line: line.lower().split())
         .filter(lambda w: w.isalpha() and len(w) > 3))

word_freq = words.map(lambda w: (w, 1)).reduceByKey(lambda a, b: a + b)
top_words = word_freq.takeOrdered(20, key=lambda x: -x[1])
print("\n🔹 20 найчастіших слів у статті:")
for w, c in top_words:
    print(f"{w}: {c}")

# Найпопулярніше слово
top_word = top_words[0][0]
print("\n🔹 Найпопулярніше слово:", top_word)

# ------------------------------------------------------------
# 4. Broadcast-змінні
# ------------------------------------------------------------
transactions = sc.parallelize([101,102,101,103,102,101,104,103,104,102]*10)
products = {101: "Ноутбук", 102: "Смартфон", 103: "Планшет", 104: "Монітор"}

broadcast_products = sc.broadcast(products)
mapped = transactions.map(lambda x: (broadcast_products.value[x], 1)).reduceByKey(lambda a, b: a + b)

print("\n🔹 Підрахунок продажів за товарами:")
for k, v in mapped.collect():
    print(f"{k}: {v}")

# ------------------------------------------------------------
# 5. Акумуляторні змінні
# ------------------------------------------------------------
high_temp_acc = sc.accumulator(0)
temps = sc.parallelize([25, 31, 28, 35, 30, 33, 29, 40, 22, 31])

def check_temp(t):
    global high_temp_acc
    if t > 30:
        high_temp_acc.add(1)
    return t

temps.map(check_temp).collect()
print("\n🔹 Кількість аномально високих температур:", high_temp_acc.value)

# ============================================================
# ВИСНОВКИ
# ============================================================

print("""
✅ ВИСНОВКИ:
- Створено та проаналізовано RDD-структури в PySpark.
- Виконано базові трансформації (map, filter, distinct, reduceByKey).
- Проведено агрегацію та кешування.
- Використано broadcast- і accumulator-змінні.
- Продемонстровано роботу з текстами, логами, транзакціями та температурами.
PySpark дає змогу ефективно обробляти великі обсяги даних у розподіленому середовищі.
""")


✅ SparkContext створено: <SparkContext master=local[*] appName=Lab3_RDD>
Кількість елементів у RDD: 108
Перші 10 елементів: ['apple', 'banana', 'orange', 'blueberry', 'blackberry', 'grape', 'banana', 'apple', 'berry', 'apple']

🔹 Слова, що починаються з B: ['BANANA', 'BLUEBERRY', 'BLACKBERRY', 'BANANA', 'BERRY', 'BANANA', 'BLUEBERRY', 'BLACKBERRY', 'BANANA', 'BERRY', 'BANANA', 'BLUEBERRY', 'BLACKBERRY', 'BANANA', 'BERRY', 'BANANA', 'BLUEBERRY', 'BLACKBERRY', 'BANANA', 'BERRY', 'BANANA', 'BLUEBERRY', 'BLACKBERRY', 'BANANA', 'BERRY', 'BANANA', 'BLUEBERRY', 'BLACKBERRY', 'BANANA', 'BERRY', 'BANANA', 'BLUEBERRY', 'BLACKBERRY', 'BANANA', 'BERRY', 'BANANA', 'BLUEBERRY', 'BLACKBERRY', 'BANANA', 'BERRY', 'BANANA', 'BLUEBERRY', 'BLACKBERRY', 'BANANA', 'BERRY', 'BANANA', 'BLUEBERRY', 'BLACKBERRY', 'BANANA', 'BERRY', 'BANANA', 'BLUEBERRY', 'BLACKBERRY', 'BANANA', 'BERRY', 'BANANA', 'BLUEBERRY', 'BLACKBERRY', 'BANANA', 'BERRY']
🔹 Унікальні слова: ['BERRY', 'BANANA', 'BLUEBERRY', 'BLACKBERRY']

🔹 К