In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [3]:
# Normalize columns in range 0 to 1

In [2]:
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.linalg import Vectors

In [6]:
features_df = spark.createDataFrame([(1,Vectors.dense([10.0,10000.0,1.0]),),
                                     (1,Vectors.dense([20.0,30000.0,2.0]),),
                                     (3,Vectors.dense([30.0,400000,4.0]),)
                                    ],("id","features"))

In [7]:
features_df.take(1)

[Row(id=1, features=DenseVector([10.0, 10000.0, 1.0]))]

In [8]:
featureScaler = MinMaxScaler(inputCol='features',outputCol='sfeatures')

In [9]:
smodel = featureScaler.fit(features_df)

In [11]:
sfeatures_df = smodel.transform(features_df)
sfeatures_df.take(1)

[Row(id=1, features=DenseVector([10.0, 10000.0, 1.0]), sfeatures=DenseVector([0.0, 0.0, 0.0]))]

In [12]:
#Standardize numeric data. Map data in range -1 to 1 with mean 0. Models like SVM work well when data is normalized like a bell
#curve.

In [13]:
from pyspark.ml.feature import StandardScaler

In [14]:
features_df = spark.createDataFrame([(1,Vectors.dense([10.0,10000.0,1.0]),),
                                     (1,Vectors.dense([20.0,30000.0,2.0]),),
                                     (3,Vectors.dense([30.0,400000,3.0]),)
                                    ],("id","features"))

In [15]:
featureScaler = StandardScaler(inputCol='features',outputCol='sfeatures', withStd=True, withMean=True)

In [16]:
smodel = featureScaler.fit(features_df)

In [17]:
sfeatures_df = smodel.transform(features_df)
sfeatures_df.take(1)

[Row(id=1, features=DenseVector([10.0, 10000.0, 1.0]), sfeatures=DenseVector([-1.0, -0.6223, -1.0]))]

In [18]:
#Bucketizer 
from pyspark.ml.feature import Bucketizer

In [19]:
splits = [-float("inf"),-10.00, 0.0, 10.0, float("inf")]

In [20]:
b_date = [(-800.0,),(-10.5,), (-1.7,), (0.0,) , (8.2,) , (90.1,)]

In [22]:
b_df = spark.createDataFrame(b_date,["features"])
b_df.show()

+--------+
|features|
+--------+
|  -800.0|
|   -10.5|
|    -1.7|
|     0.0|
|     8.2|
|    90.1|
+--------+



In [23]:
bucketizer = Bucketizer(splits=splits, inputCol="features", outputCol="bfeatures")

In [24]:
bucketed_df = bucketizer.transform(b_df)

In [25]:
bucketed_df.show()

+--------+---------+
|features|bfeatures|
+--------+---------+
|  -800.0|      0.0|
|   -10.5|      0.0|
|    -1.7|      1.0|
|     0.0|      2.0|
|     8.2|      2.0|
|    90.1|      3.0|
+--------+---------+



In [26]:
## Tokenize data
from pyspark.ml.feature import Tokenizer

In [27]:
sentences_df= spark.createDataFrame([(1,"This is an introduction to Spark MLib"),
                                      (2, "Mlib includes librares for regression and classification"),
                                     (3, "It also contains supporting tools for pipelines")],["id","sentence"])

In [28]:
sentences_df.show()

+---+--------------------+
| id|            sentence|
+---+--------------------+
|  1|This is an introd...|
|  2|Mlib includes lib...|
|  3|It also contains ...|
+---+--------------------+



In [29]:
tokenizer = Tokenizer(inputCol="sentence",outputCol="words")

In [30]:
sent_df =tokenizer.transform(sentences_df)
sent_df.show()

+---+--------------------+--------------------+
| id|            sentence|               words|
+---+--------------------+--------------------+
|  1|This is an introd...|[this, is, an, in...|
|  2|Mlib includes lib...|[mlib, includes, ...|
|  3|It also contains ...|[it, also, contai...|
+---+--------------------+--------------------+



In [32]:
# TF-IDF
from pyspark.ml.feature import HashingTF, IDF

In [33]:
hashing_tf = HashingTF(inputCol="words",outputCol="rawFeatures", numFeatures=20)

In [34]:
sent_HTF_df = hashing_tf.transform(sent_df)

In [35]:
sent_HTF_df.show()

+---+--------------------+--------------------+--------------------+
| id|            sentence|               words|         rawFeatures|
+---+--------------------+--------------------+--------------------+
|  1|This is an introd...|[this, is, an, in...|(20,[1,5,6,8,9,12...|
|  2|Mlib includes lib...|[mlib, includes, ...|(20,[1,9,12,13,15...|
|  3|It also contains ...|[it, also, contai...|(20,[0,8,10,12,15...|
+---+--------------------+--------------------+--------------------+



In [36]:
idf = IDF(inputCol="rawFeatures" , outputCol="idf_features")

In [37]:
idf_model = idf.fit(sent_HTF_df)

In [38]:
sent_IDF_df = idf_model.transform(sent_HTF_df)
sent_IDF_df.show()

+---+--------------------+--------------------+--------------------+--------------------+
| id|            sentence|               words|         rawFeatures|        idf_features|
+---+--------------------+--------------------+--------------------+--------------------+
|  1|This is an introd...|[this, is, an, in...|(20,[1,5,6,8,9,12...|(20,[1,5,6,8,9,12...|
|  2|Mlib includes lib...|[mlib, includes, ...|(20,[1,9,12,13,15...|(20,[1,9,12,13,15...|
|  3|It also contains ...|[it, also, contai...|(20,[0,8,10,12,15...|(20,[0,8,10,12,15...|
+---+--------------------+--------------------+--------------------+--------------------+



In [39]:
#KMeans

In [40]:
from pyspark.ml.feature import VectorAssembler

In [41]:
from pyspark.ml.clustering import KMeans

In [42]:
cluster_df = spark.read.csv('clustering_dataset.csv',header=True, inferSchema=True)

In [43]:
vector_assembler = VectorAssembler(inputCols = ['col1','col2','col3'], outputCol='features')

In [44]:
vcluster_df = vector_assembler.transform(cluster_df)

In [46]:
kmeans = KMeans().setK(3)

In [47]:
kmeans =kmeans.setSeed(1)

In [48]:
model = kmeans.fit(vcluster_df)

In [49]:
centers = model.clusterCenters()

In [50]:
centers

[array([35.88461538, 31.46153846, 34.42307692]),
 array([5.12, 5.84, 4.84]),
 array([80.        , 79.20833333, 78.29166667])]

In [51]:
#Hiearchical Clusters

In [52]:
from pyspark.ml.clustering import BisectingKMeans

In [53]:
bkmeans = BisectingKMeans().setK(3)

In [54]:
bkmeans =bkmeans.setSeed(1)

In [55]:
bmodel = bkmeans.fit(vcluster_df)

In [56]:
bcenters = bmodel.clusterCenters()

In [57]:
bcenters

[array([5.12, 5.84, 4.84]),
 array([35.88461538, 31.46153846, 34.42307692]),
 array([80.        , 79.20833333, 78.29166667])]

In [3]:
# Classification

In [4]:
from pyspark.sql import *
from pyspark.ml.feature import VectorAssembler

In [5]:
from pyspark.ml.feature import StringIndexer

In [10]:
from pyspark.sql.functions import col

In [7]:
iris_data = spark.read.csv("iris.data",inferSchema=True)
iris_data.show()

+---+---+---+---+-----------+
|_c0|_c1|_c2|_c3|        _c4|
+---+---+---+---+-----------+
|5.1|3.5|1.4|0.2|Iris-setosa|
|4.9|3.0|1.4|0.2|Iris-setosa|
|4.7|3.2|1.3|0.2|Iris-setosa|
|4.6|3.1|1.5|0.2|Iris-setosa|
|5.0|3.6|1.4|0.2|Iris-setosa|
|5.4|3.9|1.7|0.4|Iris-setosa|
|4.6|3.4|1.4|0.3|Iris-setosa|
|5.0|3.4|1.5|0.2|Iris-setosa|
|4.4|2.9|1.4|0.2|Iris-setosa|
|4.9|3.1|1.5|0.1|Iris-setosa|
|5.4|3.7|1.5|0.2|Iris-setosa|
|4.8|3.4|1.6|0.2|Iris-setosa|
|4.8|3.0|1.4|0.1|Iris-setosa|
|4.3|3.0|1.1|0.1|Iris-setosa|
|5.8|4.0|1.2|0.2|Iris-setosa|
|5.7|4.4|1.5|0.4|Iris-setosa|
|5.4|3.9|1.3|0.4|Iris-setosa|
|5.1|3.5|1.4|0.3|Iris-setosa|
|5.7|3.8|1.7|0.3|Iris-setosa|
|5.1|3.8|1.5|0.3|Iris-setosa|
+---+---+---+---+-----------+
only showing top 20 rows



In [11]:
iris_df = iris_data.select(col("_c0").alias('sepal_length'), col("_c1").alias("sepal_width"), col("_c2").alias("petal_length") , col("_c3",).alias("petal_width"), col("_c4").alias("species")
                          )
iris_df.show(5)

+------------+-----------+------------+-----------+-----------+
|sepal_length|sepal_width|petal_length|petal_width|    species|
+------------+-----------+------------+-----------+-----------+
|         5.1|        3.5|         1.4|        0.2|Iris-setosa|
|         4.9|        3.0|         1.4|        0.2|Iris-setosa|
|         4.7|        3.2|         1.3|        0.2|Iris-setosa|
|         4.6|        3.1|         1.5|        0.2|Iris-setosa|
|         5.0|        3.6|         1.4|        0.2|Iris-setosa|
+------------+-----------+------------+-----------+-----------+
only showing top 5 rows



In [12]:
assembler = VectorAssembler(inputCols = ["sepal_length","sepal_width","petal_length","petal_width"], outputCol="features")

In [13]:
vec_df = assembler.transform(iris_df)
vec_df.show()

+------------+-----------+------------+-----------+-----------+-----------------+
|sepal_length|sepal_width|petal_length|petal_width|    species|         features|
+------------+-----------+------------+-----------+-----------+-----------------+
|         5.1|        3.5|         1.4|        0.2|Iris-setosa|[5.1,3.5,1.4,0.2]|
|         4.9|        3.0|         1.4|        0.2|Iris-setosa|[4.9,3.0,1.4,0.2]|
|         4.7|        3.2|         1.3|        0.2|Iris-setosa|[4.7,3.2,1.3,0.2]|
|         4.6|        3.1|         1.5|        0.2|Iris-setosa|[4.6,3.1,1.5,0.2]|
|         5.0|        3.6|         1.4|        0.2|Iris-setosa|[5.0,3.6,1.4,0.2]|
|         5.4|        3.9|         1.7|        0.4|Iris-setosa|[5.4,3.9,1.7,0.4]|
|         4.6|        3.4|         1.4|        0.3|Iris-setosa|[4.6,3.4,1.4,0.3]|
|         5.0|        3.4|         1.5|        0.2|Iris-setosa|[5.0,3.4,1.5,0.2]|
|         4.4|        2.9|         1.4|        0.2|Iris-setosa|[4.4,2.9,1.4,0.2]|
|         4.9|  

In [15]:
indexer = StringIndexer(inputCol="species",outputCol='label')
index_df = indexer.fit(vec_df).transform(vec_df)
index_df.show()

+------------+-----------+------------+-----------+-----------+-----------------+-----+
|sepal_length|sepal_width|petal_length|petal_width|    species|         features|label|
+------------+-----------+------------+-----------+-----------+-----------------+-----+
|         5.1|        3.5|         1.4|        0.2|Iris-setosa|[5.1,3.5,1.4,0.2]|  0.0|
|         4.9|        3.0|         1.4|        0.2|Iris-setosa|[4.9,3.0,1.4,0.2]|  0.0|
|         4.7|        3.2|         1.3|        0.2|Iris-setosa|[4.7,3.2,1.3,0.2]|  0.0|
|         4.6|        3.1|         1.5|        0.2|Iris-setosa|[4.6,3.1,1.5,0.2]|  0.0|
|         5.0|        3.6|         1.4|        0.2|Iris-setosa|[5.0,3.6,1.4,0.2]|  0.0|
|         5.4|        3.9|         1.7|        0.4|Iris-setosa|[5.4,3.9,1.7,0.4]|  0.0|
|         4.6|        3.4|         1.4|        0.3|Iris-setosa|[4.6,3.4,1.4,0.3]|  0.0|
|         5.0|        3.4|         1.5|        0.2|Iris-setosa|[5.0,3.4,1.5,0.2]|  0.0|
|         4.4|        2.9|      

In [17]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [18]:
splits = index_df.randomSplit([0.6,0.4],1)

In [19]:
train_df = splits[0]
test_df = splits[1]

In [20]:
nb = NaiveBayes(modelType="multinomial")

In [21]:
nb_model = nb.fit(train_df)

In [23]:
predictions = nb_model.transform(test_df)

In [24]:
score = MulticlassClassificationEvaluator(labelCol='label', predictionCol="prediction", metricName="accuracy")
score

MulticlassClassificationEvaluator_4bf987e7cfcb0ede00f3

In [26]:
acc = score.evaluate(predictions)
acc

0.5862068965517241

In [28]:
from pyspark.ml.classification import MultilayerPerceptronClassifier

In [29]:
layers = [4,5,5,3]

In [30]:
mlp = MultilayerPerceptronClassifier(layers=layers, seed = 1)

In [31]:
model = mlp.fit(train_df)

In [32]:
mlp_pred = model.transform(test_df)

In [33]:
score = MulticlassClassificationEvaluator( metricName="accuracy")
acc = score.evaluate(mlp_pred)
acc

0.9482758620689655

In [34]:
from pyspark.ml.classification import DecisionTreeClassifier

In [36]:
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features")

In [37]:
model = dt.fit(train_df)

In [38]:
dt_pred = model.transform(test_df)

In [39]:
score = MulticlassClassificationEvaluator(labelCol='label', predictionCol="prediction", metricName="accuracy")
acc = score.evaluate(dt_pred)
acc

0.9310344827586207

In [41]:
# Regession
from pyspark.ml.regression import LinearRegression

In [42]:
pp_df = spark.read.csv('powerplant.csv', header=True, inferSchema = True)
pp_df.show()

+-----+-----+-------+-----+------+
|   AT|    V|     AP|   RH|    PE|
+-----+-----+-------+-----+------+
| 8.34|40.77|1010.84|90.01|480.48|
|23.64|58.49| 1011.4| 74.2|445.75|
|29.74| 56.9|1007.15|41.91|438.76|
|19.07|49.69|1007.22|76.79|453.09|
| 11.8|40.66|1017.13| 97.2|464.43|
|13.97|39.16|1016.05| 84.6|470.96|
| 22.1|71.29| 1008.2|75.38|442.35|
|14.47|41.76|1021.98|78.41| 464.0|
|31.25|69.51|1010.25|36.83|428.77|
| 6.77|38.18| 1017.8|81.13|484.31|
|28.28|68.67|1006.36| 69.9|435.29|
|22.99|46.93|1014.15|49.42|451.41|
| 29.3|70.04|1010.95|61.23|426.25|
| 8.14|37.49|1009.04|80.33|480.66|
|16.92| 44.6|1017.34|58.75|460.17|
|22.72|64.15|1021.14|60.34|453.13|
|18.14|43.56|1012.83| 47.1|461.71|
|11.49|44.63|1020.44|86.04|471.08|
| 9.94|40.46| 1018.9|68.51|473.74|
|23.54| 41.1|1002.05|38.05|448.56|
+-----+-----+-------+-----+------+
only showing top 20 rows



In [44]:
assembler = VectorAssembler(inputCols= ["AT", "V", "AP","RH"], outputCol="features")

In [45]:
vpp_df = assembler.transform(pp_df)

In [46]:
lr = LinearRegression(featuresCol='features',labelCol='PE')

In [47]:
lr_model = lr.fit(vpp_df)

In [48]:
lr_model.coefficients

DenseVector([-1.9775, -0.2339, 0.0621, -0.1581])

In [49]:
lr_model.intercept

454.6092741526312

In [50]:
lr_model.summary.rootMeanSquaredError

4.557126016749477

In [51]:
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator

In [52]:
splits = vpp_df.randomSplit([0.7,0.3],1)

In [53]:
train_df = splits[0]
test_df =splits[1]

In [54]:
dt = DecisionTreeRegressor(featuresCol="features", labelCol="PE")

In [55]:
dt_model = dt.fit(train_df)

In [56]:
dt_predict = dt_model.transform(test_df)

In [62]:
score = RegressionEvaluator(labelCol='PE',predictionCol='prediction', metricName='rmse')

In [63]:
rmse = score.evaluate(dt_predict)
rmse

4.400189094399873