# Clustering

In this scenario we're tasked with determining whether two or three hackers took part in an attack. We have data regarding details of the attacks and our task is to see if clustering can help us determine the correct number of attackers.  

In [1]:
import findspark
findspark.init("/home/bryan/Documents/Code/spark-2.4.5-bin-hadoop2.7")

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('hack').getOrCreate()

# EDA

In [3]:
data = spark.read.csv("data/hack_data.csv", inferSchema=True, header=True)

In [4]:
data.printSchema()

root
 |-- Session_Connection_Time: double (nullable = true)
 |-- Bytes Transferred: double (nullable = true)
 |-- Kali_Trace_Used: integer (nullable = true)
 |-- Servers_Corrupted: double (nullable = true)
 |-- Pages_Corrupted: double (nullable = true)
 |-- Location: string (nullable = true)
 |-- WPM_Typing_Speed: double (nullable = true)



In [5]:
data.describe().show()

+-------+-----------------------+------------------+------------------+-----------------+------------------+-----------+------------------+
|summary|Session_Connection_Time| Bytes Transferred|   Kali_Trace_Used|Servers_Corrupted|   Pages_Corrupted|   Location|  WPM_Typing_Speed|
+-------+-----------------------+------------------+------------------+-----------------+------------------+-----------+------------------+
|  count|                    334|               334|               334|              334|               334|        334|               334|
|   mean|     30.008982035928145| 607.2452694610777|0.5119760479041916|5.258502994011977|10.838323353293413|       null|57.342395209580864|
| stddev|     14.088200614636158|286.33593163576757|0.5006065264451406| 2.30190693339697|  3.06352633036022|       null| 13.41106336843464|
|    min|                    1.0|              10.0|                 0|              1.0|               6.0|Afghanistan|              40.0|
|    max|           

In [6]:
data.count()/data.groupby('Location').count().count()

1.8453038674033149

> ### It looks like we'll want to standardize our data since features with a larger magnitude will overshadow the smaller ones.

> ### Also, even though the data may not be reliable and specific countries are only occassionally repeated, we should convert 'Location' or a generate feature into a One-Hot value. There appear to be 181 unique location values, we'll try mapping them to region values.

# Pre-processing

In [7]:
import pandas as pd

In [8]:
region_df = pd.read_html('https://meta.wikimedia.org/wiki/List_of_countries_by_regional_classification', flavor='lxml')[0]
region_df.drop(columns='Global South', inplace=True)
region_df.head(3)

Unnamed: 0,Country,Region
0,Andorra,Europe
1,United Arab Emirates,Middle east
2,Afghanistan,Asia & Pacific


In [9]:
region_df = spark.createDataFrame(region_df)
data = data.join(region_df, data.Location == region_df.Country, how='left')
data.show()

+-----------------------+-----------------+---------------+-----------------+---------------+--------------------+----------------+--------------------+-------------------+
|Session_Connection_Time|Bytes Transferred|Kali_Trace_Used|Servers_Corrupted|Pages_Corrupted|            Location|WPM_Typing_Speed|             Country|             Region|
+-----------------------+-----------------+---------------+-----------------+---------------+--------------------+----------------+--------------------+-------------------+
|                   27.0|           874.26|              0|             7.05|           14.0|            Anguilla|           45.72|            Anguilla|South/Latin America|
|                   39.0|           866.95|              0|             6.82|           14.0|            Paraguay|           44.12|            Paraguay|South/Latin America|
|                   51.0|           723.94|              1|             6.34|           13.0|            Paraguay|           44.27|    

## Convert 'Region' to index value

In [10]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer, StandardScaler, VectorAssembler

In [11]:
location_indexer = StringIndexer(inputCol='Region', outputCol='Region_index', handleInvalid='keep')

In [12]:
data = location_indexer.fit(data).transform(data)

In [13]:
data.show(10)

+-----------------------+-----------------+---------------+-----------------+---------------+--------------------+----------------+--------------------+-------------------+------------+
|Session_Connection_Time|Bytes Transferred|Kali_Trace_Used|Servers_Corrupted|Pages_Corrupted|            Location|WPM_Typing_Speed|             Country|             Region|Region_index|
+-----------------------+-----------------+---------------+-----------------+---------------+--------------------+----------------+--------------------+-------------------+------------+
|                   27.0|           874.26|              0|             7.05|           14.0|            Anguilla|           45.72|            Anguilla|South/Latin America|         2.0|
|                   39.0|           866.95|              0|             6.82|           14.0|            Paraguay|           44.12|            Paraguay|South/Latin America|         2.0|
|                   51.0|           723.94|              1|           

## Convert 'Region' index to one-hot value

In [14]:
onehot_encoder = OneHotEncoder(inputCol='Region_index', outputCol='Region_1hot')

In [15]:
data = onehot_encoder.transform(data)

In [16]:
data.show(3)

+-----------------------+-----------------+---------------+-----------------+---------------+--------+----------------+--------+-------------------+------------+-------------+
|Session_Connection_Time|Bytes Transferred|Kali_Trace_Used|Servers_Corrupted|Pages_Corrupted|Location|WPM_Typing_Speed| Country|             Region|Region_index|  Region_1hot|
+-----------------------+-----------------+---------------+-----------------+---------------+--------+----------------+--------+-------------------+------------+-------------+
|                   27.0|           874.26|              0|             7.05|           14.0|Anguilla|           45.72|Anguilla|South/Latin America|         2.0|(7,[2],[1.0])|
|                   39.0|           866.95|              0|             6.82|           14.0|Paraguay|           44.12|Paraguay|South/Latin America|         2.0|(7,[2],[1.0])|
|                   51.0|           723.94|              1|             6.34|           13.0|Paraguay|           44.27|P

## Create feature vector

In [17]:
assembler = VectorAssembler(
    inputCols=[
        'Session_Connection_Time',
        'Bytes Transferred',
        'Kali_Trace_Used',
        'Servers_Corrupted',
        'Pages_Corrupted',
        'WPM_Typing_Speed',
        'Region_1hot'        
    ],
    outputCol='features'
)

In [18]:
data = assembler.transform(data)

# Cluster data

In [20]:
from pyspark.ml.clustering import KMeans

In [21]:
two_clusters = KMeans(featuresCol='features',k=2,seed=7)
three_clusters = KMeans(featuresCol='features',k=3,seed=7)

In [23]:
two_fitted = two_clusters.fit(data)

In [24]:
three_fitted = three_clusters.fit(data)

In [27]:
sum_sq_dist = two_fitted.computeCost(data), three_fitted.computeCost(data)

(6915435.188680077, 4081392.6450254675)

# END