# **Desarrollo del la prueba técnica para HCR**

### Desarrollador: Luis Arturo Cruz Cruz
- Email: info@meluiscruz.com
- Puesto: Data Engineer para Seguritech
- Reclutador: Alberto Ramirez
- Fecha: 1 de marzo, 2021.

#### 1) Definimos la cabecera del proyecto

In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import IntegerType, StringType, FloatType, TimestampType
from pyspark.sql.types import Row
from pyspark.sql.functions import *
from pyspark.sql import SQLContext

In [2]:
spark = SparkSession.builder \
        .master("local") \
        .appName("Technical_Test")\
        .getOrCreate()

In [3]:
sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)

#### 2) Declaramos la ruta del archivo
Nota: El archivo fue previamente procesado en una copia para el mejor acondicionamiento y destilación de los datos.

In [4]:
!ls data_sources/

clean_dataSyntheticMoloch.csv  cleaning_dataset.ipynb  dataSyntheticMoloch.csv


In [5]:
path = './data_sources/'

In [6]:
#previsualizar las primeras 4 lineas del archivo
!head -n 4 ./data_sources/clean_dataSyntheticMoloch.csv

 data||timestamp||id||index||partition 
"{\id\"": 1","\""dstDataBytes\"": 84"," \""dstBytes\"": 92"," \""packetLen\"": [0",104, 108]," \""srcPort\"": 41906"," \""totPackets\"": 2"," \""packetPos\"": [-20456,12452575424, 12452575528]," \""srcPayload8\"": \""302c02010104066e\"""," \""segmentCnt\"": 1"," \""srcPackets\"": 1"," \""protocol\"": [\""udp\"""," \""snmp\""]"," \""lastPacket\"": 1574403347879"," \""dstPort\"": 161"," \""communityId\"": \""1:iMBcVXr7c4KmnKeSnA/rTTFPvSo=\"""," \""dstPayload8\"": \""303002010104066e\"""," \""timestamp\"": 1574403378042"," \""srcBytes\"": 88"," \""dstMacCnt\"": 1"," \""srcIp\"": \""10.33.14.35\"""," \""firstPacket\"": 1574403347879"," \""srcDataBytes\"": 80"," \""dstMac\"": [\""40:ce:24:8a:ea:80\""]"," \""length\"": 0"," \""srcMacCnt\"": 1"," \""totDataBytes\"": 164"," \""ipProtocol\"": 17"," \""node\"": \""localhost\"""," \""dstPackets\"": 1"," \""protocolCnt\"": 2"," \""totBytes\"": 180"," \""srcMac\"": [\""d0:27:88:42:f4:8a\""]"," \""dstIp\"": 

#### 3) Leemos el archivo csv para poder crear un RDD
Nota: El separador de campos fue ajustado previamente a una doble pipeline "||".

In [7]:
cleanMolochDataRDD = sc.textFile(path + "clean_dataSyntheticMoloch.csv")\
    .map(lambda line : line.split("||"))

## Solución del requerimiento 1.

In [8]:
cleanMolochDataRDD.take(2)

[[' data', 'timestamp', 'id', 'index', 'partition '],
 ['"{\\id\\"": 1","\\""dstDataBytes\\"": 84"," \\""dstBytes\\"": 92"," \\""packetLen\\"": [0",104, 108]," \\""srcPort\\"": 41906"," \\""totPackets\\"": 2"," \\""packetPos\\"": [-20456,12452575424, 12452575528]," \\""srcPayload8\\"": \\""302c02010104066e\\"""," \\""segmentCnt\\"": 1"," \\""srcPackets\\"": 1"," \\""protocol\\"": [\\""udp\\"""," \\""snmp\\""]"," \\""lastPacket\\"": 1574403347879"," \\""dstPort\\"": 161"," \\""communityId\\"": \\""1:iMBcVXr7c4KmnKeSnA/rTTFPvSo=\\"""," \\""dstPayload8\\"": \\""303002010104066e\\"""," \\""timestamp\\"": 1574403378042"," \\""srcBytes\\"": 88"," \\""dstMacCnt\\"": 1"," \\""srcIp\\"": \\""10.33.14.35\\"""," \\""firstPacket\\"": 1574403347879"," \\""srcDataBytes\\"": 80"," \\""dstMac\\"": [\\""40:ce:24:8a:ea:80\\""]"," \\""length\\"": 0"," \\""srcMacCnt\\"": 1"," \\""totDataBytes\\"": 164"," \\""ipProtocol\\"": 17"," \\""node\\"": \\""localhost\\"""," \\""dstPackets\\"": 1"," \\""protocolCnt\

In [9]:
cleanMolochDataRDD.count()
#La función count() permite saber la integridad del RDD.

11

In [10]:
#definimos una función que eliminará la primera linea del RDD (encabezado)
def dropHeader(index,iterator):
    return iter(list(iterator)[1:])

In [11]:
DF_from_RDD_Moloch = cleanMolochDataRDD.mapPartitionsWithIndex(dropHeader)

In [12]:
DF_from_RDD_Moloch.take(2)
#Validando que el RDD ya no tiene el encabezado

[['"{\\id\\"": 1","\\""dstDataBytes\\"": 84"," \\""dstBytes\\"": 92"," \\""packetLen\\"": [0",104, 108]," \\""srcPort\\"": 41906"," \\""totPackets\\"": 2"," \\""packetPos\\"": [-20456,12452575424, 12452575528]," \\""srcPayload8\\"": \\""302c02010104066e\\"""," \\""segmentCnt\\"": 1"," \\""srcPackets\\"": 1"," \\""protocol\\"": [\\""udp\\"""," \\""snmp\\""]"," \\""lastPacket\\"": 1574403347879"," \\""dstPort\\"": 161"," \\""communityId\\"": \\""1:iMBcVXr7c4KmnKeSnA/rTTFPvSo=\\"""," \\""dstPayload8\\"": \\""303002010104066e\\"""," \\""timestamp\\"": 1574403378042"," \\""srcBytes\\"": 88"," \\""dstMacCnt\\"": 1"," \\""srcIp\\"": \\""10.33.14.35\\"""," \\""firstPacket\\"": 1574403347879"," \\""srcDataBytes\\"": 80"," \\""dstMac\\"": [\\""40:ce:24:8a:ea:80\\""]"," \\""length\\"": 0"," \\""srcMacCnt\\"": 1"," \\""totDataBytes\\"": 164"," \\""ipProtocol\\"": 17"," \\""node\\"": \\""localhost\\"""," \\""dstPackets\\"": 1"," \\""protocolCnt\\"": 2"," \\""totBytes\\"": 180"," \\""srcMac\\"": [\\

In [13]:
#' data', 'timestamp', 'id', 'index', 'partition '

#### 4) Creamos un DF a partir del RDD del paso anterior.

In [14]:
#Casting para la transformación de RDD a DF
DF_from_RDD_Moloch = DF_from_RDD_Moloch.map(lambda l :(
    str(l[0]),
    str(l[1]),
    str(l[2]),
    str(l[3]),
    str(l[4])
))

In [15]:
RDD_raw_json_data = DF_from_RDD_Moloch.map(lambda x: (x[1],x[0],x[2],x[3],x[4]))

In [16]:
RDD_raw_json_data.count()

10

In [17]:
RDD_raw_json_data.take(1)

[('2019-11-22T00:16:18.042-06:00',
  '"{\\id\\"": 1","\\""dstDataBytes\\"": 84"," \\""dstBytes\\"": 92"," \\""packetLen\\"": [0",104, 108]," \\""srcPort\\"": 41906"," \\""totPackets\\"": 2"," \\""packetPos\\"": [-20456,12452575424, 12452575528]," \\""srcPayload8\\"": \\""302c02010104066e\\"""," \\""segmentCnt\\"": 1"," \\""srcPackets\\"": 1"," \\""protocol\\"": [\\""udp\\"""," \\""snmp\\""]"," \\""lastPacket\\"": 1574403347879"," \\""dstPort\\"": 161"," \\""communityId\\"": \\""1:iMBcVXr7c4KmnKeSnA/rTTFPvSo=\\"""," \\""dstPayload8\\"": \\""303002010104066e\\"""," \\""timestamp\\"": 1574403378042"," \\""srcBytes\\"": 88"," \\""dstMacCnt\\"": 1"," \\""srcIp\\"": \\""10.33.14.35\\"""," \\""firstPacket\\"": 1574403347879"," \\""srcDataBytes\\"": 80"," \\""dstMac\\"": [\\""40:ce:24:8a:ea:80\\""]"," \\""length\\"": 0"," \\""srcMacCnt\\"": 1"," \\""totDataBytes\\"": 164"," \\""ipProtocol\\"": 17"," \\""node\\"": \\""localhost\\"""," \\""dstPackets\\"": 1"," \\""protocolCnt\\"": 2"," \\""totBy

In [18]:
#En esta celda acondicionamos la columna de datos, que es un string en formato JSON
#Se pueden apreciar las condiciones para "aplanar" la string a un formato JSON valido.
RDD_clean_json_data = RDD_raw_json_data.\
                    map(lambda x: (x[0],x[1].replace("\"{\\","{\"").replace('}\"\"\"',"}")\
                                   .replace("\\\"\"","\"").replace("\",\"\"",",\"")\
                                   .replace("\",\" \"",",\"").replace("[0\",","[0,")\
                                   .replace(",\" \"",",\"").replace("\",\" ",", ")\
                                   ,x[2],x[3],x[4]))

In [19]:
#En esta celda comparamos los efectos de los reemplazamientos
RDD_clean_json_data.take(1)

[('2019-11-22T00:16:18.042-06:00',
  '{"id": 1,"dstDataBytes": 84,"dstBytes": 92,"packetLen": [0,104, 108],"srcPort": 41906,"totPackets": 2,"packetPos": [-20456,12452575424, 12452575528],"srcPayload8": "302c02010104066e","segmentCnt": 1,"srcPackets": 1,"protocol": ["udp","snmp"],"lastPacket": 1574403347879,"dstPort": 161,"communityId": "1:iMBcVXr7c4KmnKeSnA/rTTFPvSo=","dstPayload8": "303002010104066e","timestamp": 1574403378042,"srcBytes": 88,"dstMacCnt": 1,"srcIp": "10.33.14.35","firstPacket": 1574403347879,"srcDataBytes": 80,"dstMac": ["40:ce:24:8a:ea:80"],"length": 0,"srcMacCnt": 1,"totDataBytes": 164,"ipProtocol": 17,"node": "localhost","dstPackets": 1,"protocolCnt": 2,"totBytes": 180,"srcMac": ["d0:27:88:42:f4:8a"],"dstIp": "10.33.191.126","fileId": [20456]}',
  '191122-cLA1a4d6dN1NP6_r80RW_gke',
  'sessions2-*',
  '01/11/2019')]

In [20]:
#Establecemos un esquema para la creacion del DF a partir del RDD
schemaMoloch = StructType([
    StructField("time_stamp", StringType(),False),
    StructField("data_json", StringType(), False),
    StructField("id", StringType(), False),
    StructField("index", StringType(), False),
    StructField("partition", StringType(), False),
])

In [21]:
#Creacion del DF
DF_Moloch = sqlContext.createDataFrame(RDD_clean_json_data, schemaMoloch)

In [22]:
#Comprobación de la integridad del DF
DF_Moloch.count()

10

## Solución del requerimiento 2.

In [23]:
DF_Moloch.show()

+--------------------+--------------------+--------------------+-----------+----------+
|          time_stamp|           data_json|                  id|      index| partition|
+--------------------+--------------------+--------------------+-----------+----------+
|2019-11-22T00:16:...|{"id": 1,"dstData...|191122-cLA1a4d6dN...|sessions2-*|01/11/2019|
|2019-11-22T20:52:...|{"id": 2,"dstData...|191123-cLAIOqWgxI...|sessions2-*|01/11/2019|
|2019-11-22T01:30:...|{"id": 3,"dstData...|191122-cLAoQvjqLs...|sessions2-*|01/11/2019|
|2019-11-22T14:45:...|{"id": 4,"dstData...|191122-cLAKOAk2Fo...|sessions2-*|01/11/2019|
|2019-11-22T08:31:...|{"id": 5,"dstData...|191122-cLBr8zFauv...|sessions2-*|01/11/2019|
|2019-11-22T00:16:...|{"id": 6,"dstData...|191122-cLA1a4d6dN...|sessions2-*|01/11/2019|
|2019-11-22T18:41:...|{"id": 7,"rootId"...|191123-cLA335QX0z...|sessions2-*|01/11/2019|
|2019-11-22T16:57:...|{"id": 8,"dstData...|191122-cLB0C_rAhV...|sessions2-*|01/11/2019|
|2019-11-22T13:56:...|{"id": 9,"

#### 5) Creamos un RDD que contenga solo las strings en formato JSON.
Nota: Este RDD nos permitirá crear un DF solo para esos datos

In [24]:
RDD_OnlyJsonData = RDD_clean_json_data.map(lambda x : x[1])

In [25]:
RDD_OnlyJsonData.take(1)

['{"id": 1,"dstDataBytes": 84,"dstBytes": 92,"packetLen": [0,104, 108],"srcPort": 41906,"totPackets": 2,"packetPos": [-20456,12452575424, 12452575528],"srcPayload8": "302c02010104066e","segmentCnt": 1,"srcPackets": 1,"protocol": ["udp","snmp"],"lastPacket": 1574403347879,"dstPort": 161,"communityId": "1:iMBcVXr7c4KmnKeSnA/rTTFPvSo=","dstPayload8": "303002010104066e","timestamp": 1574403378042,"srcBytes": 88,"dstMacCnt": 1,"srcIp": "10.33.14.35","firstPacket": 1574403347879,"srcDataBytes": 80,"dstMac": ["40:ce:24:8a:ea:80"],"length": 0,"srcMacCnt": 1,"totDataBytes": 164,"ipProtocol": 17,"node": "localhost","dstPackets": 1,"protocolCnt": 2,"totBytes": 180,"srcMac": ["d0:27:88:42:f4:8a"],"dstIp": "10.33.191.126","fileId": [20456]}']

In [26]:
import json

In [27]:
#Ciclo constructor de schema
#jsonSchema =  StructType([
  #StructField(field, StringType(), False) for field in dataJsonFields
#])

#### 6) Creamos el DF a partir del RDD de strings en formato JSON.

In [28]:
onlyJsonData_DF = spark.read.json(RDD_OnlyJsonData, mode="PERMISSIVE",multiLine = "true")

In [29]:
onlyJsonData_DF.printSchema()
#En este paso comprobamos la integridad del DF

root
 |-- cert: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- alt: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- altCnt: long (nullable = true)
 |    |    |-- hash: string (nullable = true)
 |    |    |-- issuerCN: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- issuerON: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- notAfter: long (nullable = true)
 |    |    |-- notBefore: long (nullable = true)
 |    |    |-- remainingDays: long (nullable = true)
 |    |    |-- serial: string (nullable = true)
 |    |    |-- subjectCN: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- subjectON: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- validDays: long (nullable = true)
 |-- certCnt: long (nullable = true)
 |--

#### 7) Declaramos el DF srcIP_DF a partir del DF onlyJsonData_DF.

In [30]:
srcIP_DF = onlyJsonData_DF.select("srcIP","srcBytes","totBytes","length")

## Solución del requerimiento 3.

In [31]:
srcIP_DF.show(2)

+--------------------+--------+--------+------+
|               srcIP|srcBytes|totBytes|length|
+--------------------+--------+--------+------+
|         10.33.14.35|      88|     180|     0|
|fe80::b1cb:ab14:b...|     178|     178|   414|
+--------------------+--------+--------+------+
only showing top 2 rows



## Promedios de los metadatos srcBytes, totBytes, length

In [32]:
srcIP_DF.select([mean('srcBytes'), mean('totBytes'), mean('length')]).show()

+-------------+-------------+-----------+
|avg(srcBytes)|avg(totBytes)|avg(length)|
+-------------+-------------+-----------+
|       1179.0|       4091.0|    65430.5|
+-------------+-------------+-----------+



## Desviaciones Estandar de los metadatos srcBytes, totBytes, length

In [33]:
srcIP_DF.select([stddev('srcBytes'), stddev('totBytes'), stddev('length')]).show()

+---------------------+---------------------+-------------------+
|stddev_samp(srcBytes)|stddev_samp(totBytes)|stddev_samp(length)|
+---------------------+---------------------+-------------------+
|   1667.4001719243447|    6843.912737283808| 206615.38485217193|
+---------------------+---------------------+-------------------+



#### 8) Declaramos la variable groupByProtocol_DF a partir del DF onlyJsonData_DF.

In [34]:
groupByProtocol_DF = onlyJsonData_DF.select("srcIP",explode("protocol").alias("hierarchy")\
                                            ,"srcBytes","totBytes","length")

In [35]:
groupByProtocol_DF.show(4)

+--------------------+---------+--------+--------+------+
|               srcIP|hierarchy|srcBytes|totBytes|length|
+--------------------+---------+--------+--------+------+
|         10.33.14.35|      udp|      88|     180|     0|
|         10.33.14.35|     snmp|      88|     180|     0|
|fe80::b1cb:ab14:b...|      udp|     178|     178|   414|
|fe80::b1cb:ab14:b...|    llmnr|     178|     178|   414|
+--------------------+---------+--------+--------+------+
only showing top 4 rows



In [36]:
#Indexamos el DF para poder filtrar los protocolos acorde a la condición del req. 4
indexedDF = groupByProtocol_DF.select("*").withColumn("idx", monotonically_increasing_id())

#### 9) Hacemos un filtro del DF por jerarquía de protocolos

In [37]:
#Filtramos los datos que cumplen con la cndición del req. 4
secondHierarchy_DF = indexedDF.filter((indexedDF.idx%2 == 0))\
                        .filter((indexedDF.hierarchy != "tcp"))\
                        .filter((indexedDF.hierarchy != "udp"))

## Solución del requerimiento 4.

In [38]:
secondHierarchy_DF.show()

+--------------------+---------+--------+--------+------+---+
|               srcIP|hierarchy|srcBytes|totBytes|length|idx|
+--------------------+---------+--------+--------+------+---+
|         10.10.18.35|     http|    1632|    1632|     0|  6|
|       10.33.176.205|     http|    5160|   21120|653468| 12|
|       10.33.128.161|      tls|    1380|    7122|   400| 14|
|fe80::b429:1e0a:1...|     http|    2756|    9434|    11| 16|
+--------------------+---------+--------+--------+------+---+



## Promedios de los metadatos srcBytes, totBytes, length

In [39]:
secondHierarchy_DF.select([mean('srcBytes'), mean('totBytes'), mean('length')]).show()

+-------------+-------------+-----------+
|avg(srcBytes)|avg(totBytes)|avg(length)|
+-------------+-------------+-----------+
|       2732.0|       9827.0|  163469.75|
+-------------+-------------+-----------+



## Desviaciones Estandar de los metadatos srcBytes, totBytes, length

In [40]:
secondHierarchy_DF.select([stddev('srcBytes'), stddev('totBytes'), stddev('length')]).show()

+---------------------+---------------------+-------------------+
|stddev_samp(srcBytes)|stddev_samp(totBytes)|stddev_samp(length)|
+---------------------+---------------------+-------------------+
|   1725.6558173633582|    8208.965586479213|  326665.5529665114|
+---------------------+---------------------+-------------------+



## Fin de la prueba técnica.

In [42]:
spark

In [41]:
#sc.stop()