<a href="https://colab.research.google.com/github/DePono/IS/blob/main/%D0%9A%D0%BE%D0%BF%D0%B8%D1%8F_%D0%B1%D0%BB%D0%BE%D0%BA%D0%BD%D0%BE%D1%82%D0%B0_%22%D0%9F%D0%BE%D0%BD%D0%B0%D0%BC%D0%B0%D1%80%D0%B5%D0%BD%D0%BA%D0%BE_%D0%94_%D0%98_%D0%9B%D0%B0%D0%B1%D0%BE%D1%80%D0%B0%D1%82%D0%BE%D1%80%D0%BD%D0%B0%D1%8F_%D0%B4%D0%B8%D0%B0%D0%B1%D0%B5%D1%82%D1%8B%22load_diabetes_ipynb%22%22%22.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#### 1. Установка PySpark & Spark

In [None]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285387 sha256=eee99644e5b2d0304e0991ee5b7cbd36858a872a1964939a6ee34a2491ddcf44
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1
The following additional packages will be installed:
  libxtst6 openjdk-8-jre-headless
Suggested packages:
  openjdk-8-demo openjdk-8-source libnss-mdns fonts-dejavu-extra fonts-nanum
  fonts-ipafont-gothic fonts-ipafont-mincho fonts-wqy-microhei
  fonts-wqy-zenhei fonts-indi

##### Импорт библиотек

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

import pyspark
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf

##### Инициализация SparkContext

In [None]:
sc = pyspark.SparkContext()
spark = SparkSession.builder.getOrCreate()

##### Датасет

Дан набор данных, содержащих информацию о [диабете]((https://archive.ics.uci.edu/ml/datasets/Diabetes) . \
\
 20 вещественных атрибутов.

##### Загрузка датасета

In [None]:
from sklearn.datasets import load_diabetes
diabetes = load_diabetes()

In [None]:
len(diabetes.data)

##### Преобразование данных

Так как датасет небольшой, воспользуемся pandas для преобразования данных. \
После получения pandas DataFrame, воспользуемся [Spark DataFrame](https://spark.apache.org/docs/latest/api/python/getting_started/quickstart_df.html#DataFrame-Creation)

In [None]:
pd_df = pd.DataFrame(diabetes.data, columns=diabetes.feature_names)
df = spark.createDataFrame(pd_df)

df.show()

In [None]:
def map_to_nullable(spark, df, column_list, nullable=False):
    for struct_field in df.schema:
        if struct_field.name in column_list:
            struct_field.nullable = nullable
    df_mod = spark.createDataFrame(df.rdd, df.schema)
    return df_mod

df = map_to_nullable(spark, df, df.columns)
df = df.withColumn('features', array(df.columns))
vectors = df.rdd.map(lambda row: Vectors.dense(row.features))

df.printSchema()

Построим 2 следующих датафрейма

* ```features``` - датафрейм, содержащий информацию о признаках диабета;
* ```labels``` - бинарная серия с информацией о том

In [None]:
from pyspark.ml.linalg import Vectors
features = spark.createDataFrame(vectors.map(Row), ["features"])
labels = pd.Series(diabetes.target)

In [None]:
features.show()

#### 2. Бинарная классификация методом K-means

Для решения задачи воспользуемся методом [K-means](https://spark.apache.org/docs/latest/ml-clustering.html) из пакета SparkML.
Так как количество классов равно двум, установите параметр ```k=2```.

Вычислим точность полученной K-means модели. Сравним выходные данные модели с выходными даннными исходного датасета.\
Изначально K-Means является алгоритмом кластеризации.\
Это означает, что он возвращает номера кластеров. Данные номера необязательно равны выходной переменной.\
Задача - понять, в какой кластер попали те или иные опухоли. И уже на основе этого оценивать accuracy метода.\
$accuracy = \frac {S} {N}$;\
где $S$ - количество опухолей, отнесённых к правильному классу;\
$N$ - общий размер датасета.


In [None]:
# базовая модель
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

kmeans = KMeans(k=2, seed=1)
model = kmeans.fit(features)

predictions = model.transform(features)
predictions.show()

Кластер 1 -> признак 0\
Кластер 0 -> признак 1

In [None]:
# функция подсчета точности модели
def getAccuracy(modelRows, datasetLabels):
  coincidencesCount = 0;
  for i, label in enumerate(datasetLabels):
      if label != modelRows[i].prediction:
          coincidencesCount = coincidencesCount + 1;

  labelsLength = len(datasetLabels);
  print(coincidencesCount, '/', labelsLength)

  return coincidencesCount/labelsLength;

In [None]:
# подсчет точности базовой модели
rows = predictions.select('prediction').collect();
baseAccuracy = getAccuracy(rows, labels)
print('Точность базовой модели: ', baseAccuracy * 100, '%')

In [None]:
# модель с улучшенной точностью

# seed: 1 -> 100
# distanceMeasure: 'euclidean' (default) -> 'cosine'

kmeans = KMeans(k=2, seed=100, distanceMeasure='cosine')
model = kmeans.fit(features)

updPredictions = model.transform(features)
updPredictions.show()

In [None]:
# подсчет точности улучшенной модели
updRows = updPredictions.select('prediction').collect();
updAccuracy = getAccuracy(updRows, labels)
print('Точность улучшенной модели: ', updAccuracy * 100, '%')

In [None]:
# изменение точности
print(baseAccuracy * 100, '% ->', updAccuracy * 100, '%')

#### 3. Понижение размерности методом PCA
Проведем понижение размерности при [помощи метода главных компонент](http://infolab.stanford.edu/~ullman/mmds/ch11.pdf).\
Более подробное изложение можно найти [по ссылке](https://arxiv.org/pdf/1404.1100.pdf).\
Данный метод также доступен в пакете [MLlib](https://spark.apache.org/docs/latest/ml-features.html#pca).

Установим параметр ```k = 2 ```, понизив размерность входного пространства в **15** раз.


In [None]:
from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors

pca = PCA(k=2, inputCol="features", outputCol="pcaFeatures")
model = pca.fit(features)

pcaFeatures = model.transform(features).select("pcaFeatures")
pcaFeatures.show(truncate=False)


#### 4. Бинарная классификация метдом K-means на данных меньшей размерности.  
Обучим модель KMeans также как в задании 2, но уже на признаках меньшей размерности.
Оценим результат классификации.

Выведем точечную диаграмму, отображающую зависимость первого признака от второго. Точки на диаграмме должны быть покрашены в зависимости от принадлежности тому или иному классу. \


In [None]:
# базовая PCA модель

kmeans = KMeans(featuresCol="pcaFeatures", k=2, seed=1)
model = kmeans.fit(pcaFeatures)

pcaPredictions = model.transform(pcaFeatures)
pcaPredictions.show()


In [None]:
# подсчет точности базовой PCA модели
pcaRows = pcaPredictions.select('prediction').collect();
basePcaAccuracy = getAccuracy(pcaRows, labels)
print('Точность PCA модели: ', basePcaAccuracy * 100, '%')

In [None]:
from sklearn.manifold import TSNE

rows = pcaPredictions.collect();
plt.figure(figsize=(10,10))

for row in rows:
  color = 'green' if row.prediction == 1 else 'red';
  plt.plot(row.pcaFeatures[0], row.pcaFeatures[1], 'o', color=color);

plt.xlabel('PCA1')
plt.ylabel('PCA2')
plt.title('Точечная диаграмма с результатами классификации')
plt.show()

In [None]:
# PCA модель с улучшенной точностью

# seed: 1 -> 10
# distanceMeasure: 'euclidean' (default) -> 'cosine'
# maxIter: 20 (default) -> 10
# initSteps: 2 (default) -> 100

kmeans = KMeans(featuresCol="pcaFeatures", k=2, seed=10, distanceMeasure='cosine', maxIter=10, initSteps=100)
model = kmeans.fit(pcaFeatures)

updPcaPredictions = model.transform(pcaFeatures)
updPcaPredictions.show()

In [None]:
# подсчет точности улучшенной PCA модели
updPcaRows = updPcaPredictions.select('prediction').collect();
updPcaAccuracy = getAccuracy(updPcaRows, labels)
print('Точность улучшенной PCA модели: ', updPcaAccuracy * 100, '%')

In [None]:
# изменение точности
print(basePcaAccuracy * 100, '% ->', updPcaAccuracy * 100, '%')

In [None]:
rows = updPcaPredictions.collect();
plt.figure(figsize=(10,10))

for row in rows:
  color = 'green' if row.prediction == 1 else 'red';
  plt.plot(row.pcaFeatures[0], row.pcaFeatures[1], 'o', color=color);

plt.xlabel('PCA1')
plt.ylabel('PCA2')
plt.title('Точечная диаграмма с результатами классификации (улучшенная модель)')
plt.show()