<a href="https://colab.research.google.com/github/Xeesto/UEP/blob/dev/BigData_notatki_zaliczenie_MATI.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Wczytanie Sparka

In [None]:
# Ustaw wersję jako parametr
SPARK_VERSION="3.5.6"

# Instalacja OpenJDK 8
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Pobranie Apache Spark z określoną wersją
!wget -q http://www.apache.org/dist/spark/spark-$SPARK_VERSION/spark-$SPARK_VERSION-bin-hadoop3.tgz

# Rozpakowanie archiwum Spark
!tar xf spark-$SPARK_VERSION-bin-hadoop3.tgz

# Instalacja findspark i pyspark
!pip install -q findspark==1.3.0
!pip install -q pyspark==$SPARK_VERSION

# Ustalamy zmienne środowiskowe.
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/spark-{SPARK_VERSION}-bin-hadoop3"

In [1]:
import findspark
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.functions import *

findspark.init(f"spark-{SPARK_VERSION}-bin-hadoop3")
sc = pyspark.SparkContext('local[*]')
spark = SparkSession.builder.appName('abc').getOrCreate()

ModuleNotFoundError: No module named 'findspark'

# ETL | podstawowa obróbka danych | pierwsze kroki w dataframie

In [None]:
df = spark.read.csv('all_weekly_excess_deaths.csv', header=True, inferSchema=True, sep=';')

In [None]:
df.show()

Tworzenie własnej schema

In [None]:
schema = StructType([
    StructField("CUST_ID", IntegerType(), True),
    StructField("BALANCE", DoubleType(), True),
    StructField("BALANCE_FREQUENCY", DoubleType(), True),
    StructField("PURCHASES", DoubleType(), True),
    StructField("ONEOFF_PURCHASES", DoubleType(), True),
    StructField("INSTALLMENTS_PURCHASES", DoubleType(), True),
    StructField("CASH_ADVANCE", DoubleType(), True),
    StructField("PURCHASES_FREQUENCY", DoubleType(), True),
    StructField("ONEOFF_PURCHASES_FREQUENCY", DoubleType(), True),
    StructField("PURCHASES_INSTALLMENTS_FREQUENCY", DoubleType(), True),
    StructField("CASH_ADVANCE_FREQUENCY", DoubleType(), True),
    StructField("CASH_ADVANCE_TRX", IntegerType(), True),
    StructField("PURCHASES_TRX", IntegerType(), True),
    StructField("CREDIT_LIMIT", IntegerType(), True),
    StructField("PAYMENTS", DoubleType(), True),
    StructField("MINIMUM_PAYMENTS", DoubleType(), True),
    StructField("PRC_FULL_PAYMENT", DoubleType(), True),
    StructField("TENURE", IntegerType(), True)
])

# Load the data with the defined schema
cc_gen = spark.read.csv("CC_GENERAL.csv", header=True, schema=schema)

Podzielenie tekstu tak aby otrzymać nowa kolumnę do spacji

In [None]:
# Zmiana typu kolumny ze STRINGA na INT
df = df.withColumn("total_deaths_number", col("total_deaths_number").cast("int"))

📊 Struktura i podgląd danych

| Komenda                | Opis                                         |
| ---------------------- | -------------------------------------------- |
| `df.show(n)`           | Wyświetla pierwsze `n` wierszy               |
| `df.printSchema()`     | Pokazuje strukturę DataFrame                 |
| `df.columns`           | Zwraca listę nazw kolumn                     |
| `df.dtypes`            | Zwraca listę typów danych                    |
| `df.describe().show()` | Statystyki opisowe (średnia, min, max itd.)  |
| `df.head(n)`           | Zwraca pierwsze `n` wierszy jako obiekty Row |
| `df.limit(n)`          | Zwraca nowy DataFrame z `n` wierszami        |


🔄 Łączenie i transformacje

| Komenda                      | Opis                   |
| ---------------------------- | ---------------------- |
| `df.groupBy("col").agg(...)` | Grupowanie i agregacje |
| `df.groupBy("col").count()`  | Liczy wystąpienia      |
| `df.agg({...})`              | Agregacje globalne     |
| `df.orderBy("col")`          | Sortowanie             |


🔄 Łączenie i transformacje

| Komenda                      | Opis                   |
| ---------------------------- | ---------------------- |
| `df.groupBy("col").agg(...)` | Grupowanie i agregacje |
| `df.groupBy("col").count()`  | Liczy wystąpienia      |
| `df.agg({...})`              | Agregacje globalne     |
| `df.orderBy("col")`          | Sortowanie             |


🔎 Filtrowanie i wybieranie

| Komenda                               | Opis                                         |
| ------------------------------------- | -------------------------------------------- |
| `df.select("col1", "col2")`           | Wybiera kolumny                              |
| `df.filter(condition)`                | Filtrowanie wierszy (alias: `df.where(...)`) |
| `df.drop("col")`                      | Usuwa kolumnę                                |
| `df.withColumn("new", ...)`           | Dodaje lub nadpisuje kolumnę                 |
| `df.withColumnRenamed("old", "new")`  | Zmienia nazwę kolumny                        |
| `df.distinct()`                       | Usuwa duplikaty                              |
| `df.dropDuplicates(["col1", "col2"])` | Duplikaty na podstawie kolumn                |


In [None]:
# Nowa kolumna bazująca na istniejącej kolumnie z pierwszymi 4 znakami
df = df.withColumn("total_deaths_prefix", substring(col("total_deaths"), 1, 4))

In [None]:
# Podziel tekst w kolumnie total_deaths na dwie części: przed i po pierwszej spacji
split_col = split(col("total_deaths"), " ", 2)

# Nowa kolumna o nazwie "total_deaths_number" idzie na sam koniec danych i jest rozdzielona po spacji bazując na kolumnie "total_deaths"
df = df.withColumn("total_deaths_number", split_col.getItem(0))

In [None]:
# Wybranie kolumn, agregacja po kolumnie groupBy
df_total = (
       df.select('total_deaths_number', 'country', 'region')
      .groupBy('region')
      .agg(max('total_deaths_number').alias('last_week_deaths'))
      .orderBy('last_week_deaths', ascending=False)
)

df_total.show()