## Management and Analysis of Physics Dataset - mod.B

## Final project: Streaming processing of cosmic rays using Drift Tubes detectors

The goal of this project is to reproduce a real-time processing of real data collected in a particle physics detector and publish the results in a dashboard for live monitoring.

### Students:
* Conforto Filippo (2021856)
* Domenichetti Lorenzo (2011653)
* Faorlin Tommaso (2021857)

# Computation notebook

In [1]:
#import libraries cell

import json
import time
import findspark
import numpy as np
from numpy import arange
from numpy import linspace 
import pandas as pd
from kafka import KafkaProducer
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
from pyspark.streaming import StreamingContext
from pyspark.sql.types import StructField, StructType, StringType, DoubleType, IntegerType
from pyspark.sql.functions import from_json, col, max, min

## Spark context and configuration

In [2]:
#initialisation of spark from the packages folder
findspark.init('/usr/local/spark')

In [3]:
#start session - specify port, application name, and configuration settings.

spark = SparkSession.builder\
    .master("spark://master:7077")\
    .appName("MAPD Final Project session")\
    .config("spark.jars.packages","org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2")\
    .getOrCreate()

#default parallelism setting to shuffle different partitions between workers (for join operation).
spark.conf.set("spark.sql.shuffle.partitions", spark.sparkContext.defaultParallelism) #15 partitions

In [4]:
#check whether the session is running.
spark

## Kafka streaming setup

In [5]:
#define the kafka server from IP and Port
KAFKA_BOOTSTRAP_SERVERS = "slave04:9092" 

#define the input dataframe and its source. Define subscription to topic_stream - one of the two topics in kafka
inputDF = spark\
        .readStream\
        .format("kafka")\
        .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS)\
        .option('subscribe', 'topic_stream')\
        .load()

In [6]:
#define the schema of the rows that will be read. double are used to overcome overflow issues
schema = StructType(
        [StructField("HEAD",        IntegerType()),
         StructField("FPGA",         IntegerType()),
         StructField("TDC_CHANNEL",  IntegerType()),
         StructField("ORBIT_CNT",    DoubleType()),
         StructField("BX_COUNTER",   DoubleType()), 
         StructField("TDC_MEAS",    DoubleType() )]
)

In [7]:
#convert input_Df to json by casting columns into the predefined schema.
jsonDF = inputDF.select(from_json(col("value").alias('value').cast("string"), schema).alias('value'))

In [8]:
#flattening the dataframe
flatDF = jsonDF.selectExpr("value.HEAD", 
                           "value.FPGA", 
                           "value.TDC_CHANNEL",
                           "value.ORBIT_CNT",
                           "value.BX_COUNTER",
                           "value.TDC_MEAS")

##  First cleanup of streaming dataframe

In [9]:
#clean dataframe, removing ancillary hits
df = flatDF.where(col("HEAD")==2)

## Output streaming service setup

In [10]:
#scintillator time offset by Chamber
time_offset_by_chamber = {
0: 95.0 - 1.1, # Ch 0
1: 95.0 + 6.4, # Ch 1
2: 95.0 + 0.5, # Ch 2
3: 95.0 - 2.6, # Ch 3
}

#bins definition for histograms - they will be shared among all iterations.
binning = list(linspace(0, 4e8, 100))

binning_drift = list(linspace(0, 800, 40))

In [11]:
def batch_proc(batch_df, epoch_id):
    
    #repartition the df DataFrame to 105 parts - and persist in cache to speedup calculations
    batch_df.coalesce(15)
    batch_df.persist()
    
    #Dividing the dataframe between chambers
    batch_df_ch0 = batch_df.filter('(FPGA==0) AND (TDC_CHANNEL >= 0) AND (TDC_CHANNEL < 64)')
    batch_df_ch1 = batch_df.filter('(FPGA==0) AND (TDC_CHANNEL >= 64) AND (TDC_CHANNEL < 128)')
    batch_df_ch2 = batch_df.filter('(FPGA==1) AND (TDC_CHANNEL >= 0) AND (TDC_CHANNEL < 64)')
    batch_df_ch3 = batch_df.filter('(FPGA==1) AND (TDC_CHANNEL >= 64) AND (TDC_CHANNEL < 128)')
    
    #coalesce + persist for each filtered batch
    batch_df_ch0.coalesce(15)
    batch_df_ch1.coalesce(15)
    batch_df_ch2.coalesce(15)
    batch_df_ch3.coalesce(15)
    batch_df_ch0.persist()
    batch_df_ch1.persist()
    batch_df_ch2.persist()
    batch_df_ch3.persist()    
       
    #list of dfs_ handy for loops
    batch_dfs = [batch_df_ch0, batch_df_ch1, batch_df_ch2, batch_df_ch3]
    
    # Counting hits for each chamber
    hits_ch0 = batch_df_ch0.count()
    hits_ch1 = batch_df_ch1.count()
    hits_ch2 = batch_df_ch2.count()
    hits_ch3 = batch_df_ch3.count()
    
    #total number of hits as the sum of the previous 4 operations
    hits = hits_ch0 + hits_ch1 + hits_ch2 + hits_ch3
    
    if hits!=0: 

        #Dataframe containing only informations about scintillator events
        batch_df_scint = (batch_df.filter('(FPGA==1) AND (TDC_CHANNEL == 128)')
                          .select(['ORBIT_CNT', 'BX_COUNTER',"TDC_MEAS"])
                          .groupBy('ORBIT_CNT')
                          .min()
                          .withColumnRenamed("min(BX_COUNTER)", 'BX_COUNTER_SCINT')
                          .withColumnRenamed("min(TDC_MEAS)", 'TDC_MEAS_SCINT')
                          .drop('min(ORBIT_CNT)')
                         )
        
        #optional persist also for the batch_df_scint DataFrame - no need to cache performance-wise
        #batch_df_scint.persist()
        
        #Total active channels histogram
        hist1 = {}
        
        #Channels per orbit histogram 
        hist2 = {}

        #Active channels in orbits in which the scintillator is active histogram
        hist3 = {}

        #Drifttime histogram
        hist4 = {}
        
        #loop over the four chambers
        for chamber in [0,1,2,3]:
            #create an empty dictionary for each type of histogram corresponding to the selected chamber
            hist1[chamber] = {}
            hist2[chamber] = {}
            hist3[chamber] = {}
            hist4[chamber] = {}

            #TDC_channel simple histogram - adaptive binning depending on the corresponding channels
            bins, counts = (
                batch_dfs[chamber].select('TDC_CHANNEL')
                .rdd.map(lambda x: x.TDC_CHANNEL)
                .histogram(list(arange((chamber % 2)*64,(chamber % 2 +1)*64,1)))
            )
            
            #Count number of active channels per orbit
            bins2, counts2 = (
                batch_dfs[chamber].groupBy("ORBIT_CNT","TDC_CHANNEL").count()
                .select('ORBIT_CNT')
                .rdd.map(lambda x: x.ORBIT_CNT)
                .histogram(binning)
            )
            
            #Filtering only useful hits (avoid scintillators) and use inner join to consider coincident events
            batch_dfs[chamber] = batch_dfs[chamber].join(batch_df_scint, ["ORBIT_CNT"], "inner")
            
            #Creating driftime and select only positive values
            #Values smaller than zero should be artifacts - only a few percentage
            batch_dfs[chamber] = batch_dfs[chamber].withColumn('DRIFTIME', 25*((col('BX_COUNTER')-col('BX_COUNTER_SCINT'))+(col('TDC_MEAS')-col('TDC_MEAS_SCINT'))/30)+time_offset_by_chamber[chamber])\
                                .where(col('DRIFTIME')>0) 
            
            #optional --- persist is not needed also in this case
            #batch_dfs[chamber].persist()
            
            #TDC_CHANNEL histogram after scintillator selection
            bins3, counts3 = (
                batch_dfs[chamber].select('TDC_CHANNEL')
                .rdd.map(lambda x: x.TDC_CHANNEL)
                .histogram(list(arange((chamber % 2)*64,(chamber % 2 +1)*64,1)))
            )

            #histogram for drifttime - a "box" is expected
            bins4, counts4 = (
                batch_dfs[chamber].select('DRIFTIME')
                .rdd.map(lambda x: x.DRIFTIME)
                .histogram(binning_drift)
            )
            
            
            #convert to python integers both bins and counts 
            hist1[chamber]['bins'] = list(map(float,bins)) 
            hist1[chamber]['counts'] = list(map(int,counts))

            hist2[chamber]['bins'] = list(map(float,bins2))
            hist2[chamber]['counts'] = list(map(int,counts2))

            hist3[chamber]['bins'] = list(map(float,bins3))
            hist3[chamber]['counts'] = list(map(int,counts3))

            hist4[chamber]['bins'] = list(map(float,bins4))
            hist4[chamber]['counts'] = list(map(int,counts4))

        #producing the results dictionary
        result = {
            "hits" : hits,
            "hits_per_chamber": [hits_ch0, hits_ch1, hits_ch2, hits_ch3],
            "hist_1": hist1,
            "hist_2": hist2,
            "hist_3": hist3,
            "hist_4": hist4
        }

        #sending the json to the producer
        producer.send('topic_results', json.dumps(result).encode('utf-8'))

        #unpersist DataFrames and free resources
        batch_df.unpersist()
        batch_df_ch0.unpersist()
        batch_df_ch1.unpersist()
        batch_df_ch2.unpersist()
        batch_df_ch3.unpersist() 
        
    else: 
        pass

## Streaming process

In [12]:
#producer definition from IP address given before
producer = KafkaProducer(bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS)

In [None]:
#process each batch as a WriteStream - 5 seconds batch - rate 1kHz 
df.writeStream\
    .foreachBatch(batch_proc)\
    .trigger(processingTime='5 second')\
    .start()\
    .awaitTermination()