# Технологии обработки больших данных

## Занятие 6. Машинное обучение в PySpark. Регрессия 

1. Поставновка задачи регрессии и отличие от классификации
2. Линейная регрессия в pyspark
3. Метрики качества регрессии
4. Домашнее задание 
  
**Рекомендованная литература:** 
- Learning Spark 2 edition
- [Spark Regression](https://spark.apache.org/docs/latest/ml-classification-regression.html#regression)
- [Open Data Science Mlcourse ч.4](https://habr.com/ru/company/ods/blog/323890/)
- [Метрики качества регрессии](https://neerc.ifmo.ru/wiki/index.php?title=%D0%9E%D1%86%D0%B5%D0%BD%D0%BA%D0%B0_%D0%BA%D0%B0%D1%87%D0%B5%D1%81%D1%82%D0%B2%D0%B0_%D0%B2_%D0%B7%D0%B0%D0%B4%D0%B0%D1%87%D0%B0%D1%85_%D0%BA%D0%BB%D0%B0%D1%81%D1%81%D0%B8%D1%84%D0%B8%D0%BA%D0%B0%D1%86%D0%B8%D0%B8_%D0%B8_%D1%80%D0%B5%D0%B3%D1%80%D0%B5%D1%81%D1%81%D0%B8%D0%B8#.D0.9E.D1.86.D0.B5.D0.BD.D0.BA.D0.B8_.D0.BA.D0.B0.D1.87.D0.B5.D1.81.D1.82.D0.B2.D0.B0_.D1.80.D0.B5.D0.B3.D1.80.D0.B5.D1.81.D1.81.D0.B8.D0.B8)

## Установка pyspark в изолированой среде venv / conda env !

! pip install pyspark

In [None]:
! pip install pyspark
import pyspark

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 33 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 54.6 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=4b5aa075280d4babf80b0c55002ca6709980dd562d3c01b58e37b9dd2013f455
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


## 1. Поставновка задачи регрессии и отличие от классификации

![Classification](https://github.com/balezz/learning_spark/blob/master/img/classification.png?raw=1)

![sigmoid](https://github.com/balezz/learning_spark/blob/master/img/sigmoid.jpg?raw=1)

![Regression](https://github.com/balezz/learning_spark/blob/master/img/regression.jpg?raw=1)

In [None]:
data_path = 'sample_data/california_housing_test.csv'

In [None]:
df = spark.read.format('csv').\
        options(header='true', inferschema='true').load(data_path,header=True)

df.show(200)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -122.05|   37.37|              27.0|     3885.0|         661.0|    1537.0|     606.0|       6.6085|          344700.0|
|   -118.3|   34.26|              43.0|     1510.0|         310.0|     809.0|     277.0|        3.599|          176500.0|
|  -117.81|   33.78|              27.0|     3589.0|         507.0|    1484.0|     495.0|       5.7934|          270500.0|
|  -118.36|   33.82|              28.0|       67.0|          15.0|      49.0|      11.0|       6.1359|          330000.0|
|  -119.67|   36.33|              19.0|     1241.0|         244.0|     850.0|     237.0|       2.9375|           81700.0|
|  -119.56|   36.51|    

In [None]:
df.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)



## 2. Линейная регрессия в pyspark

In [None]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

In [None]:
feature_vec = VectorAssembler(inputCols=['housing_median_age'], outputCol='Features')
vec_df = feature_vec.transform(df)

lr = LinearRegression(featuresCol='Features', labelCol='median_house_value', predictionCol='predict')

# Fit the model
lrModel = lr.fit(vec_df)

In [None]:
# Print the coefficients and intercept for linear regression
print(f"median_house_value = {int(lrModel.coefficients.values)} * housing_median_age + {int(lrModel.intercept)}")

median_house_value = 823 * housing_median_age + 182090


## 3. Метрики качества регрессии

$MSE = \dfrac{1}{n}\sum \limits_{i=1}^{n}(model(x_i) - y_i)^2$

MSE применяется в ситуациях, когда нам надо подчеркнуть большие ошибки и выбрать модель, которая дает меньше больших ошибок прогноза. Грубые ошибки становятся заметнее за счет того, что ошибку прогноза мы возводим в квадрат. И модель, которая дает нам меньшее значение среднеквадратической ошибки, можно сказать, что что у этой модели меньше грубых ошибок.

$RMSE = \sqrt{\dfrac{1}{n}\sum \limits_{i=1}^{n}(model(x_i) - y_i)^2}$

RMSE получается путем извлечения корня из MSE, в результате размерность ошибки в тех же величинах что и целевая переменная.

$MAE = \dfrac{1}{n}\sum \limits_{i=1}^{n}|model(x_i) - y_i|$

Среднеквадратичный функционал сильнее штрафует за большие отклонения по сравнению со среднеабсолютным, и поэтому более чувствителен к выбросам. При использовании любого из этих двух функционалов может быть полезно проанализировать, какие объекты вносят наибольший вклад в общую ошибку — не исключено, что на этих объектах была допущена ошибка при вычислении признаков или целевой величины.

Среднеквадратичная ошибка подходит для сравнения двух моделей или для контроля качества во время обучения, но не позволяет сделать выводов о том, на сколько хорошо данная модель решает задачу. Например, MSE = 10 является очень плохим показателем, если целевая переменная принимает значения от 0 до 1, и очень хорошим, если целевая переменная лежит в интервале (10000, 100000). В таких ситуациях вместо среднеквадратичной ошибки полезно использовать коэффициент детерминации — R2

$R^2 = 1 - \dfrac{\sum \limits_{i=1}^{n}(model(x_i) - y_i)^2}{\sum \limits_{i=1}^{n}(y_i - \overline{y})^2}$

Коэффициент детерминации измеряет долю дисперсии, объясненную моделью, в общей дисперсии целевой переменной. Фактически, данная мера качества — это нормированная среднеквадратичная ошибка. Если она близка к единице, то модель хорошо объясняет данные, если же она близка к нулю, то прогнозы сопоставимы по качеству с константным предсказанием.

In [None]:
trainingSummary = lrModel.summary
trainingSummary.residuals.show()
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)



+-------------------+
|          residuals|
+-------------------+
| 140373.47606918076|
| -41003.55632255983|
|  66173.47606918076|
| 124849.91154469695|
|-116038.00773494894|
|-145562.16917565712|
|-150503.55632255983|
|-30838.007734948944|
| -43.74963701382512|
| -43420.78202875439|
| -94150.68537152742|
|-154262.16917565712|
| 40861.395348826656|
| 35932.685838502395|
|  18173.47606918076|
| -63579.99179807605|
|-13143.749637013825|
| -66002.95940633546|
|  96497.04059366454|
| 203384.95987331046|
+-------------------+
only showing top 20 rows

RMSE: 112627.326567
r2: 0.008356


## 4. Домашнее задание 

Добейтесь значения R2 > 0.6

Датасет - оценка медианной стоимости домовладения штата Калифорния в зависимости от характеристик квартала: 
- longitude - Долгота положения центра квартала - чем меньше, тем западнее находится квартал (блок);
- latitude - Широта положения центра квартала - чем больше, тем севернее находится квартал (блок);
- housingMedianAge - средний возраст дома, чем больше, тем старше;
- totalRooms - Общее количество комнат в квартале;
- totalBedrooms - Общее количество спален в квартале;
- population - Общее количество людей, проживающих в пределах квартала;
- households - Общее число домашних хозяйств (группа людей или семья), проживающих в пределах квартала;
- medianIncome - Средний доход домашних хозяйств в пределах квартала (в десятках тысяч долларов США);
- medianHouseValue - Средняя стоимость дома в пределах квартала (в долларах США) - целевая переменная.

In [None]:
feature_vec = VectorAssembler(inputCols=['longitude','latitude','housing_median_age','total_rooms','total_bedrooms','households','median_income'], outputCol='Features')
vec_df = feature_vec.transform(df)

lr = LinearRegression(featuresCol='Features', labelCol='median_house_value', predictionCol='predict')

# Fit the model
lrModel = lr.fit(vec_df)
trainingSummary = lrModel.summary
trainingSummary.residuals.show()
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)



+-------------------+
|          residuals|
+-------------------+
| -4517.669437347911|
|-34625.190834094305|
| 4415.4836245551705|
|  18736.95001714863|
|-41440.283632481005|
|-13886.287007053383|
| -16119.17096871417|
|-53166.140846768394|
|-14680.627194450237|
|  8583.281268767081|
| -39043.72821099684|
|-63424.139911985025|
| -22167.32595799584|
| 16343.884550412651|
| -53963.58787771361|
|  4386.860894971527|
| 108314.30194335198|
|  39352.62627354264|
|-27038.132013320923|
|  91789.31497968407|
+-------------------+
only showing top 20 rows

RMSE: 71494.391224
r2: 0.600412


In [None]:
data_path_train = 'sample_data/california_housing_train.csv'
data_path_test = 'sample_data/california_housing_test.csv'
df_train = spark.read.format('csv').\
        options(header='true', inferschema='true').load(data_path_train,header=True)
df_test = spark.read.format('csv').\
        options(header='true', inferschema='true').load(data_path_test,header=True)

feature_vec = VectorAssembler(inputCols=['longitude','latitude','housing_median_age','total_rooms','total_bedrooms','households','median_income'], outputCol='Features')
vec_df = feature_vec.transform(df_train)

lr = LinearRegression(featuresCol='Features', labelCol='median_house_value', predictionCol='predict')


vec_df_ = feature_vec.transform(df_test)
# Fit the model
lrModel = lr.fit(vec_df)
trainingSummary = lrModel.summary
trainingSummary.residuals.show()
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

+-------------------+
|          residuals|
+-------------------+
| 54902.391296958085|
|-29356.544632575475|
|  125075.2739721709|
|  40253.15741754556|
|  80842.70972344372|
| 33084.808692584746|
| 51870.541347248945|
|  108680.9795669429|
|  14826.30175885465|
|  76327.74028678285|
|  74330.47837340645|
|  84131.36958740698|
|  78428.15859704185|
|  78268.75906461664|
|  69511.37238099473|
|  131536.4391086353|
| 57657.395815684926|
| 59191.713495355565|
|   48414.3471908425|
|  96629.24449086795|
+-------------------+
only showing top 20 rows

RMSE: 71579.494734
r2: 0.619102


