In [2]:
#!/usr/bin/env python3
#
# I. Abritta and G. Mazzitelli March 2022
# Middelware online recostruction 
# Modify by ... in date ...
#

from matplotlib import pyplot as plt
import numpy as np
import os
import stat
from datetime import datetime
# import pre_reconstruction as pr
import time
import pandas as pd
import base64
import io
import json
import struct

import midas
import midas.client

import mysql.connector

import sys
import cygno as cy
import multiprocess

from json import dumps
from kafka import KafkaProducer
import cv2
import ctypes

MAX_CPU_AVAILABLE   = multiprocess.cpu_count()
DAQ_ROOT            = os.environ['DAQ_ROOT']
DEFAULT_PATH_ONLINE = DAQ_ROOT+'/online/'

def image_jpg(image, vmin, vmax, event_number, event_time, producer):
    
    im = plt.imshow(image, cmap='gray', vmin=vmin, vmax=vmax)
    plt.title ("Event: {:d} at {:s}".format(event_number, event_time))
    plt.savefig(DEFAULT_PATH_ONLINE+'custom/tmp.png')

    with open(DEFAULT_PATH_ONLINE+'custom/tmp.png', 'rb') as f:
        img_bytes = f.read()
    
    #image_to_kafka = base64.b64encode(cv2.imencode('.jpg', img)[1]).decode()
    img_base64 = base64.b64encode(img_bytes).decode('utf-8')
    producer.send('image-topic', value=img_base64.encode('utf-8'))
    
    return 

def push_panda_table_sql(connection, table_name, df):
    try:
        mycursor=connection.cursor()
        mycursor.execute("SHOW TABLES LIKE '"+table_name+"'")
        result = mycursor.fetchone()
        if not result:
            cols = "`,`".join([str(i) for i in df.columns.tolist()])
            db_to_crete = "CREATE TABLE `"+table_name+"` ("+' '.join(["`"+x+"` REAL," for x in df.columns.tolist()])[:-1]+")"
            print ("[Table {:s} created into SQL Server]".format(table_name))
            mycursor = connection.cursor()
            mycursor.execute(db_to_crete)

        cols = "`,`".join([str(i) for i in df.columns.tolist()])

        for i,row in df.iterrows():
            sql = "INSERT INTO `"+table_name+"` (`" +cols + "`) VALUES (" + "%s,"*(len(row)-1) + "%s)"
            mycursor.execute(sql, tuple(row.astype(str)))
            connection.commit()

        mycursor.close()
        return 0 
    except:
        return -1

def init_sql():
    import os
    import mysql.connector
    # init sql variabile and connector
    try:
        connection = mysql.connector.connect(
          host=os.environ['MYSQL_IP'],
          user=os.environ['MYSQL_USER'],
          password=os.environ['MYSQL_PASSWORD'],
          database=os.environ['MYSQL_DATABASE'],
          port=int(os.environ['MYSQL_PORT'])
        )
        return connection
    except:
        return -1


In [3]:
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda x: dumps(x).encode('utf-8')
)
producerb = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    #value_serializer=lambda m: json.dumps(m).encode('ascii'),
    #value_serializer=lambda v: dumps(v).encode('utf-8'),
    max_request_size= 31457280, #15728640,
    max_block_ms=300000
)


client = midas.client.MidasClient("middleware")
buffer_handle = client.open_event_buffer("SYSTEM",None,1000000000)
request_id = client.register_event_request(buffer_handle, sampling_type = 2) #midas.GET_SOME)

# init program variables #####
t0 = time.time()
vmin         = 95
vmax         = 130
connection   =-1
max_try      = 3
header_event = [
    'timestamp',
    'serial_number',
    'event_id']
#AG         = os.environ['TAG']
###############################

# global run useful varibles
header_environment = client.odb_get("/Equipment/Environment/Settings/Names Input")
header_environment = header_event + header_environment
#
n_try=0
while connection == -1 and n_try<=max_try:
    connection = init_sql()
    time.sleep(1)
    n_try+=1
if connection == -1:
    print (int(time.time()), "ERROR SQL connaction fail...")
    exit(-1)

while 5:
    start = time.time()
    event = client.receive_event(buffer_handle, async_flag=True)
#        state         = client.odb_get("/Runinfo/State")

    odb_json = dumps(client.odb_get("/"))
    producer.send('midas-odb', value=odb_json)
    end = time.time()
    print("ODB elapsed: {:.2f}, payload size {:.1f} kb".format(end-start, len(odb_json.encode('utf-8'))/1024))

    start = time.time()

    if event is not None:
        if event.header.is_midas_internal_event():
            if verbose:
                print("Saw a special event")
            continue

        pyload       = event.pack()
        #packed_event = event.pack()
        
        binary_data = io.BytesIO()
        binary_data.write(pyload)
        binary_data.seek(0)
        encoded_data = binary_data.read()
        
        #bytes_object = bytes(ctypes.cast(packed_event, ctypes.POINTER(ctypes.c_char * len(packed_event))).contents)
        
        producerb.send('midas-event', value=(encoded_data))
        producerb.flush()


        bank_names    = ", ".join(b.name for b in event.banks.values())
        if 'CAM0' in bank_names:
            payload = list(event.banks['CAM0'].data)#[0:2500000]


            #producerb.send('midas-event', value=(payload))
            #producerb.flush()

            end = time.time()
        print("EVENT elapsed: {:.2f}, payload size {:.1f} Mb".format(end-start, np.size(encoded_data)/1024/1024))
        #print("EVENT elapsed: {:.2f}, payload size {:.1f} Mb encoded {:.1f} Mb".format(end-start, np.size(pyload)/1024/1024, np.size(encoded_data)/1024/1024))


        # global event useful variables
        bank_names    = ", ".join(b.name for b in event.banks.values())
        event_info    = [event.header.timestamp, event.header.serial_number, event.header.event_id]
        run_number    = client.odb_get("/Runinfo/Run number")
        event_number  = event.header.serial_number
        event_time    = datetime.fromtimestamp(event.header.timestamp).strftime('%Y-%m-%d %H:%M:%S')


        if 'CAM0' in bank_names: # CAM image
            image, _, _ = cy.daq_cam2array(event.banks['CAM0']) # matrice delle imagine
            image_update_time = client.odb_get("/middleware/image_update_time")
            if int(time.time())%image_update_time==0:
                print("test")
                #image_jpg(image, vmin, vmax, event_number, event_time, producerb)

#             if 'INPT' in bank_names:                
#                 value = [event_info + list(event.banks['INPT'].data)]
#                 try:
#                     producer.send('slow_control', value=value)
#                 except:
#                     print (int(time.time()), "KAFKA ERROR...")

#                 de = pd.DataFrame(value, columns = header_environment)
#                 table_name_sc = "SlowControl"
#                 n_try=0
#                 while push_panda_table_sql(connection,table_name_sc, de) == -1 and n_try<=max_try:
#                     time.sleep(1)
#                     connection = init_sql()
#                     print (int(time.time()), "ERROR SQL push_panda_table_sql fail...", n_try, connection)
#                 n_try+=1
#                 if n_try==max_try:
#                     exit(-1)
#             if event_number%100==0:
#                 print ("midware alive", event_number)

    client.communicate(10)
    time.sleep(0.1)


client.deregister_event_request(buffer_handle, request_id)

client.disconnect()

ODB elapsed: 0.09, payload size 75.9 kb
EVENT elapsed: 4.05, payload size 0.0 Mb
ODB elapsed: 0.14, payload size 75.9 kb
EVENT elapsed: 4.11, payload size 0.0 Mb
ODB elapsed: 0.12, payload size 75.9 kb
EVENT elapsed: 4.17, payload size 0.0 Mb
test
ODB elapsed: 0.12, payload size 75.9 kb
EVENT elapsed: 4.15, payload size 0.0 Mb
ODB elapsed: 0.05, payload size 75.9 kb
EVENT elapsed: -0.00, payload size 0.0 Mb
ODB elapsed: 0.09, payload size 75.9 kb
EVENT elapsed: 4.11, payload size 0.0 Mb
test
ODB elapsed: 0.16, payload size 75.9 kb
EVENT elapsed: 4.08, payload size 0.0 Mb
Midas shutdown


NameError: name 'exit' is not defined

In [20]:
type(encoded_data)

bytes

In [20]:
len(packed_event)

10616868

In [22]:
from midas import event
event2 = midas.event.Event()
event2.unpack(packed_event, use_numpy=False)

In [25]:
event2.header.timestamp

1680277098

In [3]:
pyload = event.pack()
binary_data = io.BytesIO()
binary_data.write(pyload)
binary_data.seek(0)
encoded_data = binary_data.read()
producerb.send('midas-event', value=(encoded_data))
producerb.flush()


ODB elapsed: 0.12, payload size 75.9 kb
EVENT elapsed: 4.00, payload size 0.0 Mb
ODB elapsed: 0.12, payload size 75.9 kb
EVENT elapsed: 4.03, payload size 0.0 Mb
Midas shutdown


NameError: name 'exit' is not defined

In [12]:
client.disconnect()

In [26]:
binary_stream = io.BytesIO()
# Binary data and strings are different types, so a str
# must be encoded to binary using ascii, utf-8, or other.
binary_stream.write("Hello, world!\n".encode('ascii'))
#binary_stream.write("Hello, world!\n".encode('utf-8'))

# Move cursor back to the beginning of the buffer
binary_stream.seek(0)

# Read all data from the buffer
stream_data = binary_stream.read()

# The stream_data is type 'bytes', immutable
print(type(stream_data))
print(stream_data)

# To modify the actual contents of the existing buffer
# use getbuffer() to get an object you can modify.
# Modifying this object updates the underlying BytesIO buffer
mutable_buffer = binary_stream.getbuffer()
print(type(mutable_buffer))  # class 'memoryview'
mutable_buffer[0] = 0xFF

# Re-read the original stream. Contents will be modified
# because we modified the mutable buffer
binary_stream.seek(0)
print(binary_stream.read())

<class 'bytes'>
b'Hello, world!\n'
<class 'memoryview'>
b'\xffello, world!\n'


In [28]:
bytes_object = bytes(ctypes.cast(packed_event, ctypes.POINTER(ctypes.c_char * len(packed_event))).contents)

NameError: name 'ctypes' is not defined