In [1]:
import findspark

In [2]:
findspark.init("/home/danial/spark-3.3.2-bin-hadoop3")

In [3]:
from pyspark.sql import SparkSession

In [7]:
spark = SparkSession.builder.appName('clustering').getOrCreate()

In [12]:
path = '/home/danial/Desktop/myspark/Apache-Spark/Python-and-Spark-for-Big-Data-master/Spark_for_Machine_Learning/Clustering/sample_kmeans_data.txt'

In [13]:
data = spark.read.format('libsvm').load(path)

23/04/11 14:50:31 WARN LibSVMFileFormat: 'numFeatures' option not specified, determining the number of features by going though the input. If you know the number in advance, please specify it via 'numFeatures' option to avoid the extra scan.


In [14]:
data.printSchema()

root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)



In [15]:
data.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|           (3,[],[])|
|  1.0|(3,[0,1,2],[0.1,0...|
|  2.0|(3,[0,1,2],[0.2,0...|
|  3.0|(3,[0,1,2],[9.0,9...|
|  4.0|(3,[0,1,2],[9.1,9...|
|  5.0|(3,[0,1,2],[9.2,9...|
+-----+--------------------+



In [16]:
# since it is unsupervised we don't need label

final_data = data.select('features')
final_data.show()

+--------------------+
|            features|
+--------------------+
|           (3,[],[])|
|(3,[0,1,2],[0.1,0...|
|(3,[0,1,2],[0.2,0...|
|(3,[0,1,2],[9.0,9...|
|(3,[0,1,2],[9.1,9...|
|(3,[0,1,2],[9.2,9...|
+--------------------+



In [19]:
from pyspark.ml.clustering import KMeans

In [46]:
kmeans = KMeans().setK(2).setSeed(1)

In [47]:
model = kmeans.fit(final_data)

In [48]:
# Within Set Sum of Squared Errors (WSSSE) 
wssse = model.summary.trainingCost
wssse

0.11999999999994547

In [24]:
centers = model.clusterCenters()

In [50]:
centers

[array([9.1, 9.1, 9.1]), array([0.1, 0.1, 0.1])]

In [51]:
results = model.transform(final_data)

In [52]:
results.show()

+--------------------+----------+
|            features|prediction|
+--------------------+----------+
|           (3,[],[])|         1|
|(3,[0,1,2],[0.1,0...|         1|
|(3,[0,1,2],[0.2,0...|         1|
|(3,[0,1,2],[9.0,9...|         0|
|(3,[0,1,2],[9.1,9...|         0|
|(3,[0,1,2],[9.2,9...|         0|
+--------------------+----------+



In [56]:
kmeans = KMeans().setK(3).setSeed(1)

In [57]:
model = kmeans.fit(final_data)

In [58]:
wssse = model.summary.trainingCost
wssse

0.07499999999994544

In [32]:
centers = model.clusterCenters()
centers

[array([9.1, 9.1, 9.1]), array([0.05, 0.05, 0.05]), array([0.2, 0.2, 0.2])]

In [59]:
results = model.transform(final_data)
results.show()

+--------------------+----------+
|            features|prediction|
+--------------------+----------+
|           (3,[],[])|         1|
|(3,[0,1,2],[0.1,0...|         1|
|(3,[0,1,2],[0.2,0...|         2|
|(3,[0,1,2],[9.0,9...|         0|
|(3,[0,1,2],[9.1,9...|         0|
|(3,[0,1,2],[9.2,9...|         0|
+--------------------+----------+



In [35]:
# Clustering Code Along Example

In [60]:
import findspark

In [61]:
findspark.init("/home/danial/spark-3.3.2-bin-hadoop3/")

In [62]:
from pyspark.sql import SparkSession

In [63]:
spark = SparkSession.builder.appName('cluster').getOrCreate()

23/04/11 16:33:42 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [64]:
path = '/home/danial/Desktop/myspark/Apache-Spark/Python-and-Spark-for-Big-Data-master/Spark_for_Machine_Learning/Clustering/seeds_dataset.csv'

In [66]:
data = spark.read.csv(path, header=True, inferSchema=True)

In [68]:
data.printSchema()

root
 |-- area: double (nullable = true)
 |-- perimeter: double (nullable = true)
 |-- compactness: double (nullable = true)
 |-- length_of_kernel: double (nullable = true)
 |-- width_of_kernel: double (nullable = true)
 |-- asymmetry_coefficient: double (nullable = true)
 |-- length_of_groove: double (nullable = true)



In [71]:
data.head(1)

[Row(area=15.26, perimeter=14.84, compactness=0.871, length_of_kernel=5.763, width_of_kernel=3.312, asymmetry_coefficient=2.221, length_of_groove=5.22)]

In [72]:
from pyspark.ml.clustering import KMeans

In [69]:
from pyspark.ml.feature import VectorAssembler

In [70]:
data.columns

['area',
 'perimeter',
 'compactness',
 'length_of_kernel',
 'width_of_kernel',
 'asymmetry_coefficient',
 'length_of_groove']

In [73]:
assembler = VectorAssembler(inputCols=data.columns, outputCol='features')

In [74]:
final_data = assembler.transform(data)

In [76]:
final_data.printSchema()

root
 |-- area: double (nullable = true)
 |-- perimeter: double (nullable = true)
 |-- compactness: double (nullable = true)
 |-- length_of_kernel: double (nullable = true)
 |-- width_of_kernel: double (nullable = true)
 |-- asymmetry_coefficient: double (nullable = true)
 |-- length_of_groove: double (nullable = true)
 |-- features: vector (nullable = true)



In [None]:
# Since a lot of machine learning algorithm object don't mind having a bunch of extra 
# columns (they don't read them) they won't do anything with them, they just look for 
# features column and (in case of supervised also they look for label column) so I don't 
# need to perform the following:

# my_final_data = final_data.select('features')

In [77]:
from pyspark.ml.feature import StandardScaler

In [78]:
scaler = StandardScaler(inputCol='features', outputCol='scaledFeatures')

In [79]:
scaler_model = scaler.fit(final_data)

In [80]:
final_data = scaler_model.transform(final_data)

In [81]:
final_data.printSchema()

root
 |-- area: double (nullable = true)
 |-- perimeter: double (nullable = true)
 |-- compactness: double (nullable = true)
 |-- length_of_kernel: double (nullable = true)
 |-- width_of_kernel: double (nullable = true)
 |-- asymmetry_coefficient: double (nullable = true)
 |-- length_of_groove: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- scaledFeatures: vector (nullable = true)



In [82]:
final_data.head(1)

[Row(area=15.26, perimeter=14.84, compactness=0.871, length_of_kernel=5.763, width_of_kernel=3.312, asymmetry_coefficient=2.221, length_of_groove=5.22, features=DenseVector([15.26, 14.84, 0.871, 5.763, 3.312, 2.221, 5.22]), scaledFeatures=DenseVector([5.2445, 11.3633, 36.8608, 13.0072, 8.7685, 1.4772, 10.621]))]

In [83]:
kmeans = KMeans(featuresCol='scaledFeatures', k=3)

In [84]:
model = kmeans.fit(final_data)

In [85]:
print (f" wssse is equal to {model.summary.trainingCost}")

 wssse is equal to 429.07559671507244


In [86]:
centers = model.clusterCenters()

In [88]:
centers

[array([ 4.87257659, 10.88120146, 37.27692543, 12.3410157 ,  8.55443412,
         1.81649011, 10.32998598]),
 array([ 6.31670546, 12.37109759, 37.39491396, 13.91155062,  9.748067  ,
         2.39849968, 12.2661748 ]),
 array([ 4.06105916, 10.13979506, 35.80536984, 11.82133095,  7.50395937,
         3.27184732, 10.42126018])]

In [91]:
model.transform(final_data).select('prediction').show()

+----------+
|prediction|
+----------+
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         1|
|         1|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         2|
+----------+
only showing top 20 rows



In [92]:
# consulting project 

In [1]:
import findspark

In [2]:
findspark.init('/home/danial/spark-3.3.2-bin-hadoop3')

In [8]:
from pyspark.sql import SparkSession

In [10]:
spark = SparkSession.builder.appName('hackers').getOrCreate()

23/04/13 09:57:52 WARN Utils: Your hostname, danial-VirtualBox resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
23/04/13 09:57:52 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/04/13 09:57:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [14]:
path = '/home/danial/Desktop/myspark/Apache-Spark/Python-and-Spark-for-Big-Data-master/Spark_for_Machine_Learning/Clustering/hack_data.csv'


In [15]:
data = spark.read.csv(path, header=True, inferSchema=True)



In [16]:
data.printSchema()

root
 |-- Session_Connection_Time: double (nullable = true)
 |-- Bytes Transferred: double (nullable = true)
 |-- Kali_Trace_Used: integer (nullable = true)
 |-- Servers_Corrupted: double (nullable = true)
 |-- Pages_Corrupted: double (nullable = true)
 |-- Location: string (nullable = true)
 |-- WPM_Typing_Speed: double (nullable = true)



In [19]:
data.head(1)

[Row(Session_Connection_Time=8.0, Bytes Transferred=391.09, Kali_Trace_Used=1, Servers_Corrupted=2.96, Pages_Corrupted=7.0, Location='Slovenia', WPM_Typing_Speed=72.37)]

In [30]:
data.show()

+-----------------------+-----------------+---------------+-----------------+---------------+--------------------+----------------+
|Session_Connection_Time|Bytes Transferred|Kali_Trace_Used|Servers_Corrupted|Pages_Corrupted|            Location|WPM_Typing_Speed|
+-----------------------+-----------------+---------------+-----------------+---------------+--------------------+----------------+
|                    8.0|           391.09|              1|             2.96|            7.0|            Slovenia|           72.37|
|                   20.0|           720.99|              0|             3.04|            9.0|British Virgin Is...|           69.08|
|                   31.0|           356.32|              1|             3.71|            8.0|             Tokelau|           70.58|
|                    2.0|           228.08|              1|             2.48|            8.0|             Bolivia|            70.8|
|                   20.0|            408.5|              0|             3.57

In [20]:
data.describe().show()

[Stage 4:>                                                          (0 + 1) / 1]

+-------+-----------------------+------------------+------------------+-----------------+------------------+-----------+------------------+
|summary|Session_Connection_Time| Bytes Transferred|   Kali_Trace_Used|Servers_Corrupted|   Pages_Corrupted|   Location|  WPM_Typing_Speed|
+-------+-----------------------+------------------+------------------+-----------------+------------------+-----------+------------------+
|  count|                    334|               334|               334|              334|               334|        334|               334|
|   mean|     30.008982035928145| 607.2452694610777|0.5119760479041916|5.258502994011977|10.838323353293413|       null|57.342395209580864|
| stddev|     14.088200614636158|286.33593163576757|0.5006065264451406| 2.30190693339697|  3.06352633036022|       null| 13.41106336843464|
|    min|                    1.0|              10.0|                 0|              1.0|               6.0|Afghanistan|              40.0|
|    max|           

                                                                                

#### my questions while solving this:

#### do i need to scale data? yes for sure, b/c of describe.show() above 
#### do i need to consider location? given the fact that the hackers used location? 
#### let's assume i want to use it, since they are in string type can I use them or i need to use STringIndex, let's see how many distinc value I have? answer 181 out of 334! maybe i don't need to perform on hot encoding!

In [26]:
data.select('location').distinct().count()

181

In [28]:
data.select('location').count()

334

In [31]:
from pyspark.ml.feature import VectorAssembler

In [29]:
data.columns

['Session_Connection_Time',
 'Bytes Transferred',
 'Kali_Trace_Used',
 'Servers_Corrupted',
 'Pages_Corrupted',
 'Location',
 'WPM_Typing_Speed']

In [32]:
assembler = VectorAssembler(inputCols=['Session_Connection_Time',
                                     'Bytes Transferred',
                                     'Kali_Trace_Used',
                                     'Servers_Corrupted',
                                     'Pages_Corrupted',
                                     'WPM_Typing_Speed'], outputCol='features')

In [33]:
output = assembler.transform(data)

In [34]:
output.printSchema()

root
 |-- Session_Connection_Time: double (nullable = true)
 |-- Bytes Transferred: double (nullable = true)
 |-- Kali_Trace_Used: integer (nullable = true)
 |-- Servers_Corrupted: double (nullable = true)
 |-- Pages_Corrupted: double (nullable = true)
 |-- Location: string (nullable = true)
 |-- WPM_Typing_Speed: double (nullable = true)
 |-- features: vector (nullable = true)



In [35]:
output.head(1)

[Row(Session_Connection_Time=8.0, Bytes Transferred=391.09, Kali_Trace_Used=1, Servers_Corrupted=2.96, Pages_Corrupted=7.0, Location='Slovenia', WPM_Typing_Speed=72.37, features=DenseVector([8.0, 391.09, 1.0, 2.96, 7.0, 72.37]))]

In [36]:
final_data = output.select('features')

In [37]:
final_data.show()

+--------------------+
|            features|
+--------------------+
|[8.0,391.09,1.0,2...|
|[20.0,720.99,0.0,...|
|[31.0,356.32,1.0,...|
|[2.0,228.08,1.0,2...|
|[20.0,408.5,0.0,3...|
|[1.0,390.69,1.0,2...|
|[18.0,342.97,1.0,...|
|[22.0,101.61,1.0,...|
|[15.0,275.53,1.0,...|
|[12.0,424.83,1.0,...|
|[15.0,249.09,1.0,...|
|[32.0,242.48,0.0,...|
|[23.0,514.54,0.0,...|
|[9.0,284.77,0.0,3...|
|[27.0,779.25,1.0,...|
|[12.0,307.31,1.0,...|
|[21.0,355.94,1.0,...|
|[10.0,372.65,0.0,...|
|[20.0,347.23,1.0,...|
|[22.0,456.57,0.0,...|
+--------------------+
only showing top 20 rows



In [23]:
from pyspark.ml.feature import StandardScaler

In [38]:
scaler = StandardScaler(inputCol='features', outputCol='featuresScaled')

In [39]:
scaler_model = scaler.fit(final_data)

In [40]:
final_data = scaler_model.transform(final_data)

In [41]:
final_data.printSchema()

root
 |-- features: vector (nullable = true)
 |-- featuresScaled: vector (nullable = true)



In [42]:
final_data.head(1)

[Row(features=DenseVector([8.0, 391.09, 1.0, 2.96, 7.0, 72.37]), featuresScaled=DenseVector([0.5679, 1.3658, 1.9976, 1.2859, 2.2849, 5.3963]))]

In [43]:
from pyspark.ml.clustering import KMeans

In [44]:
kmeans = KMeans(k=2, featuresCol='featuresScaled')

In [45]:
model_2_k = kmeans.fit(final_data)

In [46]:
centers = model_2_k.clusterCenters()

In [47]:
centers

[array([2.99991988, 2.92319035, 1.05261534, 3.20390443, 4.51321315,
        3.28474   ]),
 array([1.26023837, 1.31829808, 0.99280765, 1.36491885, 2.5625043 ,
        5.26676612])]

In [49]:
model_2_k.transform(final_data).select('prediction').show()

+----------+
|prediction|
+----------+
|         1|
|         1|
|         1|
|         1|
|         1|
|         1|
|         1|
|         1|
|         1|
|         1|
|         1|
|         1|
|         1|
|         1|
|         1|
|         1|
|         1|
|         1|
|         1|
|         1|
+----------+
only showing top 20 rows



In [56]:
model_2_k.transform(final_data).groupBy('prediction').count().show()

+----------+-----+
|prediction|count|
+----------+-----+
|         1|  167|
|         0|  167|
+----------+-----+



In [57]:
# it seems that 2 grouop of hackers seems in line with the assumption of hackers trade off attacks

In [58]:
KMeans_3 = KMeans(k=3, featuresCol='featuresScaled')

In [59]:
model_3_k = KMeans_3.fit(final_data)

In [60]:
centers = model_3_k.clusterCenters()
centers

[array([2.93719177, 2.88492202, 0.        , 3.19938371, 4.52857793,
        3.30407351]),
 array([1.26023837, 1.31829808, 0.99280765, 1.36491885, 2.5625043 ,
        5.26676612]),
 array([3.05623261, 2.95754486, 1.99757683, 3.2079628 , 4.49941976,
        3.26738378])]

In [66]:
model_3_k.transform(final_data).groupBy('prediction').count().show()

+----------+-----+
|prediction|count|
+----------+-----+
|         1|  167|
|         2|   88|
|         0|   79|
+----------+-----+



In [67]:
# bingo this proves that the 2 group of hackers is the right answer based on the trade off attacks assumption