# DataFrame

DataFrame w Spark to rozproszony zbiór danych zorganizowany w kolumny (jak tabela w bazie danych). Jest to podstawowa abstrakcja danych w Spark SQL i opiera się na koncepcji schematu danych + RDD. DataFrame to:

- zbiór wierszy z nazwanymi kolumnami,

- pozwala na operacje podobne do SQL: select, filter, groupBy, join, itd.,

- umożliwia optymalizację przez Catalyst Optimizer i Tungsten Engine,

- może być ładowany z różnych źródeł: CSV, Parquet, JSON, Hive, JDBC, itd.

#### Tworzenie DataFrame z RDD z wykorzystaniem klasy Row.

In [None]:
from pyspark.sql import SparkSession, Row

spark = SparkSession.builder.appName("CreateDF_Row").getOrCreate()

rdd = spark.sparkContext.parallelize([
    Row(jury="Julia Wieniawa", age=23),
    Row(jury="Agnieszka Chylińska", age=49),
    Row(jury="Marcin Prokop", age=48),
])

df = spark.createDataFrame(rdd)
df.show()

#### Z listy słowników (Python dicts)

In [None]:
data = [
    {"jury": "Julia Wieniawa", "wiek": 27},
    {"jury": "Agnieszka Chylińska", "wiek": 49},
    {"jury": "Marcin Prokop", "wiek": 48}
]

df = spark.createDataFrame(data)
df.show()

#### Z listy tuple + podanie nazw kolumn

In [None]:
data = [("Marcin Prokop", 48), ("Julia Wieniawa", 27),("Agnieszka Chylińska",49)]

columns = ["name", "age"]

df = spark.createDataFrame(data, schema=columns)
df.show()

#### Z RDD i StructType + StructField (pełne typowanie)

In [None]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

data = [("Marcin Prokop", 48), ("Julia Wieniawa", 27),("Agnieszka Chylińska",49)]
rdd = spark.sparkContext.parallelize(data)

#Programistyczny sposób definiowania schematu -> Pozmieniajmy nieco schemat i dane
schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True)
])

df = spark.createDataFrame(rdd, schema=schema)
df.printSchema()
df.show()

In [None]:
# wykorzystanie ciągu tekstowego DDL do definiowania schematu
schema = 'name STRING, age INT not null'

In [None]:
df = spark.createDataFrame(rdd, schema=schema)
df.printSchema()
df.show()

####  Z plików (CSV, JSON, Parquet)

##### Skrócona wersja: csv()

- Wyraźnie określa format pliku (csv, json, parquet, itd.)

- Pozwala ustawić więcej opcji (delimiter, nullValue, inferSchema, encoding, itd.)

- Używana często w produkcji lub do bardziej złożonego ładowania

In [None]:
df = spark.read.option("header", "true").option("infeSchema", "true").csv("data/flights/summary-data/csv/*")
df.show()

In [None]:
df.schema

##### Rozszerzona wersja: format("csv").load()

- Wyraźnie określa format pliku (csv, json, parquet, itd.)

- Pozwala ustawić więcej opcji (delimiter, nullValue, inferSchema, encoding, itd.)

- Używana często w produkcji lub do bardziej złożonego ładowania

In [None]:
df_flights_csv = spark.read \
        .format("csv") \
        .option("header", "true") \
        .option("inferSchema", "true") \
        .load("data/flights/summary-data/csv/*")

In [None]:
df_flights_csv.schema.simpleString()

In [None]:
df_flights_json = (spark.read
        .format("json")
        .option("inferSchema", "true")
        .load("data/flights/summary-data/json/*"))

In [None]:
df_flights_json.schema.simpleString()

In [None]:
# Parquet to kolumnowy format pliku przechowujący dane tabelaryczne, zaprojektowany z myślą o efektywnym przechowywaniu i szybkim dostępie do dużych zbiorów danych. 
# W przeciwieństwie do formatów wierszowych (jak CSV), przechowuje dane według kolumn, co pozwala na optymalizację zapytań, kompresję i przyspieszenie odczytu tylko potrzebnych kolumn
df_flights_parquet = (spark.read
        .format("parquet")
        .option("inferSchema", "true")
        .load("data/flights/summary-data/parquet/*"))

In [None]:
df_flights_parquet.schema.simpleString()

#### Użycie toDF() z RDD lub listy - mniej elastyczne niz  funkcja powyzej (nie podajemy schematu)

In [None]:
rdd = spark.sparkContext.parallelize(data)
df = rdd.toDF(["name", "age"])
df.show()

#### Zadanie

> Stówrz dataframe o kolumnach `daty`,`temat`,`godziny` dla zajęć ze sparkiem

**Hint** Dla daty wykorzystaj `datetime.datetime`

In [None]:
import datetime


**Przykładowy wynik**

```
+----------+------------------+-------+
|      date|             temat|godziny|
+----------+------------------+-------+
|2025-05-30|       Spark - RDD|      3|
|2025-05-30|   Spark-Dataframe|      3|
|2025-05-30|Spark-kolejny krok|      3|
+----------+------------------+-------+

```

## Podstawowe metody do przeglądania struktury DF

In [None]:
#Wyświetla schemat DataFrame w formacie drzewa.
df_flights_parquet.printSchema()

In [None]:
#Zwraca obiekt StructType, który reprezentuje schemat DataFrame.
df_flights_parquet.schema

In [None]:
#Zwraca listę krotek (nazwa_kolumny, typ_danych).
df_flights_parquet.dtypes

In [None]:
#Zwraca listę nazw kolumn.
df_flights_parquet.columns

To **atrybut obiektu**, czyli bezpośredni dostęp do kolumny tylko wtedy, gdy jej nazwa jest poprawnym identyfikatorem języka Python (np. bez spacji, nie zaczyna się od cyfry itd.).

In [None]:
df_flights_parquet.DEST_COUNTRY_NAME

To **indeksowanie kolumny po nazwie** – zalecana metoda, ponieważ:

- Działa zawsze – niezależnie od tego, jaką nazwę ma kolumna.

- Jest bardziej bezpieczna i odporna na błędy.

In [None]:
df_flights_parquet['DEST_COUNTRY_NAME']

In [None]:
df_flights_parquet[0]

Metoda .show() służy do wyświetlania danych z DataFrame'a w Spark w czytelnej, konsolowej formie tabeli. Jest to jedna z najczęściej używanych metod do szybkiego podglądu danych.

**Składnia:**

`df.show(n=20, truncate=True, vertical=False)
`

In [None]:
df_flights_parquet.show()

In [None]:
#  Nie przycinaj zawartości kolumn:
df_flights_parquet.show(truncate=False)

In [None]:
# przydatne przy długich/nested danych
df_flights_parquet.show(4,vertical=True)

### select()

Metoda .select() w PySpark służy do wybierania jednej lub wielu kolumn z DataFrame’a. Można jej używać także do obliczeń, aliasów i wyrażeń z funkcji (F.col(), F.expr(), itp.).

**Składnia:**
```
df.select("col1", "col2", ...)
df.select(df.col1, df.col2)
df.select(F.col("col1").alias("new_name"))
df.selectExpr("col1", "col2 + 1 as col2_plus_1")

```

In [None]:
df_flights_parquet.select("DEST_COUNTRY_NAME").show()

In [None]:
#  Wybór wielu kolumn:
df_flights_parquet.select("DEST_COUNTRY_NAME","ORIGIN_COUNTRY_NAME").show()

In [None]:
df_flights_parquet.select("*").show()

In [None]:
# Zmiana nazwy kolumny
from pyspark.sql.functions import col
df_flights_parquet.select(col("DEST_COUNTRY_NAME").alias("kraj_docelowy")).show(2)

In [None]:
from pyspark.sql.functions import concat_ws
df_flights_parquet.select(df_flights_parquet.DEST_COUNTRY_NAME,
                           df_flights_parquet.ORIGIN_COUNTRY_NAME,
                           concat_ws(" -> ", "ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").alias("połączenie")
                         ).show(5,truncate=False)

In [None]:
df_flights_parquet.selectExpr(
    "ORIGIN_COUNTRY_NAME",
    "DEST_COUNTRY_NAME",
    "concat(ORIGIN_COUNTRY_NAME, ' -> ', DEST_COUNTRY_NAME) as route"
).show()

| Cecha               | `select()`                              | `selectExpr()`                        |
| ------------------- | --------------------------------------- | ------------------------------------- |
| Styl                | Pythonic (`col()`, `concat_ws()`, itp.) | SQL-like (stringi z wyrażeniami SQL)  |
| Sprawdzanie składni | Błąd w czasie kompilacji                | Błąd w czasie wykonania (string eval) |
| Złożone operacje    | Czasem dłuższe                          | Krócej, jeśli znasz SQL               |
| Przykład aliasu     | `.alias("nazwa")`                       | `"kolumna as nazwa"`                  |


### withColumn() - dodawanie lub modyfikowanie kolumn

In [None]:
df_flights_parquet.withColumn("kraj_wylotu", col("ORIGIN_COUNTRY_NAME")).show(5)

In [None]:
from pyspark.sql.functions import upper
#Modyfikację istniejącej kolumny:
(df_flights_parquet
.withColumn("ORIGIN_COUNTRY_NAME", upper(col("ORIGIN_COUNTRY_NAME")))
.show(5))

### withColumnRenamed()

Metoda withColumnRenamed() w PySpark służy do zmiany nazwy istniejącej kolumny w DataFrame. Jest bardzo przydatna, gdy chcesz ustandaryzować nazwy kolumn lub przygotować dane do dalszych operacji.

- `withColumnRenamed()` można wywołać w łańcuchu wielokrotnie, ale nie przyjmuje listy nazw (nie zmienia wielu kolumn na raz)

In [None]:
(df_flights_parquet.withColumnRenamed("ORIGIN_COUNTRY_NAME", "Wylot").withColumnRenamed("DEST_COUNTRY_NAME","Przylot")).show(5)

### filter() where()

W PySpark możesz filtrować wiersze w DataFrame za pomocą metod `filter()` oraz `where()` – i co ważne: działają one identycznie. To tylko różne sposoby zapisu tej samej operacji.

In [None]:
df_flights_parquet.filter(df_flights_parquet["DEST_COUNTRY_NAME"] == "United States").show(5)

In [None]:
df_flights_parquet.where(df_flights_parquet["DEST_COUNTRY_NAME"] == "United States").show(5)

Funkcja `isin()` w PySpark służy do sprawdzania, czy wartość w kolumnie należy do określonego zbioru wartości — odpowiednik SQL-owego IN

In [None]:
df_flights_parquet.filter(df_flights_parquet["ORIGIN_COUNTRY_NAME"].isin("Portugal", "Costa Rica")).show()

Łączenie warunków działa z operatorami:

- & – logiczne AND

- | – logiczne OR

- ~ – NOT

In [None]:
#Złożone warunki (AND, OR)
df_flights_parquet.filter((df_flights_parquet["count"] > 10) & (df_flights_parquet["ORIGIN_COUNTRY_NAME"] == "Canada")).show()

In [None]:
df_flights_parquet.filter((col("count") > 10) & (col("ORIGIN_COUNTRY_NAME") == "Canada")).show()

In [None]:
from pyspark.sql.functions import rlike

# LIKE: nazwę kraju zaczynającego się od "Uni"
df_flights_parquet.filter(df_flights_parquet["DEST_COUNTRY_NAME"].like("Uni%")).show()

# RLIKE (regex): nazwy zawierające "a" i kończące się na "a"
df_flights_parquet.filter(df_flights_parquet["ORIGIN_COUNTRY_NAME"].rlike("a.*a$")).show()

#### Zadanie

Wyświetl tylko te rekordy, w których:

- kraj pochodzenia `ORIGIN_COUNTRY_NAME` to Kanada lub Stany Zjednoczone
ORAZ
- liczba lotów (count) jest większa niż 1000

**Przykładowy wynik**

 ```
+------------------+-------------------+------+
| DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| count|
+------------------+-------------------+------+
|            Mexico|      United States|  6200|
|     United States|      United States|348113|
|           Germany|      United States|  1392|
|            Canada|      United States|  8271|
|Dominican Republic|      United States|  1109|
|             Japan|      United States|  1383|
|    United Kingdom|      United States|  1629|
|     United States|             Canada|  8305|
+------------------+-------------------+------+

```

#### Zadanie

Pokaż loty, gdzie kraj pochodzenia `ORIGIN_COUNTRY_NAME` NIE jest  z "Russia"

### lit()

Funkcja **lit()** w PySpark służy do tworzenia kolumny zawierającej stałą (literalną) wartość. Jest to szczególnie przydatne, gdy chcesz dodać kolumnę z tą samą wartością dla wszystkich wierszy lub użyć stałej w wyrażeniach kolumnowych.

In [None]:
from pyspark.sql.functions import lit
df_flights_parquet.withColumn("stała_kolumna", lit(1)).show(5)

In [None]:
df_flights_parquet.withColumn("nowa_kolumna", lit(None)).show(5)

### Manipulacji ciągami znaków

- `upper(col)`: Zamienia wszystkie znaki na wielkie litery.

- `lower(col)`: Zamienia wszystkie znaki na małe litery.

- `initcap(col)`: Zamienia pierwszy znak każdego słowa na wielką literę.

In [None]:
from pyspark.sql.functions import upper, lower, initcap

In [None]:
df_flights_parquet.select(
    upper("DEST_COUNTRY_NAME").alias("kraj_docelowy_upper"),
    lower("DEST_COUNTRY_NAME").alias("kraj_docelowy_lower"),
    initcap("DEST_COUNTRY_NAME").alias("kraj_docelowy_initcap")
).show()

**Łączenie ciągów znaków**

- `concat(col1, col2, ...)`: Łączy kolumny bez separatora.

- `concat_ws(sep, col1, col2, ...)`: Łączy kolumny z podanym separatorem.

- `format_string(format, *cols)`: Formatuje ciąg znaków w stylu printf.

In [None]:
from pyspark.sql.functions import concat_ws, format_string

df_flights_parquet.select(
    concat_ws(" ", "DEST_COUNTRY_NAME", "DEST_COUNTRY_NAME").alias("pełna_nazwa")
).show(truncate=False)


In [None]:
df_flights_parquet.select(format_string('%s => %s',
                                        df_flights_parquet.ORIGIN_COUNTRY_NAME,
                                        df_flights_parquet.DEST_COUNTRY_NAME)
                          .alias("Wylot => Przylot")).show(5,truncate=False)


**Operacje na podciągach**

- `substring(col, pos, len)`: Zwraca podciąg od pozycji pos o długości len.

- `instr(col, substr)`: Zwraca pozycję pierwszego wystąpienia substr w col.

- `length()`:  Zwraca długości (liczby znaków) w stringu (tekście) w kolumnie DataFrame.


In [None]:
from pyspark.sql.functions import substring, instr,length
df_flights_parquet.select(
    substring(df_flights_parquet.DEST_COUNTRY_NAME, 1, 5).alias("prefix"),
    instr(df_flights_parquet.DEST_COUNTRY_NAME, "ed").alias("ed_position"),
    length(df_flights_parquet.DEST_COUNTRY_NAME).alias("długość")
).show()

**Czyszczenie i przycinanie**

- `trim(col)`: Usuwa białe znaki z początku i końca ciągu.

- `ltrim(col)`, `rtrim(col)`: Usuwają białe znaki z lewej lub prawej strony.

- `lpad(col, len, pad)`, `rpad(col, len, pad)`: Dopełniają ciąg do długości len znakiem pad

In [None]:
dane = [("   Spark.    ",) ]
df = spark.createDataFrame(dane).toDF("test")
df.show()

In [None]:
from pyspark.sql.functions import col, ltrim, rtrim, trim
df.withColumn("ltrim", ltrim(col("test"))). \
  withColumn("rtrim", rtrim(col("test"))). \
  withColumn("trim", trim(col("test"))). \
  show()

In [None]:
from pyspark.sql.functions import concat, lpad,rpad

df_flights_parquet.select(concat(
    rpad(df_flights_parquet.ORIGIN_COUNTRY_NAME, 40, '-'),
    lpad(df_flights_parquet.DEST_COUNTRY_NAME, 20,'-')
).alias("Wylot => Przylot")).show(truncate=False)

**Operacje z wyrażeniami regularnymi**

- `regexp_replace(col, pattern, replacement)`: Zamienia wszystkie wystąpienia wzorca pattern na replacement.

- `regexp_extract(col, pattern, group)`: Wyodrębnia część ciągu pasującą do wzorca pattern.

- `rlike(col, pattern)`: Sprawdza, czy ciąg pasuje do wyrażenia regularnego.

In [None]:
from pyspark.sql.functions import regexp_extract

# Wyciągnij tylko pierwszy wyraz z ORIGIN_COUNTRY_NAME.
df_flights_parquet.withColumn("first_word", regexp_extract("ORIGIN_COUNTRY_NAME", r"^\w+", 0)).show()

In [None]:
from pyspark.sql.functions import regexp_replace

#Zamień spacje na podkreślniki w DEST_COUNTRY_NAME
df_flights_parquet.withColumn("dest_clean", regexp_replace("DEST_COUNTRY_NAME", " ", "_")).show()

### dzielenia tekstu na części

- `split(...)` – zwraca kolumnę typu array (tablica stringów),

- `getItem(n)` – wybiera n-ty element (indeks od 0).

In [None]:
from pyspark.sql.functions import split

# rozdziel nazwy krajów
df_flights_parquet.withColumn("pierwszy_człon", split("DEST_COUNTRY_NAME", " ").getItem(0)).show()

In [None]:
df_flights_parquet.withColumn("drugi_człon", split("DEST_COUNTRY_NAME", " ").getItem(1)).show()

### explode()

Funkcja `explode()` w PySpark jest używana do **"rozpakowywania" tablicy lub mapy** – tworzy osobny wiersz **dla każdego elementu w tablicy** w kolumnie.

In [None]:
from pyspark.sql.functions import explode

df_flights_parquet.select(
    "DEST_COUNTRY_NAME",
    explode(split("DEST_COUNTRY_NAME", " ")).alias("word")
).show()

Jeśli **DEST_COUNTRY_NAME** to "United States", to powstaną 2 wiersze: "United" i "States"

In [None]:
from pyspark.sql import Row

data = [Row(imie="Tomek", hobby=["sport", "gaming"]),
        Row(imie="Ola", hobby=["podróże", "taniec"])]

df = spark.createDataFrame(data)

df.select("imie", explode("hobby").alias("hobby")).show()

### funkcje daty/czasu w PySpark

| Funkcja                             | Opis                                          |
| ----------------------------------- | --------------------------------------------- |
| `current_date()`                    | Zwraca bieżącą datę                           |
| `current_timestamp()`               | Zwraca bieżący znacznik czasu                 |
| `to_date()`                         | Konwertuje string lub timestamp na `DateType` |
| `to_timestamp()`                    | Konwertuje string na `TimestampType`          |
| `date_format()`                     | Zwraca sformatowaną datę jako string          |
| `datediff(date1, date2)`            | Liczba dni między datami (date1 - date2)      |
| `months_between()`                  | Różnica miesięcy między datami                |
| `year()`, `month()`, `dayofmonth()` | Ekstrakcja części daty                        |
| `hour()`, `minute()`, `second()`    | Ekstrakcja części czasu                       |
| `add_months(date, n)`               | Dodaje n miesięcy do daty                     |
| `date_add(date, days)`              | Dodaje dni do daty                            |
| `date_sub(date, days)`              | Odejmuje dni od daty                          |
| `next_day(date, 'Mon')`             | Najbliższy podany dzień tygodnia              |
| `trunc(date, 'MM')`                 | Ucina datę do początku miesiąca               |


In [None]:
from pyspark.sql.functions import current_date, current_timestamp, to_date, year, month, weekofyear,dayofyear, dayofweek,hour, minute, second,to_timestamp,lit

df = spark.range(1).select(
    current_date().alias("dzisiaj"),
    current_timestamp().alias("teraz")
)

(df.withColumn("year", year("dzisiaj"))
.withColumn("month", month("dzisiaj"))
.withColumn("weekofyear", weekofyear("dzisiaj"))
.withColumn("dayofyear", dayofyear("dzisiaj"))
.withColumn("weekday", dayofweek("dzisiaj"))
.withColumn("hour", hour("teraz"))
.withColumn("minute", minute("teraz"))
.withColumn("second", minute("teraz"))).show(truncate=False)

In [None]:
df_datetime = spark.range(1).select(
    to_date(lit('20250601'), 'yyyyMMdd').alias('to_date'),
    to_timestamp(lit('20250601 1000'), 'yyyyMMdd HHmm').alias('to_timestamp'))
df_datetime.show()

In [None]:
from pyspark.sql.functions import date_add, date_sub

(df_datetime.withColumn("date_add_date", date_add("to_date", 10)).withColumn("date_add_time", date_add("to_timestamp", 10)).withColumn("date_sub_date", date_sub("to_date", 10)).withColumn("date_sub_time", date_sub("to_timestamp", 10)).show())

**date_format()**
- data_lub_timestamp – kolumna typu DateType lub TimestampType

- "format" – ciąg formatujący (np. "yyyy-MM-dd", "MM/yyyy", "EEEE")

| Symbol | Znaczenie            | Przykład |
| ------ | -------------------- | -------- |
| `yyyy` | rok                  | 2025     |
| `MM`   | miesiąc (01-12)      | 05       |
| `MMM`  | skrót miesiąca       | May      |
| `MMMM` | pełna nazwa miesiąca | May      |
| `dd`   | dzień miesiąca       | 26       |
| `EEEE` | dzień tygodnia       | Monday   |
| `HH`   | godzina (24h)        | 13       |
| `mm`   | minuty               | 09       |
| `ss`   | sekundy              | 47       |


In [None]:
from pyspark.sql.functions import date_format

In [None]:
df = spark.createDataFrame([("2025-05-26",)], ["date_str"])
df = df.withColumn("date", to_date("date_str"))

df.withColumn("formatted", date_format("date", "MMMM yyyy")).show()

### Zadanie

> Utwórz nową kolumnę z nazwa dnia tygodnia dla dzisiejszej daty

In [None]:
df = spark.range(1).select(current_date().alias("today"))
df.show()

**Przykładowy wynik**

```
+----------+---------+
|     today|  weekday|
+----------+---------+
|2025-05-28|Wednesday|
+----------+---------+
```

| Funkcja        | Typ  | Działa na       | Obcina do                                          | Przykład                                            |
| -------------- | ---- | --------------- | -------------------------------------------------- | --------------------------------------------------- |
| `trunc()`      | Data | `DateType`      | dzień / miesiąc / rok                              | `trunc(date_col, 'MM')` → pierwszy dzień miesiąca   |
| `date_trunc()` | Czas | `TimestampType` | sekunda, minuta, godzina, dzień, miesiąc, rok itd. | `date_trunc('hour', timestamp_col)` → pełna godzina |


`trunc()` działa tylko na daty (DateType), nie timestampy!

`date_trunc()` wymaga timestamp (albo to_timestamp() przed użyciem).

trunc(date, format)
- "MM" – początek miesiąca

- "YYYY" lub "YY" – początek roku

date_trunc(unit, timestamp)
- "second", "minute", "hour", "day", "month", "year", "week" itd.

In [None]:
df = spark.createDataFrame([("2025-05-26",)], ["date_str"])
df = df.withColumn("date", to_date("date_str"))

# Obetnij do pierwszego dnia miesiąca
df.withColumn("month_start", trunc("date", "MM")).withColumn("year_start", trunc("date", "YYYY")).show()

In [None]:
from pyspark.sql.functions import current_timestamp, date_trunc

df = spark.range(1).select(current_timestamp().alias("now"))

# Obetnij do pełnej godziny
df.withColumn("rounded_week", date_trunc("week", "now")).withColumn("rounded_hour", date_trunc("hour", "now")).withColumn("rounded_min", date_trunc("minute", "now")).show(truncate=False)

### drop()  – Usuwanie kolumn

In [None]:
df_flights_parquet.drop("DEST_COUNTRY_NAME").show()

### dropna() - Usuwanie wierszy z null

Jeśli chcesz usunąć wiersze, które zawierają null

- `df.dropna(how="any")` - domyślnie: usuń wiersz, jeśli którakolwiek kolumna ma null
- `df.dropna(how="all")` - usuń wiersz, jeśli wszystkie kolumny mają null
- `df.dropna(subset=["col1"])` - tylko jeśli null w konkretnych kolumnach


### fillna() / fill() — Wypełnianie null


```
df.fillna(value)
df.fillna(value, subset=["col1", "col2"])
```

Można też podać słownik z wartościami dla konkretnych kolumn:


```
df.fillna({"col1": "brak danych", "col2": 0})
```

```
df.fill(...)      # skrót
df.fillna(...)    # pełna nazwa```


In [None]:
data = [("Marcin Prokop", None), (None, 27),("Agnieszka Chylińska",None)]

columns = ["name", "age"]

df = spark.createDataFrame(data, schema=columns)
df.show()

In [None]:
df.dropna(subset=["age"]).show()

In [None]:
df.printSchema()

In [None]:
df.fillna(6, subset = ["age"]).show()

In [None]:
df.fillna({"name": "brak danych", "age": 0}).show()

### Złącza - jonis

W PySpark funkcja join() służy do łączenia dwóch DataFrame'ów — dokładnie tak jak JOIN w SQL. Jest to jedna z najważniejszych operacji w przetwarzaniu danych.

| Typ           | Opis                                  |
| ------------- | ------------------------------------- |
| `"inner"`     | tylko wspólne wiersze                 |
| `"left"`      | wszystkie z `df1` + dopas. z `df2`    |
| `"right"`     | wszystkie z `df2` + dopas. z `df1`    |
| `"outer"`     | pełne złączenie (oba df-y)            |
| `"left_semi"` | jak filtr – tylko `df1`, jeśli pasuje |
| `"left_anti"` | tylko `df1`, jeśli **nie** pasuje     |


**Składnia:**
```
df1.join(df2, on=warunek_lączenia, how="typ_join")
```

In [None]:
airportsna = (spark.read
        .format("csv")
        .option("header", "true")
        .option("inferSchema", "true")
        .option("sep", "\t")
        .load("data/flights/airport-codes-na.txt"))

In [None]:
departureDelays =(spark.read
    .format("csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .load("data/flights/departuredelays.csv"))


In [None]:
airportsna.show()

In [None]:
departureDelays.show()

In [None]:
df = airportsna.join(departureDelays,on=airportsna["IATA"] == departureDelays["origin"], how = 'inner')

In [None]:
df.show()

`left_semi` 
- Zwraca tylko te wiersze z lewego DataFrame’a, dla których istnieje dopasowanie w prawym.

- Nie zwraca kolumn z prawego DataFrame’a.

- Działa jak filtr .filter() na podstawie istnienia wartości.

In [None]:
users = spark.createDataFrame([
    ("Alice", "admin"),
    ("Bob", "user"),
    ("Charlie", "guest"),
    ("Diana", "user")
], ["name", "role"])

# Tabela uprawnień – dozwolone role
# Pokaz przyklad  z duplikatami
permissions = spark.createDataFrame([
    ("admin",),
    ("user",)
], ["role"])

result = users.join(permissions, on="role", how="left_semi")
result.show()

`left_semi` działa jak:
```
SELECT * FROM users WHERE role IN (SELECT role FROM permissions)
```

| JOIN        | Zwraca kolumny z obu?   | Filtrowanie? | Szybkość                  |
| ----------- | ----------------------- | ------------ | ------------------------- |
| `inner`     | Tak                     | Nie          | wolniejszy                |
| `left_semi` | Nie (tylko lewa tabela) | Tak          | szybszy (brak duplikacji) |


Kiedy `left_semi` 

- Chcesz sprawdzić istnienie (czy coś pasuje)

- Potrzebujesz tylko lewe dane

- Robisz filtrowanie na podstawie innej tabeli

`left_anti`

Zwraca tylko te wiersze z lewej tabeli, które nie mają dopasowania w prawej tabeli.

In [None]:
no_permission = users.join(permissions, on="role", how="left_anti")
no_permission.show()

`Charlie` to jedyny użytkownik, którego rola "guest" nie znajduje się w permissions, więc został zwrócony.

| JOIN typ    | Zwraca jeśli dopasowanie? | Dane z prawej? | Przykład                   |
| ----------- | ------------------------- | -------------- | -------------------------- |
| `left_semi` | ✅ Tak                     | ❌ Nie          | „Kto ma uprawnienia”       |
| `left_anti` | ❌ Nie                     | ❌ Nie          | „Kto **nie** ma uprawnień” |


#### Zadanie

Wykonaj operację złączenia inner left right outer na poniższym dataframe

In [None]:
df1 = spark.createDataFrame([
    (1, ),
    (1, ),
    (1, ),
    (2, ),
    (3, ),
    (3, ),
    (3, ),
], ["nr"])

df2 = spark.createDataFrame([
    (1, ),
    (1, ),
    (2, ),
    (2, ),
    (4, ),
    (None, ),
], ["nr"])

In [None]:
# -- inner; 3x2 -> 6 jedynek


In [None]:
# -- left


In [None]:
# -- right


In [None]:
# -- left_semi


In [None]:
# -- left_semi


### broadcast join

- To technika, w której mała tabela (np. lookup, słownik, kody) jest rozsyłana (broadcastowana) do wszystkich węzłów klastra.

- Spark może wtedy uniknąć shuffle (przemieszczania danych między węzłami) – działa szybciej.

**Kiedy używać**:

- Gdy jedna z tabel jest mała (np. < 10 MB).

- Gdy masz join między:

    - Dużym DataFrame (df_large)

    - Małym DataFrame (df_small) z kluczami

In [None]:
# Duża tabela – np. dane transakcyjne
df_large = spark.createDataFrame([
    (1, "A"),
    (2, "B"),
    (3, "C"),
    (4, "D"),
], ["id", "value"])

# Mała tabela – słownik opisów
df_small = spark.createDataFrame([
    (1, "Jan"),
    (2, "Anna"),
], ["id", "name"])

# Broadcast join
from pyspark.sql.functions import broadcast

joined = df_large.join(broadcast(df_small), on="id", how="left")
joined.show()

Zamiast przemieszczać dane obu stron **(shuffle join)**, Spark rozsyła małą tabelę do każdego węzła, więc duża tabela może być przetwarzana lokalnie.

- Spark sam może wybrać broadcast join, jeśli spełnione są warunki (mały rozmiar, **spark.sql.autoBroadcastJoinThreshold**).

- Ale możesz wymusić **broadcast()** funkcją, jak powyżej.

- Możesz zobaczyć plan zapytania **.explain()** – tam pojawi się **BroadcastHashJoin**.

In [None]:
joined.explain()

### Union intersect subtract

`union`

- Łączy dwa DataFrame’y dokładając wiersze jeden pod drugi.

- Zwraca wszystkie wiersze z obu DataFrame’ów.

- Struktury obu DataFrame’ów muszą być zgodne (takie same kolumny i typy).

In [None]:
df1 = spark.createDataFrame([
    (1, "A"),
    (2, "B")
], ["id", "value"])

df2 = spark.createDataFrame([
    (2, "B"),
    (3, "C")
], ["id", "value"])

union_df = df1.union(df2)
union_df.show()

**Uwaga**: union nie usuwa duplikatów. Jeśli chcesz je usunąć, użyj union_df.distinct().

In [None]:
union_df.distinct().show()

`intersect`

- Zwraca przecięcie dwóch DataFrame’ów — wiersze, które występują w obu.

- Struktura też musi być zgodna.

In [None]:
intersect_df = df1.intersect(df2)
intersect_df.show()

`subtract`

- Zwraca wiersze z pierwszego DataFrame, które nie występują w drugim.

- Usuwa duplikaty (wynik unikalny).

In [None]:
subtract_df = df1.subtract(df2)
subtract_df.show()

| Metoda      | Co zwraca                       | Duplikaty?      |
| ----------- | ------------------------------- | --------------- |
| `union`     | Wszystkie wiersze z obu tabel   | Nie (zachowuje) |
| `intersect` | Wspólne wiersze (unikalne)      | Usuwa           |
| `subtract`  | Wiersze z pierwszej bez drugiej | Usuwa           |


### distinct() vs dropDuplicates()

`distinct()`

- Usuwa wszystkie duplikaty z DataFrame — zwraca unikalne wiersze.

- Dotyczy wszystkich kolumn.

- Nie pozwala na wskazanie konkretnych kolumn.

In [None]:
df = spark.createDataFrame([
    (1, "A"),
    (1, "A"),
    (2, "B"),
    (2, "C")
], ["id", "val"])

df.distinct().show()

`dropDuplicates()`

- Usuwa duplikaty na podstawie wskazanych kolumn (jeśli podasz listę kolumn).

- Jeśli nie podasz kolumn, działa jak distinct().

- Pozwala zachować unikalność wg określonych kolumn, ignorując pozostałe.

In [None]:
df.dropDuplicates().show()

In [None]:
df.dropDuplicates(["id"]).show()

| Metoda             | Co robi?                             | Możliwość wskazania kolumn? |
| ------------------ | ------------------------------------ | --------------------------- |
| `distinct()`       | Usuwa duplikaty wg wszystkich kolumn | Nie                         |
| `dropDuplicates()` | Usuwa duplikaty wg podanych kolumn   | Tak                         |


### orderBy()

Metoda orderBy() służy do sortowania wierszy w DataFrame według jednej lub wielu kolumn — rosnąco (domyślnie) lub malejąco.

```
df.orderBy("col1")                          # Rosnąco (domyślnie)
df.orderBy(df.col1.asc())                   # Jawnie rosnąco
df.orderBy(df.col1.desc())                  # Malejąco
df.orderBy("col1", "col2")                  # Po kilku kolumnach

```

In [None]:
df_flights_parquet.orderBy("count", ascending=False).show()

In [None]:
df_flights_parquet.orderBy(col("count").asc()).show()

### groupBy

In [None]:
(df_flights_parquet.groupBy("ORIGIN_COUNTRY_NAME")
  .sum("count")
  .withColumnRenamed("sum(count)", "total_flights")
  .orderBy("total_flights", ascending=False)
  .show())

In [None]:
(df_flights_parquet.groupBy("ORIGIN_COUNTRY_NAME")
  .agg({"DEST_COUNTRY_NAME": "count"})
  .withColumnRenamed("count(DEST_COUNTRY_NAME)", "num_destinations")
  .orderBy("num_destinations", ascending=False)
  .show())

In [None]:
from pyspark.sql.functions import avg

(df_flights_parquet.groupBy("ORIGIN_COUNTRY_NAME")
  .agg(avg("count").alias("avg_flights"))
  .orderBy("avg_flights", ascending=False)
  .show())

In [None]:
import pyspark.sql.functions as f

(df_flights_parquet.groupBy("ORIGIN_COUNTRY_NAME")
  .agg(
    f.sum("count").alias("total_flights"),
    f.avg("count").alias("avg_flights"),
    f.count("DEST_COUNTRY_NAME").alias("num_destinations")
  )
  .orderBy(f.desc("total_flights"))
  .show())

#### Zadanie 

Z którego kraju jest najwięcej wylotów ?

**Przykładowy wynik:**

```
+--------------------+----------------+
| ORIGIN_COUNTRY_NAME|num_destinations|
+--------------------+----------------+
|       United States|             125|
|              Russia|               1|
|            Anguilla|               1|
|             Senegal|               1|
|              Sweden|               1|
|            Kiribati|               1|
|              Guyana|               1|
|         Philippines|               1|
|           Singapore|               1|
|            Malaysia|               1|
|                Fiji|               1|
|              Turkey|               1|
|             Germany|               1|
|         Afghanistan|               1|
|              Jordan|               1|
|               Palau|               1|
|Turks and Caicos ...|               1|
|              France|               1|
|              Greece|               1|
|British Virgin Is...|               1|
+--------------------+----------------+
```

### describe()

- Zwraca podstawowe statystyki opisowe dla wybranych kolumn liczbowych (i tekstowych) DataFrame.

- Statystyki obejmują:

    - count — liczba niepustych (non-null) wartości

    - mean — średnia arytmetyczna
    
    - stddev — odchylenie standardowe
    
    - min — minimalna wartość (dla tekstu, alfabetycznie)
    
    - max — maksymalna wartość (dla tekstu, alfabetycznie)
 
```
df.describe()           # dla wszystkich kolumn
df.describe("col1", "col2")  # dla wybranych kolumn
```

In [None]:
df_flights_parquet.describe("count").show()

### CASE WHEN

- Pozwala tworzyć nową kolumnę na podstawie warunków logicznych.

- W Spark SQL używamy do tego funkcji when z pyspark.sql.functions.

In [None]:
from pyspark.sql.functions import when, col
df_flights_parquet.withColumn(
    "traffic_category",
    when(col("count") > 100, "Wysokie obciążenie")
    .when(col("count") > 50, "Średnie obciążenie")
    .otherwise("Niskie obciążenie")
).show()

### Funkcje oknowe

Funkcje oknowe (ang. window functions) w PySpark pozwalają wykonywać operacje analityczne na grupach wierszy, z zachowaniem pełnego kontekstu danych (nie agregują danych do jednej wartości).


- Działają w kontekście "okna" danych (czyli zdefiniowanego zbioru wierszy wokół aktualnego wiersza).

- Nie redukują liczby wierszy (jak groupBy), tylko dodają kolumny z wynikami analizy.

- Przykłady użycia:

    - Rangi (row_number, rank)

    - Agregaty (sum, avg, max, min)

    - Operacje na sąsiednich wierszach (lead, lag)

In [None]:
from pyspark.sql.window import Window
df = spark.createDataFrame([
    ("A", 100, "2025-01-01"),
    ("A", 100, "2025-01-01"),
    ("A", 100, "2025-01-01"),
    ("A", 200, "2025-01-02"),
    ("A",  50, "2025-01-03"),
    ("B",  30, "2025-01-01"),
    ("B",  70, "2025-01-02"),
    ("C",  10, "2025-01-01")
], ["group", "value", "date"])

# Konwersja na typ DateType
df = df.withColumn("date", f.to_date("date", "yyyy-MM-dd"))

df.show()

In [None]:
# Definicja okna – partycjonowanie po kraju
window_spec = Window.partitionBy("group")

df.withColumn("sum_in_group", f.sum("value").over(window_spec)).show()

In [None]:
window_spec = Window.partitionBy("group").orderBy("date")
df_ranked = df \
    .withColumn("row_number", f.row_number().over(window_spec)) \
    .withColumn("rank", f.rank().over(window_spec)) \
    .withColumn("dense_rank", f.dense_rank().over(window_spec))

df_ranked.orderBy("group", "date").show()

| Funkcja        | Opis                                                                            |
| -------------- | ------------------------------------------------------------------------------- |
| `row_number()` | Liczy unikalnie wiersze w kolejności. Bez powtórzeń.                            |
| `rank()`       | Przy tych samych wartościach – przypisuje tę samą rangę, ale przeskakuje numer. |
| `dense_rank()` | Jak `rank()`, ale bez przeskoków w numeracji.                                   |


In [None]:
window_spec = Window.partitionBy("group").orderBy("date")
df_lag_lead = df \
    .withColumn("prev_value", f.lag("value", 1).over(window_spec)) \
    .withColumn("next_value", f.lead("value", 1).over(window_spec))

df_lag_lead.orderBy("group", "date").show()

| Funkcja        | Co robi                                            |
| -------------- | -------------------------------------------------- |
| `lag(col, 1)`  | Zwraca wartość z **poprzedniego wiersza** w grupie |
| `lead(col, 1)` | Zwraca wartość z **następnego wiersza** w grupie   |


In [None]:
# moving average (średnia krocząca) i moving sum (suma krocząca)

window_spec = (
    Window.partitionBy("group").orderBy("date")
    .rowsBetween(Window.unboundedPreceding, 0) 
)
df_moving = df \
    .withColumn("moving_sum", f.sum("value").over(window_spec)) \
    .withColumn("moving_avg", f.avg("value").over(window_spec))

df_moving.orderBy("group", "date").show()

- Tylko poprzedni wiersz: .rowsBetween(-1, -1)

- Poprzedni + bieżący + następny: .rowsBetween(-1, 1)

- Od początku grupy do teraz: .rowsBetween(Window.unboundedPreceding, 0)

#### Zadanie

**Analiza sprzedaży**
Masz dane dziennej sprzedaży produktów:

In [None]:
df = spark.createDataFrame([
    ("prodA", "2023-01-01", 100),
    ("prodA", "2023-01-02", 120),
    ("prodA", "2023-01-03", 90),
    ("prodA", "2023-01-04", 140),
    ("prodB", "2023-01-01", 200),
    ("prodB", "2023-01-02", 180),
    ("prodB", "2023-01-03", 190),
    ("prodB", "2023-01-04", 210),
], ["product", "date", "sales"])

##### 1. Dodaj kolumnę row_number wg daty (dla każdego produktu).

```
+-------+----------+-----+----------+
|product|      date|sales|row_number|
+-------+----------+-----+----------+
|  prodA|2023-01-01|  100|         1|
|  prodA|2023-01-02|  120|         2|
|  prodA|2023-01-03|   90|         3|
|  prodA|2023-01-04|  140|         4|
|  prodB|2023-01-01|  200|         1|
|  prodB|2023-01-02|  180|         2|
|  prodB|2023-01-03|  190|         3|
|  prodB|2023-01-04|  210|         4|
+-------+----------+-----+
```----------+

##### 2. Dodaj kolumnę prev_sales z poprzednim dniem (funkcja lag).

```

+-------+----------+-----+----------+
|product|      date|sales|prev_sales|
+-------+----------+-----+----------+
|  prodA|2023-01-01|  100|      NULL|
|  prodA|2023-01-02|  120|       100|
|  prodA|2023-01-03|   90|       120|
|  prodA|2023-01-04|  140|        90|
|  prodB|2023-01-01|  200|      NULL|
|  prodB|2023-01-02|  180|       200|
|  prodB|2023-01-03|  190|       180|
|  prodB|2023-01-04|  210|       190|
+-------+----------+-----+----------+


```

##### 3. Dodaj kolumnę next_sales z następnym dniem (lead).

```

+-------+----------+-----+----------+
|product|      date|sales|next_sales|
+-------+----------+-----+----------+
|  prodA|2023-01-01|  100|       120|
|  prodA|2023-01-02|  120|        90|
|  prodA|2023-01-03|   90|       140|
|  prodA|2023-01-04|  140|      NULL|
|  prodB|2023-01-01|  200|       180|
|  prodB|2023-01-02|  180|       190|
|  prodB|2023-01-03|  190|       210|
|  prodB|2023-01-04|  210|      NULL|
+-------+----------+-----+----------+

```

##### 4. Dodaj kolumnę moving_avg_3 – średnia z 3 dni: bieżący i 2 poprzednie.

```

+-------+----------+-----+------------------+
|product|      date|sales|      moving_avg_3|
+-------+----------+-----+------------------+
|  prodA|2023-01-01|  100|             100.0|
|  prodA|2023-01-02|  120|             110.0|
|  prodA|2023-01-03|   90|103.33333333333333|
|  prodA|2023-01-04|  140|             112.5|
|  prodB|2023-01-01|  200|             200.0|
|  prodB|2023-01-02|  180|             190.0|
|  prodB|2023-01-03|  190|             190.0|
|  prodB|2023-01-04|  210|             195.0|
+-------+----------+-----+------------------+

```

##### 5. Dodaj kolumnę diff_from_prev = różnica sales - prev_sales.

```
+-------+----------+-----+--------------+
|product|      date|sales|diff_from_prev|
+-------+----------+-----+--------------+
|  prodA|2023-01-01|  100|          NULL|
|  prodA|2023-01-02|  120|            20|
|  prodA|2023-01-03|   90|           -30|
|  prodA|2023-01-04|  140|            50|
|  prodB|2023-01-01|  200|          NULL|
|  prodB|2023-01-02|  180|           -20|
|  prodB|2023-01-03|  190|            10|
|  prodB|2023-01-04|  210|            20|
+-------+----------+-----+--------------+

```




##### 6. Wyznacz datę z największą sprzedażą per produkt (użyj rank()).

```
+-------+----------+-----+---------+
|product|      date|sales|rank_desc|
+-------+----------+-----+---------+
|  prodA|2023-01-04|  140|        1|
|  prodA|2023-01-02|  120|        2|
|  prodA|2023-01-01|  100|        3|
|  prodA|2023-01-03|   90|        4|
|  prodB|2023-01-04|  210|        1|
|  prodB|2023-01-01|  200|        2|
|  prodB|2023-01-03|  190|        3|
|  prodB|2023-01-02|  180|        4|
+-------+----------+-----+---------+

```

### Zapisywanie DataFrame w PySpark

`csv`

Przydatne opcje:
- `header=True` — dodaje nagłówki kolumn do pliku CSV

- `mode` — tryb zapisu:

    - `"overwrite"` — nadpisuje pliki w katalogu

    - `"append"` — dopisuje do istniejących plików

    - `"ignore"` — ignoruje zapis, jeśli katalog istnieje

    - `"error"` lub "errorifexists" — domyślny, wyrzuca błąd jeśli katalog istnieje

- `sep=","` — separator kolumn (domyślnie przecinek, można zmienić np. na tabulator \t)

- `quote` — znak cudzysłowu (np. ")

- `escape` — znak ucieczki (np. \)

In [None]:
df.show()

In [None]:
(df.write
  .option("header", "true")
  .option("sep", ";")
  .mode("overwrite")
  .csv("output/csv_folder/"))

`parquet`

Parquet to kolumnowy, skompresowany format, wydajny i popularny w big data.


- `mode` — analogicznie jak w CSV (`overwrite`, `append` itd.)

- `compression` — typ kompresji (`snappy` — domyślna, `gzip`, `none`, `brotli`, `lz4` itd.)

In [None]:
df.write \
  .mode("overwrite") \
  .option("compression", "gzip") \
  .parquet("output/parquet_folder")

In [None]:
df.rdd.getNumPartitions()

#### Zapis do pojedynczego pliku:

Domyślnie Spark zapisuje wiele plików (po jednym na partycję). Aby wymusić pojedynczy plik:

In [None]:
(df.coalesce(1)
     .write
    .option("header", "true")
    .option("sep", ";")
    .mode("overwrite")
    .csv("output/csv_folder/"))

In [None]:
(df.coalesce(1)
    .write
    .mode("overwrite")
    .option("compression", "gzip")
    .parquet("output/parquet_folder"))

### UDF vs Pandas UDF

W PySpark istnieją dwa główne podejścia do definiowania funkcji użytkownika: tradycyjne funkcje użytkownika (UDF) oraz funkcje użytkownika oparte na Pandas (pandas UDF). Poniżej przedstawiam porównanie obu metod:



- **Definicja**: Tworzone za pomocą dekoratora @pandas_udf, operują na kolumnach jako obiektach pandas.Series.

- **Wydajność**: Bardziej wydajne dzięki wykorzystaniu wektorowych operacji Pandas i Apache Arrow do szybkiego transferu danych między JVM a Pythonem.

- **Zastosowanie**: Idealne do operacji, które można wyrazić jako operacje wektorowe na danych.

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

In [None]:
data = [(1,), (2,), (3,)]
df = spark.createDataFrame(data, ["value"])

In [None]:
# Zdefiniowanie funkcji cubed()
@udf(IntegerType())
def cubed(s):
    return s*s*s

In [None]:
df_udf = df.withColumn("cubed_value", cubed(df["value"]))
df_udf.show()

In [None]:
from pyspark.sql.functions import pandas_udf
import pandas as pd

@pandas_udf("int")
def cubed_pandas(a: pd.Series) -> pd.Series:
    return a**3


df_pandas_udf = df.withColumn("cubed_value", cubed_pandas(df["value"]))
df_pandas_udf.show()

| Cecha                       | Tradycyjne UDF        | pandas UDF                 |                                                                     |
| --------------------------- | --------------------- | -------------------------- | ------------------------------------------------------------------- |
| **Typ operacji**            | Wiersz po wierszu     | Wektorowe (kolumnowe)      |                                                                     |
| **Wydajność**               | Niższa                | Wyższa dzięki wektoryzacji |                                                                     |
| **Wsparcie dla Arrow**      | Nie                   | Tak                        |                                                                     |
| **Złożoność implementacji** | Prosta                | Wymaga znajomości Pandas   |                                                                     |
| **Zastosowanie**            | Niestandardowa logika | Operacje wektorowe         | ([Dokumentacja Databricks][1], [en.wikipedia.org][2], [Brainly][3]) |

[1]: https://docs.databricks.com/aws/en/udf/pandas?utm_source=chatgpt.com "A pandas user-defined function (UDF) - Databricks Documentation"
[2]: https://en.wikipedia.org/wiki/User-defined_function?utm_source=chatgpt.com "User-defined function"
[3]: https://brainly.com/question/47363257?utm_source=chatgpt.com "What is the difference between a built-in function and a user-defined ..."
