In [1]:
from pyspark.sql import SparkSession

warehouse_location = '/opt/spark/work-dir/lab_07/metastore_db'

spark = SparkSession \
        .builder \
        .master("local[*]") \
        .appName("lab7") \
        .config("spark.memory.offHeap.enabled", "true") \
        .config("spark.memory.offHeap.size", "4g") \
        .enableHiveSupport() \
        .config("spark.sql.warehouse.dir", warehouse_location) \
        .getOrCreate()

In [2]:
!wget https://raw.githubusercontent.com/kropiak/uwm_analiza_duzych_zbiorow/refs/heads/main/lab_07/zamowienia.txt

--2024-11-25 15:51:53--  https://raw.githubusercontent.com/kropiak/uwm_analiza_duzych_zbiorow/refs/heads/main/lab_07/zamowienia.txt
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 35312 (34K) [text/plain]
Saving to: ‘zamowienia.txt.1’


2024-11-25 15:51:54 (760 KB/s) - ‘zamowienia.txt.1’ saved [35312/35312]



In [3]:
df = spark.read.option("delimiter", ";").csv('zamowienia.txt', header=True, inferSchema=True)

df.show(5)

+------+----------+---------------+------------+-----------+
|  Kraj|Sprzedawca|Data zamowienia|idZamowienia|      Utarg|
+------+----------+---------------+------------+-----------+
|Polska|  Kowalski|     16.07.2003|       10248|  440,00 z|
|Polska|  Sowiäski|     10.07.2003|       10249|1 863,40 z|
|Niemcy|   Peacock|     12.07.2003|       10250|1 552,60 z|
|Niemcy| Leverling|     15.07.2003|       10251|  654,06 z|
|Niemcy|   Peacock|     11.07.2003|       10252|3 597,90 z|
+------+----------+---------------+------------+-----------+
only showing top 5 rows



In [4]:
from pyspark.sql.functions import col, regexp_replace
df = df.withColumn("Sprzedawca", regexp_replace(col("Sprzedawca"), "ä", "a"))

df.show(5)

+------+----------+---------------+------------+-----------+
|  Kraj|Sprzedawca|Data zamowienia|idZamowienia|      Utarg|
+------+----------+---------------+------------+-----------+
|Polska|  Kowalski|     16.07.2003|       10248|  440,00 z|
|Polska|  Sowiaski|     10.07.2003|       10249|1 863,40 z|
|Niemcy|   Peacock|     12.07.2003|       10250|1 552,60 z|
|Niemcy| Leverling|     15.07.2003|       10251|  654,06 z|
|Niemcy|   Peacock|     11.07.2003|       10252|3 597,90 z|
+------+----------+---------------+------------+-----------+
only showing top 5 rows



In [6]:
from pyspark.sql import functions as F

df = df.withColumn("Utarg", F.regexp_replace(F.col("Utarg"), " z", ""))
df = df.withColumn("Utarg", F.regexp_replace(F.col("Utarg"), r"\s+", ""))
df = df.withColumn("Utarg", F.regexp_replace(F.col("Utarg"), ",", "."))
df = df.withColumn("Utarg", F.col("Utarg").cast("float"))

df.show(5, truncate=False)

+------+----------+---------------+------------+------+
|Kraj  |Sprzedawca|Data zamowienia|idZamowienia|Utarg |
+------+----------+---------------+------------+------+
|Polska|Kowalski  |16.07.2003     |10248       |440.0 |
|Polska|Sowiaski  |10.07.2003     |10249       |1863.4|
|Niemcy|Peacock   |12.07.2003     |10250       |1552.6|
|Niemcy|Leverling |15.07.2003     |10251       |654.06|
|Niemcy|Peacock   |11.07.2003     |10252       |3597.9|
+------+----------+---------------+------------+------+
only showing top 5 rows



In [7]:
df.printSchema()

root
 |-- Kraj: string (nullable = true)
 |-- Sprzedawca: string (nullable = true)
 |-- Data zamowienia: string (nullable = true)
 |-- idZamowienia: integer (nullable = true)
 |-- Utarg: float (nullable = true)



In [10]:
from pyspark.sql.functions import to_date

df = df.withColumn("Data zamowienia", to_date(df["Data zamowienia"], "dd.mm.yyyy"))

df.printSchema()
df.show(5)

root
 |-- Kraj: string (nullable = true)
 |-- Sprzedawca: string (nullable = true)
 |-- Data zamowienia: date (nullable = true)
 |-- idZamowienia: integer (nullable = true)
 |-- Utarg: float (nullable = true)

+------+----------+---------------+------------+------+
|  Kraj|Sprzedawca|Data zamowienia|idZamowienia| Utarg|
+------+----------+---------------+------------+------+
|Polska|  Kowalski|     2003-01-16|       10248| 440.0|
|Polska|  Sowiaski|     2003-01-10|       10249|1863.4|
|Niemcy|   Peacock|     2003-01-12|       10250|1552.6|
|Niemcy| Leverling|     2003-01-15|       10251|654.06|
|Niemcy|   Peacock|     2003-01-11|       10252|3597.9|
+------+----------+---------------+------------+------+
only showing top 5 rows



In [11]:
df = df.sort("idZamowienia")

df.show(5)
df.printSchema()

+------+----------+---------------+------------+------+
|  Kraj|Sprzedawca|Data zamowienia|idZamowienia| Utarg|
+------+----------+---------------+------------+------+
|Polska|  Kowalski|     2003-01-16|       10248| 440.0|
|Polska|  Sowiaski|     2003-01-10|       10249|1863.4|
|Niemcy|   Peacock|     2003-01-12|       10250|1552.6|
|Niemcy| Leverling|     2003-01-15|       10251|654.06|
|Niemcy|   Peacock|     2003-01-11|       10252|3597.9|
+------+----------+---------------+------------+------+
only showing top 5 rows

root
 |-- Kraj: string (nullable = true)
 |-- Sprzedawca: string (nullable = true)
 |-- Data zamowienia: date (nullable = true)
 |-- idZamowienia: integer (nullable = true)
 |-- Utarg: float (nullable = true)



In [17]:
buckets = df.repartition(4, "idZamowienia")
buckets.cache()

import time
start = time.time()
df.groupBy("Kraj").sum("Utarg").show()
print(f"Czas agregacji bez wiaderkowania: {time.time() - start:.2f} sekund")

start = time.time()
buckets.groupBy("Kraj").sum("Utarg").show()
print(f"Czas agregacji na wiaderkach: {time.time() - start:.2f} sekund")

+------+-----------------+
|  Kraj|       sum(Utarg)|
+------+-----------------+
|Niemcy|894996.4916362762|
|Polska| 333330.908657074|
+------+-----------------+

Czas agregacji bez wiaderkowania: 0.51 sekund
+------+-----------------+
|  Kraj|       sum(Utarg)|
+------+-----------------+
|Niemcy|894996.4916362762|
|Polska| 333330.908657074|
+------+-----------------+

Czas agregacji na wiaderkach: 0.42 sekund


In [18]:
partitioned_by_country = df.repartition("Kraj")
partitioned_by_country.write.partitionBy("Kraj").csv("partycjonowanie_kraj", header=True)

partitioned_by_seller = df.repartition("Sprzedawca")
partitioned_by_seller.write.partitionBy("Sprzedawca").csv("partycjonowanie_sprzedawca", header=True)

In [19]:
start = time.time()
df.filter(df["Kraj"] == "Polska").groupBy("Sprzedawca").sum("Utarg").show()
print(f"Czas dla danych oryginalnych: {time.time() - start:.2f} sekund")

start = time.time()
partitioned_by_country.filter(df["Kraj"] == "Polska").groupBy("Sprzedawca").sum("Utarg").show()
print(f"Czas dla danych partycjonowanych: {time.time() - start:.2f} sekund")

+----------+------------------+
|Sprzedawca|        sum(Utarg)|
+----------+------------------+
|  Sowiaski| 72527.62940597534|
|      King|116962.98942565918|
|     Dudek| 75048.03958892822|
|  Kowalski| 68792.25023651123|
+----------+------------------+

Czas dla danych oryginalnych: 0.74 sekund
+----------+------------------+
|Sprzedawca|        sum(Utarg)|
+----------+------------------+
|  Sowiaski| 72527.62940597534|
|      King|116962.98942565918|
|     Dudek| 75048.03958892822|
|  Kowalski| 68792.25023651123|
+----------+------------------+

Czas dla danych partycjonowanych: 1.13 sekund


In [20]:
from pyspark.sql.functions import month, upper, lit

subset1 = df.sample(fraction=0.25).withColumn("month", month("Data zamowienia"))
subset1.createOrReplaceTempView("subset1")

In [21]:
subset2 = df.sample(fraction=0.25).withColumn("netto", col("Utarg") / 1.23)
subset2.write.parquet("subset2.parquet")

In [22]:
subset3 = df.sample(fraction=0.25).withColumn("Sprzedawca", upper("Sprzedawca"))
subset3.write.csv("subset3.csv", header=True)

In [37]:
subset4 = df.sample(fraction=0.25).withColumn("waluta", lit("PLN"))
subset4 = subset4.withColumn("idZamowienia", col("idZamowienia").cast("string"))
subset4.write.json("subset4.json")

In [38]:
subset2_df = spark.read.parquet("subset2.parquet")
subset3_df = spark.read.csv("subset3.csv", header=True, inferSchema=True)
subset4_df = spark.read.json("subset4.json")


df_alias = df.alias("df")
subset1_alias = subset1.alias("subset1")
subset2_alias = subset2_df.alias("subset2")
subset3_alias = subset3_df.alias("subset3")
subset4_alias = subset4_df.alias("subset4")

joined_df = df_alias.join(subset1_alias, "idZamowienia", "inner") \
                    .join(subset2_alias, "idZamowienia", "inner") \
                    .join(subset3_alias, "idZamowienia", "inner") \
                    .join(subset4_alias, "idZamowienia", "inner")

joined_df.select(
    "df.idZamowienia",
    "df.Kraj",
    "df.Sprzedawca",
    "df.Data zamowienia",
    "df.Utarg",
    "subset1.month",
    "subset2.netto",
    "subset3.Sprzedawca",
    "subset4.waluta"
).show()


+------------+------+----------+---------------+------+-----+------------------+----------+------+
|idZamowienia|  Kraj|Sprzedawca|Data zamowienia| Utarg|month|             netto|Sprzedawca|waluta|
+------------+------+----------+---------------+------+-----+------------------+----------+------+
|       10450|Niemcy|  Callahan|     2004-01-11|425.12|    1| 345.6260122903964|  CALLAHAN|   PLN|
|       10478|Niemcy|    Fuller|     2004-01-26| 471.2|    1| 383.0894408187246|    FULLER|   PLN|
|       10740|Niemcy|   Peacock|     2004-01-25|1416.0|    1| 1151.219512195122|   PEACOCK|   PLN|
|       10748|Niemcy| Leverling|     2004-01-28|2196.0|    1|1785.3658536585367| LEVERLING|   PLN|
+------------+------+----------+---------------+------+-----+------------------+----------+------+

