# MAPD mod. B - Final Project
##  Streaming processing of cosmic rays using Drift Tubes detectors

The goal of this project is to reproduce a real-time processing of real data collected in a particle physics detector and publish the results in a
dashboard for live monitoring.

### Students:
    - Aidin Attar - 2048654
    - Ema Baci - 2050726
    - Mariam Hergnyan - 2040478

## Computation

In [1]:
import time
import findspark
import pyspark
import json

import numpy  as np
import pandas as pd

from kafka                import KafkaProducer
from kafka.admin          import KafkaAdminClient, NewTopic
 
from pyspark.sql           import functions as F
from pyspark.sql           import SparkSession
from pyspark.streaming     import StreamingContext
from pyspark.sql.types     import StructField, StructType, DoubleType, IntegerType
from pyspark.sql.functions import from_json, col, when, count, struct, collect_list
from pyspark               import SparkConf, SparkContext
from pyspark.sql.types     import StringType 

### Spark setup

In [2]:
#initialize all the required variables
findspark.init('/usr/local/spark')

In [3]:
#start session - specify port, application name, and configuration settings.

spark = SparkSession.builder\
        .master("spark://pd-master:7077")\
        .appName("Project_CosmicRays_Dashboard_application")\
        .config("spark.sql.execution.arrow.pyspark.enabled", "true")\
        .config("spark.sql.execution.arrow.pyspark.fallback.enabled", "false")\
        .config("spark.sql.streaming.forceDeleteTempCheckpointLocation", "true")\
        .config("spark.jars.packages","org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2")\
        .config("spark.ui.port", "4041")\
        .getOrCreate()

# default parallelism
#spark.conf.set("spark.sql.shuffle.partitions", spark.sparkContext.defaultParallelism)

spark

:: loading settings :: url = jar:file:/usr/local/spark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/aidin/.ivy2/cache
The jars for the packages stored in: /home/aidin/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-c2fe7915-89f5-432b-8a1e-63ce6008a512;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.1.2 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.1.2 in central
	found org.apache.kafka#kafka-clients;2.6.0 in central
	found com.github.luben#zstd-jni;1.4.8-1 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.8.2 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.commons#commons-pool2;2.6.2 in central
:: resolution report :: resolve 1042ms :: artifacts dl 39ms
	:: modules in use:
	com.github.luben#zstd-jni;1.4.8-1 from central in [default]
	org.apache.commons#commons-pool2;2.6.2 from central 

22/09/19 19:13:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


### Kafka Setup

In [4]:
KAFKA_BOOTSTRAP_SERVERS = 'pd-slave3:9092'

producer    = KafkaProducer(bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS)
kafka_admin = KafkaAdminClient(bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS)

In [5]:
kafka_admin.list_topics()

['topic_results', 'topic_stream', '__consumer_offsets']

In [6]:
# input dataframe representing the stream of input lines from kafka
# by connecting to the appropriate servers and topic

inputDF = spark.readStream\
        .format("kafka")\
        .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS)\
        .option('subscribe', 'topic_stream')\
        .load()

inputDF.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [7]:
# the schema of the json data format used to create the messages

schema = StructType(
        [
                StructField("HEAD",        StringType()),
                StructField("FPGA",        StringType()),
                StructField("TDC_CHANNEL", StringType()),
                StructField("ORBIT_CNT",   StringType()),
                StructField("BX_COUNTER",  StringType()),
                StructField("TDC_MEAS",    StringType())
        ]
    )

In [8]:
# a new DF is created from the previous by using the pyspark.sql functions

jsonDF = inputDF.select(from_json(col("value").alias('value').cast("string"), schema).alias('value'))

jsonDF.printSchema()

root
 |-- value: struct (nullable = true)
 |    |-- HEAD: string (nullable = true)
 |    |-- FPGA: string (nullable = true)
 |    |-- TDC_CHANNEL: string (nullable = true)
 |    |-- ORBIT_CNT: string (nullable = true)
 |    |-- BX_COUNTER: string (nullable = true)
 |    |-- TDC_MEAS: string (nullable = true)



In [9]:
# flatten the jsonDF into a proper DataFrame

flatDF = jsonDF.select("value.HEAD", 
                       "value.FPGA", 
                       "value.TDC_CHANNEL",
                       "value.ORBIT_CNT",
                       "value.BX_COUNTER",
                       "value.TDC_MEAS")

flatDF.printSchema()

root
 |-- HEAD: string (nullable = true)
 |-- FPGA: string (nullable = true)
 |-- TDC_CHANNEL: string (nullable = true)
 |-- ORBIT_CNT: string (nullable = true)
 |-- BX_COUNTER: string (nullable = true)
 |-- TDC_MEAS: string (nullable = true)



In [10]:
# data-cleansing
df = flatDF.where(col("HEAD")==2)

# division of dataframe between chambers
df = df.withColumn('CHAMBER',
        when(( col( 'FPGA' ) == 0 ) & ( col( 'TDC_CHANNEL' ) >=  0 ) & ( col( 'TDC_CHANNEL' ) <  64  ), 0) \
       .when(( col( 'FPGA' ) == 0 ) & ( col( 'TDC_CHANNEL' ) >= 64 ) & ( col( 'TDC_CHANNEL' ) <= 127 ), 1) \
       .when(( col( 'FPGA' ) == 1 ) & ( col( 'TDC_CHANNEL' ) >=  0 ) & ( col( 'TDC_CHANNEL' ) <  64  ), 2) \
       .when(( col( 'FPGA' ) == 1 ) & ( col( 'TDC_CHANNEL' ) >= 64 ) & ( col( 'TDC_CHANNEL' ) <= 127 ), 3) \
       .when(( col( 'FPGA' ) == 1 ) &                                  ( col( 'TDC_CHANNEL' ) == 128 ), 4) \
       .otherwise(None))                                                                                   \
       .filter( col( 'CHAMBER' ).isNotNull() )

df.printSchema()

root
 |-- HEAD: string (nullable = true)
 |-- FPGA: string (nullable = true)
 |-- TDC_CHANNEL: string (nullable = true)
 |-- ORBIT_CNT: string (nullable = true)
 |-- BX_COUNTER: string (nullable = true)
 |-- TDC_MEAS: string (nullable = true)
 |-- CHAMBER: integer (nullable = true)



In [11]:
# scintillator time offset by Chamber
# TODO compute ABSOLUTETIME and DRIFTIME
time_offset_by_chamber = {
0: 95.0 - 1.1, # Ch 0
1: 95.0 + 6.4, # Ch 1
2: 95.0 + 0.5, # Ch 2
3: 95.0 - 2.6, # Ch 3
}

In [12]:
# json structure for the message to be sent as result

msg_json = {
    'time'     : .0,
    'epoch_id' :  0,
    'hits'     :  0,
    'CH0' : {'total_hits'  : 0,
             'histo_CH'    : {'bin_edges'  : [],
                              'bin_counts' : []
                             },
             'histo_ORB'   : {'bin_edges'  : [],
                              'bin_counts' : []
                             },
             'histo_SC'    : {'bin_edges'  : [],
                              'bin_counts' : []
                             }
            },
    'CH1' : {'total_hits'  : 0,
             'histo_CH'    : {'bin_edges'  : [],
                              'bin_counts' : []
                             },
             'histo_ORB'   : {'bin_edges'  : [],
                              'bin_counts' : []
                             },
             'histo_SC'    : {'bin_edges'  : [],
                              'bin_counts' : []
                             }
            },
    'CH2' : {'total_hits'  : 0,
             'histo_CH'    : {'bin_edges'  : [],
                              'bin_counts' : []
                             },
             'histo_ORB'   : {'bin_edges'  : [],
                              'bin_counts' : []
                             },
             'histo_SC'    : {'bin_edges'  : [],
                              'bin_counts' : []
                             }
            },
    'CH3' : {'total_hits'  : 0,
             'histo_CH'    : {'bin_edges'  : [],
                              'bin_counts' : []
                             },
             'histo_ORB'   : {'bin_edges'  : [],
                              'bin_counts' : []
                             },
             'histo_SC'    : {'bin_edges'  : [],
                              'bin_counts' : []
                             }
            },
}

In [13]:
def batch_proc(df, epoch_id):   

    '''
    Function for the batch processing of the data.
    The processing consists in retrieving
    the following informations:
        1. total number of processed hits, 
           post-clensing (1 value per batch)
        2. total number of processed hits,
           post-clensing, per chamber (4 values per batch)
        3. histogram of the counts of active TDC_CHANNEL,
           per chamber (4 arrays per batch)
        4. histogram of the total number of active
           TDC_CHANNEL in each ORBIT_CNT, per chamber 
           (4 arrays per batch)
        5. histogram of the counts of active TDC_CHANNEL,
           per chamber, ONLY for those orbits with at least
           one scintillator signal in it (4 arrays per batch)
        6. histogram of the DRIFTIME, per chamber 
           (4 arrays per batch) TODO
    
    Inputs:
        - df: spark dataframe with the data
        - epoch_id: batch index    
    '''

    start = time.time()
    
    msg_json['epoch_id']    = epoch_id

    # total number of processed hits
    # post-cleansing
    total_hits = df.filter(col('CHAMBER') != 4)\
                   .count()


    # total number of processed hits,
    # post-clensing, per chamber
    df_counts = df.filter(col('CHAMBER') != 4) \
                  .groupBy(   'CHAMBER')       \
                  .count()                     \
                  .withColumnRenamed('count', 
                                     'ch_hits')


    # histogram of the counts of active
    # TDC_CHANNEL, per chamber.
    df_counts_ch = df.filter(col('CHAMBER') != 4)            \
                     .groupBy('TDC_CHANNEL', 'CHAMBER')      \
                     .count()                                \
                     .groupBy('CHAMBER')                     \
                     .agg(struct(collect_list('TDC_CHANNEL') \
                                       .alias(  'bin_edges'),
                                 collect_list(      'count') \
                                       .alias(     'counts'),
                                )                            \
                          .alias('histo_CH')
                         )
    
    # histogram of the total number of active
    # TDC_CHANNEL in each ORBIT_CNT per chamber
    df_counts_orb = df.filter(col('CHAMBER') != 4)                    \
                      .groupBy('ORBIT_CNT', 'CHAMBER')                \
                      .agg(F.countDistinct("TDC_CHANNEL"))            \
                      .groupBy('count(TDC_CHANNEL)', 'CHAMBER')       \
                      .count()                                        \
                      .groupBy('CHAMBER')                             \
                      .agg(struct(collect_list('count(TDC_CHANNEL)')  \
                                        .alias(         'bin_edges'),
                                  collect_list(             'count') \
                                        .alias(            'counts'),
                                 )                                    \
                           .alias('histo_ORB')
                          )


    # Histogram of the count of active TDC_CHANNEL,
    # per chamber, only for those orbits with
    # at least one scintillator in it
    
    # revision needed: not sure about this
    list_scint = df.filter(col('CHAMBER') == 4)   \
                   .select('ORBIT_CNT')           \

    df_counts_sc = df.filter(col('CHAMBER') != 4 )                  \
                     .join(list_scint, on="ORBIT_CNT", how="inner") \
                     .groupBy(  'TDC_CHANNEL', 'CHAMBER'       )    \
                     .count()                                       \
                     .groupBy('CHAMBER')                            \
                     .agg(struct(collect_list('TDC_CHANNEL')        \
                                         .alias('bin_edges'),
                                 collect_list(      'count')        \
                                         .alias(   'counts'))       \
                          .alias('histo_SC')
                         )

    # dataframe with results joined
    df_res = df_counts.join(df_counts_ch,  on='CHAMBER') \
                      .join(df_counts_orb, on='CHAMBER') \
                      .join(df_counts_sc,  on='CHAMBER') \
                      .sort('CHAMBER')                   \
                      .collect()
                      
    msg_json['hits'] = total_hits
    for i in range(len(df_res)):
        msg_json[f'CH{i}']['total_hits']               = df_res[i][  'ch_hits']
        msg_json[f'CH{i}'][  'histo_CH'][ 'bin_edges'] = df_res[i][ 'histo_CH']['bin_edges']
        msg_json[f'CH{i}'][  'histo_CH']['bin_counts'] = df_res[i][ 'histo_CH'][   'counts']
        msg_json[f'CH{i}'][ 'histo_ORB'][ 'bin_edges'] = df_res[i]['histo_ORB']['bin_edges']
        msg_json[f'CH{i}'][ 'histo_ORB']['bin_counts'] = df_res[i]['histo_ORB'][   'counts']
        msg_json[f'CH{i}'][  'histo_SC'][ 'bin_edges'] = df_res[i][ 'histo_SC']['bin_edges']
        msg_json[f'CH{i}'][  'histo_SC']['bin_counts'] = df_res[i][ 'histo_SC'][   'counts']

    end = time.time()
    
    # get the execution time
    msg_json['time'] = end - start
    
    producer.send('topic_results', json.dumps(msg_json).encode('utf-8'))
    producer.flush()
    pass

In [14]:
df.isStreaming

True

In [15]:
# call the operations for each batch of data
df.writeStream\
    .foreachBatch(batch_proc)\
    .start()\
    .awaitTermination()

22/09/19 19:13:37 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-ab5f6868-16a2-4696-bfbc-1aa36eaa0ed6. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
22/09/19 19:13:37 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


ERROR:root:KeyboardInterrupt while sending command.200][Stage 9245:(161 + 4) / 200]]
Traceback (most recent call last):
  File "/home/aidin/.local/lib/python3.9/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/home/aidin/.local/lib/python3.9/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/opt/anaconda3/lib/python3.9/socket.py", line 704, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt
Exception ignored in: <function JavaObject.__init__.<locals>.<lambda> at 0x7f87f84daca0>
Traceback (most recent call last):
  File "/home/aidin/.local/lib/python3.9/site-packages/py4j/java_gateway.py", line 1357, in <lambda>
    lambda wr, cc=self._gateway_client, id=self._target_id:
KeyboardInterrupt: 
Exception ignored in: <function JavaObject.__init__.<locals>.<lambda> at 0x7f87fa730790>
Traceback (most recent call last):
  File "/home/a