# Machine Learning

* ``spark.mllib``, ``spark.ml``

* Input:
    * 1 column of arrays. Each array contains all features from each sample
    * 1 column with the target y.

* Entities
    * Transformer
    * Estimator
    * Pipeline
    * Parameters

* Syntax
    * R-like

## Regression

In [3]:
from pyspark.ml.regression import LinearRegression, RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import VectorAssembler

In [4]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("ExampleML") \
    .getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/11/06 07:49:36 WARN Utils: Your hostname, WIN-NJTBBD8GS0T, resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/11/06 07:49:36 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/06 07:49:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
cars_tmp = spark.read.csv(
    "/home/ubuntu/obenkyo/raw_data/spark_course_udemy/Carros.csv",
    inferSchema=True,
    header=True,
    sep=';'
)

                                                                                

In [None]:
vect_features = VectorAssembler(inputCols=[("Consumo"), ("Cilindros"), ("Cilindradas")], outputCol="features")

cars = vect_features.transform(cars_tmp.select(["Consumo", "Cilindros", "Cilindradas", "HP"]))

cars_train, cars_test = cars.randomSplit([0.7, 0.3])

In [None]:
linreg = LinearRegression(featuresCol="features", labelCol="HP")
model = linreg.fit(cars_train)
pred = model.transform(cars_test)

25/11/06 07:59:27 WARN Instrumentation: [045e274b] regParam is zero, which might cause numerical instability and overfitting.
25/11/06 07:59:28 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
25/11/06 07:59:28 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
                                                                                

In [25]:
pred.show()

+-------+---------+-----------+---+------------------+------------------+
|Consumo|Cilindros|Cilindradas| HP|          features|        prediction|
+-------+---------+-----------+---+------------------+------------------+
|     21|        6|        160|110|  [21.0,6.0,160.0]|176.47484408798923|
|    143|        8|        360|245| [143.0,8.0,360.0]|214.53107046754377|
|    147|        8|        440|230| [147.0,8.0,440.0]|213.00371317767423|
|    152|        8|        304|150| [152.0,8.0,304.0]|213.87791462447709|
|    164|        8|       2758|180|[164.0,8.0,2758.0]|183.18379486107372|
|    173|        8|       2758|180|[173.0,8.0,2758.0]|181.87017168193992|
|    181|        6|        225|105| [181.0,6.0,225.0]|152.35492925338994|
|    187|        8|        360|175| [187.0,8.0,360.0]|208.10891270288968|
|    197|        6|        145|175| [197.0,6.0,145.0]|150.96312392296215|
|    228|        4|        108| 93| [228.0,4.0,108.0]| 88.65287178779501|
|    273|        4|         79| 66|  [

In [28]:
eval_reg = RegressionEvaluator(predictionCol="prediction", labelCol="HP", metricName="rmse")
eval_reg.evaluate(pred)

35.592841438105346

In [30]:
rf_model = RandomForestRegressor(featuresCol="features", labelCol="HP").fit(cars_train)
rf_pred = rf_model.transform(cars_test)

eval_reg.evaluate(rf_pred)

25/11/06 08:04:35 WARN DecisionTreeMetadata: DecisionTree reducing maxBins from 32 to 21 (= number of training instances)


32.00073904460288

## Classification (using RFormula)

In [33]:
from pyspark.ml.feature import RFormula
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [50]:
churn = spark.read.csv("/home/ubuntu/obenkyo/raw_data/spark_course_udemy/Churn.csv", sep=';', header=True, inferSchema=True)

In [53]:
formula = RFormula(
    formula="Exited ~ .",
    featuresCol="features",
    labelCol="label",
    handleInvalid="skip"
)

churn_trans = formula.fit(churn).transform(churn)

In [54]:
churn_trans.show(truncate=False)

+-----------+---------+------+---+------+--------+-------------+---------+--------------+---------------+------+----------------------------------------------------------------+-----+
|CreditScore|Geography|Gender|Age|Tenure|Balance |NumOfProducts|HasCrCard|IsActiveMember|EstimatedSalary|Exited|features                                                        |label|
+-----------+---------+------+---+------+--------+-------------+---------+--------------+---------------+------+----------------------------------------------------------------+-----+
|619        |France   |Female|42 |2     |0       |1            |1        |1             |10134888       |1     |[619.0,1.0,0.0,0.0,42.0,2.0,0.0,1.0,1.0,1.0,1.0134888E7]        |1.0  |
|608        |Spain    |Female|41 |1     |8380786 |1            |0        |1             |11254258       |0     |[608.0,0.0,0.0,0.0,41.0,1.0,8380786.0,1.0,0.0,1.0,1.1254258E7]  |0.0  |
|502        |France   |Female|42 |8     |1596608 |3            |1        |0     

In [55]:
churn_train, churn_test = churn_trans.randomSplit([0.7, 0.3])

In [57]:
print(churn_train.count(), churn_test.count())

[Stage 204:>                                                        (0 + 1) / 1]

6988 3012


                                                                                

In [58]:
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features").fit(churn_train)
pred = dt.transform(churn_test)

25/11/06 08:18:42 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

In [61]:
pred.select("features", "label", "rawPrediction", "probability", "prediction").show(truncate=False)

[Stage 226:>                                                        (0 + 1) / 1]

+----------------------------------------------------------------+-----+--------------+----------------------------------------+----------+
|features                                                        |label|rawPrediction |probability                             |prediction|
+----------------------------------------------------------------+-----+--------------+----------------------------------------+----------+
|[350.0,1.0,0.0,0.0,40.0,0.0,1.1109885E7,1.0,1.0,1.0,1.7232121E7]|1.0  |[4356.0,489.0]|[0.8990712074303405,0.10092879256965945]|0.0       |
|[350.0,0.0,0.0,1.0,54.0,1.0,1.5267748E7,1.0,1.0,1.0,1.9197349E7]|1.0  |[328.0,120.0] |[0.7321428571428571,0.26785714285714285]|0.0       |
|[358.0,0.0,0.0,0.0,52.0,8.0,1.4354236E7,3.0,1.0,0.0,1.4195911E7]|1.0  |[41.0,233.0]  |[0.14963503649635038,0.8503649635036497]|1.0       |
|[359.0,1.0,0.0,0.0,44.0,6.0,1.2874769E7,1.0,1.0,0.0,1.4695571E7]|1.0  |[270.0,148.0] |[0.645933014354067,0.35406698564593303] |0.0       |
|[363.0,0.0,0.0,0.0,

                                                                                

In [62]:
eval_cls = BinaryClassificationEvaluator(
    rawPredictionCol="prediction",
    labelCol="label",
    metricName="areaUnderROC"
)

print(eval_cls.evaluate(pred))

                                                                                

0.6693669565566289


## Pipelines

In [67]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

cars = spark.read.csv("/home/ubuntu/obenkyo/raw_data/spark_course_udemy/Carros.csv", inferSchema=True, header=True, sep=';').select("Consumo", "Cilindros", "Cilindradas", "HP")

cars_features = VectorAssembler(
    inputCols=[("Consumo"), ("Cilindros"), ("Cilindradas")], outputCol="features"
).transform(cars)

cars_train, cars_test = cars_features.randomSplit([0.7, 0.3])

linreg = LinearRegression(featuresCol="features", labelCol="HP").fit(cars_train)
pred = linreg.transform(cars_test)

25/11/06 08:29:03 WARN Instrumentation: [d3d5bbe3] regParam is zero, which might cause numerical instability and overfitting.


In [68]:
from pyspark.ml import Pipeline

In [74]:
pipeline = Pipeline(stages=[
    VectorAssembler(
        inputCols=[("Consumo"), ("Cilindros"), ("Cilindradas")], outputCol="features"
    ),
    LinearRegression(featuresCol="features", labelCol="HP")
])

pipelineModel = pipeline.fit(cars)
pred_pipe = pipelineModel.transform(cars)

25/11/06 08:32:28 WARN Instrumentation: [47fde7ad] regParam is zero, which might cause numerical instability and overfitting.


In [75]:
pred_pipe.show()

+-------+---------+-----------+---+------------------+------------------+
|Consumo|Cilindros|Cilindradas| HP|          features|        prediction|
+-------+---------+-----------+---+------------------+------------------+
|     21|        6|        160|110|  [21.0,6.0,160.0]|162.32154816816646|
|     21|        6|        160|110|  [21.0,6.0,160.0]|162.32154816816646|
|    228|        4|        108| 93| [228.0,4.0,108.0]| 82.51715587712931|
|    214|        6|        258|110| [214.0,6.0,258.0]|141.86680518718754|
|    187|        8|        360|175| [187.0,8.0,360.0]|202.93528239714834|
|    181|        6|        225|105| [181.0,6.0,225.0]| 145.4980634611832|
|    143|        8|        360|245| [143.0,8.0,360.0]|   207.41448530972|
|    244|        4|       1467| 62|[244.0,4.0,1467.0]| 69.69282676584851|
|    228|        4|       1408| 95|[228.0,4.0,1408.0]| 71.80767356085781|
|    192|        6|       1676|123|[192.0,6.0,1676.0]|132.42483285541724|
|    178|        6|       1676|123|[17