In [1]:
import os


os.environ['SPARK_NAME'] = "/opt/spark"
os.environ['PYSPARK_DRIVER_PYTHON_OPTS'] = 'lab'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/opt/spark/work-dir/venv/bin/python3'
os.environ['PYSPARK_PYTHON'] = '/opt/spark/work-dir/venv/bin/python3'

In [2]:
from pyspark.sql import SparkSession


spark = SparkSession\
        .builder\
        .master("local[2]")\
        .appName("Create-DataFrame")\
        .config("spark.memory.offHeap.enabled","true")\
        .config("spark.memory.offHeap.size","4g")\
        .getOrCreate()
spark.sparkContext

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/01 15:26:32 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


**Zadanie 1**  
Pamiętacie plik zamówienia.txt ?
Plik został umieszczony w folderze z labem w repozytorium.

Wczytaj ten plik za pomocą Sparka do dowolnego typu danych (RDD, Spark DataFrame) i dokonaj transformacji tak aby:
* naprawić problemy z kodowaniem znaków (replace?) w kolumnie Sprzedawca
* poprawić format danych w kolumnie Utarg
* dodać odpowiednie typy danych
* kolumna idZamowienia powinna być traktowana jako klucz (indeks)

In [3]:
sc = spark.sparkContext
df = spark.read.csv('./data/zamowienia.txt', header=True, sep=';', encoding='UTF-8')
df.printSchema()

                                                                                

root
 |-- Kraj: string (nullable = true)
 |-- Sprzedawca: string (nullable = true)
 |-- Data zamowienia: string (nullable = true)
 |-- idZamowienia: string (nullable = true)
 |-- Utarg: string (nullable = true)



In [4]:
from pyspark.sql.functions import col, regexp_replace, to_date
from pyspark.sql.types import IntegerType, DoubleType

df = df.withColumn("Sprzedawca", regexp_replace(col("Sprzedawca"), "[^\\w\\s]", ""))

df = df.withColumn("Utarg", regexp_replace(col("Utarg"), "[^0-9,]", ""))
df = df.withColumn("Utarg", regexp_replace(col("Utarg"), ",", "."))

df = df.withColumn("idZamowienia", col("idZamowienia").cast(IntegerType()))
df = df.withColumn("Data zamowienia", to_date(col("Data zamowienia"), "dd.MM.yyyy"))
df = df.withColumn("Utarg", col("Utarg").cast(DoubleType()))

In [5]:
df.createOrReplaceTempView("ORDERS_DATA")
spark.catalog.listTables()

[Table(name='ORDERS_DATA', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]

In [6]:
df.printSchema()

root
 |-- Kraj: string (nullable = true)
 |-- Sprzedawca: string (nullable = true)
 |-- Data zamowienia: date (nullable = true)
 |-- idZamowienia: integer (nullable = true)
 |-- Utarg: double (nullable = true)



In [7]:
spark.sql("Select * from ORDERS_DATA limit 10").show()

+------+----------+---------------+------------+------+
|  Kraj|Sprzedawca|Data zamowienia|idZamowienia| Utarg|
+------+----------+---------------+------------+------+
|Polska|  Kowalski|     2003-07-16|       10248| 440.0|
|Polska|   Sowiski|     2003-07-10|       10249|1863.4|
|Niemcy|   Peacock|     2003-07-12|       10250|1552.6|
|Niemcy| Leverling|     2003-07-15|       10251|654.06|
|Niemcy|   Peacock|     2003-07-11|       10252|3597.9|
|Niemcy| Leverling|     2003-07-16|       10253|1444.8|
|Polska|  Kowalski|     2003-07-23|       10254|556.62|
|Polska|     Dudek|     2003-07-15|       10255|2490.5|
|Niemcy| Leverling|     2003-07-17|       10256| 517.8|
|Niemcy|   Peacock|     2003-07-22|       10257|1119.9|
+------+----------+---------------+------------+------+



**Zadanie 2**  
Po wykonaniu zadania 1, wykorzystaj przykłady z laboratorium i:
* 2.1 wykonaj wiaderkowanie danych i wykonaj dowolne zapytanie agregujące na tych danych vs. dane nie podzielone na wiaderka - porównaj czas

In [8]:
from pyspark.ml.feature import Bucketizer
from pyspark.sql.functions import avg

utarg_splits = [0, 500, 1000, 2000, 5000, float("inf")]

bucketizer = Bucketizer(
    splits=utarg_splits,
    inputCol="Utarg",
    outputCol="Utarg_Bucket"
)

bucketed_df = bucketizer.transform(df)

In [9]:
%%time
result_bucketed = bucketed_df.groupBy("Utarg_Bucket").agg(avg("Utarg").alias("Average_Utarg"))
result_bucketed.show()

[Stage 4:>                                                          (0 + 1) / 1]

+------------+------------------+
|Utarg_Bucket|     Average_Utarg|
+------------+------------------+
|         0.0| 266.4040454545455|
|         1.0| 730.6321808510637|
|         4.0| 8745.641999999996|
|         3.0| 3032.618066666667|
|         2.0|1493.3539810426544|
+------------+------------------+

CPU times: user 13.6 ms, sys: 3.68 ms, total: 17.3 ms
Wall time: 2.16 s


                                                                                

In [10]:
%%time
result_original = df.groupBy().agg(avg("Utarg").alias("Average_Utarg"))
result_original.show()

+------------------+
|     Average_Utarg|
+------------------+
|1537.3309136420523|
+------------------+

CPU times: user 0 ns, sys: 13.8 ms, total: 13.8 ms
Wall time: 744 ms


* 2.2 wykonaj partycjonowanie danych i zapisz je w formcie csv (wypróbuj partycjonowanie wg. kraju, nazwiska)

In [11]:
partitioned_path = "data/partitioned_zamowienia"
df.write.partitionBy("Kraj", "Sprzedawca").mode("overwrite").csv(partitioned_path, header=True)

                                                                                

* 2.3 wykonaj zapytanie agregujące z filtrowaniem po kolumnie, której użyłeś/-aś do partycjonowania na danych oryginalnych oraz partycjonowanych i porównaj czas wykonania

In [12]:
%%time
result_original = df.filter(col("Kraj") == "Polska").groupBy("Kraj").agg(avg("Utarg").alias("Average_Utarg"))
result_original.show()

+------+-----------------+
|  Kraj|    Average_Utarg|
+------+-----------------+
|Polska|1550.376325581395|
+------+-----------------+

CPU times: user 6.36 ms, sys: 9.75 ms, total: 16.1 ms
Wall time: 889 ms


In [13]:
%%time
partitioned_df = spark.read.csv(partitioned_path, header=True, inferSchema=True)
result_partitioned = partitioned_df.filter(col("Kraj") == "Polska").groupBy("Kraj").agg(avg("Utarg").alias("Average_Utarg"))
result_partitioned.show()

                                                                                

+------+------------------+
|  Kraj|     Average_Utarg|
+------+------------------+
|Polska|1550.3763255813956|
+------+------------------+

CPU times: user 5.86 ms, sys: 12.7 ms, total: 18.5 ms
Wall time: 3.85 s


**Zadanie 3**  
Z danych wygeneruj 4 różne podzbiory próbek (wiersze wybrane losowo) i dodaj nową kolumnę w każdym z nich, np. w jednym stwórz kolumnę month wyciągając tylko miesiąc z daty, w drugim wartość netto zamówienia (przyjmując, że vat to 23%), w kolejnym zamień nazwisko na wielkie litery, w kolejnym dodaj kolumnę waluta z wartością PLN.

Następnie zapisz każdy z tych zbiorów tak, że:
* zbiór pierwszy to będzie tymczasowa tabela in-memory Sparka
* zbiór drugi to plik(i) parquet
* zbiór trzeci to plik(i) csv
* zbiór czwarty to plik(i) json

Wykonaj zapytanie złączające jak w przykładzie pobierając dane bezpośrednio z plików i wyświetl idZamowienia, Kraj, Sprzedawcę, Datę, Utarg oraz 4 nowo utworzone kolumny.

In [14]:
splits = df.sample(fraction=0.5, seed=19)

In [15]:
from pyspark.sql.functions import month, upper, lit, round

subset1 = splits.withColumn("Month", month(col("Data zamowienia")))
subset2 = splits.withColumn("Netto", round(col("Utarg") / 1.23, 2))
subset3 = splits.withColumn("SprzedawcaUpper", upper(col("Sprzedawca")))
subset4 = splits.withColumn("Waluta", lit("PLN"))

In [16]:
subset1.createOrReplaceTempView("ZAMOWIENIA_SUBSET_1")
subset2.write.parquet("data/zamowienia_subset_2.parquet", mode="overwrite")
subset3.write.csv("data/zamowienia_subset_3.csv", header=True, mode="overwrite")
subset4.write.json("data/zamowienia_subset_4.json", mode="overwrite")

                                                                                

In [17]:
query = """
SELECT s1.idZamowienia, s1.Kraj, s1.Sprzedawca, s1.`Data zamowienia`, s1.Utarg, 
       s1.Month, s2.Netto, s3._c5, s4.Waluta
FROM ZAMOWIENIA_SUBSET_1 s1
JOIN parquet.`data/zamowienia_subset_2.parquet` s2 ON s1.idZamowienia = s2.idZamowienia
JOIN csv.`data/zamowienia_subset_3.csv` s3 ON s1.idZamowienia = s3._c3
JOIN json.`data/zamowienia_subset_4.json` s4 ON s1.idZamowienia = s4.idZamowienia
"""

df_from_json = spark.sql(query).show(10)

+------------+------+----------+---------------+------+-----+-------+---------+------+
|idZamowienia|  Kraj|Sprzedawca|Data zamowienia| Utarg|Month|  Netto|      _c5|Waluta|
+------------+------+----------+---------------+------+-----+-------+---------+------+
|       10248|Polska|  Kowalski|     2003-07-16| 440.0|    7| 357.72| KOWALSKI|   PLN|
|       10249|Polska|   Sowiski|     2003-07-10|1863.4|    7|1514.96|  SOWISKI|   PLN|
|       10250|Niemcy|   Peacock|     2003-07-12|1552.6|    7|1262.28|  PEACOCK|   PLN|
|       10251|Niemcy| Leverling|     2003-07-15|654.06|    7| 531.76|LEVERLING|   PLN|
|       10253|Niemcy| Leverling|     2003-07-16|1444.8|    7|1174.63|LEVERLING|   PLN|
|       10255|Polska|     Dudek|     2003-07-15|2490.5|    7| 2024.8|    DUDEK|   PLN|
|       10257|Niemcy|   Peacock|     2003-07-22|1119.9|    7| 910.49|  PEACOCK|   PLN|
|       10261|Niemcy|   Peacock|     2003-07-30| 448.0|    7| 364.23|  PEACOCK|   PLN|
|       10262|Niemcy|  Callahan|     2003-0