In [56]:
MASTER = "local"
NUM_PROCESSORS = "8"
NUM_EXECUTORS = "4"
NUM_PARTITIONS = 10

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType
from pyspark.ml import Pipeline
from pyspark.sql import SQLContext
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
import pyspark.sql.functions as psf
import json

In [57]:
conf = SparkConf()

conf.set("spark.app.name", "one_part_data")
conf.set("spark.master", MASTER)
conf.set("spark.executor.cores", NUM_PROCESSORS)
conf.set("spark.executor.instances", NUM_EXECUTORS)
conf.set("spark.executor.memory", "6g")
conf.set("spark.locality.wait", "0")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryoserializer.buffer.max", "2000")
conf.set("spark.executor.heartbeatInterval", "6000s")
conf.set("spark.network.timeout", "10000000s")
conf.set("spark.shuffle.spill", "true")
conf.set("spark.driver.memory", "15g")
conf.set("spark.driver.maxResultSize", "15g")

<pyspark.conf.SparkConf at 0x7f97c3d40400>

In [58]:
from pyspark.sql import SparkSession

# Create SparkSession 
spark = SparkSession.builder \
      .config(conf=conf) \
      .master("local[*]") \
      .appName("SparkByExamples.com") \
      .getOrCreate()
spark

In [59]:
dataset = spark.read.csv("/home/yagor/Рабочий стол/mipt/lab3/notebook/nutrition_table.csv",header=True,inferSchema=True)

In [60]:
dataset.show()

23/01/01 17:32:12 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , fat_100g, carbohydrates_100g, sugars_100g, proteins_100g, salt_100g, energy_100g, reconstructed_energy, g_sum, exceeded, product
 Schema: _c0, fat_100g, carbohydrates_100g, sugars_100g, proteins_100g, salt_100g, energy_100g, reconstructed_energy, g_sum, exceeded, product
Expected: _c0 but found: 
CSV file: file:///home/yagor/Рабочий%20стол/mipt/lab3/notebook/nutrition_table.csv
+---+--------+------------------+-----------+-------------+---------+-----------+--------------------+-----------------+--------+--------------------+
|_c0|fat_100g|carbohydrates_100g|sugars_100g|proteins_100g|salt_100g|energy_100g|reconstructed_energy|            g_sum|exceeded|             product|
+---+--------+------------------+-----------+-------------+---------+-----------+--------------------+-----------------+--------+--------------------+
|  1|   28.57|             64.29|      14.29|         3.57|      0.0|  

In [61]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [62]:
feat_cols = [ #'_c0',
 'fat_100g',
 'carbohydrates_100g',
 'sugars_100g',
 'proteins_100g',
 'salt_100g',
 'energy_100g',
 'reconstructed_energy',
 'g_sum',
 'exceeded',
 #'product'
 ]

In [63]:
vec_assembler = VectorAssembler(inputCols = feat_cols, outputCol='features')

In [64]:
final_data = vec_assembler.transform(dataset)

In [65]:
final_data.show()

23/01/01 17:32:12 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , fat_100g, carbohydrates_100g, sugars_100g, proteins_100g, salt_100g, energy_100g, reconstructed_energy, g_sum, exceeded, product
 Schema: _c0, fat_100g, carbohydrates_100g, sugars_100g, proteins_100g, salt_100g, energy_100g, reconstructed_energy, g_sum, exceeded, product
Expected: _c0 but found: 
CSV file: file:///home/yagor/Рабочий%20стол/mipt/lab3/notebook/nutrition_table.csv
+---+--------+------------------+-----------+-------------+---------+-----------+--------------------+-----------------+--------+--------------------+--------------------+
|_c0|fat_100g|carbohydrates_100g|sugars_100g|proteins_100g|salt_100g|energy_100g|reconstructed_energy|            g_sum|exceeded|             product|            features|
+---+--------+------------------+-----------+-------------+---------+-----------+--------------------+-----------------+--------+--------------------+--------------------+
|  1|   

In [66]:
from pyspark.ml.feature import StandardScaler

In [67]:
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=False)

In [68]:
final_data

DataFrame[_c0: int, fat_100g: double, carbohydrates_100g: double, sugars_100g: double, proteins_100g: double, salt_100g: double, energy_100g: double, reconstructed_energy: double, g_sum: double, exceeded: int, product: string, features: vector]

In [69]:
# Compute summary statistics by fitting the StandardScaler
scalerModel = scaler.fit(final_data)

In [70]:
# Normalize each feature to have unit standard deviation.
cluster_final_data = scalerModel.transform(final_data)

## train model

In [71]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

In [72]:
# for k in range(2,15):
#     kmeans = KMeans(featuresCol='scaledFeatures',k=k)
#     model = kmeans.fit(cluster_final_data)
#     predictions = model.transform(cluster_final_data)
#     evaluator = ClusteringEvaluator()
#     silhouette = evaluator.evaluate(predictions)
#     print("With K={}".format(k))
#     print("Silhouette with squared euclidean distance = " + str(silhouette))
#     print('--'*30)

In [93]:
k = 5

kmeans = KMeans(featuresCol='scaledFeatures', predictionCol='pred_kmeans_cluster', k=k)

In [94]:
modelKMeans = kmeans.fit(cluster_final_data)

In [95]:
predictions = modelKMeans.transform(cluster_final_data)

## 2. Split on train and test and analys

In [96]:
predictions.show()

23/01/01 17:32:47 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , fat_100g, carbohydrates_100g, sugars_100g, proteins_100g, salt_100g, energy_100g, reconstructed_energy, g_sum, exceeded, product
 Schema: _c0, fat_100g, carbohydrates_100g, sugars_100g, proteins_100g, salt_100g, energy_100g, reconstructed_energy, g_sum, exceeded, product
Expected: _c0 but found: 
CSV file: file:///home/yagor/Рабочий%20стол/mipt/lab3/notebook/nutrition_table.csv
+---+--------+------------------+-----------+-------------+---------+-----------+--------------------+-----------------+--------+--------------------+--------------------+--------------------+-------------------+
|_c0|fat_100g|carbohydrates_100g|sugars_100g|proteins_100g|salt_100g|energy_100g|reconstructed_energy|            g_sum|exceeded|             product|            features|      scaledFeatures|pred_kmeans_cluster|
+---+--------+------------------+-----------+-------------+---------+-----------+-----------------

In [97]:
evaluator = ClusteringEvaluator(predictionCol='pred_kmeans_cluster')

In [98]:
silhouette = evaluator.evaluate(predictions)

In [99]:
print(f"With k={k} Silhouette with squared euclidean distance = " + str(silhouette))

With k=5 Silhouette with squared euclidean distance = 0.4557467408039474


In [100]:
centers=modelKMeans.clusterCenters()
print("Cluster Centers:")
for center in centers:
    print(center)

Cluster Centers:
[0.29048637 2.53813625 1.77256495 0.61769196 0.10027311 1.96002328
 1.92998784 2.47449235 0.        ]
[0.11924658 0.40544206 0.34112048 0.27188833 0.10217509 0.42143614
 0.39140018 0.46758935 0.        ]
[1.00763071 0.6949448  0.22435894 1.83172412 0.15928124 1.507693
 1.4944646  1.47197809 0.        ]
[3.45288991 0.55140649 0.19940754 2.34815383 0.14523594 3.21660235
 3.28997534 2.53990678 0.05101977]
[1.8627923  1.94825912 1.17971266 0.83966936 0.10007308 2.7497747
 2.74990873 2.7036021  0.7327873 ]


In [101]:
modelKMeans.transform(cluster_final_data).groupBy('pred_kmeans_cluster').count().show()

+-------------------+-----+
|pred_kmeans_cluster|count|
+-------------------+-----+
|                  1|17941|
|                  3| 2315|
|                  4| 5617|
|                  2| 8807|
|                  0|10348|
+-------------------+-----+



In [102]:
cols = ["_c0", "pred_kmeans_cluster"]
result = predictions.select(*cols)

In [103]:
result.show()

23/01/01 17:32:58 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , fat_100g, carbohydrates_100g, sugars_100g, proteins_100g, salt_100g, energy_100g, reconstructed_energy, g_sum, exceeded
 Schema: _c0, fat_100g, carbohydrates_100g, sugars_100g, proteins_100g, salt_100g, energy_100g, reconstructed_energy, g_sum, exceeded
Expected: _c0 but found: 
CSV file: file:///home/yagor/Рабочий%20стол/mipt/lab3/notebook/nutrition_table.csv
+---+-------------------+
|_c0|pred_kmeans_cluster|
+---+-------------------+
|  1|                  4|
|  2|                  4|
|  3|                  3|
|  7|                  4|
| 12|                  3|
| 15|                  4|
| 16|                  3|
| 19|                  2|
| 20|                  0|
| 21|                  4|
| 22|                  0|
| 23|                  3|
| 24|                  4|
| 26|                  4|
| 27|                  3|
| 28|                  3|
| 29|                  3|
| 31|                 

In [109]:
(trainingData, testData) = predictions.randomSplit([0.7, 0.3])

In [117]:
testData.count()

23/01/01 18:12:46 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , fat_100g, carbohydrates_100g, sugars_100g, proteins_100g, salt_100g, energy_100g, reconstructed_energy, g_sum, exceeded, product
 Schema: _c0, fat_100g, carbohydrates_100g, sugars_100g, proteins_100g, salt_100g, energy_100g, reconstructed_energy, g_sum, exceeded, product
Expected: _c0 but found: 
CSV file: file:///home/yagor/Рабочий%20стол/mipt/lab3/notebook/nutrition_table.csv


13611

In [110]:
trainingData.count()

23/01/01 18:10:24 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , fat_100g, carbohydrates_100g, sugars_100g, proteins_100g, salt_100g, energy_100g, reconstructed_energy, g_sum, exceeded, product
 Schema: _c0, fat_100g, carbohydrates_100g, sugars_100g, proteins_100g, salt_100g, energy_100g, reconstructed_energy, g_sum, exceeded, product
Expected: _c0 but found: 
CSV file: file:///home/yagor/Рабочий%20стол/mipt/lab3/notebook/nutrition_table.csv


31417

## 3. Classification model

In [130]:
from pyspark.ml.classification import MultilayerPerceptronClassifier, RandomForestClassifier, LogisticRegression
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from sklearn.metrics import classification_report, confusion_matrix
import numpy as np

In [114]:
RandomForest = RandomForestClassifier(labelCol="pred_kmeans_cluster", featuresCol="scaledFeatures", predictionCol='pred_from_randomforest_class', numTrees=20, maxDepth=3)


In [115]:
modelRandomForest = RandomForest.fit(trainingData)

23/01/01 18:11:46 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , fat_100g, carbohydrates_100g, sugars_100g, proteins_100g, salt_100g, energy_100g, reconstructed_energy, g_sum, exceeded, product
 Schema: _c0, fat_100g, carbohydrates_100g, sugars_100g, proteins_100g, salt_100g, energy_100g, reconstructed_energy, g_sum, exceeded, product
Expected: _c0 but found: 
CSV file: file:///home/yagor/Рабочий%20стол/mipt/lab3/notebook/nutrition_table.csv
23/01/01 18:11:47 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , fat_100g, carbohydrates_100g, sugars_100g, proteins_100g, salt_100g, energy_100g, reconstructed_energy, g_sum, exceeded, product
 Schema: _c0, fat_100g, carbohydrates_100g, sugars_100g, proteins_100g, salt_100g, energy_100g, reconstructed_energy, g_sum, exceeded, product
Expected: _c0 but found: 
CSV file: file:///home/yagor/Рабочий%20стол/mipt/lab3/notebook/nutrition_table.csv
23/01/01 18:11:47 WARN CSVHeaderChecker: CSV hea

In [136]:
# Make predictions.
predictions_rf = modelRandomForest.transform(testData)

In [137]:

# Select example rows to display.
predictions_rf.select("pred_from_randomforest_class", "pred_kmeans_cluster", "scaledFeatures").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="pred_kmeans_cluster", predictionCol="pred_from_randomforest_class", metricName="accuracy")
accuracy = evaluator.evaluate(predictions_rf)
print("Test Error = %g" % (1.0 - accuracy))

23/01/01 21:19:37 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , fat_100g, carbohydrates_100g, sugars_100g, proteins_100g, salt_100g, energy_100g, reconstructed_energy, g_sum, exceeded, product
 Schema: _c0, fat_100g, carbohydrates_100g, sugars_100g, proteins_100g, salt_100g, energy_100g, reconstructed_energy, g_sum, exceeded, product
Expected: _c0 but found: 
CSV file: file:///home/yagor/Рабочий%20стол/mipt/lab3/notebook/nutrition_table.csv
+----------------------------+-------------------+--------------------+
|pred_from_randomforest_class|pred_kmeans_cluster|      scaledFeatures|
+----------------------------+-------------------+--------------------+
|                         4.0|                  4|[1.19624216012105...|
|                         4.0|                  4|[1.25585333159404...|
|                         3.0|                  3|[2.4561142223762,...|
|                         0.0|                  2|[0.39852412389251...|
|                   

## 4. Linear regression model

In [134]:
LinRegression = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8, labelCol="pred_from_randomforest_class", featuresCol="scaledFeatures")

# Fit the model
modelLinearRegression = LinRegression.fit(predictions)

# Print the coefficients and intercept for linear regression
print("Coefficients: %s" % str(modelLinearRegression.coefficients))
print("Intercept: %s" % str(modelLinearRegression.intercept))

23/01/01 21:18:20 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , fat_100g, carbohydrates_100g, sugars_100g, proteins_100g, salt_100g, energy_100g, reconstructed_energy, g_sum, exceeded, product
 Schema: _c0, fat_100g, carbohydrates_100g, sugars_100g, proteins_100g, salt_100g, energy_100g, reconstructed_energy, g_sum, exceeded, product
Expected: _c0 but found: 
CSV file: file:///home/yagor/Рабочий%20стол/mipt/lab3/notebook/nutrition_table.csv
23/01/01 21:18:20 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , fat_100g, carbohydrates_100g, sugars_100g, proteins_100g, salt_100g, energy_100g, reconstructed_energy, g_sum, exceeded, product
 Schema: _c0, fat_100g, carbohydrates_100g, sugars_100g, proteins_100g, salt_100g, energy_100g, reconstructed_energy, g_sum, exceeded, product
Expected: _c0 but found: 
CSV file: file:///home/yagor/Рабочий%20стол/mipt/lab3/notebook/nutrition_table.csv
Coefficients: [0.5820088256423173,0.0,0.0,0.0,0.

In [None]:
lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Fit the model
lrModel = lr.fit(training)

# Print the coefficients and intercept for linear regression
print("Coefficients: %s" % str(lrModel.coefficients))
print("Intercept: %s" % str(lrModel.intercept))

In [62]:
lr = LogisticRegression(maxIter=10, tol=1E-6, fitIntercept=True, labelCol="prediction", featuresCol="scaledFeatures", probabilityCol="lr_prob")

In [None]:
pipeline = Pipeline(stages=[assemble, scale, KMeans_algo, rf, column_dropper, lr])

# Fit the pipeline to training documents.
model = pipeline.fit(cluster_final_data)
output = model.transform(cluster_final_data)

# Evaluate clustering by computing Silhouette score
clust_evaluator = ClusteringEvaluator(predictionCol='clust_preds', featuresCol='standardized', \
                                metricName='silhouette', distanceMeasure='squaredEuclidean')

# Select (prediction, true label) and compute test error
class_evaluator = MulticlassClassificationEvaluator(labelCol="clust_preds", predictionCol="class_preds", metricName="accuracy")
accuracy = class_evaluator.evaluate(output)
y_true = np.array(output.select("clust_preds").collect())
y_pred = np.array(output.select("class_preds").collect())

In [None]:
model.layers
[2, 5, 2]
model.weights.size
27
testDF = sqlContext.createDataFrame([
(Vectors.dense([1.0, 0.0]),),
  (Vectors.dense([0.0, 0.0]),)], ["features"])
model.transform(testDF).show()