# MAPD-B distributed processing exam
## Project 4: Streaming processing of cosmic rays using Drift Tubes detectors


The goal of this project is to reproduce a real-time processing of real data collected in a
particle physics detector and publish the results in a dashboard for live monitoring.


### Students:
+ **Capettini Hilario** (2013031)

+ **Carmona Gerardo** (2005005)

+ **Monaco Saverio** (2012264)

## Introduction

Extremely brief introduction, the elements and data flow.

## The cluster 

Cluster design, what was installed.

In [None]:
#IMPORTS
import json
import numpy as np
import time

import findspark
from pyspark.sql import SparkSession
from kafka.admin import KafkaAdminClient, NewTopic
from pyspark.sql.functions import from_json, col, when, sum as ssum
from pyspark.sql.types import StructField, StructType, DoubleType, IntegerType
import json
import numpy as np
import time
import pyspark.sql.functions as F
from kafka import KafkaProducer

## Streaming with Kafka and Spark

Here I try to implement a basic pipeline for the project conecting kafka with spark.

## Get Kafka and Spark ready

We can now initialize all the required variables with `findspark.init()` by passing the path to the spark folder we downloaded previously.

In [None]:
findspark.init('/usr/local/spark')

In [None]:
#%%script bash --no-raise-error
#$SPARK_HOME/sbin/start-all.sh
#$SPARK_HOME/sbin/start-master.sh

# # start master 
# $SPARK_HOME/sbin/start-master.sh --host localhost \
#     --port 7077 --webui-port 8080
    
# # start worker
# $SPARK_HOME/sbin/start-worker.sh spark://localhost:7077 \
#     --cores 8 --memory 6g

## Create the Spark session

We can now create the spark session. With the following command we are asking to the master (and resource manager) to create an application with required resources and configurations. In this case we are using all the default options.

In [None]:
spark = SparkSession.builder \
    .master("spark://master:7077")\
    .appName("Spark Streaming")\
    .config("spark.jars.packages","org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1")\
    .config("spark.sql.shuffle.partitions",8)\
    .getOrCreate()


In [None]:
spark

## Kafka

In [None]:
KAFKA_HOME = '/usr/local/kafka'
KAFKA_BOOTSTRAP_SERVERS = 'slave01:9092'
#KAFKA_BOOTSTRAP_SERVERS = 'localhost:9092'

In [None]:
#By some reason I can't launch this from here using OS, so i open the terminals in the KAFKA_HOME folder
# and launch the zookeper and the kafka server comands manually


# Start Zookeeper
# bin/zookeeper-server-start.sh config/zookeeper.properties 
#os.system('{0}/bin/zookeeper-server-start.sh {0}/config/zookeeper.properties'.format(KAFKA_HOME)) 
    
# Start one Kafka Broker
#bin/kafka-server-start.sh config/server.properties
#os.system('{0}/bin/kafka-server-start.sh {0}/config/server.properties'.format(KAFKA_HOME)) 

### Create the topics for Kafka

In [None]:
kafka_admin = KafkaAdminClient(
        bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
    )

#Here we will inject the data
new_topic_a = NewTopic(name='Experiment_measurements', 
                       num_partitions=16, 
                       replication_factor=1)

#Here we inject the number of processed hits, post cleaning
new_topic_b = NewTopic(name='results', 
                       num_partitions=1, 
                       replication_factor=1)

kafka_admin.create_topics(new_topics=[new_topic_a,new_topic_b])


In [None]:
kafka_admin.list_topics()

## Kafka - Spark INTEGRATION

### Read the data from the Kafka topic (define the consumer)

In [None]:
inputDF = spark\
    .readStream\
    .format("kafka")\
    .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS)\
    .option('subscribe', 'Experiment_measurements')\
    .load()


In [None]:
## The schema of the json data format used to create the messages
schema = StructType(
        [
                StructField("HEAD",        IntegerType()),
                StructField("FPGA",        IntegerType()),
                StructField("TDC_CHANNEL", IntegerType()),
                StructField("ORBIT_CNT",   DoubleType()),
                StructField("BX_COUNTER",  IntegerType()),
                StructField("TDC_MEAS",    DoubleType())
        ]  
)

## a new DF can be created from the previous by using the pyspark.sql functions
jsonDF = inputDF.select(from_json(col("value").alias('value').cast("string"), schema).alias('value'))

In [None]:
flatDF = jsonDF.selectExpr("value.HEAD", 
                           "value.FPGA", 
                           "value.TDC_CHANNEL",
                           "value.ORBIT_CNT",
                           "value.BX_COUNTER",
                           "value.TDC_MEAS")

In [None]:
flatDF.printSchema()

## Spark processing

In [None]:
## FILTERING OF THE DATA
## we only keep the events with "HEAD" = 2 and "TDC_CHANNEL" <= 128

cleanDF = flatDF.where((col('HEAD')==2) & (col('TDC_CHANNEL') <= 128))

In [None]:
## Colection of functions for the main computation

def chamber_assignment(df):
    '''Assign chamber number and leave the scintillator carriers with chamber == null'''
    
    return(df.withColumn('CHAMBER',when(col("FPGA") == 0, 
                                                when(col("TDC_CHANNEL")<=63,1).\
                                                otherwise(when(col("TDC_CHANNEL")<128,2))).\
                                           otherwise(when(col("TDC_CHANNEL")<=63,3).\
                                                     otherwise(when(col("TDC_CHANNEL")<128,4))
                                           )).\
                                           select([ col('TDC_CHANNEL'), col('ORBIT_CNT'),
                                           col('BX_COUNTER'),col('TDC_MEAS'),
                                           col('CHAMBER')])
          )

def scintillator_data(df):
    '''Define a dataframe containing the relevant information for 
    the scintillator analysis''' 
    
    #First we filter the events encoding the passage time,
    #then we add the PASSAGE time for each event 
    #Finally if we have two scilantor hits within the same orbit we keep 
    #the one with the smaller time
    return(df.filter((col("CHAMBER").isNull())).\
                          withColumn("PASSAGETIME", 25 * (col("ORBIT_CNT") * 3564 + col("BX_COUNTER") + col("TDC_MEAS")/30)).\
                          drop("TDC_CHANNEL").drop("BX_COUNTER").\
                          drop("TDC_MEAS").drop("CHAMBER").\
                          groupBy("ORBIT_CNT").min("PASSAGETIME").\
                          withColumnRenamed("ORBIT_CNT","ORBIT_CNT_sci").\
                          withColumnRenamed("min(PASSAGETIME)","PASSAGETIME")
          )


def histogram_1(df,min_v_1,max_v_1,inc_1):
    '''This function return the bins and counts for the first requested histogram'''
    hist_1_bins = np.arange(min_v_1,max_v_1,inc_1)
    hist_1 = df\
        .filter((min_v_1<=F.col('TDC_CHANNEL')) & (F.col('TDC_CHANNEL')<=max_v_1))\
        .withColumn('BIN', F.floor((F.col('TDC_CHANNEL')-min_v_1)/inc_1))\
        .groupBy('CHAMBER','BIN')\
        .count().select('CHAMBER','BIN', col('count').alias('COUNT'))
    return (hist_1_bins, hist_1)
    

def histogram_2(df,min_v_2,max_v_2,inc_2):
    '''This function return the bins and counts for the second requested histogram'''
    hist_2_bins = np.arange(min_v_2,max_v_2,inc_2)
    hist_2 = df\
        .groupBy('CHAMBER','ORBIT_CNT')\
        .agg(F.countDistinct('TDC_CHANNEL').alias('ACTIVE_CHANNELS'))\
        .filter((min_v_2<=F.col('ORBIT_CNT'))&(F.col('ORBIT_CNT')<=max_v_2))\
        .withColumn('BIN',F.floor((F.col('ORBIT_CNT')-min_v_2)/inc_2))\
        .groupBy('CHAMBER','BIN')\
        .agg(F.sum('ACTIVE_CHANNELS').alias('COUNT'))#.collect()
    return(hist_2_bins, hist_2)

def histogram_3(df,min_v_3,max_v_3,inc_3):
    '''This function return the bins and counts for the third requested histogram'''
    hist_3_bins = np.arange(min_v_3,max_v_3,inc_3)
    hist_3 = df\
        .filter((min_v_3<=F.col('TDC_CHANNEL')) & (F.col('TDC_CHANNEL')<=max_v_3))\
        .withColumn('BIN', F.floor((F.col('TDC_CHANNEL')-min_v_3)/inc_3))\
        .groupBy('CHAMBER','BIN')\
        .count().select('CHAMBER','BIN', col('count').alias('COUNT')) 
    return(hist_3_bins, hist_3)

def histogram_4(df,min_v_4,max_v_4,inc_4):
    '''This function return the bins and counts for the fourth requested histogram'''
    hist_4_bins = np.arange(min_v_4,max_v_4,inc_4)
    hist_4 = df\
        .filter((min_v_4<=F.col('DRIFTIME')) & (F.col('DRIFTIME')<=max_v_4))\
        .withColumn('BIN', F.floor((F.col('DRIFTIME')-min_v_4)/inc_4))\
        .groupBy('CHAMBER','BIN')\
        .count().select('CHAMBER','BIN', col('count').alias('COUNT')) 
    return(hist_4_bins, hist_4)


   
def numpify(bins, pos_count):
    '''NUMPIFY RESULTS'''
    counter = np.zeros(len(bins))#np.zeros(len(bins)-1)?
    positions = np.array(list(pos_count.keys()))
    counts = np.array(list(pos_count.values()))
    counter[positions] = counts
    return counter


def prepare_results(chamber_hits,hist_1,hist_2,hist_3,hist_4,hist_1_bins,hist_2_bins,hist_3_bins,hist_4_bins):
    '''COLLECTING RESULTS'''
    _chamber_hits = chamber_hits.collect()
    
    _hist_1 = hist_1.groupBy('CHAMBER').agg(
    F.map_from_entries(
        F.collect_list(
            F.struct("BIN", "COUNT"))).alias("COUNT")
        ).collect()

    _hist_2 = hist_2.groupBy('CHAMBER').agg(
    F.map_from_entries(
        F.collect_list(
            F.struct("BIN","COUNT"))).alias("COUNT")
        ).collect()

    _hist_3 = hist_3.groupBy('CHAMBER').agg(
    F.map_from_entries(
        F.collect_list(
            F.struct("BIN", "COUNT"))).alias("COUNT")
        ).collect()
    
    _hist_4 = hist_4.groupBy('CHAMBER').agg(
    F.map_from_entries(
        F.collect_list(
            F.struct("BIN", "COUNT"))).alias("COUNT")
        ).collect()
    

    # JSON FORMATING OF RESULTS
    _hist_1_dict = {row.CHAMBER: {
        'Bins': list(hist_1_bins), 'Counts': list(numpify(hist_1_bins,row.COUNT))
    } for row in _hist_1}

    _hist_2_dict = {row.CHAMBER: {
        'Bins': list(hist_2_bins), 'Counts': list(numpify(hist_2_bins,row.COUNT))
    } for row in _hist_2}
    
    _hist_3_dict = {row.CHAMBER: {
        'Bins': list(hist_3_bins), 'Counts': list(numpify(hist_3_bins,row.COUNT))
    } for row in _hist_3}
        
    _hist_4_dict = {row.CHAMBER: {
        'Bins': list(hist_4_bins), 'Counts': list(numpify(hist_4_bins,row.COUNT))
    } for row in _hist_4}
    
    return(_chamber_hits,_hist_1_dict,_hist_2_dict,_hist_3_dict,_hist_4_dict)

In [None]:
def computations(df, epoch):
    '''This is the main function of the code, it requires a dataframe as input. The dataframe is analysed
       and the results are published in the kafka topic "results" '''
    
    #start=time.time()
    main_df = chamber_assignment(df)

    scintillator_df = scintillator_data(main_df)
    
    #Drop the columns with null values from main_df
    hit_df = main_df.na.drop(subset=["CHAMBER"])#.show()
    
    ## TOTAL NUMBER OF PROCESSED HITS
    total_hits = hit_df.count()
    if not total_hits: return

    ## TOTAL NUMBER OF PROCESSED HITS PER CHAMBER
    chamber_hits = hit_df\
        .groupBy('CHAMBER').count()\
        .select(col('CHAMBER'),col('count').alias('COUNT'))#.collect()
    
    ## ACTIVE TDC_CHANNEL PER CHAMBER
    min_v_1 = 0
    max_v_1 = 170
    inc_1 = 5
    hist_1_bins, hist_1 = histogram_1(hit_df,min_v_1,max_v_1,inc_1)
    
    ## ACTIVE TDC_CHANNEL PER CHAMBER PER ORBIT_CNT
    min_v_2 = 6.e5 #main_df.agg(F.min(F.col('ORBIT_CNT')).alias('min')).collect()[-1].min
    max_v_2 = 1.e7 #main_df.agg(F.max(F.col('ORBIT_CNT')).alias('max')).collect()[-1].max
    inc_2 = 0.5e6
    hist_2_bins, hist_2 = histogram_2(hit_df,min_v_2,max_v_2,inc_2)
    
    
    #keep only the hits with a scintillator signal within the same orbit
    chamber_sci = hit_df.join(scintillator_df,main_df.ORBIT_CNT ==  scintillator_df.ORBIT_CNT_sci,"inner")

    ## ADD TIME CORRECTION BY CHAMBER
    chamber_sci = chamber_sci.withColumn('TIME_OFFSET',when(col("CHAMBER") == 1, 93.9).\
                                                       when(col("CHAMBER") == 2, 101.4).\
                                                       when(col("CHAMBER") == 3, 95.5).\
                                                       when(col("CHAMBER") == 4, 92.4))

    #Add the ABSSOLUTETIME and DRIFTIME
    chamber_sci = chamber_sci.withColumn("ABSOLUTETIME",
                             25 * (col("ORBIT_CNT") * 3564 + col("BX_COUNTER") + col("TDC_MEAS")/30)).\
                              withColumn("DRIFTIME",col("ABSOLUTETIME")-col("PASSAGETIME") + col("TIME_OFFSET"))
   

    ## ACTIVE TDC_CHANNEL PER CHAMBER WITHIN SCINTILLATOR SIGNAL
    min_v_3 = 0
    max_v_3= 170
    inc_3 = 5
    hist_3_bins, hist_3 = histogram_3(chamber_sci,min_v_3,max_v_3,inc_3)
    

    ## HISTOGRAM OF DRIFTIME, PER CHAMBER
    min_v_4 = 0
    max_v_4= 1000
    inc_4 = 10
    hist_4_bins, hist_4 = histogram_4(chamber_sci,min_v_4,max_v_4,inc_4)
    
    #PREPARE THE RESULTS
    _chamber_hits,_hist_1_dict,_hist_2_dict,_hist_3_dict,_hist_4_dict = prepare_results(chamber_hits,hist_1,hist_2,hist_3,hist_4,hist_1_bins,hist_2_bins,hist_3_bins,hist_4_bins)
    
    
    results = {f'Chamber_{row.CHAMBER}': {
        'Count': int(row.COUNT),
        'Hist_1': _hist_1_dict.get(row.CHAMBER, {'Bins': list(np.arange(min_v_1,max_v_1,inc_1)), 'Counts' : [0]*(len(list(np.arange(min_v_1,max_v_1,inc_1)))-1)}),
        'Hist_2': _hist_2_dict.get(row.CHAMBER, {'Bins': list(np.arange(min_v_2,max_v_2,inc_2)), 'Counts' : [0]*(len(list(np.arange(min_v_2,max_v_2,inc_2)))-1)}),
        'Hist_3': _hist_3_dict.get(row.CHAMBER, {'Bins': list(np.arange(min_v_3,max_v_3,inc_3)), 'Counts' : [0]*(len(list(np.arange(min_v_3,max_v_3,inc_3)))-1)}),
        'Hist_4': _hist_4_dict.get(row.CHAMBER, {'Bins': list(np.arange(min_v_4,max_v_4,inc_4)), 'Counts' : [0]*(len(list(np.arange(min_v_4,max_v_4,inc_4)))-1)})} for row in _chamber_hits}

    results.update({
        'Index': time.time(),
        'Total Count': int(total_hits)
    })

    producer.send(topic="results", value= str(results).encode('utf-8'))
    #producer.flush()
    #end = time.time()

    #print("Time =",end-start)

In [None]:
#Send the results to the kafka topic
#Initialize the producer
producer = KafkaProducer(bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS)


In [None]:
#Trigger the processing
cleanDF.writeStream\
    .foreachBatch(computations)\
    .trigger(processingTime='5 seconds')\
    .start()\
    .awaitTermination()

In [None]:
spark.stop()

If you also want to delete any data of your local Kafka environment including any events you have created along the way, run the command:

`` $ rm -rf /tmp/kafka-logs /tmp/zookeeper `` 

##  Results

### Vertical scalability

### Horizontal scalability

### Scaling with ammount of data