# Filter and save as parquet

In [82]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.config("spark.driver.host", "localhost") \
.config("spark.driver.extraJavaOptions", "-Djava.security.manager=allow") \
.config("spark.executor.extraJavaOptions", "-Djava.security.manager=allow") \
.getOrCreate()

In [83]:
run = "low_v1"

cpu_load = "/media/kakn/storage/prometheus_export/" + run + "/processor_cpu_full.json"
message_folder = "/media/kakn/storage/message_export/" + run + "/egress"

In [84]:
range_start = 0
range_end = 0

import json, os

with open(cpu_load, "r") as f:
    cpu_load = json.load(f)["data"]["result"][0]["values"]
    cpu_load = list(map(lambda x: (x[0], float(x[1])), cpu_load))


for i in range(len(cpu_load) - 19 ):
    if all(cpu_load[j][1] < 0.5 for j in range(i, i + 20)):
        range_start = cpu_load[i][0]
        break

for i in range(len(cpu_load) - 19):
    if all(cpu_load[j][1] < 0.02 for j in range(i, i + 20)):
        range_end = cpu_load[i-1][0]
        break

range_start = int(range_start * 1000)
range_end = int(range_end * 1000)

In [85]:
input_path = "/media/kakn/storage/filtered/manual/" + run
input_egress_path = input_path + "/egress"
input_ingress_path = input_path + "/ingress"


In [86]:
import json
import pyspark.sql.functions as F
import pandas as pd

ingress_frame = spark.read.json(input_ingress_path)
egress_frame = spark.read.json(input_egress_path)

drop_cols = ["firmwareVersion", "temperature", "powerState", "coolerState", "doorOpenCount", "doorCloseCount", "doorOpenTime", "batteryLevel", "latitude", "longitude", "locationType", "locationConfidence", "wifiCount", "mobileCellId", "mobileCellType", "mobileMNC", "mobileMCC", "mobileRSSI", "mobileLac", "messageType", "plausibilityState"]

ingress_frame = ingress_frame.filter((ingress_frame.kafkaTime >= range_start) & (ingress_frame.kafkaTime <= range_end))   
ingress_frame = ingress_frame.drop_duplicates()
ingress_frame = ingress_frame.drop(*drop_cols)

egress_frame = egress_frame.drop(*drop_cols)
egress_frame = egress_frame.filter((egress_frame.kafkaTime >= range_start) & (egress_frame.kafkaTime <= range_end))

combined_frame = egress_frame.join(
    other= ingress_frame, 
    on=["traceId", "cng_deviceId", "timestamp", "timePeriodStart", "messageTimestamp"], 
    how="inner"
    )

drop_cols = ["timestamp", "timePeriodStart", "messageTimestamp", "deduplicationHash"]
combined_frame = combined_frame.drop(*drop_cols)

combined_frame = combined_frame.withColumn("latency", egress_frame.kafkaTime - ingress_frame.kafkaTime)

combined_frame = combined_frame.filter(combined_frame.latency > 0)


# get the outlier threshold
latency = combined_frame.select("latency").toPandas()
latency = latency["latency"]
latency = latency.to_numpy()
latency = pd.Series(latency)
Q1 = latency.quantile(0.25)
Q3 = latency.quantile(0.75)
IQR = Q3 - Q1
outlier_threshold = Q3 + 1.5 * IQR

print("Outlier threshold: ", outlier_threshold)
print("Q1: ", Q1)
print("Q3: ", Q3)
print("IQR: ", IQR)

# get the number of outliers
outliers = combined_frame.filter(combined_frame.latency > outlier_threshold)
print("Number of outliers: ", outliers.count())

# get the number of messages
print("Number of messages: ", combined_frame.count())


#histogram of latency
#combined_frame.select("latency").toPandas().hist(bins=100)

#graph of latency
#combined_frame.select("latency").toPandas().plot()



# combined_frame.select(F.mean("latency"), F.median("latency"), F.std("latency"), F.min("latency"), F.max("latency"), F.percentile("latency", 0.95), F.percentile("latency", 0.99)).show(vertical=True)

                                                                                

Outlier threshold:  166.0
Q1:  21.0
Q3:  79.0
IQR:  58.0


                                                                                

Number of outliers:  11




Number of messages:  196512


                                                                                

25/02/02 19:24:42 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 78518443 ms exceeds timeout 120000 ms
25/02/02 19:24:42 WARN SparkContext: Killing executors is not supported by current scheduler.
25/02/02 19:24:44 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint