In [37]:
import findspark
findspark.init()
findspark.find()

import pyspark
from pyspark.sql import SparkSession, SQLContext
from pyspark.ml import Pipeline,Transformer
from pyspark.ml.feature import Imputer,StandardScaler,StringIndexer,OneHotEncoder, VectorAssembler

from pyspark.sql.functions import *
from pyspark.sql.types import *
import numpy as np

col_names = ['tcp_flags','tcp_time_delta','tcp_len','mqtt_conack_flags','mqtt_conack_flags_reserved',
             'mqtt_conack_flags_sp','mqtt_conack_val','mqtt_conflag_cleansess','mqtt_conflag_passwd',
             'mqtt_conflag_qos','mqtt_conflag_reserved','mqtt_conflag_retain','mqtt_conflag_uname',
             'mqtt_conflag_willflag','mqtt_conflags','mqtt_dupflag','mqtt_hdrflags','mqtt_kalive',
             'mqtt_len','mqtt_msg','mqtt_msgid','mqtt_msgtype','mqtt_proto_len','mqtt_protoname',
             'mqtt_qos','mqtt_retain','mqtt_sub_qos','mqtt_suback_qos','mqtt_ver','mqtt_willmsg',
             'mqtt_willmsg_len','mqtt_willtopic','mqtt_willtopic_len','target']

nominal_cols = ['tcp_flags', 'tcp_time_delta', 'tcp_len', 'mqtt_msg', 'mqtt_protoname',\
                'mqtt_willmsg', 'mqtt_willtopic', 'target']

binary_cols = ['mqtt_conack_flags_reserved', 'mqtt_conack_flags_sp', 'mqtt_conflag_cleansess', \
               'mqtt_conflag_passwd', 'mqtt_conflag_reserved', 'mqtt_conflag_retain', 'mqtt_conflag_uname',\
               'mqtt_conflag_willflag', 'mqtt_dupflag', 'mqtt_retain']

continuous_cols = ['mqtt_conack_val', 'mqtt_conack_flags', 'mqtt_conflag_qos', 'mqtt_conflags', 'mqtt_hdrflags', 'mqtt_kalive',\
                 'mqtt_len', 'mqtt_msgid', 'mqtt_msgtype', 'mqtt_proto_len', 'mqtt_qos',  'mqtt_sub_qos',\
                  'mqtt_suback_qos', 'mqtt_ver', 'mqtt_willmsg_len', 'mqtt_willtopic_len']

corelated_cols_to_remove = []

class OutcomeCreater(Transformer): # this defines a transformer that creates the outcome column
    
    def __init__(self):
        super().__init__()

    
    def _transform(self, dataset):
        label_to_binary = udf(lambda name: 0.0 if name == 'slowite'\
                              else 2.0 if name == 'bruteforce'\
                              else 3.0 if name == 'flood'\
                              else 4.0 if name == 'malformed'\
                              else 5.0 if name == 'dos'\
                              else 5.0
                             )
        output_df = dataset.withColumn('outcome', label_to_binary(col('target'))).drop("target")  
        output_df = output_df.withColumn('outcome', col('outcome').cast(DoubleType()))
        #output_df = output_df.drop('difficulty')
        return output_df

class FeatureTypeCaster(Transformer): # this transformer will cast the columns as appropriate types  
    def __init__(self):
        super().__init__()

    def _transform(self, dataset):
        output_df = dataset
        for col_name in binary_cols + continuous_cols:
            output_df = output_df.withColumn(col_name,col(col_name).cast(DoubleType()))

        return output_df
class ColumnDropper(Transformer): # this transformer drops unnecessary columns
    def __init__(self, columns_to_drop = None):
        super().__init__()
        self.columns_to_drop=columns_to_drop
    def _transform(self, dataset):
        output_df = dataset
        for col_name in self.columns_to_drop:
            output_df = output_df.drop(col_name)
        return output_df

def get_preprocess_pipeline():
    # Stage where columns are casted as appropriate types
    stage_typecaster = FeatureTypeCaster()

    # Stage where nominal columns are transformed to index columns using StringIndexer
    nominal_id_cols = [x+"_index" for x in nominal_cols]
    nominal_onehot_cols = [x+"_encoded" for x in nominal_cols]
    stage_nominal_indexer = StringIndexer(inputCols = nominal_cols, outputCols = nominal_id_cols )

    # Stage where the index columns are further transformed using OneHotEncoder
    stage_nominal_onehot_encoder = OneHotEncoder(inputCols=nominal_id_cols, outputCols=nominal_onehot_cols)

    # Stage where all relevant features are assembled into a vector (and dropping a few)
    feature_cols = continuous_cols+binary_cols+nominal_onehot_cols
    
    for col_name in corelated_cols_to_remove:
        feature_cols.remove(col_name)
    stage_vector_assembler = VectorAssembler(inputCols=feature_cols, outputCol="vectorized_features") # handleInvalid = 'keep'

    # Stage where we scale the columns
    stage_scaler = StandardScaler(inputCol= 'vectorized_features', outputCol= 'features')
    

    # Stage for creating the outcome column representing whether there is attack 
    stage_outcome = OutcomeCreater()

    # Removing all unnecessary columbs, only keeping the 'features' and 'outcome' columns
    stage_column_dropper = ColumnDropper(columns_to_drop = nominal_cols+nominal_id_cols+
        nominal_onehot_cols+ binary_cols + continuous_cols + ['vectorized_features'])
    # Connect the columns into a pipeline
    pipeline = Pipeline(stages=[stage_typecaster,stage_nominal_indexer,stage_nominal_onehot_encoder,
        stage_vector_assembler,stage_scaler,stage_outcome,stage_column_dropper])
    return pipeline 

In [41]:
# if you installed Spark on windows, 
# you may need findspark and need to initialize it prior to being able to use pyspark
# Also, you may need to initialize SparkContext yourself.
# Uncomment the following lines if you are using Windows!

import os
import sys

# os.environ['PYSPARK_PYTHON'] = sys.executable
# os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

spark = SparkSession.builder \
    .master("local[*]") \
    .appName("SystemsToolChains") \
    .getOrCreate()

mqtt_schema = StructType([
    StructField("tcp_flags", StringType(), True), 
    StructField("tcp_time_delta", DoubleType(), True),
    StructField("tcp_len", IntegerType(), True),
    StructField("mqtt_conack_flags", IntegerType(), True), 
    StructField("mqtt_conack_flags_reserved", BooleanType(), True),
    StructField("mqtt_conack_flags_sp", BooleanType(), True), 
    StructField("mqtt_conack_val", IntegerType(), True),
    StructField("mqtt_conflag_cleansess", BooleanType(), True),
    StructField("mqtt_conflag_passwd", BooleanType(), True),
    StructField("mqtt_conflag_qos", IntegerType(), True),
    StructField("mqtt_conflag_reserved", BooleanType(), True),
    StructField("mqtt_conflag_retain", BooleanType(), True),
    StructField("mqtt_conflag_uname", BooleanType(), True),
    StructField("mqtt_conflag_willflag", BooleanType(), True),
    StructField("mqtt_conflags", IntegerType(), True),
    StructField("mqtt_dupflag", BooleanType(), True),
    StructField("mqtt_hdrflags", IntegerType(), True),
    StructField("mqtt_kalive", IntegerType(), True),
    StructField("mqtt_len", IntegerType(), True),
    StructField("mqtt_msg", StringType(), True),
    StructField("mqtt_msgid", IntegerType(), True),
    StructField("mqtt_msgtype", IntegerType(), True),
    StructField("mqtt_proto_len", IntegerType(), True),
    StructField("mqtt_protoname", StringType(), True),
    StructField("mqtt_qos", IntegerType(), True),
    StructField("mqtt_retain", BooleanType(), True),
    StructField("mqtt_sub_qos", IntegerType(), True),
    StructField("mqtt_suback_qos", IntegerType(), True),
    StructField("mqtt_ver", IntegerType(), True),
    StructField("mqtt_willmsg", StringType(), True),
    StructField("mqtt_willmsg_len", IntegerType(), True),
    StructField("mqtt_willtopic", StringType(), True),
    StructField("mqtt_willtopic_len", IntegerType(), True),
    StructField("target", StringType(), True)
])

mqtt_raw = spark.read.csv('data/train70_reduced.csv',header=True, schema=mqtt_schema).toDF(*col_names)
mqtt_test_raw = spark.read.csv('data/test30_reduced.csv',header=True, inferSchema=True).toDF(*col_names)

mqtt_raw.printSchema()


root
 |-- tcp_flags: string (nullable = true)
 |-- tcp_time_delta: double (nullable = true)
 |-- tcp_len: integer (nullable = true)
 |-- mqtt_conack_flags: integer (nullable = true)
 |-- mqtt_conack_flags_reserved: boolean (nullable = true)
 |-- mqtt_conack_flags_sp: boolean (nullable = true)
 |-- mqtt_conack_val: integer (nullable = true)
 |-- mqtt_conflag_cleansess: boolean (nullable = true)
 |-- mqtt_conflag_passwd: boolean (nullable = true)
 |-- mqtt_conflag_qos: integer (nullable = true)
 |-- mqtt_conflag_reserved: boolean (nullable = true)
 |-- mqtt_conflag_retain: boolean (nullable = true)
 |-- mqtt_conflag_uname: boolean (nullable = true)
 |-- mqtt_conflag_willflag: boolean (nullable = true)
 |-- mqtt_conflags: integer (nullable = true)
 |-- mqtt_dupflag: boolean (nullable = true)
 |-- mqtt_hdrflags: integer (nullable = true)
 |-- mqtt_kalive: integer (nullable = true)
 |-- mqtt_len: integer (nullable = true)
 |-- mqtt_msg: string (nullable = true)
 |-- mqtt_msgid: integer (nul

In [39]:
# Column Rename
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, DoubleType
# mqtt_conack_flags | int
# mqtt_conack_flags_reserved | boolean
# mqtt_conack_flags_sp | boolean
# mqtt_conack_val | int


renamed_columns_df = df.withColumnRenamed("rerror_rate","newly_renamed_rerror_rate")
# Casting one column type
casted_types_df = (renamed_columns_df.withColumn("new_column_srv_serror_rate", \
                    renamed_columns_df["srv_serror_rate"] \
                    .cast("integer")).drop("srv_serror_rate")
                    .distinct() # deleting duplicate rows

SyntaxError: invalid syntax (2512572328.py, line 8)

In [None]:
preprocess_pipeline = get_preprocess_pipeline()
preprocess_pipeline_model = preprocess_pipeline.fit(mqtt_raw)

mqtt_df = preprocess_pipeline_model.transform(mqtt_raw)
mqtt_df_test = preprocess_pipeline_model.transform(mqtt_test_raw)

In [16]:
mqtt_df.show()
mqtt_df.printSchema()

+--------------------+-------+
|            features|outcome|
+--------------------+-------+
|(45418,[0,1,2,3,4...|    5.0|
|(45418,[3,5,7,25,...|    5.0|
|(45418,[3,5,6,7,9...|    5.0|
|(45418,[3,5,6,7,9...|    5.0|
|(45418,[3,5,7,25,...|    5.0|
|(45418,[3,5,6,7,2...|    5.0|
|(45418,[26,41,898...|    5.0|
|(45418,[26,37,898...|    5.0|
|(45418,[3,5,7,25,...|    5.0|
|(45418,[3,5,7,25,...|    5.0|
|(45418,[3,5,6,7,2...|    5.0|
|(45418,[3,5,6,7,9...|    5.0|
|(45418,[30,34,898...|    4.0|
|(45418,[3,5,7,25,...|    5.0|
|(45418,[3,5,7,25,...|    5.0|
|(45418,[0,3,5,7,2...|    2.0|
|(45418,[3,5,6,7,2...|    5.0|
|(45418,[3,5,6,7,2...|    5.0|
|(45418,[26,877,89...|    2.0|
|(45418,[26,33,898...|    5.0|
+--------------------+-------+
only showing top 20 rows

root
 |-- features: vector (nullable = true)
 |-- outcome: double (nullable = true)



In [24]:
from pyspark.sql.functions import lit

# Read the first dataset and add a constant column with label "test"
df_train = spark.read.csv("data/train70_reduced.csv", header=True, inferSchema=True).toDF(*col_names)
df_train = df_train.withColumn("Original_Dataset", lit("train"))

# Read the second dataset and add a constant column with label "train"
df_test = spark.read.csv("data/test30_reduced.csv", header=True, inferSchema=True).toDF(*col_names)
df_test = df_test.withColumn("Original_Dataset", lit("test"))


merged_df = df_train.union(df_test)
merged_df.show(1, vertical=True)

-RECORD 0--------------------------------
 tcp_flags                  | 0x00000018 
 tcp_time_delta             | 0.998867   
 tcp_len                    | 10         
 mqtt_conack_flags          | 0          
 mqtt_conack_flags_reserved | 0.0        
 mqtt_conack_flags_sp       | 0.0        
 mqtt_conack_val            | 0.0        
 mqtt_conflag_cleansess     | 0.0        
 mqtt_conflag_passwd        | 0.0        
 mqtt_conflag_qos           | 0.0        
 mqtt_conflag_reserved      | 0.0        
 mqtt_conflag_retain        | 0.0        
 mqtt_conflag_uname         | 0.0        
 mqtt_conflag_willflag      | 0.0        
 mqtt_conflags              | 0          
 mqtt_dupflag               | 0.0        
 mqtt_hdrflags              | 0x00000030 
 mqtt_kalive                | 0.0        
 mqtt_len                   | 8.0        
 mqtt_msg                   | 32         
 mqtt_msgid                 | 0.0        
 mqtt_msgtype               | 3.0        
 mqtt_proto_len             | 0.0 

In [30]:
numeric_features = [feature[0] for feature in df_train if feature[1] in continuous_cols]
print(numeric_features)
import matplotlib.pyplot as plt

ValueError: Cannot convert column into bool: please use '&' for 'and', '|' for 'or', '~' for 'not' when building DataFrame boolean expressions.

In [19]:
correlation_matrix = mqtt_df.toPandas().corr()
print(correlation_matrix)

         outcome
outcome      1.0


  correlation_matrix = mqtt_df.toPandas().corr()


In [18]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(featuresCol = 'features', labelCol = 'outcome')

lrModel = lr.fit(mqtt_df) # fit the logistic regression model to the training dataset

Py4JJavaError: An error occurred while calling o2816.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 30.0 failed 1 times, most recent failure: Lost task 2.0 in stage 30.0 (TID 83) (LAPTOP-R6MIHO76.wifi.local.cmu.edu executor driver): java.lang.RuntimeException: Vector values MUST NOT be NaN or Infinity, but got (45418,[3,5,6,7,9,25,35,9070,9711,43756,45408,45410,45411,45413],[NaN,NaN,NaN,NaN,NaN,2.012857332503923,3.533718283042206,39.57494549165696,10.271119772922047,481.29720547703164,10.268826489694721,481.29720547703164,481.29720547703164,2.0469576662564344])
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.project_doConsume_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
	at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
	at scala.collection.TraversableOnce.aggregate(TraversableOnce.scala:260)
	at scala.collection.TraversableOnce.aggregate$(TraversableOnce.scala:260)
	at scala.collection.AbstractIterator.aggregate(Iterator.scala:1431)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$4(RDD.scala:1234)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$6(RDD.scala:1235)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:853)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:853)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2785)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2721)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2720)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2720)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1206)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2984)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:971)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2263)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2358)
	at org.apache.spark.rdd.RDD.$anonfun$fold$1(RDD.scala:1172)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:405)
	at org.apache.spark.rdd.RDD.fold(RDD.scala:1166)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$2(RDD.scala:1259)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:405)
	at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1226)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$1(RDD.scala:1212)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:405)
	at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1212)
	at org.apache.spark.ml.stat.Summarizer$.getClassificationSummarizers(Summarizer.scala:233)
	at org.apache.spark.ml.classification.LogisticRegression.$anonfun$train$1(LogisticRegression.scala:517)
	at org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:191)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:191)
	at org.apache.spark.ml.classification.LogisticRegression.train(LogisticRegression.scala:497)
	at org.apache.spark.ml.classification.LogisticRegression.train(LogisticRegression.scala:287)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:114)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.RuntimeException: Vector values MUST NOT be NaN or Infinity, but got (45418,[3,5,6,7,9,25,35,9070,9711,43756,45408,45410,45411,45413],[NaN,NaN,NaN,NaN,NaN,2.012857332503923,3.533718283042206,39.57494549165696,10.271119772922047,481.29720547703164,10.268826489694721,481.29720547703164,481.29720547703164,2.0469576662564344])
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.project_doConsume_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
	at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
	at scala.collection.TraversableOnce.aggregate(TraversableOnce.scala:260)
	at scala.collection.TraversableOnce.aggregate$(TraversableOnce.scala:260)
	at scala.collection.AbstractIterator.aggregate(Iterator.scala:1431)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$4(RDD.scala:1234)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$6(RDD.scala:1235)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:853)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:853)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
