<div style="border: 4px solid white; padding: 20px; background-color: #2596be; color: white;">

# <b>Explorative Datenanalyse mit Sparky</b>

#### <i>CAS Information Engineering - Modul: Big data - FS 2024</i>

<b> Autoren: </b> Hassler Robin, Tschanz Daniel, Tsiantas Theofanis (Gruppe 10)

</div>

# Teil 2 - Map/Reduce Anylse

In [None]:
# Installation der notwendigen Bibliothecken
%pip install seaborn

In [None]:
# Notwendige Packete
import sparky
import pyspark
import pyspark.sql
from pyspark.sql.functions import trim, col, to_date, when
import seaborn as sns
import pandas as pd
import matplotlib.pyplot as plt
from datetime import datetime
import time

In [None]:
# Accountdefinition für die Verbindung mit sparky
zhawaccount = "tsianthe"

In [None]:
sc = sparky.connect(f"sparknotebook-{zhawaccount}", 4)
spark = pyspark.sql.SparkSession.builder.getOrCreate()

## Dateien einlesen

In [None]:
# CSV-Dateien einlesen
load_time_start = time.time()
df_customer = spark.read.format("csv").option("header", "true").option("delimiter", ",").load("./cleanedData/Customers.csv")
df_items = spark.read.format("csv").option("header", "true").option("delimiter", ",").load("./cleanedData/Items.csv")
df_orders = spark.read.format("csv").option("header", "true").option("delimiter", ",").load("./cleanedData/Orders.csv")
df_currency = spark.read.format("csv").option("header", "true").option("delimiter", ",").load("./cleanedData/Exchangerates.csv")
load_time_finish = time.time()
print(f"Time taken to save: {load_time_finish - load_time_start:.2f} seconds")

In [None]:
# Parquet-Dateien einlesen (zur Vergleich der Ladenzeit)
load_time_start = time.time()
df_customer_p = spark.read.parquet("./cleanedData/Customers.parquet", header=True, inferSchema=True)
df_items_p = spark.read.parquet("./cleanedData/Items.parquet", header=True, inferSchema=True)
df_orders_p = spark.read.parquet("./cleanedData/Orders.parquet", header=True, inferSchema=True)
df_currency_p = spark.read.parquet("./cleanedData/Exchangerates.parquet", header=True, inferSchema=True)
load_time_finish = time.time()
print(f"Time taken to save: {load_time_finish - load_time_start:.2f} seconds")

## Map/Reduce

### Monatlich neu akquirierte Kunden

In [None]:
# Das Schema überprüfen
df_customer_p.printSchema()

In [None]:
# Berechnung der neu akquierten Kunden pro Jahrestag
rdd_customer = df_customer_p.rdd.map(lambda x:x[3]).map(lambda x:[x, 1]).reduceByKey(lambda x,y:x+y).sortByKey()
# rdd_customer.collect()

In [None]:
# RDD nur auf Monatbasis mappen
rdd_customer_month = rdd_customer.map(lambda x: ((x[0].year, x[0].month), x[1]))
# rdd_customer_month.collect()

In [None]:
# Die gleichen Monaten desselben Jahres addieren
rdd_aggregated = rdd_customer_month.reduceByKey(lambda a, b: a + b).sortByKey()
# rdd_aggregated.collect()

In [None]:
# Variablen für Plot definieren
dates = [row[0] for row in rdd_aggregated.collect()]
events = [row[1] for row in rdd_aggregated.collect()]

In [None]:
# x-Achse als String definieren
x_labels = [f"{year}-{month:02d}" for year, month in dates]

In [None]:
# Plot erstellen und konfigurieren
plt.figure(figsize=(10, 6))
sns.lineplot(x=x_labels, y=events, sort=False)

plt.title('Neue Kunden pro Monat')
plt.xlabel('Jahr-Monat')
plt.ylabel('Anzahl neuer Kunden')
plt.xticks(rotation=45)
plt.grid(True)
plt.tight_layout()

# Abstand der x-Achsenbeschriftungen definieren
plt.xticks(ticks=range(0, 220, 20), rotation=45)

plt.show()

### Monatlicher Umsatz pro Objekt

In [None]:
# Das Schema überprüfen
# df_orders_p.printSchema()

In [None]:
# Bestellungen RDD definieren 
# Schema: Customer Number | Status | Item number | Order quantity | Net price | Registration date
orders_reduced = df_orders_p.rdd.map(lambda x:(x[2], x[6], x[9], x[10], x[17], x[22]))
# orders_reduced.take(2)

In [None]:
# Umsatz pro Bestellung 
# Schema: Customer Number | Status | Item number | Umsatz | Registration date
orders_reduced =orders_reduced.map(lambda x:(x[0], x[1], x[2], x[3]*x[4], x[5]))
# orders_reduced.take(2)

In [None]:
# Nur gültige Bestellungen berücksichtigen (Status 5: Nur Angebot, Status > 90: Angebot abgesagt)
# Schema: Customer Number | Item number | Umsatz | Registration date
orders_reduced = orders_reduced.filter(lambda x: 5 < x[1] < 90)
orders_reduced = orders_reduced.map(lambda x:(x[0], (x[2], x[3], x[4])))
# orders_reduced.take(2)

In [None]:
# Das Schema überprüfen
# df_customer_p.printSchema()

In [None]:
# Kunden RDD definieren 
# Schema: Customer Number | Currency
df_customer_reduced = df_customer_p.rdd.map(lambda x:(x[0], x[2]))
# df_customer_reduced.take(2)

In [None]:
# Anzahl der Bestellungen
before_join_count = orders_reduced.count()

In [None]:
# RDD mit der Währnug pro Bestellung definieren 
# Schema: Customer Number | Item number | Umsatz | Registration date | Currency
combined_rdd = orders_reduced.join(df_customer_reduced)
# combined_rdd.collect()

In [None]:
# Überprüfen, wie viele Bestellungen aufgrund der Join entfernt wurden
print(f"{abs(combined_rdd.count()-before_join_count)} Bestellungen wurden entfernt (keine gültige Kundennummer)") 

In [None]:
# Nach Datum sortieren
combined_rdd = combined_rdd.sortBy(lambda x: x[1][0][2])
# combined_rdd.collect()

In [None]:
# Kunden entfernen von RDD und Objektnummer als Key definieren
combined_rdd = combined_rdd.map(lambda x: (x[1][0][0], (x[1][0][1], x[1][0][2], x[1][1])))
# combined_rdd.collect()

In [None]:
# Das Schema überprüfen
# df_items_p.printSchema()

In [None]:
# Objekte RDD definieren 
# Schema: Item Number | Itemgroup
df_items_reduced = df_items_p.rdd.map(lambda x:(x[0], x[2]))
# df_items_reduced.take(2)

In [None]:
# Anzahl der Bestellungen
before_join_count = combined_rdd.count()

In [None]:
# RDD mit der Objektgruppe (Item group) pro Bestellung definieren 
# Schema: Item number | Umsatz | Registration date | Currency | Itemgroup
combined_rdd = combined_rdd.join(df_items_reduced)
# combined_rdd.collect()

In [None]:
# Überprüfen, wie viele Bestellungen aufgrund der Join entfernt wurden
print(f"{abs(combined_rdd.count()-before_join_count)} Bestellungen wurden entfernt (keine gültige Itemgruppe)") 

In [None]:
# Objektnummer (Item number) von RDD entfernen. Erneut gemäss Datum sortieren 
# Schema: Umsatz | Registration date | Currency | Itemgroup
combined_rdd = combined_rdd.map(lambda x: (x[1][0][0], x[1][0][1], x[1][0][2], x[1][1])).sortBy(lambda x: x[1])
# combined_rdd.collect()

In [None]:
# Datum-Währung kombinieren. Datum nur auf Monatbasis ausdrücken
# Schema: Datum-Währung Umsatz | Itemgroup
combined_rdd = combined_rdd.map(lambda x:(f"{x[1].year}-{x[1].month}-{x[2]}", x[0], x[3]))
#combined_rdd.collect()

In [None]:
# Datum-Währung als Schlüssel setzten
combined_rdd = combined_rdd.map(lambda x:(x[0], (x[1], x[2])))
#combined_rdd.collect()

In [None]:
# Das Schema überprüfen
# df_currency_p.printSchema()

In [None]:
# Objekte RDD definieren 
# Schema: Datum | Währung | Kurs
df_currency_reduced = df_currency_p.rdd.map(lambda x:(x[0], x[2], x[3]))
# df_currency_reduced.collect()

In [None]:
# Ungültige Daten entfernen (filter)
# Nur den Jahr-Monat behalten. 
# Neuer Schlüssel als Jahr-Monat-Währung definieren
df_currency_reduced = df_currency_reduced.filter(lambda x: x[0] is not None)
df_currency_reduced = df_currency_reduced.map(lambda x:(f"{x[0].year}-{x[0].month}-{x[1]}", x[2]))
# df_currency_reduced.collect()

In [None]:
# Anzahl der Bestellungen
before_join_count = combined_rdd.count()

In [None]:
# RDD mit der Wechselkurs pro Bestellung definieren. Nur die Bestellungen berücksichtigen,
# für die ein gültiger Wechselkurs existiert.
# Schema: Jahr-Monat-Währung | Umsatz | Itemgroup | Wechselkurs
combined_rdd = combined_rdd.join(df_currency_reduced)

In [None]:
# Überprüfen, wie viele Bestellungen aufgrund der Join entfernt wurden
print(f"{abs(combined_rdd.count()-before_join_count)} Bestellungen wurden entfernt (keine gültige Itemgruppe)") 

In [None]:
# Die Währung vom Schlüssel löschen
# Als erstes die Währung löschen
combined_rdd=combined_rdd.map(lambda x: ((x[0][:7]), x[1]))
# Dann "-" entfernen (problematisch für die Monate mit einem Ziffer!)
combined_rdd=combined_rdd.map(lambda x: (x[0].replace('-', ''), x[1]))
# Schlüssel als datetime-Objekt definieren und nur Jahr und Monat behalten
combined_rdd = combined_rdd.map(lambda x: (datetime.strptime(x[0], "%Y%m").strftime("%Y-%m"), x[1]))
#combined_rdd.takeSample(False,200)

In [None]:
# RDD umformen
# Schema: Jahr-Monat | Itemgroup | Umsatz | Wechselkurs
combined_rdd = combined_rdd.map(lambda x:(x[0], x[1][0][1], x[1][0][0], x[1][1]))
#combined_rdd.collect()

In [None]:
# RDD umformen
# Schema: Jahr-Monat | Itemgroup | Umsatz in CHF
combined_rdd = combined_rdd.map(lambda x:(x[0],x[1], x[2]*x[3]/1000000))
#combined_rdd.takeSample(False,50)

In [None]:
# RDDs für jede Objektgruppe definieren
item_1000_rdd = combined_rdd.filter(lambda x:x[1]==1000).map(lambda x:(x[0], x[2])).reduceByKey(lambda x,y:x+y).sortByKey()
item_1100_rdd = combined_rdd.filter(lambda x:x[1]==1100).map(lambda x:(x[0], x[2])).reduceByKey(lambda x,y:x+y).sortByKey()
item_1200_rdd = combined_rdd.filter(lambda x:x[1]==1200).map(lambda x:(x[0], x[2])).reduceByKey(lambda x,y:x+y).sortByKey()
item_1300_rdd = combined_rdd.filter(lambda x:x[1]==1300).map(lambda x:(x[0], x[2])).reduceByKey(lambda x,y:x+y).sortByKey()
item_1400_rdd = combined_rdd.filter(lambda x:x[1]==1400).map(lambda x:(x[0], x[2])).reduceByKey(lambda x,y:x+y).sortByKey()
item_1500_rdd = combined_rdd.filter(lambda x:x[1]==1500).map(lambda x:(x[0], x[2])).reduceByKey(lambda x,y:x+y).sortByKey()
item_1600_rdd = combined_rdd.filter(lambda x:x[1]==1600).map(lambda x:(x[0], x[2])).reduceByKey(lambda x,y:x+y).sortByKey()
item_1700_rdd = combined_rdd.filter(lambda x:x[1]==1700).map(lambda x:(x[0], x[2])).reduceByKey(lambda x,y:x+y).sortByKey()
item_1800_rdd = combined_rdd.filter(lambda x:x[1]==1800).map(lambda x:(x[0], x[2])).reduceByKey(lambda x,y:x+y).sortByKey()
item_1900_rdd = combined_rdd.filter(lambda x:x[1]==1900).map(lambda x:(x[0], x[2])).reduceByKey(lambda x,y:x+y).sortByKey()
item_2000_rdd = combined_rdd.filter(lambda x:x[1]==2000).map(lambda x:(x[0], x[2])).reduceByKey(lambda x,y:x+y).sortByKey()
#item_1000_rdd.collect()

In [None]:
# X- und Y-Achse für jedes RDD-Objeckt
x1000, y1000 = zip(*item_1000_rdd.collect())
x1100, y1100 = zip(*item_1100_rdd.collect())
x1200, y1200 = zip(*item_1200_rdd.collect())
x1300, y1300 = zip(*item_1300_rdd.collect())
x1400, y1400 = zip(*item_1400_rdd.collect())
x1500, y1500 = zip(*item_1500_rdd.collect())
x1600, y1600 = zip(*item_1600_rdd.collect())
x1700, y1700 = zip(*item_1700_rdd.collect())
x1800, y1800 = zip(*item_1800_rdd.collect())
x1900, y1900 = zip(*item_1900_rdd.collect())
x2000, y2000 = zip(*item_2000_rdd.collect())

# Umwandlung zu strings für das Plot
x1000_labels = [str(date) for date in x1000]
x1100_labels = [str(date) for date in x1100]
x1200_labels = [str(date) for date in x1200]
x1300_labels = [str(date) for date in x1300]
x1400_labels = [str(date) for date in x1400]
x1500_labels = [str(date) for date in x1500]
x1600_labels = [str(date) for date in x1600]
x1700_labels = [str(date) for date in x1700]
x1800_labels = [str(date) for date in x1800]
x1900_labels = [str(date) for date in x1900]
x2000_labels = [str(date) for date in x2000]

In [None]:
# Plotting
plt.figure(figsize=(10, 6))

# Plot RDDs
plt.plot(x1000_labels, y1000, linestyle='-', color='blue', label='Item: 1000')
plt.plot(x1100_labels, y1100, linestyle='-', color='green', label='Item: 1100')
plt.plot(x1200_labels, y1200, linestyle='-', color='red', label='Item: 1200')
plt.plot(x1300_labels, y1300, linestyle='-', color='orange', label='Item: 1300')
plt.plot(x1400_labels, y1400, linestyle='-', color='black', label='Item: 1400')
plt.plot(x1500_labels, y1500, linestyle='-', color='yellow', label='Item: 1500')
plt.plot(x1600_labels, y1600, linestyle='-.', color='blue', label='Item: 1600')
plt.plot(x1700_labels, y1700, linestyle='-.', color='green', label='Item: 1700')
plt.plot(x1800_labels, y1800, linestyle='-.', color='red', label='Item: 1800')
plt.plot(x1900_labels, y1900, linestyle='-.', color='orange', label='Item: 1900')
plt.plot(x2000_labels, y2000, linestyle='-.', color='black', label='Item: 2000')

# Ploteigenschaften
plt.xlabel('Datum')
plt.ylabel('CHF-Mio')
plt.title('Umsatz pro Object und Monat')
plt.xticks(rotation=45)  

plt.legend()

In [None]:
sc.stop()