<a href="https://colab.research.google.com/github/MartynaaP/MartynaaP.github.io/blob/main/zajecia_powtorzeniowe_zaawansowane.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Zajęcia powtórkowe: Big Data ze Sparkiem (wersja podstawowa)
Ten notebook zawiera rozszerzone zadania, wprowadzenie do dodatkowych funkcji Sparka oraz wyjaśnienia kluczowych metod, które pomogą w utrwaleniu materiału.


In [None]:
# Ustaw wersję jako parametr
SPARK_VERSION="3.5.5"

# Instalacja OpenJDK 8
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Pobranie Apache Spark z określoną wersją
!wget -q http://www.apache.org/dist/spark/spark-$SPARK_VERSION/spark-$SPARK_VERSION-bin-hadoop3.tgz

# Rozpakowanie archiwum Spark
!tar xf spark-$SPARK_VERSION-bin-hadoop3.tgz

# Instalacja findspark i pyspark
!pip install -q findspark==1.3.0
!pip install -q pyspark==$SPARK_VERSION

# Ustalamy zmienne środowiskowe.
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/spark-{SPARK_VERSION}-bin-hadoop3"

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.2/317.2 MB[0m [31m4.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [None]:
import findspark
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkContext

findspark.init(f"spark-{SPARK_VERSION}-bin-hadoop3")
sc = pyspark.SparkContext('local[*]')
spark = SparkSession.builder.appName('abc').getOrCreate()

## Zadanie 1: Agregacja z użyciem Spark DataFrame API i Window Functions

**Opis:** Wykorzystaj DataFrame API, aby załadować plik `pracownicy.csv`, a następnie obliczyć dla każdego działu (`department`):
1. Liczbę pracowników.
2. Średnią stawkę godzinową (`hourly_rate`).
3. Dla każdego pracownika dodaj kolumnę z odchyleniem stawki godzinowej od średniej w dziale.

**Dodatkowo:**
- Zcache'uj (cache) DataFrame przed obliczeniami, żeby pokazać jak działa pamięć podręczna Sparka.
- Użyj Window Functions z modułu `pyspark.sql.window`.

**Wybrane funkcje do utrwalenia:**
- `DataFrame.cache()`: oznacza, że dane zostaną wczytane do pamięci i ponowne operacje będą szybsze.
- `Window.partitionBy()`: definiuje, jak dzielimy dane na okna.
- `avg`, `col`, `round`: wyrażenia SQL do obliczeń w DataFrame.


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType, DoubleType

df = spark.read.csv("pracownicy.csv", header=True, inferSchema=True)

df.createOrReplaceTempView("cc")

def classify_credit_limit(credit_limit):
    if credit_limit is None:
        return None  # albo np. 0, lub inna wartość domyślna
    return (credit_limit // 1000) * 1000


classify_credit_limit_udf = udf(classify_credit_limit, DoubleType())

# Register the UDF with Spark
spark.udf.register("classify_credit_limit_udf", classify_credit_limit_udf)

df_classified = df.withColumn("CREDIT_LIMIT_RANGE", classify_credit_limit_udf(df["CREDIT_LIMIT"]))

df_classified.select("CUST_ID", "CREDIT_LIMIT", "CREDIT_LIMIT_RANGE").show()

df_classified.createOrReplaceTempView("cc")

spark.sql("""
SELECT SUM(INSTALLMENTS_PURCHASES), CREDIT_LIMIT_RANGE
FROM cc
GROUP BY CREDIT_LIMIT_RANGE
""").show()

AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `CREDIT_LIMIT` cannot be resolved. Did you mean one of the following? [` Imie;Nazwisko ;stawka godzinowa w dolarach;rok urodzenia`].

In [None]:
from google.colab import drive
drive.mount('/content/drive')

## Zadanie 2: Zapytania SQL z UDF i przedziałami

**Opis:** Załaduj plik `CC_General.csv` jako DataFrame i zarejestruj tymczasowy widok SQL o nazwie `cc`.
1. Zdefiniuj User-Defined Function (UDF), która zaklasyfikuje `credit_limit` do przedziału co 1000.
2. Napisz zapytanie SQL, które obliczy sumę `INSTALLMENTS_PURCHASES` dla każdego przedziału.
3. Posortuj wynik rosnąco wg przedziału.

**Dodatkowo:**
- Użyj UDF aby pokazać możliwość rozszerzenia SQL o niestandardowe funkcje.

**Wybrane funkcje i moduły:**
- `pyspark.sql.functions.udf`: do tworzenia funkcji użytkownika.
- `spark.udf.register()`: rejestracja UDF w kontekście SQL.


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType, DoubleType

df = spark.read.csv("CC.csv", header=True, inferSchema=True)

df.createOrReplaceTempView("cc")

def classify_credit_limit(credit_limit):
    if credit_limit is None:
        return None  # albo np. 0, lub inna wartość domyślna
    return (credit_limit // 1000) * 1000


classify_credit_limit_udf = udf(classify_credit_limit, DoubleType())

df_classified = df.withColumn("CREDIT_LIMIT_RANGE", classify_credit_limit_udf(df["CREDIT_LIMIT"]))

df_classified.select("CUST_ID", "CREDIT_LIMIT", "CREDIT_LIMIT_RANGE").show()

df_classified.createOrReplaceTempView("cc")

spark.sql("""
SELECT SUM(INSTALLMENTS_PURCHASES), CREDIT_LIMIT_RANGE
FROM cc
GROUP BY CREDIT_LIMIT_RANGE
""").show()

AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/content/CC_General.csv.

## Zadanie 3: Transformacje RDD z Accumulators i Broadcast

**Opis:** Wczytaj plik `pracownicy.csv` jako DataFrame i skonwertuj go na RDD.
1. Użyj Broadcast Variable, aby przenieść na wszystkie węzły listę działów, które analizujemy.
2. Policzyć liczbę pracowników urodzonych po roku 1990 w wybranych działach.
3. Użyj Accumulatora, aby zliczyć łączną liczbę przefiltrowanych rekordów.

**Wybrane funkcje i moduły:**
- `sc.broadcast()`: tworzenie zmiennej broadcast.
- `sc.accumulator()`: tworzenie akumulatora do sumowania po stronie sterującej.


In [None]:
# TODO: Zadanie 3 - Broadcast i Accumulator
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('Zadanie3_Advanced').getOrCreate()
sc = spark.sparkContext

# Lista działów do analizy
departments = [] # do uzupełnienia
dep_broadcast = sc.broadcast(departments)

# Accumulator do zliczania rekordów
count_acc = sc.accumulator(0)

# Wczytaj dane
df = spark.read.option('header', True).csv('pracownicy.csv')
rdd = df.rdd

def process(row):
    # Zwiększ akumulator
    count_acc.add(1)
    birth_year = int(row['birth_year'])
    if birth_year > 1990 and row['department'] in dep_broadcast.value:
        return (row['gender'], 1)
    else:
        return None

# Filtruj i licz
pairs = rdd.map(process).filter(lambda x: x is not None)
result = pairs.reduceByKey(lambda a, b: a + b)
print('Wynik:', result.collect())
print('Przetworzono rekordów:', count_acc.value)