In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession

# ścieżka do bazy danych hurtowni danych oraz plików
# należy dostosować do ścieżki względnej, w której umieszczony został bieżący notebook
warehouse_location = '/opt/spark/work-dir/L7/db/metastore_db'

# utworzenie sesji Spark, ze wskazaniem włączenia obsługi Hive oraz
# lokalizacją przechowywania hurtowni danych
spark = SparkSession\
        .builder\
        .master("local[2]")\
        .appName("Apache SQL and Hive")\
        .config("spark.memory.offHeap.enabled","true")\
        .config("spark.memory.offHeap.size","4g")\
        .enableHiveSupport()\
        .config("spark.sql.warehouse.dir", warehouse_location)\
        .getOrCreate()
spark.sparkContext

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/01 15:25:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/12/01 15:25:46 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


# Zadanie 1 
Pamiętacie plik zamówienia.txt ? Plik został umieszczony w folderze z labem w repozytorium.
Wczytaj ten plik za pomocą Sparka do dowolnego typu danych (RDD, Spark DataFrame) i dokonaj transformacji tak aby:
- naprawić problemy z kodowaniem znaków (replace?) w kolumnie Sprzedawca
- poprawić format danych w kolumnie Utarg
- dodać odpowiednie typy danych
- kolumna idZamowienia powinna być traktowana jako klucz (indeks)

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, regexp_replace, to_date
from pyspark.sql.types import FloatType, IntegerType, StringType, StructField, StructType, DateType


schema = StructType([
    StructField("Kraj", StringType(), True),
    StructField("Sprzedawca", StringType(), True),
    StructField("Data zamowienia", StringType(), True),
    StructField("idZamowienia", IntegerType(), True),
    StructField("Utarg", StringType(), True)
])
df = spark.read.csv('zamowienia.txt', sep=";", header=True, schema=schema)
df.head()

Row(Kraj='Polska', Sprzedawca='Kowalski', Data zamowienia='16.07.2003', idZamowienia=10248, Utarg='440,00 z\x88')

In [4]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
import unicodedata

def remove_accents(input_str):
    if input_str is None:
        return None
    # Zamiana znaków diakrytycznych na ich podstawowe odpowiedniki
    nfkd_form = unicodedata.normalize('NFKD', input_str)
    return "".join([c for c in nfkd_form if not unicodedata.combining(c)])
    
# zdefiniowanie udf do utworzenia wlasnej funkcji, ktora bedzie uzyta do przetworzenia danych 
remove_accents_udf = udf(remove_accents, StringType())
df = df.withColumn("Sprzedawca", remove_accents_udf(col("Sprzedawca")))

df = df.withColumn("Data zamowienia", to_date(col("Data zamowienia"), "dd.MM.yyyy"))
df = df.withColumn("Utarg", regexp_replace(col("Utarg"), "[^0-9,]", ""))
df = df.withColumn("Utarg", regexp_replace(col("Utarg"), ",", "."))
df = df.withColumn("Utarg", col("Utarg").cast(FloatType()))

df = df.withColumnRenamed("idZamowienia", "id")

df.show(truncate=False)

                                                                                

+------+----------+---------------+-----+-------+
|Kraj  |Sprzedawca|Data zamowienia|id   |Utarg  |
+------+----------+---------------+-----+-------+
|Polska|Kowalski  |2003-07-16     |10248|440.0  |
|Polska|Sowiaski  |2003-07-10     |10249|1863.4 |
|Niemcy|Peacock   |2003-07-12     |10250|1552.6 |
|Niemcy|Leverling |2003-07-15     |10251|654.06 |
|Niemcy|Peacock   |2003-07-11     |10252|3597.9 |
|Niemcy|Leverling |2003-07-16     |10253|1444.8 |
|Polska|Kowalski  |2003-07-23     |10254|556.62 |
|Polska|Dudek     |2003-07-15     |10255|2490.5 |
|Niemcy|Leverling |2003-07-17     |10256|517.8  |
|Niemcy|Peacock   |2003-07-22     |10257|1119.9 |
|Niemcy|Davolio   |2003-07-23     |10258|1614.88|
|Niemcy|Peacock   |2003-07-25     |10259|100.8  |
|Niemcy|Peacock   |2003-07-29     |10260|1504.65|
|Niemcy|Peacock   |2003-07-30     |10261|448.0  |
|Niemcy|Callahan  |2003-07-25     |10262|584.0  |
|Polska|Dudek     |2003-07-31     |10263|1873.8 |
|Polska|Sowiaski  |2003-08-23     |10264|695.62 |


In [5]:
df.printSchema()

root
 |-- Kraj: string (nullable = true)
 |-- Sprzedawca: string (nullable = true)
 |-- Data zamowienia: date (nullable = true)
 |-- id: integer (nullable = true)
 |-- Utarg: float (nullable = true)



# Zadanie 2 
Po wykonaniu zadania 1, wykorzystaj przykłady z laboratorium i:
2.1 wykonaj wiaderkowanie danych i wykonaj dowolne zapytanie agregujące na tych danych vs. dane nie podzielone na wiaderka - porównaj czas

In [6]:
df.createOrReplaceTempView("ZAMOWIENIA_DATA")

buckets = spark.sql("select distinct Sprzedawca from ZAMOWIENIA_DATA").count()
buckets

9

In [7]:
from datetime import datetime

data = df.write.bucketBy(buckets, 'Sprzedawca').mode('overwrite').sortBy('Utarg').saveAsTable('zamowienia_sprzedawca_bucketed')

start = datetime.now()
spark.sql("select Sprzedawca, count(Sprzedawca), avg(Utarg) from zamowienia_sprzedawca_bucketed group by Sprzedawca").show()
print(f'bucketed end: {datetime.now()-start}')

start = datetime.now()
spark.sql("select Sprzedawca, count(Sprzedawca), avg(Utarg) from ZAMOWIENIA_DATA group by Sprzedawca").show()
print(f'not bucketed end: {datetime.now()-start}')

24/12/01 15:25:53 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
24/12/01 15:25:53 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist
24/12/01 15:25:59 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 2.3.0
24/12/01 15:25:59 WARN ObjectStore: setMetaStoreSchemaVersion called but recording version is disabled: version = 2.3.0, comment = Set by MetaStore UNKNOWN@172.18.0.2
24/12/01 15:26:00 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
                                                                                24/12/01 15:26:03 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
24/12/01 15:26:03 WARN HiveConf: HiveConf of name hive.internal.ss.authz.settings.applied.marker does not exist
24/12/01 15:26:03 WARN HiveC

+----------+-----------------+------------------+
|Sprzedawca|count(Sprzedawca)|        avg(Utarg)|
+----------+-----------------+------------------+
|   Davolio|              117| 1559.829831897703|
|  Callahan|               99|1242.7542393616957|
| Leverling|              125|1609.5701606445311|
|  Sowiaski|               65|1115.8096831688515|
|  Kowalski|               42| 1637.910719916934|
|   Peacock|              151|1495.1237175417261|
|      King|               67|1745.7162600844654|
|    Fuller|               92| 1766.345436718153|
|     Dudek|               41|1830.4399899738592|
+----------+-----------------+------------------+

bucketed end: 0:00:01.058409
+----------+-----------------+------------------+
|Sprzedawca|count(Sprzedawca)|        avg(Utarg)|
+----------+-----------------+------------------+
|  Sowiaski|               65|1115.8096831688515|
|   Peacock|              151|1495.1237175417261|
|      King|               67|1745.7162600844654|
|     Dudek|        

2.2 wykonaj partycjonowanie danych i zapisz je w formcie csv (wypróbuj partycjonowanie wg. kraju, nazwiska

In [8]:
output_path_kraj = "zamowienia_partitioned_kraj"

df.write.partitionBy("Kraj").mode('overwrite').saveAsTable(output_path_kraj)

df.write.partitionBy("Kraj").mode('overwrite').format("csv").option("header", "true").save(output_path_kraj)

In [9]:
output_path_nazwiska = "zamowienia_partitioned_nazwiska"

df.write.partitionBy("Sprzedawca").mode('overwrite').saveAsTable(output_path_nazwiska)

df.write.partitionBy("Sprzedawca").mode('overwrite').format("csv").option("header", "true").save(output_path_nazwiska)

                                                                                

2.3 wykonaj zapytanie agregujące z filtrowanie po kolumnie, której użyłeś/-aś do partycjonowania na danych oryginalnych oraz partycjonowanych i porównaj czas wykonania

In [10]:
from datetime import datetime


start = datetime.now()
# df.filter(df.Kraj == 'Polska').groupby('Kraj').agg({'Utarg': 'avg'}).show()
spark.sql("select Kraj, avg(Utarg) from ZAMOWIENIA_DATA where Kraj='Polska' group by Kraj").show()
print(f'not partitioned end: {datetime.now()-start}')

start = datetime.now()
spark.sql("select Kraj, avg(Utarg) from zamowienia_partitioned_kraj where Kraj='Polska' group by Kraj").show()
print(f'partitioned end: {datetime.now()-start}')

+------+-----------------+
|  Kraj|       avg(Utarg)|
+------+-----------------+
|Polska|1550.376319335228|
+------+-----------------+

not partitioned end: 0:00:00.345065
+------+-----------------+
|  Kraj|       avg(Utarg)|
+------+-----------------+
|Polska|1550.376319335228|
+------+-----------------+

partitioned end: 0:00:00.428861


# Zadanie 3 
Z danych wygeneruj 4 różne podzbiory próbek (wiersze wybrane losowo) i dodaj nową kolumnę w każdym z nich, np. w jednym stwórz kolumnę month wyciągając tylko miesiąc z daty, w drugim wartość netto zamówienia (przyjmując, że vat to 23%), w kolejnym zamień nazwisko na wielkie litery, w kolejnym dodaj kolumnę waluta z wartością PLN.

In [11]:
from pyspark.sql.functions import col, month, expr, upper, lit

# Wygenerowanie czterech różnych podzbiorów danych z losowymi wierszami
subset1 = df.sample(fraction=0.5, seed=42)
subset2 = df.sample(fraction=0.5, seed=43)
subset3 = df.sample(fraction=0.5, seed=44)
subset4 = df.sample(fraction=0.5, seed=45)

# Dodanie nowych kolumn w każdym podzbiorze
subset1 = subset1.withColumn("Month", month(col("Data zamowienia")))

subset2 = subset2.withColumn("Netto", (col("Utarg") / 1.23).alias("Netto"))

subset3 = subset3.withColumn("Sprzedawca 2", upper(col("Sprzedawca")))

subset4 = subset4.withColumn("Waluta", lit("PLN"))

Następnie zapisz każdy z tych zbiorów tak, że:
- zbiór pierwszy to będzie tymczasowa tabela in-memory Sparka

In [12]:
subset1.createOrReplaceTempView("subset1_temp")

- zbiór drugi to plik(i) parquet

In [13]:
output_parquet = "subset2_parquet"
subset2.write.mode("overwrite").parquet(output_parquet)

- zbiór trzeci to plik(i) csv

In [14]:
output_csv = "subset3_csv"
subset3.write.mode("overwrite").csv(output_csv, header=True)

- zbiór czwarty to plik(i) json

In [15]:
output_json = "subset4_json"
subset4.write.mode("overwrite").json(output_json)

Wykonaj zapytanie złączające jak w przykładzie pobierając dane bezpośrednio z plików i wyświetl idZamowienia, Kraj, Sprzedawcę, Datę, Utarg oraz 4 nowo utworzone kolumny.

In [16]:
query = """
SELECT ed.id, ed.Sprzedawca, ed.Kraj, ed.`Data zamowienia`,ed.Month, json.Waluta, ed.Utarg, parquet.Netto, csv._c5 as `Sprzedawca 2`

FROM json.`subset4_json/` as json 
join subset1_temp ed on json.id=ed.id
join parquet.`subset2_parquet/` as parquet on ed.id=parquet.id
join csv.`subset3_csv/` as csv on ed.id=csv._c3
"""
df_from_json = spark.sql(query).show(10)

24/12/01 15:26:14 WARN ObjectStore: Failed to get database json, returning NoSuchObjectException
24/12/01 15:26:14 WARN ObjectStore: Failed to get database parquet, returning NoSuchObjectException
24/12/01 15:26:14 WARN ObjectStore: Failed to get database csv, returning NoSuchObjectException


+-----+----------+------+---------------+-----+------+-------+------------------+------------+
|   id|Sprzedawca|  Kraj|Data zamowienia|Month|Waluta|  Utarg|             Netto|Sprzedawca 2|
+-----+----------+------+---------------+-----+------+-------+------------------+------------+
|10274|  Sowiaski|Polska|     2003-08-16|    8|   PLN|  538.6| 437.8861590129573|    SOWIASKI|
|10288|   Peacock|Niemcy|     2003-09-03|    9|   PLN|   80.1| 65.12194997896025|     PEACOCK|
|10315|   Peacock|Niemcy|     2003-10-03|   10|   PLN|  516.8| 420.1625917016006|     PEACOCK|
|10319|      King|Polska|     2003-10-11|   10|   PLN| 1191.2|  968.455244855183|        KING|
|10321| Leverling|Niemcy|     2003-10-11|   10|   PLN|  144.0|117.07317073170732|   LEVERLING|
|10330| Leverling|Niemcy|     2003-10-28|   10|   PLN| 1649.0|1340.6504065040651|   LEVERLING|
|10337|   Peacock|Niemcy|     2003-10-29|   10|   PLN| 2467.0|2005.6910569105692|     PEACOCK|
|10338|   Peacock|Niemcy|     2003-10-29|   10|   

In [17]:
# Zatrzymanie sesji Spark
spark.stop()