## Setup of Kafka

__In the terminal__:

connect ssh to _kafka_ (IP:10.67.22.185)

and start zookeeper and kafka servers and exit connection:

Packages needed and __broker__ IP:

In [1]:
from kafka import KafkaProducer
from kafka.admin import KafkaAdminClient, NewTopic

KAFKA_BOOTSTRAP_SERVERS = 'kafka:9092'

producer = KafkaProducer(bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS)

In [2]:
kafka_admin = KafkaAdminClient(bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS)

In [3]:
kafka_admin.delete_topics(['stream', 'results'])

DeleteTopicsResponse_v3(throttle_time_ms=0, topic_error_codes=[(topic='stream', error_code=0), (topic='results', error_code=0)])

In [4]:
kafka_admin.list_topics()

['__consumer_offsets']

In [5]:
stream_topic = NewTopic(name='stream', 
                      num_partitions=1, 
                      replication_factor=1)

results_topic = NewTopic(name='results', 
                       num_partitions=1, 
                       replication_factor=1)
kafka_admin.create_topics(new_topics=[stream_topic, results_topic])

CreateTopicsResponse_v3(throttle_time_ms=0, topic_errors=[(topic='stream', error_code=0, error_message=None), (topic='results', error_code=0, error_message=None)])

In [6]:
kafka_admin.list_topics()

['stream', 'results', '__consumer_offsets']

## Spark

In [7]:
import findspark
findspark.init('/usr/local/spark/')

In [8]:
!env | grep -i spark

SPARK_HOME=/usr/local/spark/
PYSPARK_PYTHON=/usr/bin/python3.6
PATH=/usr/bin:/usr/lib64/qt-3.3/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/usr/local/spark/bin:/root/bin


In [9]:
! $SPARK_HOME/sbin/start-all.sh

starting org.apache.spark.deploy.master.Master, logging to /usr/local/spark//logs/spark-root-org.apache.spark.deploy.master.Master-1-mapd-b-gr12-1.novalocal.out
master: starting org.apache.spark.deploy.worker.Worker, logging to /usr/local/spark/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-mapd-b-gr12-1.novalocal.out
slave01: starting org.apache.spark.deploy.worker.Worker, logging to /usr/local/spark/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-mapd-b-gr12-2.novalocal.out
slave03: starting org.apache.spark.deploy.worker.Worker, logging to /usr/local/spark/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-mapd-b-gr12-5.novalocal.out
slave02: starting org.apache.spark.deploy.worker.Worker, logging to /usr/local/spark/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-mapd-b-gr12-3.novalocal.out


In [10]:
from pyspark.sql import SparkSession
  
spark = SparkSession.builder \
        .master("spark://master:7077")\
        .appName("Project_MAPDB_application")\
        .config("spark.sql.execution.arrow.pyspark.enabled", "true")\
        .config("spark.sql.execution.arrow.pyspark.fallback.enabled", "false")\
        .config("spark.sql.streaming.forceDeleteTempCheckpointLocation", "true")\
        .config("spark.jars.packages","org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1")\
        .getOrCreate()

In [11]:
sc = spark.sparkContext
sc

In [12]:
inputDF = spark.readStream\
    .format("kafka")\
    .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS)\
    .option('subscribe', 'stream')\
    .load()

In [13]:
inputDF.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [14]:
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructField, StructType, StringType, DoubleType, IntegerType, LongType

## the schema of the json data format used to create the messages
schema = StructType(
        [
                StructField("HEAD", IntegerType()),
                StructField("FPGA", StringType()),
                StructField("TDC_CHANNEL", IntegerType()),
                StructField("ORBIT_CNT", StringType()),
                StructField("BX_COUNTER", StringType()),
                StructField("TDC_MEAS", StringType())    
        ]
)
## a new DF can be created from the previous by using the pyspark.sql functions
jsonDF = inputDF.select(from_json(col("value").alias('value').cast("string"), schema).alias('value'))

In [15]:
jsonDF.printSchema()

root
 |-- value: struct (nullable = true)
 |    |-- HEAD: integer (nullable = true)
 |    |-- FPGA: string (nullable = true)
 |    |-- TDC_CHANNEL: integer (nullable = true)
 |    |-- ORBIT_CNT: string (nullable = true)
 |    |-- BX_COUNTER: string (nullable = true)
 |    |-- TDC_MEAS: string (nullable = true)



In [16]:
flatDF = jsonDF.selectExpr("value.HEAD", 
                           "value.FPGA", 
                           "value.TDC_CHANNEL",
                           "value.ORBIT_CNT",
                           "value.BX_COUNTER",
                           "value.TDC_MEAS")

In [17]:
flatDF.printSchema()

root
 |-- HEAD: integer (nullable = true)
 |-- FPGA: string (nullable = true)
 |-- TDC_CHANNEL: integer (nullable = true)
 |-- ORBIT_CNT: string (nullable = true)
 |-- BX_COUNTER: string (nullable = true)
 |-- TDC_MEAS: string (nullable = true)



In [18]:
import json
import numpy as np
from pyspark.sql import functions as F
from pyspark.sql.functions import concat, col, lit, count, desc, asc , to_json
from itertools import chain
import pandas as pd

In [19]:
def analysis(df, epoch_id):   
    
    #total events
    tot = df.count()
    
    #clean
    df_clean = df.where(col('HEAD')== 2)
    
    #point 1
    tot_hits = df_clean.count()

    #chambers
    chamber_0= df_clean \
        .where(col('FPGA') == 0)\
        .where(col('TDC_CHANNEL') >= 0)\
        .where(col('TDC_CHANNEL') <= 63)

    chamber_1= df_clean\
        .where(col('FPGA') == 0)\
        .where(col('TDC_CHANNEL') >= 64)\
        .where(col('TDC_CHANNEL') <= 127)

    chamber_2= df_clean\
        .where(col('FPGA') == 1)\
        .where(col('TDC_CHANNEL') >= 0)\
        .where(col('TDC_CHANNEL') <= 63)\

    chamber_3=df_clean\
        .where(col('FPGA') == 1)\
        .where(col('TDC_CHANNEL') >= 64)\
        .where(col('TDC_CHANNEL') <= 127)\
    
    #point2
    tot_hits_ch0 = chamber_0.count()
    tot_hits_ch1 = chamber_1.count()
    tot_hits_ch2 = chamber_2.count()
    tot_hits_ch3 = chamber_3.count()
    
    #point 3
    df0 = chamber_0.groupBy('TDC_CHANNEL').count().toPandas()
    df1 = chamber_1.groupBy('TDC_CHANNEL').count().toPandas()
    df2 = chamber_2.groupBy('TDC_CHANNEL').count().toPandas()
    df3 = chamber_3.groupBy('TDC_CHANNEL').count().toPandas()
    
    #point 4    
    
    df_orbs0=chamber_0.groupBy('ORBIT_CNT').agg(F.countDistinct("TDC_CHANNEL"))\
                .groupBy(col('count(TDC_CHANNEL)')).count().toPandas()
    df_orbs1=chamber_1.groupBy('ORBIT_CNT').agg(F.countDistinct("TDC_CHANNEL"))\
                .groupBy(col('count(TDC_CHANNEL)')).count().toPandas()
    df_orbs2=chamber_2.groupBy('ORBIT_CNT').agg(F.countDistinct("TDC_CHANNEL"))\
                    .groupBy(col('count(TDC_CHANNEL)')).count().toPandas()
    df_orbs3=chamber_3.groupBy('ORBIT_CNT').agg(F.countDistinct("TDC_CHANNEL"))\
            .groupBy(col('count(TDC_CHANNEL)')).count().toPandas()

    #point Extra 1
    tdc128 = df_clean.where(col('FPGA')==1).where(col('TDC_CHANNEL') ==128).toPandas()
    l_orbs = tdc128['ORBIT_CNT'].tolist()
    
    scint_df_ch0 = chamber_0.where(col('ORBIT_CNT').isin(l_orbs)).groupBy('TDC_CHANNEL')\
                .count().toPandas()
    scint_df_ch1 = chamber_1.where(col('ORBIT_CNT').isin(l_orbs)).groupBy('TDC_CHANNEL')\
                .count().toPandas()
    scint_df_ch2 = chamber_2.where(col('ORBIT_CNT').isin(l_orbs)).groupBy('TDC_CHANNEL')\
                .count().toPandas()
    scint_df_ch3 = chamber_3.where(col('ORBIT_CNT').isin(l_orbs)).groupBy('TDC_CHANNEL')\
                .count().toPandas()

    outputJson = {'tot_import':tot,
                  'hits': tot_hits,
                  'hitsPerChamber': [tot_hits_ch0, tot_hits_ch1, tot_hits_ch2, tot_hits_ch3],
                  'histo_ch0': [df0['TDC_CHANNEL'].tolist(), df0['count'].tolist()],
                  'histo_ch1': [df1['TDC_CHANNEL'].tolist(), df1['count'].tolist()],
                  'histo_ch2': [df2['TDC_CHANNEL'].tolist(), df2['count'].tolist()],
                  'histo_ch3': [df3['TDC_CHANNEL'].tolist(), df3['count'].tolist()],
                  'histo_orbit_ch0':[df_orbs0['count(TDC_CHANNEL)'].tolist(), df_orbs0['count'].tolist()],
                  'histo_orbit_ch1':[df_orbs1['count(TDC_CHANNEL)'].tolist(), df_orbs1['count'].tolist()],
                  'histo_orbit_ch2':[df_orbs2['count(TDC_CHANNEL)'].tolist(), df_orbs2['count'].tolist()],
                  'histo_orbit_ch3':[df_orbs3['count(TDC_CHANNEL)'].tolist(), df_orbs3['count'].tolist()],
                  'histo_scin_ch0': [scint_df_ch0['TDC_CHANNEL'].tolist(), scint_df_ch0['count'].tolist()],
                  'histo_scin_ch1': [scint_df_ch1['TDC_CHANNEL'].tolist(), scint_df_ch1['count'].tolist()],
                  'histo_scin_ch2': [scint_df_ch2['TDC_CHANNEL'].tolist(), scint_df_ch2['count'].tolist()],
                  'histo_scin_ch3': [scint_df_ch3['TDC_CHANNEL'].tolist(), scint_df_ch3['count'].tolist()],
                 }
    
    producer.send('results', json.dumps(outputJson).encode('utf-8'))
    producer.flush()
    pass

In [20]:
flatDF.isStreaming

True

In [None]:
flatDF.writeStream\
    .foreachBatch(analysis)\
    .start()\
    .awaitTermination()

## Stop Spark

In [22]:
sc.stop()
spark.stop()

In [23]:
! $SPARK_HOME/sbin/stop-all.sh

master: stopping org.apache.spark.deploy.worker.Worker
slave01: stopping org.apache.spark.deploy.worker.Worker
slave02: stopping org.apache.spark.deploy.worker.Worker
slave03: stopping org.apache.spark.deploy.worker.Worker
stopping org.apache.spark.deploy.master.Master


## Stop Kafka

Stop kafka and zookeeper servers:

In [66]:
! ssh kafka kafka_2.13-2.7.0/bin/kafka-server-stop.sh 
! ssh kafka kafka_2.13-2.7.0/bin/zookeeper-server-stop.sh