In [1]:
spark

In [2]:
sc

In [3]:
df = spark.read.format("com.databricks.spark.csv").option("header","false").option("inferSchema", "true").load("oasis/netflow_day-11")

In [4]:
df.printSchema


<bound method DataFrame.printSchema of DataFrame[_c0: int, _c1: int, _c2: string, _c3: string, _c4: int, _c5: string, _c6: string, _c7: int, _c8: int, _c9: bigint, _c10: bigint]>

In [5]:
df1 = df.selectExpr("_c0 as Time","_c1 as Duration","_c2 as SrcDevice","_c3 as DstDevice","_c4 as Protocol","_c5 as SrcPort","_c6 as DstPort","_c7 as SrcPackets","_c8 as DstPackets","_c9 as SrcBytes","_c10 as DstBytes")

In [6]:
df1.printSchema

<bound method DataFrame.printSchema of DataFrame[Time: int, Duration: int, SrcDevice: string, DstDevice: string, Protocol: int, SrcPort: string, DstPort: string, SrcPackets: int, DstPackets: int, SrcBytes: bigint, DstBytes: bigint]>

In [7]:
df2 = df1.withColumn("Duration", df1["Duration"].cast("double"))

In [8]:
df2 = df2.withColumn("SrcPackets",df2["SrcPackets"].cast("double"))

In [9]:
df2 = df2.withColumn("DstPackets",df2["DstPackets"].cast("double"))

In [10]:
df2 = df2.withColumn("SrcBytes",df2["SrcBytes"].cast("double"))

In [11]:
df2 = df2.withColumn("DstBytes",df2["DstBytes"].cast("double"))

In [12]:
df2.printSchema

<bound method DataFrame.printSchema of DataFrame[Time: int, Duration: double, SrcDevice: string, DstDevice: string, Protocol: int, SrcPort: string, DstPort: string, SrcPackets: double, DstPackets: double, SrcBytes: double, DstBytes: double]>

In [13]:
df3 = df2.select("Duration","SrcPackets","DstPackets","SrcBytes","DstBytes")

In [14]:
df3.printSchema

<bound method DataFrame.printSchema of DataFrame[Duration: double, SrcPackets: double, DstPackets: double, SrcBytes: double, DstBytes: double]>

In [15]:
df3.show(5)

+--------+----------+----------+--------+--------+
|Duration|SrcPackets|DstPackets|SrcBytes|DstBytes|
+--------+----------+----------+--------+--------+
|     0.0|       1.0|       0.0|    64.0|     0.0|
|     0.0|       1.0|       0.0|    75.0|     0.0|
|     0.0|       1.0|       0.0|    69.0|     0.0|
|     0.0|       5.0|       0.0|   400.0|     0.0|
|     0.0|       1.0|       0.0|    58.0|     0.0|
+--------+----------+----------+--------+--------+
only showing top 5 rows



In [16]:
from pyspark.ml.feature import VectorAssembler

In [17]:
assembler = VectorAssembler(
    inputCols=["Duration","SrcPackets","DstPackets","SrcBytes","DstBytes"],
    outputCol="feature Vector")

In [18]:
output = assembler.transform(df3)

In [19]:
output.show(5)

+--------+----------+----------+--------+--------+--------------------+
|Duration|SrcPackets|DstPackets|SrcBytes|DstBytes|      feature Vector|
+--------+----------+----------+--------+--------+--------------------+
|     0.0|       1.0|       0.0|    64.0|     0.0|(5,[1,3],[1.0,64.0])|
|     0.0|       1.0|       0.0|    75.0|     0.0|(5,[1,3],[1.0,75.0])|
|     0.0|       1.0|       0.0|    69.0|     0.0|(5,[1,3],[1.0,69.0])|
|     0.0|       5.0|       0.0|   400.0|     0.0|(5,[1,3],[5.0,400...|
|     0.0|       1.0|       0.0|    58.0|     0.0|(5,[1,3],[1.0,58.0])|
+--------+----------+----------+--------+--------+--------------------+
only showing top 5 rows



In [20]:
from pyspark.ml.clustering import KMeans

In [21]:
kmeans = KMeans(k=5,featuresCol="feature Vector",predictionCol="cluster")
model = kmeans.fit(output)
centers = model.clusterCenters()
for x in centers:
    print(x)

[  2962.18356814    169.5142439     164.6317104   46816.85182762
 127952.78854591]
[3.05003077e+05 1.10199183e+09 3.96508103e+08 2.04425469e+11
 5.12937822e+10]
[2.83887000e+05 5.62064529e+07 1.13553349e+08 2.70663682e+09
 1.52686383e+11]
[1.25190476e+05 1.23688736e+07 3.18227680e+07 8.58889649e+08
 2.58732086e+10]
[2.79597705e+05 2.28936993e+08 9.97122548e+07 3.49151919e+10
 1.39353598e+10]


In [22]:
from pyspark.ml.clustering import BisectingKMeans

In [23]:
bkm = BisectingKMeans(k=5,featuresCol="feature Vector",predictionCol="cluster")
model = bkm.fit(output)
centers = model.clusterCenters()
for x in centers:
    print(x)

[ 2961.70742148   131.7293426    102.06339279 39807.05962848
 86766.22687638]
[6.87764776e+04 6.85910329e+06 7.90474261e+06 1.17666230e+09
 5.21702711e+09]
[1.39294048e+05 3.21155469e+07 3.67006748e+07 5.01199114e+09
 2.14358246e+10]
[2.44530316e+05 1.06348163e+09 4.32713369e+08 1.66971306e+11
 4.86456341e+10]
[2.53803404e+05 5.03489389e+07 1.01633320e+08 2.41869228e+09
 1.36434739e+11]


In [24]:
json = spark.read.json("oasis/wls_day-11")

In [25]:
json.printSchema

<bound method DataFrame.printSchema of DataFrame[AuthenticationPackage: string, Destination: string, DomainName: string, EventID: bigint, FailureReason: string, LogHost: string, LogonID: string, LogonType: bigint, LogonTypeDescription: string, ParentProcessID: string, ParentProcessName: string, ProcessID: string, ProcessName: string, ServiceName: string, Source: string, Status: string, SubjectDomainName: string, SubjectLogonID: string, SubjectUserName: string, Time: bigint, UserName: string]>

In [26]:
json.count()

83183265

In [27]:
json.createGlobalTempView("event")

In [28]:
spark.sql("select EventID,count(*) as number from global_temp.event where EventID = 4625 group by EventID").show()

+-------+------+
|EventID|number|
+-------+------+
|   4625|356966|
+-------+------+



In [29]:
spark.sql("select EventID, count(*) as number from global_temp.event where EventID = 4609 group by EventID").show()

+-------+------+
|EventID|number|
+-------+------+
|   4609|    21|
+-------+------+

