Der Skript dient sich zur Synchronisierung der Daten aus der Daten bank mit dem Siganl der Magnet-Temperatur.
Schritte:
1- Magnet-Temperatur Upsampling
2- Aggregate Abfrage zwischen beiden Dataframes

Load libraries

In [None]:
import pandas as pd
import os
import glob
import numpy as np
import duckdb as ddb
import pymongo
from pymongo import MongoClient
import plotly.express as ex
from plotly.subplots import make_subplots
import plotly.graph_objects as go
from datetime import datetime
from datetime import timedelta
import plotly.io as pio

In [None]:
file_path= "G:\\Innovations@HELLER\\DN\\KI\\Zollern-FH-MillTrunMotor\\Datasets\\rough_data\\Magnet_temperature\\2023_07_19_rotor_hochtemp.csv"
put_File_name= "Versuch_19072023_Sync.csv"

Set time priod needed from Database (UTC-Time)

In [None]:
start= datetime.strptime("2023-09-06T00:00:00.000+0000", "%Y-%m-%dT%H:%M:%S.%f+0000")
end= datetime.strptime ("2023-09-07T13:00:00.000+0000", "%Y-%m-%dT%H:%M:%S.%f+0000")

In [None]:
#####DB Config
host= "m53763edge"
port=27017
collection= "thermal_data"
all_data= True
get_preprocessed_data= False
time_shift= +2

Prameters for upsampling

In [None]:
current_sample_rate= 5 ## Seconds
new_sample_rate= 0.1 ## Seconds
count_insertions= int(current_sample_rate / new_sample_rate) - 1

In [None]:
def create_block(start_date, period: int, count_insertions: int):
    insertion_block= pd.DataFrame({'date': [ start_date + timedelta(seconds= (i+1) * period) for i in range(count_insertions)],
                                   'magnet_temperature':[np.NaN for i in range(count_insertions)]})
    return insertion_block

In [None]:
def insert_block(original_data: pd.DataFrame, period:int,  count_insertions: int):
    output_df= pd.DataFrame(columns=original_data.columns.to_list())
    for index, row in original_data.iterrows():
        if index + 1 == len(original_data):
            output_df= output_df.append(row, ignore_index= True)
        else:
            start_date= row['date']
            block= create_block(start_date= start_date, period= period, count_insertions= count_insertions)
            output_df= output_df.append(row.to_dict(),  ignore_index= True)
            output_df= output_df.append(block, ignore_index= True)
    return output_df

Read Magnet Temperature and add 1-Hour to the time stamp to get it in local time
- Apply upsampling

In [None]:
df= pd.read_csv(file_path)
df['date']= df['date'].apply( lambda x: datetime.strptime(x,"%Y-%m-%d %H:%M:%S") + timedelta(hours=+1))
df.drop(columns= ['No'],inplace= True)
to_up_sampled_data= df
to_up_sampled_data= insert_block(to_up_sampled_data, period= new_sample_rate, count_insertions= count_insertions)

Interpolation of the upsampled file

In [None]:
to_up_sampled_data['magnet_temperature']= to_up_sampled_data['magnet_temperature'].interpolate(method='linear', order=5, axis= 0,inplace= False)
to_up_sampled_data.head(10)

Read Data source from database

Get Data from database

In [None]:
## Read raw and preprocessing data
client = MongoClient(host= host, port=port)
db = client.h4ai
event_list = db[collection].find({ "date" : { '$gte' : start, '$lt' : end} }).sort('date', 1)
signals= None
i=0
last_prediction= -500
prev_time= None
current_time= None
stop= False
### Go through events in DB
for event in event_list:
    i+=1
    record_list= event['content']
    keys= None
    ### Go through records in each event
    for record in record_list:
        ### Get right data between start / end 
        if record['date']< start or record['date'] > end:
            continue
        current_time= record['date']+ timedelta(hours= time_shift)
        if prev_time is not None  and (current_time -prev_time).total_seconds()> 5.5:
            print('Period= ', (current_time -prev_time).total_seconds())
            print('prev_time', prev_time)
            print('current_time', current_time)
            print('#########################################################')
        for item in record['raw_data'].keys():
            record['raw_data'][item]= [np.double(np.round(record['raw_data'][item],decimals=2))]
        record['raw_data']['date']=[current_time]
        record['raw_data']['given2model']=[record['given2model']]
        record['raw_data']['prediction']= [record['prediction']]
        ####################################
        #print ('After ',record)
        lf_signal_point= pd.DataFrame(record['raw_data'])
        if signals is None:
            signals= lf_signal_point
        else:
            signals= signals.append(lf_signal_point,ignore_index= True)
        prev_time= current_time
signals.reset_index(inplace= True)

Remove unneeded Columns

In [None]:
signals.drop(columns=['index', 'given2model'], inplace= True)
signals.columns

In [None]:
## A function to create property list of a dataframe suitable for an SQL query
def get_field_list(df: pd.DataFrame, df_name: str):
    df_logs_fields= df.columns.to_list()
    fields= ""
    for field in df_logs_fields:
        if fields == "":
            fields =  "{df_name}.".format(df_name= df_name) + field
        else:
            fields += ", {df_name}.".format(df_name= df_name) + field
    return fields

Generate field list for each dataframe

In [None]:
signals_fields= get_field_list(signals, 'signals')
to_up_sampled_data_fields= get_field_list(to_up_sampled_data, 'to_up_sampled_data')
table_signals= 'signals'
table_to_up_sampled_data= 'to_up_sampled_data'

In [None]:
signals.head(10)

In [None]:
to_up_sampled_data.head(10)

Apply Query to combine data sources

In [None]:
####Apply Query
query= """
select * from 
    (select {t_a}.start as start, {t_a}.end as end,{table_a_fields}, min({t_b}.ts)  as ts from 
        (
            (select  {table_a_fields}, {t_a}.date as start, ({t_a}.date + interval 5 Second) as end from {t_a})  {t_a}
            join
            (select {table_b_fields}, {t_b}.date as ts from {t_b})  {t_b}
            on ({t_b}.ts >= {t_a}.start) and ( {t_b}.ts < {t_a}.end)
        )
    group by {t_a}.start, {t_a}.end, {table_a_fields}) combined_logs
join
    {t_b}
on (combined_logs.ts = {t_b}.date)
order by start Asc
""".format(t_a= table_signals, t_b= table_to_up_sampled_data, table_a_fields= signals_fields, table_b_fields= to_up_sampled_data_fields)
results= ddb.query(query)
result_df= results.to_df()

In [None]:
selected_colums= ['date','start', 'end', 'ts','T_KLEMMUNG', 'T_LAGER', 'T_MOTOR', 'V_LAENGS', 'DRZ5',
       'Analytic_sol', 'I_MOMENT_G', 'I_FELD', 'DRZ', 'I_IST_BETR', 'V_QUER',
       'T_BETT',  'prediction', 'magnet_temperature']

Resulted Data frame after aggregation

In [None]:
result_df[selected_colums]

Visualize Data of resulted Data frame

In [None]:
scatter_mode= 'lines'#'lines'# 'lines+markers'# 'markers'
selected_Columns= ['date','DRZ5','T_KLEMMUNG', 'T_LAGER', 'T_MOTOR',
       'T_BETT',  'Analytic_sol','prediction', 'magnet_temperature']
df= result_df[selected_colums]
fig= make_subplots(rows=6,cols=1,shared_xaxes= True, print_grid= True, vertical_spacing=0.02)
##DRZ
fig.add_trace(go.Scatter(x= df['date'], y= df['DRZ5'], name='given2model', mode= scatter_mode), row= 1, col= 1)
fig.update_yaxes(title_text= 'DRZ (RPM)', row= 1, col= 1)
##T_KLEMMUNG
fig.add_trace(go.Scatter(x= df['date'], y= df['T_KLEMMUNG'], name='T_KLEMMUNG', mode= scatter_mode), row= 2, col= 1)
fig.update_yaxes(title_text= 'T_KLEMMUNG C°', row= 2, col= 1)
##T_LAGER
fig.add_trace(go.Scatter(x= df['date'], y= df['T_LAGER'], name='T_LAGER', mode= scatter_mode), row= 3, col= 1)
fig.update_yaxes(title_text= 'T_LAGER C°', row= 3, col= 1)
##'T_MOTOR'
fig.add_trace(go.Scatter(x= df['date'], y= df['T_MOTOR'], name='T_MOTOR', mode= scatter_mode), row= 4, col= 1)
fig.update_yaxes(title_text= 'T_MOTOR C°', row= 4, col= 1)
##'prediction_variable'
fig.add_trace(go.Scatter(x= df['date'], y= df['magnet_temperature'], name='magnet_temperature', mode= scatter_mode), row= 5, col= 1)
fig.add_trace(go.Scatter(x= df['date'], y= df['prediction'], name='prediction', mode= scatter_mode), row= 5, col= 1)
fig.add_trace(go.Scatter(x= df['date'], y= df['Analytic_sol'], name='Analytic_sol', mode= scatter_mode), row= 5, col= 1)
fig.update_yaxes(title_text= 'Rotor Temperature C°', row= 5, col= 1)
##Restfehler
fig.add_trace(go.Scatter(x= df['date'], y= df['magnet_temperature']- df['Analytic_sol'], name=' Error-Analytic', mode= scatter_mode),  row= 6 , col= 1)
fig.add_trace(go.Scatter(x= df['date'], y= df['magnet_temperature']- df['prediction'], name=' Error-Data-Driven', mode= scatter_mode),  row= 6 , col= 1)
    ## Draw the tolerence +-5
#fig.add_trace(go.Scatter(x= signals['date'], y= np.full_like(signals['db_prediction_abs_error'],5), name='+5 Obere Grenze', mode= scatter_mode),  row= 10 , col= 1)
#fig.add_trace(go.Scatter(x= signals['date'], y= np.full_like(signals['db_prediction_abs_error'],-5), name='- 5 Obere Grenze', mode= scatter_mode),  row= 10 , col= 1)
fig.update_yaxes(title_text= 'Error in Rotor Temperature C°', row= 6, col= 1)
fig.update_layout(height=1200, width=1400, title_text= 'M57002 Machine Data')
fig.show()


Save it as .CSV file

In [None]:
save_path= "G:\\Innovations@HELLER\\DN\\KI\\Zollern-FH-MillTrunMotor\\Datasets\\rough_data\\database_with_sensor\\Synchronization_results\\{file_name}".format(file_name= put_File_name)
selected_colums= ['date' ,'ts','T_KLEMMUNG', 'T_LAGER', 'T_MOTOR', 'V_LAENGS', 'DRZ5',
       'Analytic_sol', 'I_MOMENT_G', 'I_FELD', 'DRZ', 'I_IST_BETR', 'V_QUER',
       'T_BETT',  'prediction', 'magnet_temperature']
result_df[selected_colums].to_csv(save_path)

In [None]:
import onnx
from skl2onnx import convert_sklearn
from skl2onnx.common.data_types import FloatTensorType
import onnxruntime as ort

In [None]:
preprocessor_path="G:\\Innovations@HELLER\\DN\\KI\\Zollern-FH-MillTrunMotor\\Datasets\\workspace\\assets\\v0.0.10\\preprocessor.onnx"
sess= ort.InferenceSession(preprocessor_path)

In [None]:
input_name = sess.get_inputs()[0].name
label_name = sess.get_outputs()[0].name

In [None]:
sess.get_inputs()[0].shape

In [None]:
import numpy as np
data= np.asarray([[0,0, 0,0]],  dtype=np.float32)
data.shape

In [None]:

onnx_outputs = sess.run(None, {'signals': data})
onnx_output = onnx_outputs[0]

In [None]:
onnx_output