## Initializing Spark

In [1]:
from pyspark import SparkConf, SparkContext

conf = SparkConf().setAppName("appName").setMaster("local")
sc = SparkContext()

22/11/02 15:00:54 WARN Utils: Your hostname, Alexs-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.1.131 instead (on interface en0)
22/11/02 15:00:54 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/11/02 15:00:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/11/02 15:00:56 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [2]:
import os
os.environ["PYTHONHASHSEED"]=str('myseed') # Set seed to maintain reproducibility accross clusters

## Loading data

In [3]:
data_rdd = sc.textFile("./data/botnet_tot_syn_l.csv")

In [4]:
data_rdd.count()

                                                                                

1000000

In [5]:
data_rdd.take(1)

['9.012784269851088936e+00,1.672999976689183313e+03,2.199998846107087047e+01,9.999997452701503420e-01,6.199988768910407089e+01,6.999980788311222568e+01,1.300000023299318563e+01,2.999999978593413275e+00,1.990000019239598430e+02,2.468369573014893532e+09,2.468372549224571228e+09 ,1']

## Create RDDs

In [6]:
def create_rdd(csv_line):
    numbers = csv_line.replace(" ","").split(",")
    numbers = [float(number) for number in numbers]
    aux = numbers[-1]
    numbers = numbers[:-1]
    return [numbers, aux]

In [7]:
separated_rdd = data_rdd.map(create_rdd)
separated_rdd.take(1)

[[[9.012784269851089,
   1672.9999766891833,
   21.99998846107087,
   0.9999997452701503,
   61.99988768910407,
   69.99980788311223,
   13.000000232993186,
   2.9999999785934133,
   199.00000192395984,
   2468369573.0148935,
   2468372549.224571],
  1.0]]

### Calculate mean by column

In [8]:
separated_rdd = separated_rdd.sample(False, 0.00001) # Let's use a subset in order to code faster
row_num = separated_rdd.count()
row_num

                                                                                

8

In [9]:
separated_rdd.take(2)

                                                                                

[[[2692.4681590519417,
   3558.186754897253,
   442.9999919600367,
   0.9999997452701503,
   61.99988768910407,
   69.99980788311223,
   12.99999995866449,
   -2.35688187855132e-08,
   8.00000498340475,
   2468369572.9931583,
   3645423789.1408367],
  1.0],
 [[0.15721432914011757,
   40802.09045176012,
   53.000260213743786,
   507008.67686660576,
   5100.095715924603,
   82.99988759205826,
   -1.653359582576286e-07,
   2.999999985175615,
   186.99999400676145,
   2468370084.563616,
   2468368394.7593513],
  0.0]]

In [10]:
means = sc.broadcast([x/row_num for x in separated_rdd.map(lambda x: x[0]).reduce(lambda x,y: [a+b for a,b in zip(x,y)])])
means.value[0:10]

                                                                                

[1665.7478158707154,
 22011.995103410634,
 12728.159328252657,
 126448.59168116396,
 17347161.44429345,
 10306.36265129139,
 9.499999942180999,
 1.6249999936165969,
 104.99999911448208,
 2084039885.8918612]

### Calculate stdev by column

In [16]:
import math

In [17]:
stdevs = sc.broadcast([math.sqrt(x/row_num) for x in separated_rdd
                          .map(lambda x: [(a-means.value[it])**2 for it,a in enumerate(x[0])])
                          .reduce(lambda x,y: [a+b for a,b in zip(x,y)])])
stdevs.value[0:10]

                                                                                

[1684.3453639470222,
 20551.577746042276,
 21377.156083722966,
 219014.49903268623,
 45879790.38488994,
 27019.39556724539,
 5.500000060923014,
 2.1758619192876982,
 99.38561017301706,
 669916979.4014223]

### Normalize

In [18]:
normalized_rdd = separated_rdd.map(lambda x: [[(a-means.value[it])/stdevs.value[it] for it,a in enumerate(x[0])], x[1]])
normalized_rdd.take(1)

                                                                                

[[[0.6095664019730812,
   -0.8979266008940422,
   -0.5746863281616216,
   -0.5773480396955243,
   -0.3780989254501664,
   -0.3788523994895519,
   0.6363636323116911,
   -0.7468304871650056,
   -0.9759963636809526,
   0.5736974862835984,
   0.7396902364820193],
  1.0]]

### Check mean and variances

In [19]:
normalized_means = sc.broadcast([x/row_num for x in normalized_rdd.map(lambda x: x[0]).reduce(lambda x,y: [a+b for a,b in zip(x,y)])])
normalized_means.value[0:10]

                                                                                

[-1.1102230246251565e-16,
 5.551115123125783e-17,
 5.854691731421724e-17,
 -5.551115123125783e-17,
 2.7755575615628914e-17,
 4.163336342344337e-17,
 2.498001805406602e-16,
 -5.551115123125783e-17,
 8.326672684688674e-17,
 2.220446049250313e-16]

In [20]:
normalized_stdevs = [math.sqrt(x/row_num) for x in normalized_rdd
                          .map(lambda x: [(a-normalized_means.value[it])**2 for it,a in enumerate(x[0])])
                          .reduce(lambda x,y: [a+b for a,b in zip(x,y)])]
normalized_stdevs[0:10]

                                                                                

[0.9999999999999999, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0]