# APACHE SPARK

### Dzień 2

#### Spark SQL + Spark ML

<br><br>
**ML**
- Transformer - algorytm przekształcający wejściowy DF w inny DF, np. wytrenowany model ML tworzący nowy DF zawierający predykcje (transform)
- Estymator - algorytm który na podstawie DF tworzy transformer (fit)
- Pipeline - szeregowe połączenie transformerów i estymatorów w celu utworzenia przepływu (workflow)

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as f

In [None]:
spark = SparkSession.builder.appName('my_app').master("local[*]").getOrCreate()

### Wektory

In [None]:
from pyspark.ml.linalg import Vectors
from pyspark.ml import feature

**Dwa typy wektorów:**
- sparse - większość wartości to zera więc w celu optymalizacji zajmowanej pamięci podawane są tylko indeksy (wraz z wartościami) gdzie wartość != 0
- dense - podane są wszystkie wartości

In [None]:
data = [(Vectors.sparse(4, [(0, 1.0), (3, -2.0)]),"A",1),
        (Vectors.dense([4.0, 5.0, 0.0, 3.0]),"B",6),
        (Vectors.dense([6.0, 7.0, 0.0, 8.0]),"A",3),
        (Vectors.sparse(4, [(0, 9.0), (3, 1.0)]),"B",2), 
        (Vectors.sparse(4, [(1, 1.0), (2, 2.0)]),"C",4)]
dummy_df = spark.createDataFrame(data, ["features", "categ", "num"])
dummy_df.show()

In [None]:
dummy_df.printSchema()

**Przechodzenie od kolumny kategorycznej do wektora**

In [None]:
indexer = feature.StringIndexer(inputCol="categ", outputCol="categIndex")

In [None]:
IDXmodel = indexer.fit(dummy_df)

In [None]:
dummy_df1 = IDXmodel.transform(dummy_df)
dummy_df1.show()

In [None]:
# OneHotEncoderEstimator - spark 2.x
OHencoder = feature.OneHotEncoder(inputCols=["categIndex"], outputCols=["categVect"])

In [None]:
OHmodel = OHencoder.fit(dummy_df1)

In [None]:
dummy_df2 = OHmodel.transform(dummy_df1)
dummy_df2.show()

**Łączenie zmiennych w wektory**

In [None]:
vectAssembler = feature.VectorAssembler(inputCols = ["features", "categVect", "num"], outputCol = "featuresFull")
dummy_df3 = vectAssembler.transform(dummy_df2)
dummy_df3.show(truncate=False)

**Alternatywne podejście**

In [None]:
rf = feature.RFormula(formula="~ features + categ + num", featuresCol='featuresFull')

In [None]:
rfModel = rf.fit(dummy_df)

In [None]:
rfModel.transform(dummy_df).show(truncate=False)

**Skalowanie zmiennych**

In [None]:
scaler = feature.StandardScaler(inputCol="featuresFull", outputCol="featuresScal")

In [None]:
scalerModel = scaler.fit(dummy_df3)

In [None]:
dummy_df3.select("featuresFull").show(truncate=False)
scalerModel.transform(dummy_df3).select("featuresScal").show(truncate=False)

Skalowanie min-max *(w wyniku transformacji powstaje DenseVector)*

In [None]:
MMscaler = feature.MinMaxScaler(inputCol="featuresFull", outputCol="featuresScal")

In [None]:
MMscalerModel = MMscaler.fit(dummy_df3)

In [None]:
dummy_df3.select("featuresFull").show(truncate=False)
MMscalerModel.transform(dummy_df3).select("featuresScal").show(truncate=False)

**PCA**

In [None]:
pca = feature.PCA(k=3, inputCol="featuresFull", outputCol="featuresPCA")

In [None]:
PCAmodel = pca.fit(dummy_df3)

In [None]:
PCAmodel.transform(dummy_df3).select("featuresFull", "featuresPCA").show(truncate=False)

In [None]:
PCAmodel.explainedVariance

### Pipeline

In [None]:
from pyspark.ml import Pipeline

In [None]:
# przygotowanie estymatorów/transformerów
indexer = feature.StringIndexer(inputCol="categ", outputCol="categIndex")
OHencoder = feature.OneHotEncoder(inputCols=["categIndex"], outputCols=["categVect"])
vectAssembler = feature.VectorAssembler(inputCols = ["features", "num", "categVect"], outputCol = "featuresFull")
scaler = feature.StandardScaler(inputCol="featuresFull", outputCol="featuresScal")

In [None]:
# utworzenie estymatora
pipeline = Pipeline(stages=[indexer, OHencoder, vectAssembler, scaler])

In [None]:
# utworzenie transformera
pipelineModel = pipeline.fit(dummy_df)

In [None]:
# transformacja
pipelineModel.transform(dummy_df).select("featuresScal").show(truncate=False)

In [None]:
pipelineModel.save("pipe")

In [None]:
from pyspark.ml import PipelineModel

In [None]:
pip = PipelineModel.load("pipe")

In [None]:
pip.transform(dummy_df).select("featuresScal").show(truncate=False)

> **ZADANIE:**
- przygotuj poniższe dane
- usuń wiersze zawierające braki danych
- stwórz kolumnę zawierającą miesiąc wyciągnięty z kolumny `start_time`
- stwórz kolumnę zawierającą informację o godzinie o której wystąpiło wyporzyczenie
- stwórz kolumnę zawierającą informacje o przedziale wiekowym wyporzyczającego (przedziały: <20, 20-40, 40-60, 60<)
- zaokrąglij do jednego miejsca po przecinku wartości w kolumnach `start_station_longitude` oraz `start_station_latitude`
- usuń kolumny: `start_time`, `end_time`, `start_station_name`, `start_station_id`, `end_station_id`, `end_station_name`, `end_station_latitude`, `end_station_longitude`, `member_birth_year`, `bike_id`
- zmień nazwę kolumny `duration_sec` na `label`
- z pozostałych zmiennych stwórz kolumnę `features` zawierającą wektory
- wynikowemu DataFrameowi nadaj nazwę `goBike_processed`

In [None]:
goBike = spark.read.csv("./2017-fordgobike-tripdata.csv", header=True, inferSchema=True)

In [None]:
goBike.printSchema()

### Klasyfikacja

In [None]:
from pyspark.ml import classification

#### Dane

https://archive.ics.uci.edu/ml/datasets/adult

In [None]:
col_names = ["age", "workclass", "fnlwgt", "education", "education-num","marital-status", "occupation", 
             "relationship", "race", "sex", "capital-gain", "capital-loss", "hours-per-week", 
             "native-country", "earnings"]

In [None]:
df = spark.read.csv("./adult.data", header=False, inferSchema=True, ignoreLeadingWhiteSpace=True)

In [None]:
#df.toDF(*col_names).show()
df = df.select(*[f.col(old).alias(new) for old, new in zip(df.columns, col_names)]).drop("fnlwgt").dropna("any")

In [None]:
df.show(3, vertical=True)

> **ZADANIE:**
- przygotuj dane
- podziel `df` na zbiór treningowy i ewaluacyjny
- na podstawie kolumny `earnings` stwórz zmienną celu `label` z wartościami zakodowanymi jako 0 i 1
- stwórz (przeskalowaną) kolumnę `features` zawierającą wektory powstałe na podstawie pozostałych kolumn
- wynikowe DFy nazwij `df_train` i `df_eval`

**Ostatnie przygotowania**

In [None]:
df_train = df_train.select("label", "features")
df_eval = df_eval.select("label", "features")

In [None]:
df_train.cache()
df_eval.cache()

In [None]:
print("Train:")
df_train.groupBy("label").count().show()
print("Eval:")
df_eval.groupBy("label").count().show()

#### Regresja logistyczna

In [None]:
lr = classification.LogisticRegression(maxIter=100)

In [None]:
lrModel = lr.fit(df_train)

In [None]:
lrModel.coefficients

In [None]:
lrModel.intercept

In [None]:
trainingSummary = lrModel.summary
type(trainingSummary)

In [None]:
trainingSummary.roc.show(120)

In [None]:
trainingSummary.roc.toPandas()

In [None]:
trainingSummary.pr.show(120)

In [None]:
trainingSummary.areaUnderROC

In [None]:
trainingSummary.accuracy

In [None]:
trainingSummary.predictions.show()

In [None]:
# predykcje
lrModel.transform(df_eval).show()

#### SVM

In [None]:
svm = classification.LinearSVC(maxIter=100)

In [None]:
svmModel = svm.fit(df_train)

In [None]:
svmModel.coefficients

In [None]:
svmModel.intercept

In [None]:
# predykcje
svmModel.transform(df_eval).show()

#### Drzewo decyzyjne

In [None]:
tree = classification.DecisionTreeClassifier()

In [None]:
treeModel = tree.fit(df_train)

In [None]:
treeModel.depth

In [None]:
treeModel.numNodes

In [None]:
print(treeModel.toDebugString)

In [None]:
# predykcje
treeModel.transform(df_eval).show()

#### Las losowy

In [None]:
forest = classification.RandomForestClassifier()

In [None]:
forestModel = forest.fit(df_train)

In [None]:
forestModel.featureImportances

In [None]:
print(forestModel.toDebugString)

In [None]:
# predykcje
forestModel.transform(df_eval).show()

#### Gradient-Boosted Trees

In [None]:
gbt = classification.GBTClassifier()

In [None]:
gbtModel = gbt.fit(df_train)

In [None]:
gbtModel.featureImportances

In [None]:
print(gbtModel.toDebugString)

In [None]:
# predykcje
gbtModel.transform(df_eval).show()

#### Naiwny Bayes

In [None]:
bayes = classification.NaiveBayes()

In [None]:
bayesModel = bayes.fit(df_train)

In [None]:
# predykcje
bayesModel.transform(df_eval).show()

#### MLP

In [None]:
mlp = classification.MultilayerPerceptronClassifier(maxIter=100, layers=[99,40,2])

In [None]:
mlpModel = mlp.fit(df_train)

In [None]:
mlpModel.weights

In [None]:
# predykcje
mlpModel.transform(df_eval).show()

#### Ewaluacja

In [None]:
from pyspark.ml import evaluation

In [None]:
evaluator = evaluation.BinaryClassificationEvaluator()

In [None]:
# AUC - regresja
evaluator.evaluate(lrModel.transform(df_eval))

In [None]:
# AUC - SVM
evaluator.evaluate(svmModel.transform(df_eval))

In [None]:
# AUC - drzewo decyzyjne
evaluator.evaluate(treeModel.transform(df_eval))

In [None]:
# AUC - las losowy
evaluator.evaluate(forestModel.transform(df_eval))

In [None]:
# AUC - gbt
evaluator.evaluate(gbtModel.transform(df_eval))

In [None]:
# AUC - NB
evaluator.evaluate(bayesModel.transform(df_eval))

In [None]:
# AUC - MLP
evaluator.evaluate(mlpModel.transform(df_eval))

> **ZADANIE:**
- napisz funkcję do obliczania `accuracy`
- oblicz `accuracy` powyższych modeli

In [None]:
def calculate_acc(df, label="label", prediction="prediction"):
    pass

> **ZADANIE:**
- popraw `accuracy` dwóch modeli

### Regresja

In [None]:
from pyspark.ml import regression

https://archive.ics.uci.edu/ml/datasets/wine+quality

In [None]:
wine_red = spark.read.csv("./winequality-red.csv", header=True, inferSchema=True, sep=";") \
.withColumn("type", f.lit(0))
wine_white = spark.read.csv("./winequality-white.csv", header=True, inferSchema=True, sep=";") \
.withColumn("type", f.lit(1))

In [None]:
wine = wine_red.union(wine_white)

In [None]:
cols = [col.replace(" ", "_") for col in wine.columns]
cols

In [None]:
wine = wine.toDF(*cols)

In [None]:
wine.printSchema()

> **ZADANIE:**
- przygotuj dane
- podziel `wine` na zbiór treningowy i ewaluacyjny
- usuń wiersze zawierające braki danych
- zmień nazwę kolumny `quality` na `label`
- z pozostałych zmiennych stwórz (przeskalowaną) kolumnę `features` zawierającą wektory
- wynikowym DFom nadaj nazwę `wine_train` i `wine_eval`

**Ostatnie przygotowania**

In [None]:
wine_train = wine_train.select("label", "features")
wine_eval = wine_eval.select("label", "features")

In [None]:
wine_train.cache()
wine_eval.cache()

In [None]:
print("Train:")
wine_train.describe("label").show()
print("Eval:")
wine_eval.describe("label").show()

#### Regresja liniowa

In [None]:
reg = regression.LinearRegression(maxIter=500)

In [None]:
regModel = reg.fit(wine_train)

In [None]:
regModel.coefficients

In [None]:
regModel.intercept

In [None]:
trainSummary = regModel.summary
type(trainSummary)

In [None]:
trainSummary.meanAbsoluteError

In [None]:
trainSummary.meanSquaredError

In [None]:
trainSummary.r2

In [None]:
# predykcje
regModel.transform(wine_eval).show()

#### Drzewo regresyjne

In [None]:
tree_reg = regression.DecisionTreeRegressor()

In [None]:
tree_regModel = tree_reg.fit(wine_train)

In [None]:
print(tree_regModel.toDebugString)

In [None]:
# predykcje
tree_regModel.transform(wine_eval).show()

#### Las regresyjny

In [None]:
forest_reg = regression.RandomForestRegressor()

In [None]:
forest_regModel = forest_reg.fit(wine_train)

In [None]:
forest_regModel.featureImportances

In [None]:
print(forest_regModel.toDebugString)

In [None]:
# predykcje
forest_regModel.transform(wine_eval).show()

#### Gradient-Boosted Trees regression

In [None]:
gbt_reg = regression.GBTRegressor()

In [None]:
gbt_regModel = gbt_reg.fit(wine_train)

In [None]:
gbt_regModel.featureImportances

In [None]:
print(gbt_regModel.toDebugString)

In [None]:
# predykcje
gbt_regModel.transform(wine_eval).show()

#### Ewaluacja

In [None]:
evaluator_reg = evaluation.RegressionEvaluator()

In [None]:
# rmse - regresja
evaluator_reg.evaluate(regModel.transform(wine_eval))

In [None]:
# rmse - drzewo
evaluator_reg.evaluate(tree_regModel.transform(wine_eval))

In [None]:
# rmse - las
evaluator_reg.evaluate(forest_regModel.transform(wine_eval))

In [None]:
# rmse - gbt
evaluator_reg.evaluate(gbt_regModel.transform(wine_eval))

> **ZADANIE:**
- oblicz `MSE` oraz `R^2` powyższych modeli

> **ZADANIE:**
- popraw `R^2` jednego modelu

### Wybór najlepszych parametrów

In [None]:
from pyspark.ml import tuning

In [None]:
reg2 = regression.LinearRegression()

In [None]:
grid = tuning.ParamGridBuilder() \
.addGrid(reg2.maxIter, [100, 500, 1000]) \
.addGrid(reg2.regParam, [0.0, 0.1, 0.2]).build()

In [None]:
reg_eval = evaluation.RegressionEvaluator(metricName='r2')

In [None]:
cv = tuning.CrossValidator(estimator=reg2, estimatorParamMaps=grid, evaluator=reg_eval, parallelism=2)

In [None]:
cvModel = cv.fit(wine_train)

In [None]:
cvModel.avgMetrics

Parametry najlepszego modelu

In [None]:
cvModel.bestModel._java_obj.getMaxIter()

In [None]:
cvModel.bestModel._java_obj.getRegParam()

In [None]:
reg_eval.evaluate(cvModel.transform(wine_eval))

zapisanie i wczytanie modelu

In [None]:
cvModel.save("model")

In [None]:
readInModel = tuning.CrossValidatorModel.load("model")

In [None]:
reg_eval.evaluate(readInModel.transform(wine_eval))

### Pipeline + wybór najlepszych parametrów

In [None]:
cols = [x for x in wine_t.columns if x != "quality"]
vectA = feature.VectorAssembler()#inputCols = cols, outputCol = "featuresRaw")
scal = feature.StandardScaler()#inputCol="featuresRaw", outputCol="features")
forestReg = regression.RandomForestRegressor()
pipe = Pipeline(stages=[vectA, scal, forestReg])

In [None]:
# baseOn - sposob podawania stalego parametru do wszystkich kombinacji
paramGrid = tuning.ParamGridBuilder() \
.baseOn([vectA.inputCols, cols]) \
.baseOn([vectA.outputCol, 'featuresRaw']) \
.baseOn([scal.inputCol, 'featuresRaw']) \
.baseOn([scal.outputCol, 'features']) \
.addGrid(scal.withMean, [False, True]) \
.baseOn([forestReg.labelCol, 'quality']) \
.addGrid(forestReg.maxDepth, [5, 6, 7, 8]) \
.build()

In [None]:
regr_eval = evaluation.RegressionEvaluator(labelCol= "quality", metricName='r2')

In [None]:
# utworzenie estymatora
crossval = tuning.CrossValidator(estimator=pipe,
                                 estimatorParamMaps=paramGrid,
                                 evaluator=regr_eval,
                                 numFolds=4, 
                                 parallelism=2)

In [None]:
# utworzenie transformera
crossvalModel = crossval.fit(wine_t)

In [None]:
crossvalModel.avgMetrics

In [None]:
crossvalModel.bestModel.stages[2]._java_obj.getMaxDepth()

In [None]:
crossvalModel.bestModel.stages[1]._java_obj.getWithMean()

In [None]:
# transformacja (predykcja)
crossvalModel.transform(wine_e).drop(*cols).show()

In [None]:
regr_eval.evaluate(crossvalModel.transform(wine_e))

> **ZADANIE:**
- stwórz model jak najlepiej przewidujący liczbę pierścieni (wiek) mięczaków
- do problemu można podejść jak do regresji lub jak do klasyfikacji
- wszystkie chwyty dozwolone

http://archive.ics.uci.edu/ml/datasets/Abalone

In [None]:
colNames = ["Sex", "Length", "Diameter", "Height", "Whole_weight", "Shucked_weight", 
            "Viscera_weight", "Shell_weight", "Rings"]

In [None]:
abalone = spark.read.csv("./abalone.data", header=False, inferSchema=True)

In [None]:
abalone = abalone.select(*[f.col(old).alias(new) for old, new in zip(abalone.columns, colNames)])

In [None]:
abalone.show()