# ETL

Het extract - transform - load concept is een veel voorkomend begrip in (big) data toepassingen en geeft het stappenplan weer van de levenscyclus van de data binnen je toepassing.
Het concept bestaat uit drie stappen:
* extract: zoeken van data, inlezen en validatie
* transform: verwerken van data, data cleaning, aggregatie, groupering, filtering, ...
* load: opslaan van de getransformeerde data in een file, database, datawarehouse, datalake, ...

In de rest van deze notebook gaan we bestuderen hoe deze stappen uit te voeren met Spark.
Hiervoor gaan we een csv gebruiken als bronbestand.

## Extract

In deze directory staat een zip file waarin deze csv is opgeslaan. 
Unzip deze file eerst en upload het naar het hdfs

In [1]:
import pydoop.hdfs as hdfs
    
localFs = hdfs.hdfs(host='')
clientFs = hdfs.hdfs(host='localhost', port=9000)

if not clientFs.exists('ML'):
    clientFs.create_directory('ML')
    
for f in clientFs.list_directory('ML'):
    clientFs.delete(f['name'], True)

localFs.copy('../Week 5/cars.csv', clientFs, 'ML/cars.csv')

2023-03-23 16:43:47,570 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


0

Maak nu een locale sparkcontext aan en lees dit bestand in

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master('local[2]') \
                            .config("spark.driver.memory", "8g") \
                            .config('spark.executor.memory', '2g') \
                            .appName('ML_cars') \
                            .getOrCreate()

# extract gedeelte
df = spark.read.csv('ML/cars.csv', header=True, sep=',')
print('Total rows = {}'.format(df.count()))
df.show(1)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2023-03-23 16:43:52,994 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2023-03-23 16:43:54,779 WARN util.Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
                                                                                

Total rows = 38531


2023-03-23 16:44:04,993 WARN util.package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+-----------------+----------+------------+------+--------------+-------------+-----------+--------------+-----------+---------------+---------+------------+-----+----------+---------+---------------+---------------+----------------+----------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------------+
|manufacturer_name|model_name|transmission| color|odometer_value|year_produced|engine_fuel|engine_has_gas|engine_type|engine_capacity|body_type|has_warranty|state|drivetrain|price_usd|is_exchangeable|location_region|number_of_photos|up_counter|feature_0|feature_1|feature_2|feature_3|feature_4|feature_5|feature_6|feature_7|feature_8|feature_9|duration_listed|
+-----------------+----------+------------+------+--------------+-------------+-----------+--------------+-----------+---------------+---------+------------+-----+----------+---------+---------------+---------------+----------------+----------+---------+---------+---------+----

De datastructuur van het csv is als volgt:

In [3]:
df.printSchema()

root
 |-- manufacturer_name: string (nullable = true)
 |-- model_name: string (nullable = true)
 |-- transmission: string (nullable = true)
 |-- color: string (nullable = true)
 |-- odometer_value: string (nullable = true)
 |-- year_produced: string (nullable = true)
 |-- engine_fuel: string (nullable = true)
 |-- engine_has_gas: string (nullable = true)
 |-- engine_type: string (nullable = true)
 |-- engine_capacity: string (nullable = true)
 |-- body_type: string (nullable = true)
 |-- has_warranty: string (nullable = true)
 |-- state: string (nullable = true)
 |-- drivetrain: string (nullable = true)
 |-- price_usd: string (nullable = true)
 |-- is_exchangeable: string (nullable = true)
 |-- location_region: string (nullable = true)
 |-- number_of_photos: string (nullable = true)
 |-- up_counter: string (nullable = true)
 |-- feature_0: string (nullable = true)
 |-- feature_1: string (nullable = true)
 |-- feature_2: string (nullable = true)
 |-- feature_3: string (nullable = true)


## Transform

De transform stap is de meest complexe stap van de drie en kan uit een grote verscheidenheid van bewerkingen bestaan, zoals:
* Dataformaten aanpassen
* Vertalingen van tekst
* Geencodeerde waarden aanpassen: 0/1 vs true/false of m/f vs male/female
* Allerhande data-cleaning stappen
* Encoderen (Ordinal of One-hot) van categorieke kolommen
* Groeperen van data
* Uitvoeren van berekeningen 
* ...

Schrijf hieronder eerst zelf de code om de volgende stappen uit te voeren:
* Omzetten naar integer van de kolommen: odometer_value, year_produced, engine_capacity, price_usd, number_of_photos, up_counter, duration_listed
* Omzetten naar boolean van de kolommen: engine_has_gas, has_warranty, is_exchangeable, feature_0 tot en met 9
* Bereken het aantal null en nan waarden per kolom

In [4]:
df_backup = df

In [5]:
import pyspark.sql.functions as f
from pyspark.sql.types import IntegerType, BooleanType

df = df_backup

cols = ['odometer_value', 'year_produced', 'engine_capacity', 'price_usd', 'number_of_photos', 'up_counter', 'duration_listed']
for c in cols:
    #df = df.withColumn(c, f.col(c).cast(IntegerType()))
    df = df.withColumn(c, f.col(c).cast('int'))
    
df.printSchema()

root
 |-- manufacturer_name: string (nullable = true)
 |-- model_name: string (nullable = true)
 |-- transmission: string (nullable = true)
 |-- color: string (nullable = true)
 |-- odometer_value: integer (nullable = true)
 |-- year_produced: integer (nullable = true)
 |-- engine_fuel: string (nullable = true)
 |-- engine_has_gas: string (nullable = true)
 |-- engine_type: string (nullable = true)
 |-- engine_capacity: integer (nullable = true)
 |-- body_type: string (nullable = true)
 |-- has_warranty: string (nullable = true)
 |-- state: string (nullable = true)
 |-- drivetrain: string (nullable = true)
 |-- price_usd: integer (nullable = true)
 |-- is_exchangeable: string (nullable = true)
 |-- location_region: string (nullable = true)
 |-- number_of_photos: integer (nullable = true)
 |-- up_counter: integer (nullable = true)
 |-- feature_0: string (nullable = true)
 |-- feature_1: string (nullable = true)
 |-- feature_2: string (nullable = true)
 |-- feature_3: string (nullable = 

In [6]:
cols = ['engine_has_gas', 'has_warranty', 'is_exchangeable', 'feature_0', 'feature_1', 'feature_2', 'feature_3', 'feature_4', 'feature_5', 'feature_6', 'feature_7', 'feature_8', 'feature_9']
for c in cols:
    #df = df.withColumn(c, f.col(c).cast(BooleanType()))
    df = df.withColumn(c, f.col(c).cast('boolean'))
    
df.printSchema()

root
 |-- manufacturer_name: string (nullable = true)
 |-- model_name: string (nullable = true)
 |-- transmission: string (nullable = true)
 |-- color: string (nullable = true)
 |-- odometer_value: integer (nullable = true)
 |-- year_produced: integer (nullable = true)
 |-- engine_fuel: string (nullable = true)
 |-- engine_has_gas: boolean (nullable = true)
 |-- engine_type: string (nullable = true)
 |-- engine_capacity: integer (nullable = true)
 |-- body_type: string (nullable = true)
 |-- has_warranty: boolean (nullable = true)
 |-- state: string (nullable = true)
 |-- drivetrain: string (nullable = true)
 |-- price_usd: integer (nullable = true)
 |-- is_exchangeable: boolean (nullable = true)
 |-- location_region: string (nullable = true)
 |-- number_of_photos: integer (nullable = true)
 |-- up_counter: integer (nullable = true)
 |-- feature_0: boolean (nullable = true)
 |-- feature_1: boolean (nullable = true)
 |-- feature_2: boolean (nullable = true)
 |-- feature_3: boolean (null

In [7]:
nulls = df.select([f.count(f.when(f.col(c).isNull(), 1)).alias(c) for c in df.columns])  # die 1 dient om iets te hebben dat je kan tellen
nulls.show()



+-----------------+----------+------------+-----+--------------+-------------+-----------+--------------+-----------+---------------+---------+------------+-----+----------+---------+---------------+---------------+----------------+----------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------------+
|manufacturer_name|model_name|transmission|color|odometer_value|year_produced|engine_fuel|engine_has_gas|engine_type|engine_capacity|body_type|has_warranty|state|drivetrain|price_usd|is_exchangeable|location_region|number_of_photos|up_counter|feature_0|feature_1|feature_2|feature_3|feature_4|feature_5|feature_6|feature_7|feature_8|feature_9|duration_listed|
+-----------------+----------+------------+-----+--------------+-------------+-----------+--------------+-----------+---------------+---------+------------+-----+----------+---------+---------------+---------------+----------------+----------+---------+---------+---------+-------

                                                                                

In bovenstaande code kan je zien dat er slechts een aantal null-waarden in de dataset aanwezig zijn.
Deze kunnen ingevuld worden door middel van een [imputer](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.Imputer.html).
Hier laten we deze rijen echter gewoon vallen voor de eenvoud:

In [8]:
df = df.na.drop()
df.count()  # bovenaan hadden we er 38531 dus zijn er inderdaad 10 weg

                                                                                

38521

De oefening om de waarden in te vullen met een imputer (bvb door het gemiddelde) kan je hieronder doen.

In [9]:
# oefening

Bereken nu de volgende waarden van de beschikbare data:
* Aantal autos per merk
* Welke verschillende types van transmissie zijn er?
* Marktaandeel (percentage) van de verschillende types motor?
* Maximum prijs van elk merk
* Wat zijn de vijf goedkoopste voertuigen met een automatische transmissie?

In [10]:
# autos per merk
df.groupby('manufacturer_name').count().show()



+-----------------+-----+
|manufacturer_name|count|
+-----------------+-----+
|       Volkswagen| 4243|
|            Lexus|  213|
|           Jaguar|   53|
|            Rover|  235|
|           Lancia|   92|
|             Jeep|  107|
|       Mitsubishi|  887|
|              ГАЗ|  200|
|              Kia|  912|
|             Mini|   68|
|        Chevrolet|  435|
|            Volvo|  721|
|            Lifan|   47|
|          Hyundai| 1116|
|             LADA|  146|
|        SsangYong|   79|
|             Audi| 2468|
|             Seat|  303|
|         Cadillac|   43|
|          Pontiac|   42|
+-----------------+-----+
only showing top 20 rows



                                                                                

In [11]:
# types transmissie
df.groupby('transmission').count().show()
df.select('transmission').distinct().show()

                                                                                

+------------+-----+
|transmission|count|
+------------+-----+
|   automatic|12888|
|  mechanical|25633|
+------------+-----+





+------------+
|transmission|
+------------+
|   automatic|
|  mechanical|
+------------+



                                                                                

In [12]:
# marktaandeel
aantal_autos = df.count()
df_aandeel = df.groupby('engine_type').count()
df_aandeel.withColumn('marktaandeel', f.col('count')/aantal_autos).show()

+-----------+-----+-------------------+
|engine_type|count|       marktaandeel|
+-----------+-----+-------------------+
|   gasoline|25647| 0.6657926845097479|
|     diesel|12874|0.33420731549025207|
+-----------+-----+-------------------+



In [13]:
# maximum prijs per merk
df.groupby('manufacturer_name').max('price_usd').show()

+-----------------+--------------+
|manufacturer_name|max(price_usd)|
+-----------------+--------------+
|       Volkswagen|         43999|
|            Lexus|         48610|
|           Jaguar|         50000|
|            Rover|          9900|
|           Lancia|          9500|
|             Jeep|         43000|
|       Mitsubishi|         31400|
|              ГАЗ|         30000|
|              Kia|         44700|
|             Mini|         39456|
|        Chevrolet|         49900|
|            Volvo|         48200|
|            Lifan|         15750|
|          Hyundai|         45954|
|             LADA|         13800|
|        SsangYong|         15900|
|             Audi|         46750|
|             Seat|         18350|
|         Cadillac|         25750|
|          Pontiac|         10000|
+-----------------+--------------+
only showing top 20 rows



In [14]:
# goedkoopste voertuigen met automatische transmissie
df_cheapest = df.filter(f.col('transmission') == 'automatic').sort(f.col('price_usd').asc()).limit(5)

df_cheapest.show()

+-----------------+----------+------------+------+--------------+-------------+-----------+--------------+-----------+---------------+---------+------------+---------+----------+---------+---------------+---------------+----------------+----------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------------+
|manufacturer_name|model_name|transmission| color|odometer_value|year_produced|engine_fuel|engine_has_gas|engine_type|engine_capacity|body_type|has_warranty|    state|drivetrain|price_usd|is_exchangeable|location_region|number_of_photos|up_counter|feature_0|feature_1|feature_2|feature_3|feature_4|feature_5|feature_6|feature_7|feature_8|feature_9|duration_listed|
+-----------------+----------+------------+------+--------------+-------------+-----------+--------------+-----------+---------------+---------+------------+---------+----------+---------+---------------+---------------+----------------+----------+---------+---------+--

## Load

In deze stap veronderstellen we dat we enkel de 5 goedkoopste auto's willen bewaren.
Schrijf hieronder de benodigde code om de informatie van deze autos op te slaan in een json.

In [15]:
df_cheapest.write.format('json').save('ML/result.json')

Dit is een voorbeeld waarbij de resultaten worden opgeslaan in een bestand.
Andere mogelijkheden zijn om het op te slaan in een SQL-database.
Demo-code om dit te bereiken kan je [hier](https://kontext.tech/column/spark/395/save-dataframe-to-sql-databases-via-jdbc-in-pyspark) bekijken.
Later in dit vak zullen we ook NoSQL-databases bekijken.
Op dat moment zullen we zien hoe we de resultaten kunnen bewaren in dit type database beheersystemen (DBMS).

# Machine Learning

Maak nu een ML-pipeline waarbij geprobeerd wordt om de verkoopsprijs zo goed mogelijk te voorspellen.
Bereken voor de pipelijn op te stellen de aantal unieke waarden in de string kolommen.
Maak nu drie lijsten aan:
* De kolommen met meer dan 100 unieke waarden (gaan we droppen)
* De kolommen met meer dan 10 unieke waarden (gaan we encoderen met ordinal encoding)
* De overige kolommen gaan we met one-hot encoding doen

In [20]:
for c in [item[0] for item in df.dtypes if item[1].startswith('string')]:
    print(c, df.select(c).distinct().count())
    
# drop cols with more than 100 options
cols_to_drop = ['model_name']
# cols with more than 10 options:
cols_ordinal = ['manufacturer_name', 'color', 'body_type']
# cols with less than 10 options
cols_onehot = ['transmission', 'engine_fuel', 'engine_type', 'state', 'drivetrain', 'location_region']

cols_int = [item[0] for item in df.dtypes if item[1].startswith('int')]
cols_int.remove('price_usd')
print('int', cols_int)
cols_bool = [item[0] for item in df.dtypes if item[1].startswith('bool')]
print('bool', cols_bool)

manufacturer_name 55
model_name 1116
transmission 2
color 12
engine_fuel 5
engine_type 2
body_type 12
state 3
drivetrain 3
location_region 6
int ['odometer_value', 'year_produced', 'engine_capacity', 'number_of_photos', 'up_counter', 'duration_listed']
bool ['engine_has_gas', 'has_warranty', 'is_exchangeable', 'feature_0', 'feature_1', 'feature_2', 'feature_3', 'feature_4', 'feature_5', 'feature_6', 'feature_7', 'feature_8', 'feature_9']


Doe dit aan de hand van een pipeline waarbij je minstens de volgende stappen uitvoert:
* Gebruik de vector assembler om alle integer kolommen te groeperen in een kolom met de naam 'intfeatures' (let op dat je niet de prijs kolom mee neemt in de berekening)
* Voeg 3de-graads polynomiale uitbreiding toe van de numerieke kolommen
* Voer normalisatie uit van elke feature in deze kolom zodat het gemiddelde 0 is en en de stddev 1
* Voeg ordinal encoding van de kolommen met tussen de 10 en 100 unieke waarden toe
* Voeg onehot encoding van de kolommen met minder dan 10 unieke waarden toe
* Gebruik opnieuw de vectorassembler om alle kolommen (intfeatures, boolean, onehot en ordinal kolommen) samen te voegen in 1 kolom 'features'
* Train een regressor (zie de api voor de mogelijkheden) en train het met de 'features' kolom als features en price_usd als targets
* Evalueer het model door de root mean squared error en verklaarde variabiliteit te berekenen. Gebruik hiervoor de random_split functie om de dataset te splitsen in train en test data
* Gebruik een cross-validator om op zoek te gaan naar de beste parameters

In [43]:
from pyspark.ml.feature import VectorAssembler, PolynomialExpansion, StandardScaler, StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

# splitsen in train en test (stap 8)
(train,test) = df.randomSplit([0.8, 0.2])

# stap 1
stap1 = VectorAssembler(inputCols=cols_int, outputCol='intfeatures')

# stap 2
stap2 = PolynomialExpansion(degree=3, inputCol='intfeatures', outputCol='exp_int_features')

# stap 3 -> gemiddelde 0 is en stddev = 1
stap3 = StandardScaler(inputCol='exp_int_features', outputCol='scaled_int_features', withMean=True)
# 2 features x en y en graad 3
# features berekenen x, x^2, x^3, y, y^2, y^3, x*y, x^2*y, y^2*x

# stap 4
cols_ordinal_out = [x + '_out' for x in cols_ordinal]
stap4 = StringIndexer(inputCols=cols_ordinal, outputCols=cols_ordinal_out)
# in de fit stap wordt hier berekend welk woord met welk getal overeenkomt -> met de labels attribuut kan je de omgekeerde operatie doen

# stap5
#onehot encoder verwacht iets dat geordinalencoded is
cols_onehot_out1 = [x + '_tmp' for x in cols_onehot]
cols_onehot_out2 = [x + '_out' for x in cols_onehot]
stap5_1 = StringIndexer(inputCols=cols_onehot, outputCols=cols_onehot_out1)
stap5_2 = OneHotEncoder(inputCols=cols_onehot_out1, outputCols=cols_onehot_out2)

# stap 6
cols_features = cols_onehot_out2 + cols_ordinal_out + ['scaled_int_features'] + cols_bool  # lijsten extenden
stap6 = VectorAssembler(inputCols=cols_features, outputCol='features')

# stap7
regressor = RandomForestRegressor(featuresCol='features', labelCol='price_usd', predictionCol='preds', maxBins=60)

pipeline = Pipeline(stages=[stap1, stap2, stap3, stap4, stap5_1, stap5_2, stap6, regressor])

# fit the pipeline
model = pipeline.fit(train)  # in het geval van niet splitsen stond hier en de lijn eronder df ipv train en test
preds = model.transform(test)
#preds.show(1,truncate=False)

evaluator1 = RegressionEvaluator(labelCol='price_usd',predictionCol='preds', metricName='rmse')
evaluator2 = RegressionEvaluator(labelCol='price_usd',predictionCol='preds', metricName='var')

print(evaluator1.evaluate(preds))
print(evaluator2.evaluate(preds))


                                                                                

2398.835009943527




30737373.27843988


                                                                                

In [47]:
# iets oudere manier voor evaluatie te doen (rdd-based)

from pyspark.mllib.evaluation import RegressionMetrics
true_and_pred = preds.select('price_usd', 'preds').rdd.map(lambda x: (float(x[0]), float(x[1]))) 
true_and_pred.collect()
metrics = RegressionMetrics(true_and_pred)

print("RMSE =", metrics.rootMeanSquaredError)
print("Explained variance =",metrics.explainedVariance)  # dit geeft niet exact dezelfde waarde als hierboven



RMSE = 2398.835009943527
Explained variance = 37692558.83460832


                                                                                

In [48]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, TrainValidationSplit

# grid met opties voor parameters
paramGrid = ParamGridBuilder().addGrid(regressor.numTrees, [5, 10]).build()
# aan het grid kan je ook parameters van de andere stappen toevoegen
#paramGrid = ParamGridBuilder().addGrid(regressor.numTrees, [5, 10]).addGrid(stap2.degree,[2,3]).build()

# dit is wat we gaan optimaliseren
evaluator = RegressionEvaluator(labelCol='price_usd',predictionCol='preds', metricName='rmse')

tvs = TrainValidationSplit(estimator=pipeline, evaluator=evaluator, estimatorParamMaps=paramGrid, parallelism=1)

cvModel = tvs.fit(train)

                                                                                

In [None]:
# beste parameters van het beste model
cvModel.bestModel.extractParamMap()

In [None]:
cvModel.bestModel.save("ML/model")