In [1]:
import findspark
findspark.init()

# Procesamiento de datos

import math
import pandas as pd
import numpy as np
import random 
from datetime import datetime


import pyspark.sql.functions as F
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import monotonically_increasing_id, row_number
from pyspark.sql import Window
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, CountVectorizer, MinMaxScaler
from pyspark.ml.linalg import Vectors
from pyspark.ml.evaluation import ClusteringEvaluator

# Modelos de clustering 

from pyspark.ml.clustering import KMeans 

# Pyspark

sc = SparkContext('local')
spark = SparkSession(sc)

In [2]:
base_path = '.'

# Preprocesamiento

In [3]:
# Esquema

schema = StructType([
    StructField('version', StringType(), True),
    StructField('time', StringType(), True),
    StructField('id', StringType(), True),
    StructField('type', StringType(), True),
    StructField('event', StringType(), True),
    StructField('data', ArrayType(StructType([
        StructField('time',  StringType(), True),
        StructField('imei', StringType(), True),
        StructField('imsi', StringType(), True),
        StructField('rat', StringType(), True),
        ]), True))
    ])    

In [4]:
df = spark.read.schema(schema).load("../../datasets/rm_dataset_v1/dataset_rm_v1.json",
                     format="json", sep=":", inferSchema="true", header="true")
df.printSchema()

root
 |-- version: string (nullable = true)
 |-- time: string (nullable = true)
 |-- id: string (nullable = true)
 |-- type: string (nullable = true)
 |-- event: string (nullable = true)
 |-- data: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- time: string (nullable = true)
 |    |    |-- imei: string (nullable = true)
 |    |    |-- imsi: string (nullable = true)
 |    |    |-- rat: string (nullable = true)



In [5]:
df.show()

+-------+----------+--------------------+----+-----+--------------------+
|version|      time|                  id|type|event|                data|
+-------+----------+--------------------+----+-----+--------------------+
|    1.0|1596800148|de539085-315f-466...|  RM| DATA|[[1596800108, 359...|
|    1.0|1596797290|de539085-315f-466...|  RM| DATA|[[1596796932, 359...|
|    1.0|1596294961|de539085-315f-466...|  RM| DATA|[[1596294959, 359...|
|    1.0|1596296581|de539085-315f-466...|  RM| DATA|[[1596296538, 352...|
|    1.0|1596297481|de539085-315f-466...|  RM| DATA|[[1596297428, 358...|
|    1.0|1596297781|de539085-315f-466...|  RM| DATA|[[1596297758, 359...|
|    1.0|1596298021|de539085-315f-466...|  RM| DATA|[[1596297972, 356...|
|    1.0|1596044533|de539085-315f-466...|  RM| DATA|[[1596039825, 355...|
|    1.0|1596295981|de539085-315f-466...|  RM| DATA|[[1596294995, 353...|
|    1.0|1596296521|de539085-315f-466...|  RM| DATA|[[1596296508, 013...|
+-------+----------+------------------

In [6]:
# Eliminamos columnas no útiles

df = df.drop("event","id","type","version","time")

In [7]:
# Expandimos los valores de la columna data en nuevas columnas del df.

df = df.withColumn("data", F.explode("data")).select(
  "*", F.col("data")["imei"].alias("imei"), F.col("data")["imsi"].alias("imsi"), F.col("data")["rat"].alias("rat"), 
  F.col("data")["time"].alias("time")
)
# Eliminamos la columna data tras haberla expandido.

df = df.drop("data")

In [9]:
# Cambio de formato de time

def timeFormat(time_string):
    datetime_string = datetime.utcfromtimestamp(int(time_string)).strftime('%Y-%m-%d %H:%M:%S')
    return datetime_string

timeFormatUDF = F.udf(lambda ts: timeFormat(ts)) 

df = df.withColumn("time", timeFormatUDF(F.col("time")).cast(TimestampType()))
df = df.withColumn("time", F.date_format(F.col("time"), "yyyy-MM-dd'T'HH:mm:ss.SSSZ"))

In [None]:
# Separamos año, mes, dia, hora, minuto y segundo de time

df = df.withColumn("year", F.year(F.col("time")))
df = df.withColumn("month", F.month(F.col("time")))
df = df.withColumn("day", F.dayofmonth(F.col("time")))
df = df.withColumn("hour", F.hour(F.col("time")))
df = df.withColumn("minute", F.minute(F.col("time")))
df = df.withColumn("second", F.second(F.col("time")))

In [None]:
# Separamos MCC, MNC y MSIN de la columna IMSI

df = df.withColumn('mcc', df.imsi.substr(1,3))
df = df.withColumn('mnc', df.imsi.substr(4,2))
df = df.withColumn('msin', df.imsi.substr(6,10))

In [None]:
""" Formato de los IMEI: TAC -- Serial_Number -- CD (15 digitos) """
# Separamos TAC, SNR y CD de la columna IMEI

df = df.withColumn('tac', df.imei.substr(1,8))
df = df.withColumn('snr', df.imei.substr(9,6))

In [None]:
# Escalamos la columna YEAR con MinMaxScaler 

df = df.withColumn("year", df["year"].cast(FloatType()))
max_year = 3000
min_year = 0
df = df.withColumn("year", (F.col("year") - min_year) / (max_year - min_year))

In [None]:
# Normalizamos columnas month, day, hour, minute y second

df = df.withColumn("month", (F.col("month") - 1) / (12 - 1))
df = df.withColumn("day", (F.col("day") - 1) / (31 - 1))
df = df.withColumn("hour", (F.col("hour") - 0) / (23 - 0))
df = df.withColumn("minute", (F.col("minute") - 0) / (59 - 0))
df = df.withColumn("second", (F.col("second") - 0) / (59 - 0))

In [None]:
# Realizamos indexado de las columnas categoricas

stringIndexerRat = StringIndexer(inputCol="rat", outputCol="rat_index",handleInvalid="keep")
stringIndexerMcc = StringIndexer(inputCol="mcc", outputCol="mcc_index",handleInvalid="keep")
stringIndexerMnc = StringIndexer(inputCol="mnc", outputCol="mnc_index",handleInvalid="keep")
stringIndexerMsin = StringIndexer(inputCol="msin", outputCol="msin_index",handleInvalid="keep")
stringIndexerTac = StringIndexer(inputCol="tac", outputCol="tac_index",handleInvalid="keep")
stringIndexerSnr = StringIndexer(inputCol="snr", outputCol="snr_index",handleInvalid="keep")

pipeline = Pipeline(stages=[stringIndexerRat,stringIndexerMcc,stringIndexerMnc,
                            stringIndexerMsin,stringIndexerTac,stringIndexerSnr])


String_Indexer_Model = pipeline.fit(df)
df = String_Indexer_Model.transform(df)

In [None]:
# Creación de vector de características

vector_assembler = VectorAssembler(inputCols=['year','month','day','hour','minute','second',
                                        'rat_index','mcc_index','mnc_index','msin_index',
                                        'tac_index','snr_index'], outputCol = "features")

df = vector_assembler.transform(df)

In [None]:
df.show(2)

In [None]:
df.select('features').show(truncate=False)

# DistanceKmeans

In [None]:
# Estrucutra del dataset normal

schema = StructType([
    StructField('label', StringType(), True), # no
    StructField('features', ArrayType(DoubleType()), True), # si
    ])  

df_train = spark.read.schema(schema).option("mode", "DROPMALFORMED").json(
    '../../deteccion_anomalias/preprocesamiento/KMeansStringIndexer.json')

In [None]:
# Estrucutra del dataset normal

schema = StructType([
    StructField('rf_id', StringType(), True), # no
    StructField('time', TimestampType(), True), # si
    StructField('rat', StringType(), True), # si
    StructField('imei', StringType(), True), # si
    StructField('imsi', StringType(), True), # si
    StructField('label', StringType(), True), # no
    StructField('year', DoubleType(), True), # si
    StructField('month', DoubleType(), True), # si
    StructField('day', DoubleType(), True), # si
    StructField('hour', DoubleType(), True), # si
    StructField('minute', DoubleType(), True), # si
    StructField('second', DoubleType(), True), # si
    StructField('mcc', StringType(), True), # si
    StructField('mnc', StringType(), True), # si
    StructField('msin', StringType(), True), # si
    StructField('tac_a', StringType(), True), # no
    StructField('tac_b', StringType(), True), # no
    StructField('snr', StringType(), True), # no
    StructField('cd', StringType(), True), # si
    StructField('rat_ohe', ArrayType(DoubleType()), True), # si
    StructField('cat_imsi', ArrayType(StringType()), True), # si
    StructField('cat_imei', ArrayType(StringType()), True), # si
    StructField('imei_vec', ArrayType(DoubleType()), True), # si
    StructField('imsi_vec', ArrayType(DoubleType()), True), # si
    StructField('features', ArrayType(DoubleType())), # si
    ])  

df_train = spark.read.schema(schema).option("mode", "DROPMALFORMED").json(
    '../../deteccion_anomalias/preprocesamiento/KMeansWord2Vec.json')

In [None]:
# Función de calculo de pertenencia a un cluster

def centroid (k,centers):
    return centers[k].tolist()

In [None]:
# Función de calculo de distancia euclidea al centroid

def distToCentroid(datapt, centroid):
    return math.sqrt(Vectors.squared_distance(datapt, centroid))

In [None]:
new_schema = ArrayType(DoubleType(), containsNull=False)
udf_foo = F.udf(lambda x:x, new_schema)

In [None]:
# Separamos df en datos normales

df_normal = df_train.filter(df_train.label == 'Normal')
df_normal = df_normal.withColumn("features",udf_foo("features"))

In [None]:
df_normal.show(1, truncate = False)

In [None]:
# Entrenamiento y predicción

kmeans = KMeans(k=30, maxIter=100, tol=1e-4, seed=319869)
model = kmeans.fit(df_normal.select('features'))

In [None]:
model_output_path = "{}/data/distanceKmeansRmSIModel.bin".format(base_path)
# model_output_path = "{}/data/distanceKmeansRmW2VModel.bin".format(base_path)
model.write().overwrite().save(model_output_path)

In [None]:
df_normal = model.transform(df_normal)

In [None]:
centers = model.clusterCenters()

In [None]:
vectorCent = F.udf(lambda k: centroid(k,centers), ArrayType(DoubleType()))
euclDistance = F.udf(lambda data,centroid: distToCentroid(data,centroid),FloatType())

In [None]:
# Calculamos valor del centroid más cercano.

df_normal = df_normal.withColumn('centroid', vectorCent(F.col('prediction')))

In [None]:
# Calculamos distancia al centroid más cercano.

df_normal = df_normal.withColumn('distance', euclDistance(F.col('features'),F.col('centroid')))

In [None]:
threshold = df_normal.groupBy('prediction').agg(F.sort_array(F.collect_list('distance'), asc=False).alias('distances'))\
.orderBy('prediction')

In [None]:
threshold = threshold.select('distances').toPandas()['distances'].values

In [None]:
threshold_path = '{}/data/thresholdStringIndexer.npy'.format(base_path)
# threshold_path = '{}/data/thresholdWord2Vec.npy'.format(base_path)
np.save(threshold_path, threshold)