In [1]:
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [2]:
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

In [3]:
num_words = 128 # Number of hits per message
rdd = sc.binaryRecords("data/data_000001.dat", 8*num_words)

In [4]:
from pyspark.sql import Row
import struct

def hit_unpacker(word):
    # hit masks
    hmaskTDC_MEAS     = 0x1F
    hmaskBX_COUNTER   = 0xFFF
    hmaskORBIT_CNT    = 0xFFFFFFFF
    hmaskTDC_CHANNEL  = 0x1FF
    hmaskFPGA         = 0xF
    hmaskHEAD         = 0x3

    hfirstTDC_MEAS    = 0
    hfirstBX_COUNTER  = 5
    hfirstORBIT_CNT   = 17
    hfirstTDC_CHANNEL = 49
    hfirstFPGA        = 58
    hfirstHEAD        = 62
    
    TDC_MEAS     =      (( word >> hfirstTDC_MEAS    ) & hmaskTDC_MEAS   )
    BX_COUNTER   =      (( word >> hfirstBX_COUNTER  ) & hmaskBX_COUNTER )
    ORBIT_CNT    =      (( word >> hfirstORBIT_CNT   ) & hmaskORBIT_CNT  )
    TDC_CHANNEL  =  1 + (( word >> hfirstTDC_CHANNEL ) & hmaskTDC_CHANNEL)
    FPGA         =      (( word >> hfirstFPGA        ) & hmaskFPGA       )
    HEAD         =      (( word >> hfirstHEAD        ) & hmaskHEAD       )
    
    if((TDC_CHANNEL!=137) and (TDC_CHANNEL!=138)):
            TDC_MEAS -= 1

    unpacked  = {
        'HEAD': HEAD,
        'FPGA': FPGA,
        'TDC_CHANNEL': TDC_CHANNEL,
        'ORBIT_CNT': ORBIT_CNT,
        'BX_COUNTER': BX_COUNTER,
        'TDC_MEAS': TDC_MEAS,
        'TRG_QUALITY': np.NaN
    }
    print(unpacked)
    return Row(**unpacked)

def trigger_unpacker(word):
    # Trigger masks
    tmaskQUAL    = 0x0000000000000001
    tmaskBX      = 0x0000000000001FFE
    tmaskTAGBX   = 0x0000000001FFE000
    tmaskTAGORB  = 0x01FFFFFFFE000000
    tmaskMCELL   = 0x0E00000000000000
    tmaskSL      = 0x3000000000000000
    tmaskHEAD    = 0xC000000000000000

    tfirstQUAL   = 0
    tfirstBX     = 1
    tfirstTAGBX  = 13
    tfirstTAGORB = 25
    tfirstMCELL  = 57
    tfirstSL     = 60
    tfirstHEAD   = 62
    
    storedTrigHead     = ( word & tmaskHEAD   ) >> tfirstHEAD
    storedTrigMiniCh   = ( word & tmaskSL     ) >> tfirstSL
    storedTrigMCell    = ( word & tmaskMCELL  ) >> tfirstMCELL
    storedTrigTagOrbit = ( word & tmaskTAGORB ) >> tfirstTAGORB
    storedTrigTagBX    = ( word & tmaskTAGBX  ) >> tfirstTAGBX
    storedTrigBX       = ( word & tmaskBX     ) >> tfirstBX
    storedTrigQual     = ( word & tmaskQUAL   ) >> tfirstQUAL
    
    unpacked = {
        'HEAD': storedTrigHead,
        'FPGA': storedTrigMiniCh,
        'TDC_CHANNEL': storedTrigMCell,
        'ORBIT_CNT': storedTrigTagOrbit,
        'BX_COUNTER': storedTrigTagBX,
        'TDC_MEAS': storedTrigBX,
        'TRG_QUALITY': storedTrigQual
    }
        
    return Row(**unpacked)

def unpacker(hit):
    
    word_counter = 0
    word_size = 8
    
    rows = []
    
    for i in range(0, num_words*word_size, word_size):      
        buffer = struct.unpack('<Q', hit[i:i+word_size])[0]
        head = (buffer >> 62) & 0x3

        if head <= 2:
            rows.append(hit_unpacker(buffer))

        elif head == 3:
            rows.append(trigger_unpacker(buffer))
        
    return rows

In [5]:
unpacked_rdd = rdd.map(lambda hit: unpacker(hit)).flatMap(lambda x: x)
spark_df = unpacked_rdd.toDF()
spark_df = spark_df.limit(20000).cache() # work on a subset of 20000 message

In [6]:
# remove tdc_channel = 139 since they are not physical events
spark_df = spark_df.where(col('TDC_CHANNEL')!=139)

In [7]:
allhits = spark_df.filter('HEAD == 1').drop('TRG_QUALITY')
triggershits = spark_df.filter('HEAD > 2')

In [8]:
# Check triggerhits
triggershits.where(col('ORBIT_CNT')==673213665).show()

+----------+----+----+---------+-----------+--------+-----------+
|BX_COUNTER|FPGA|HEAD|ORBIT_CNT|TDC_CHANNEL|TDC_MEAS|TRG_QUALITY|
+----------+----+----+---------+-----------+--------+-----------+
|      2465|   0|   3|673213665|          6|    2456|       null|
|      2465|   0|   3|673213665|          5|    2456|       null|
|      2465|   0|   3|673213665|          4|    4095|       null|
|      2465|   0|   3|673213665|          3|    4095|       null|
|      2465|   0|   3|673213665|          2|    4095|       null|
|      2465|   0|   3|673213665|          1|    4095|       null|
|      2465|   0|   3|673213665|          0|    4095|       null|
|      2465|   0|   3|673213665|          6|    2459|       null|
|      2465|   0|   3|673213665|          5|    2459|       null|
|      2465|   0|   3|673213665|          4|    4095|       null|
|      2465|   0|   3|673213665|          3|    4095|       null|
|      2465|   0|   3|673213665|          2|    4095|       null|
|      246

In [9]:
# find triggers
triggers = triggershits.groupby('ORBIT_CNT')

In [10]:
from pyspark.sql.functions import when, col, min

# Create T0 column
triggers_table = triggers.agg(min('TDC_MEAS').alias('T0'))

In [11]:
hits = allhits.join(triggers_table, on='ORBIT_CNT')
hits.show()

+---------+----------+----+----+-----------+--------+----+
|ORBIT_CNT|BX_COUNTER|FPGA|HEAD|TDC_CHANNEL|TDC_MEAS|  T0|
+---------+----------+----+----+-----------+--------+----+
|673213665|      2457|   0|   1|         45|      22|2456|
|673213665|      2456|   0|   1|         67|      24|2456|
|673213665|      2464|   0|   1|         66|      17|2456|
|673213665|      2463|   0|   1|         68|      18|2456|
|673213665|      2463|   0|   1|         47|      16|2456|
|673213665|      2468|   0|   1|         50|      21|2456|
|673213665|      2468|   0|   1|         69|      27|2456|
|673213665|      2465|   0|   1|         48|      27|2456|
|673213781|       333|   0|   1|         21|       0|1772|
|673213781|      1777|   0|   1|        123|      23|1772|
|673213781|      1774|   0|   1|         12|      12|1772|
|673213781|      1778|   0|   1|         13|      15|1772|
|673213781|      1777|   0|   1|        124|      24|1772|
|673213781|      1785|   0|   1|         11|       0|177

In [12]:
## Parameters

DURATION = {                         
    'orbit:bx': 3564,
    'orbit': 3564*25,
    'bx': 25.,
    'tdc': 25./30
}

TIME_WINDOW = (-50, 500)

### Chamber configuration
NCHANNELS  = 64    # channels per SL
NSL        = 4     # number of SL
### Cell parameters
XCELL     = 42.                      # cell width in mm
ZCELL     = 13.                      # cell height in mm
ZCHAMB    = 550.                     # spacing betweeen chambers in mm

TDRIFT    = 15.6*DURATION['bx']    # drift time in ns
VDRIFT    = XCELL*0.5 / TDRIFT     # drift velocity in mm/ns 

layer_z     = [  1,            3,            2,            4,         ]
chanshift_x = [  0,            -1,           0,            -1,        ]
pos_z       = [  ZCELL*3.5,    ZCELL*1.5,    ZCELL*2.5,    ZCELL*0.5, ]
posshift_x  = [  0,            0,            0.5,          0.5,       ]

In [13]:
# Create column TDRIFT
hits = hits.withColumn('TDRIFT', 
                (hits['BX_COUNTER']-hits['T0'])*DURATION['bx'] + 
                 hits['TDC_MEAS']*DURATION['tdc']).drop('T0')

# Find events
events = hits.where((hits['TDRIFT']>TIME_WINDOW[0]) & (hits['TDRIFT']<TIME_WINDOW[1]))

In [14]:
events = events.withColumn('X_POSSHIFT', 
                   when(col('TDC_CHANNEL') % 4 == 1, posshift_x[0])
                  .when(col('TDC_CHANNEL') % 4 == 2, posshift_x[1])
                  .when(col('TDC_CHANNEL') % 4 == 3, posshift_x[2])
                  .when(col('TDC_CHANNEL') % 4 == 0, posshift_x[3])
                  .otherwise(0.0)
                 )

In [15]:
events = events.withColumn('SL',
                      when((col('FPGA') == 0) & (col('TDC_CHANNEL') <= NCHANNELS) , 0)
                     .when((col('FPGA') == 0) & (col('TDC_CHANNEL') > NCHANNELS) & (col('TDC_CHANNEL') <= 2*NCHANNELS), 1)
                     .when((col('FPGA') == 1) & (col('TDC_CHANNEL') <= NCHANNELS) , 2)
                     .when((col('FPGA') == 1) & (col('TDC_CHANNEL') > NCHANNELS) & (col('TDC_CHANNEL') <= 2*NCHANNELS), 3) 
                     .otherwise(-1)
                    )

In [16]:
events = events.withColumn('TDC_CHANNEL_NORM', col('TDC_CHANNEL') - NCHANNELS*(col('SL')%2))

In [17]:
events = events.withColumn('X_POS_LEFT', 
                  (((col('TDC_CHANNEL_NORM')-0.5)/4).cast('integer') + col('X_POSSHIFT'))*XCELL + 
                  XCELL/2 - greatest(col('TDRIFT'), lit(0))*VDRIFT)
events = events.withColumn('X_POS_RIGHT', 
                  (((col('TDC_CHANNEL_NORM')-0.5)/4).cast('integer') + col('X_POSSHIFT'))*XCELL + 
                  XCELL/2 + greatest(col('TDRIFT'), lit(0))*VDRIFT)

In [18]:
events = events.withColumn('Z_POS', 
                           when(col('TDC_CHANNEL') % 4 == 1, pos_z[0])
                          .when(col('TDC_CHANNEL') % 4 == 2, pos_z[1])
                          .when(col('TDC_CHANNEL') % 4 == 3, pos_z[2])
                          .when(col('TDC_CHANNEL') % 4 == 0, pos_z[3])
                          .otherwise(0.0)
                          )

In [19]:
events.select('ORBIT_CNT', 'X_POS_LEFT', 'X_POS_RIGHT', 'Z_POS', 'TDRIFT').show(8)

+---------+------------------+------------------+-----+------------------+
|ORBIT_CNT|        X_POS_LEFT|       X_POS_RIGHT|Z_POS|            TDRIFT|
+---------+------------------+------------------+-----+------------------+
|673213665| 480.6666666666667| 485.3333333333333| 45.5|43.333333333333336|
|673213665| 40.92307692307692| 43.07692307692308| 32.5|              20.0|
|673213665| 9.467948717948717|32.532051282051285| 19.5|214.16666666666666|
|673213665|31.769230769230766| 52.23076923076923|  6.5|             190.0|
|673213665|493.85897435897436| 514.1410256410256| 32.5|188.33333333333334|
|673213665|507.90384615384613| 542.0961538461538| 19.5|             317.5|
|673213665| 45.63461538461539| 80.36538461538461| 45.5|             322.5|
|673213665| 490.6730769230769| 517.3269230769231|  6.5|             247.5|
+---------+------------------+------------------+-----+------------------+
only showing top 8 rows

