In [30]:
import numpy as np
import pandas as pd
import findspark
from pyspark.sql import SparkSession
from pyspark.ml.stat import Correlation
from pyspark.sql.functions import avg, round
from pyspark.ml.feature import RFormula, VectorAssembler, QuantileDiscretizer
from pyspark.ml.regression import LinearRegression
from pyspark.ml.regression import GeneralizedLinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from imblearn.over_sampling import SMOTE

In [31]:
findspark.init()
spark = SparkSession.builder.appName("multilinear_regression_car_horsepower").getOrCreate()

cars = spark.read.load('../data/raw/mtcars.csv', format='csv', header=True,
                       inferSchema=True, sep=';')
print("Number of instances in this dataset: ", cars.count())
cars.show()

Number of instances in this dataset:  32
+--------------+---------+------------+-------------+------+---------------+--------------------+-----------------+-----+----------+---+
|MilesPerGallon|Cylinders|Displacement|RearAxleRatio|Weight|QuarterMileTime|VShapeOrStraightLine|AutomaticOrManual|Gears|Carburetor| HP|
+--------------+---------+------------+-------------+------+---------------+--------------------+-----------------+-----+----------+---+
|          21.0|        6|       160.0|          3.9|  2.62|          16.46|                   0|                1|    4|         4|110|
|          21.0|        6|       160.0|          3.9| 2.875|          17.02|                   0|                1|    4|         4|110|
|          22.8|        4|       108.0|         3.85|  2.32|          18.61|                   1|                1|    4|         1| 93|
|          21.4|        6|       258.0|         3.08| 3.215|          19.44|                   1|                0|    3|         1|110|


In [32]:
# Assemble variables in a single vector for the Correlation constructor.
assembler = VectorAssembler(inputCols=cars.columns, outputCol='corr_features')
cars_assembled = assembler.transform(cars).select('corr_features')

corr_matrix = Correlation.corr(cars_assembled, 'corr_features')

# Get the list of correlations provided by function corr() and build a
# dataframe using the original column names.
corr_matrix = corr_matrix.collect()[0][corr_matrix.columns[0]].toArray()
corr_matrix = spark.createDataFrame(corr_matrix.tolist(), cars.columns)

# Round every value in the dataframe to better visualize correlations.
corr_matrix.select([round(c, 3).alias(c) for c in corr_matrix.columns]).show()

+--------------+---------+------------+-------------+------+---------------+--------------------+-----------------+------+----------+------+
|MilesPerGallon|Cylinders|Displacement|RearAxleRatio|Weight|QuarterMileTime|VShapeOrStraightLine|AutomaticOrManual| Gears|Carburetor|    HP|
+--------------+---------+------------+-------------+------+---------------+--------------------+-----------------+------+----------+------+
|           1.0|   -0.852|      -0.848|        0.681|-0.868|          0.419|               0.664|              0.6|  0.48|    -0.551|-0.776|
|        -0.852|      1.0|       0.902|         -0.7| 0.782|         -0.591|              -0.811|           -0.523|-0.493|     0.527| 0.832|
|        -0.848|    0.902|         1.0|        -0.71| 0.888|         -0.434|               -0.71|           -0.591|-0.556|     0.395| 0.791|
|         0.681|     -0.7|       -0.71|          1.0|-0.712|          0.091|                0.44|            0.713|   0.7|    -0.091|-0.449|
|        -0.8

In [34]:
r_formula = RFormula(formula="HP ~ MilesPerGallon + Cylinders"
                     + " + VShapeOrStraightLine + AutomaticOrManual + Carburetor")
cars_rf = r_formula.fit(cars).transform(cars)
cars_rf.select('features', 'label').show(10)

cars_train, cars_test = cars_rf.randomSplit([0.8, 0.2], seed=10)
print("Number of training instances: ", cars_train.count())
print("Number of testing instances: ", cars_test.count())

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[21.0,6.0,0.0,1.0...|110.0|
|[21.0,6.0,0.0,1.0...|110.0|
|[22.8,4.0,1.0,1.0...| 93.0|
|[21.4,6.0,1.0,0.0...|110.0|
|[18.7,8.0,0.0,0.0...|175.0|
|[18.1,6.0,1.0,0.0...|105.0|
|[14.3,8.0,0.0,0.0...|245.0|
|[24.4,4.0,1.0,0.0...| 62.0|
|[22.8,4.0,1.0,0.0...| 95.0|
|[19.2,6.0,1.0,0.0...|123.0|
+--------------------+-----+
only showing top 10 rows

Number of training instances:  26
Number of testing instances:  6


In [35]:
regressor = LinearRegression()
model = regressor.fit(cars_train)

pred = model.transform(cars_train)

print("Regression Evaluation Metrics: ")

evaluator = RegressionEvaluator(metricName='rmse')
print("RMSE: ", evaluator.evaluate(pred))

evaluator.setMetricName('r2')
print("R2: ", evaluator.evaluate(pred))

evaluator.setMetricName('mae')
print("MAE: ", evaluator.evaluate(pred))

evaluator.setMetricName('var')
print("Explained variance: ", evaluator.evaluate(pred))

Regression Evaluation Metrics: 
RMSE:  28.07246604692157
R2:  0.8376519545336089
MAE:  22.822333453164763
Explained variance:  4066.096413358052


In [36]:
gaussian_regressor = GeneralizedLinearRegression(family='gaussian')
model = gaussian_regressor.fit(cars_train)
pred = model.transform(cars_train)
print("Gaussian Residual Distribution Regression")
evaluator = RegressionEvaluator(metricName='rmse')
print("RMSE: ", evaluator.evaluate(pred))
evaluator.setMetricName('r2')
print("R2: ", evaluator.evaluate(pred))
evaluator.setMetricName('mae')
print("MAE: ", evaluator.evaluate(pred))
evaluator.setMetricName('var')
print("Explained variance: ", evaluator.evaluate(pred))
print()

poisson_regressor = GeneralizedLinearRegression(family='poisson')
model = poisson_regressor.fit(cars_train)
pred = model.transform(cars_train)
print("Poisson Residual Distribution Regression")
evaluator = RegressionEvaluator(metricName='rmse')
print("RMSE: ", evaluator.evaluate(pred))
evaluator.setMetricName('r2')
print("R2: ", evaluator.evaluate(pred))
evaluator.setMetricName('mae')
print("MAE: ", evaluator.evaluate(pred))
evaluator.setMetricName('var')
print("Explained variance: ", evaluator.evaluate(pred))
print()

gamma_regressor = GeneralizedLinearRegression(family='gamma')
model = gamma_regressor.fit(cars_train)
pred = model.transform(cars_train)
print("Gamma Residual Distribution Regression")
evaluator = RegressionEvaluator(metricName='rmse')
print("RMSE: ", evaluator.evaluate(pred))
evaluator.setMetricName('r2')
print("R2: ", evaluator.evaluate(pred))
evaluator.setMetricName('mae')
print("MAE: ", evaluator.evaluate(pred))
evaluator.setMetricName('var')
print("Explained variance: ", evaluator.evaluate(pred))
print()

poisson_gamma_regressor = GeneralizedLinearRegression(family='tweedie',
                                                      variancePower=1.5)
model = poisson_gamma_regressor.fit(cars_train)
pred = model.transform(cars_train)
print("Compound Poisson-Gamma Distribution Regression")
evaluator = RegressionEvaluator(metricName='rmse')
print("RMSE: ", evaluator.evaluate(pred))
evaluator.setMetricName('r2')
print("R2: ", evaluator.evaluate(pred))
evaluator.setMetricName('mae')
print("MAE: ", evaluator.evaluate(pred))
evaluator.setMetricName('var')
print("Explained variance: ", evaluator.evaluate(pred))

Gaussian Residual Distribution Regression
RMSE:  28.07246604692157
R2:  0.8376519545336089
MAE:  22.822333453164763
Explained variance:  4066.096413358052

Poisson Residual Distribution Regression
RMSE:  20.566064069989732
R2:  0.9128658692844106
MAE:  17.45322802250631
Explained variance:  4219.737202039817

Gamma Residual Distribution Regression
RMSE:  20.65819693769698
R2:  0.9120834250363956
MAE:  17.862012885680983
Explained variance:  4677.629640761557

Compound Poisson-Gamma Distribution Regression
RMSE:  19.345049233657225
R2:  0.9229051065271985
MAE:  16.863811812773893
Explained variance:  4416.580690102612


In [37]:
poisson_gamma_regressor = GeneralizedLinearRegression(family='tweedie',
                                                      variancePower=1.5)
model = poisson_gamma_regressor.fit(cars_train)
pred = model.transform(cars_test)
pred.select('label', 'prediction').show()

print("Compound Poisson-Gamma Distribution Regression")
evaluator = RegressionEvaluator(metricName='rmse')
print("RMSE: ", evaluator.evaluate(pred))
evaluator.setMetricName('r2')
print("R2: ", evaluator.evaluate(pred))
evaluator.setMetricName('mae')
print("MAE: ", evaluator.evaluate(pred))
evaluator.setMetricName('var')
print("Explained variance: ", evaluator.evaluate(pred))

+-----+------------------+
|label|        prediction|
+-----+------------------+
|205.0|242.61496340712614|
|245.0| 206.6175171302184|
|180.0|186.96148784993238|
|180.0|178.79414159005162|
|105.0|116.41304195985697|
|175.0|152.53830129571708|
+-----+------------------+

Compound Poisson-Gamma Distribution Regression
RMSE:  24.40219357085737
R2:  0.6591921488313937
MAE:  19.673255533488064
Explained variance:  1580.0039254428023


In [38]:
r_formula = RFormula(formula="HP ~ Cylinders + Carburetor")
cars_rf = r_formula.fit(cars).transform(cars)
cars_rf.select('features', 'label').show(10)

cars_train, cars_test = cars_rf.randomSplit([0.8, 0.2], seed=10)
print("Number of training instances: ", cars_train.count())
print("Number of testing instances: ", cars_test.count())

+---------+-----+
| features|label|
+---------+-----+
|[6.0,4.0]|110.0|
|[6.0,4.0]|110.0|
|[4.0,1.0]| 93.0|
|[6.0,1.0]|110.0|
|[8.0,2.0]|175.0|
|[6.0,1.0]|105.0|
|[8.0,4.0]|245.0|
|[4.0,2.0]| 62.0|
|[4.0,2.0]| 95.0|
|[6.0,4.0]|123.0|
+---------+-----+
only showing top 10 rows

Number of training instances:  26
Number of testing instances:  6


In [39]:
regressor = LinearRegression()
model = regressor.fit(cars_train)

pred = model.transform(cars_train)

print("Regression Evaluation Metrics: ")

evaluator = RegressionEvaluator(metricName='rmse')
print("RMSE: ", evaluator.evaluate(pred))

evaluator.setMetricName('r2')
print("R2: ", evaluator.evaluate(pred))

evaluator.setMetricName('mae')
print("MAE: ", evaluator.evaluate(pred))

evaluator.setMetricName('var')
print("Explained variance: ", evaluator.evaluate(pred))

Regression Evaluation Metrics: 
RMSE:  30.163508459168124
R2:  0.8125654517095617
MAE:  24.72072872629369
Explained variance:  3944.3225207473242


In [40]:
gaussian_regressor = GeneralizedLinearRegression(family='gaussian')
model = gaussian_regressor.fit(cars_train)
pred = model.transform(cars_train)
print("Gaussian Residual Distribution Regression")
evaluator = RegressionEvaluator(metricName='rmse')
print("RMSE: ", evaluator.evaluate(pred))
evaluator.setMetricName('r2')
print("R2: ", evaluator.evaluate(pred))
evaluator.setMetricName('mae')
print("MAE: ", evaluator.evaluate(pred))
evaluator.setMetricName('var')
print("Explained variance: ", evaluator.evaluate(pred))
print()

poisson_regressor = GeneralizedLinearRegression(family='poisson')
model = poisson_regressor.fit(cars_train)
pred = model.transform(cars_train)
print("Poisson Residual Distribution Regression")
evaluator = RegressionEvaluator(metricName='rmse')
print("RMSE: ", evaluator.evaluate(pred))
evaluator.setMetricName('r2')
print("R2: ", evaluator.evaluate(pred))
evaluator.setMetricName('mae')
print("MAE: ", evaluator.evaluate(pred))
evaluator.setMetricName('var')
print("Explained variance: ", evaluator.evaluate(pred))
print()

gamma_regressor = GeneralizedLinearRegression(family='gamma')
model = gamma_regressor.fit(cars_train)
pred = model.transform(cars_train)
print("Gamma Residual Distribution Regression")
evaluator = RegressionEvaluator(metricName='rmse')
print("RMSE: ", evaluator.evaluate(pred))
evaluator.setMetricName('r2')
print("R2: ", evaluator.evaluate(pred))
evaluator.setMetricName('mae')
print("MAE: ", evaluator.evaluate(pred))
evaluator.setMetricName('var')
print("Explained variance: ", evaluator.evaluate(pred))
print()

poisson_gamma_regressor = GeneralizedLinearRegression(family='tweedie',
                                                      variancePower=1.5)
model = poisson_gamma_regressor.fit(cars_train)
pred = model.transform(cars_train)
print("Compound Poisson-Gamma Distribution Regression")
evaluator = RegressionEvaluator(metricName='rmse')
print("RMSE: ", evaluator.evaluate(pred))
evaluator.setMetricName('r2')
print("R2: ", evaluator.evaluate(pred))
evaluator.setMetricName('mae')
print("MAE: ", evaluator.evaluate(pred))
evaluator.setMetricName('var')
print("Explained variance: ", evaluator.evaluate(pred))

Gaussian Residual Distribution Regression
RMSE:  30.163508459168124
R2:  0.8125654517095617
MAE:  24.72072872629369
Explained variance:  3944.3225207473242

Poisson Residual Distribution Regression
RMSE:  22.663558325502922
R2:  0.8941862441662103
MAE:  19.12408285987216
Explained variance:  4091.3889376320517

Gamma Residual Distribution Regression
RMSE:  22.04969418217305
R2:  0.8998407474756317
MAE:  18.510566322661216
Explained variance:  4631.236177009051

Compound Poisson-Gamma Distribution Regression
RMSE:  20.9652374035977
R2:  0.9094506153853563
MAE:  17.983841076644175
Explained variance:  4314.15706291506


In [41]:
poisson_gamma_regressor = GeneralizedLinearRegression(family='tweedie',
                                                      variancePower=1.5)
model = poisson_gamma_regressor.fit(cars_train)
pred = model.transform(cars_test)
pred.select('label', 'prediction').show()

print("Compound Poisson-Gamma Distribution Regression")
evaluator = RegressionEvaluator(metricName='rmse')
print("RMSE: ", evaluator.evaluate(pred))
evaluator.setMetricName('r2')
print("R2: ", evaluator.evaluate(pred))
evaluator.setMetricName('mae')
print("MAE: ", evaluator.evaluate(pred))
evaluator.setMetricName('var')
print("Explained variance: ", evaluator.evaluate(pred))

+-----+------------------+
|label|        prediction|
+-----+------------------+
|205.0|213.13725502710878|
|245.0|213.13725502710878|
|180.0|191.77920695365597|
|180.0|191.77920695365597|
|105.0|106.62712068379611|
|175.0|173.47777310114245|
+-----+------------------+

Compound Poisson-Gamma Distribution Regression
RMSE:  15.077099444461398
R2:  0.869896957143173
MAE:  11.1179602483276
Explained variance:  1313.8857074599364


In [45]:
bucketizer = QuantileDiscretizer(numBuckets=5, inputCol='HP',
                                 outputCol='Buckets')
cars_bucketized = bucketizer.fit(cars).transform(cars)

num_samples_per_bucket = {0.0: 100, 1.0: 100, 2.0: 100, 3.0: 100, 4.0: 100}

smote_resampler = SMOTE(sampling_strategy=num_samples_per_bucket,
                        random_state=0, k_neighbors=4)
cars_resampled_X, cars_resampled_y = smote_resampler.fit_resample(
    cars_bucketized.drop('Buckets').toPandas().to_numpy(),
    cars_bucketized.select('Buckets').toPandas().to_numpy()
)

cars_resampled = spark.createDataFrame(pd.DataFrame(cars_resampled_X,
                                                    columns=cars.columns))
cars_resampled.sample(fraction=0.1).show()

  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():


+------------------+-----------------+------------------+------------------+------------------+------------------+--------------------+-------------------+------------------+------------------+------------------+
|    MilesPerGallon|        Cylinders|      Displacement|     RearAxleRatio|            Weight|   QuarterMileTime|VShapeOrStraightLine|  AutomaticOrManual|             Gears|        Carburetor|                HP|
+------------------+-----------------+------------------+------------------+------------------+------------------+--------------------+-------------------+------------------+------------------+------------------+
|              16.4|              8.0|             275.8|              3.07|              4.07|              17.4|                 0.0|                0.0|               3.0|               3.0|             180.0|
|              17.3|              8.0|             275.8|              3.07|              3.73|              17.6|                 0.0|             

In [46]:
r_formula = RFormula(formula="HP ~ MilesPerGallon + Cylinders"
                     + " + VShapeOrStraightLine + AutomaticOrManual + Carburetor")
cars_resampled_rf = r_formula.fit(cars_resampled).transform(cars_resampled)

cars_train, cars_test = cars_resampled_rf.randomSplit([0.8, 0.2], seed=10)
print("Number of training instances: ", cars_train.count())
print("Number of testing instances: ", cars_test.count())

Number of training instances:  399
Number of testing instances:  101


In [47]:
poisson_gamma_regressor = GeneralizedLinearRegression(family='tweedie',
                                                      variancePower=1.5)
model = poisson_gamma_regressor.fit(cars_train)
pred = model.transform(cars_test)

print("Compound Poisson-Gamma Distribution Regression")
evaluator = RegressionEvaluator(metricName='rmse')
print("RMSE: ", evaluator.evaluate(pred))
evaluator.setMetricName('r2')
print("R2: ", evaluator.evaluate(pred))
evaluator.setMetricName('mae')
print("MAE: ", evaluator.evaluate(pred))
evaluator.setMetricName('var')
print("Explained variance: ", evaluator.evaluate(pred))

Compound Poisson-Gamma Distribution Regression
RMSE:  16.41452715340321
R2:  0.9255694744106916
MAE:  12.87291532593791
Explained variance:  3523.6364180829496
