## Data Reader

Reads data from given locations excel file and persist to raw data table with status and persists message to queue about records to process

In [1]:
%%writefile data-reader-requirements.txt
pandas==2.2.1
psycopg2==2.9.9
pika==1.3.2

Overwriting data-reader-requirements.txt


In [2]:
!pip install -r data-reader-requirements.txt



In [3]:
import pandas as pd
import pika
import json
import sql_operations

In [4]:
mq_connection = pika.BlockingConnection(
    pika.ConnectionParameters('172.16.238.20'))
channel = mq_connection.channel()
channel.queue_declare(queue='new_data')

<METHOD(['channel_number=1', 'frame_type=1', "method=<Queue.DeclareOk(['consumer_count=0', 'message_count=0', 'queue=new_data'])>"])>

In [5]:
def get_csv_name(excel_to_read):
    return 'Generated/' + excel_to_read + '.csv'

In [6]:
def read_excel(file_name):
    return pd.read_csv(file_name)

In [7]:
def create_tag_message(tagname, batch_id):
    cfg = sql_operations.get_configurations_of_tag(tagname)
    if cfg is not None:
        cfg_message = {
            'ConfigId': cfg['ConfigId'],
            'BatchId': batch_id,
            'MasterTag': {
                'id': cfg['MasterTagId'],
                'name': cfg['MasterTagName'],
                'values': []
            },
            'ErrorTag': {
                'id': cfg['ErrorTagId'],
                'name': cfg['ErrorTagName'],
                'values': []
            }
        }
        return cfg_message
    return None

In [8]:
def insert_to_cfg_message(messages, tag_name, batch_id, inserted_index):
    inserted = False
    for m in messages:
        if m['BatchId'] == batch_id and m['MasterTag']['name'] == tag_name:
            m['MasterTag']['values'].append(inserted_index)
            inserted = True
        elif m['BatchId'] == batch_id and m['ErrorTag']['name'] == tag_name:
            m['ErrorTag']['values'].append(inserted_index)
            inserted = True
    if not inserted:
        tag_message_for_batch = create_tag_message(tag_name, batch_id)
        if tag_message_for_batch is not None:
            messages.append(tag_message_for_batch)
            insert_to_cfg_message(messages, tag_name, batch_id, inserted_index)
    pass

In [9]:
def process_file(excel_to_read):
    csv_dataf = read_excel(get_csv_name(excel_to_read))
    messages = []
    requires_new_partition = False
    for index, row in csv_dataf.iterrows():
        batch_name = row['BATCH_ID']
        product = row['PRODUCT_ID']
        tag_id = row['TAG_ID']
        timestamp = row['TIMESTAMP']
        value = row['VALUE']
        current_partion = sql_operations.get_latest_batch_partition(batch_name)
        if not sql_operations.exists_timestamp_value(batch_name, tag_id, timestamp, value):
            if value == 0:
                sql_operations.insert_new_raw_data( batch_name, product, tag_id, timestamp, value, 'IGNORE')
                requires_new_partition = True
                if current_partion is not None and current_partion['endTime'] is None:
                    sql_operations.finish_batch_partition(current_partion['id'], timestamp)
            else:
                if requires_new_partition or current_partion is None:
                    sql_operations.start_new_batch_partition(batch_name, product, timestamp)
                    current_partion = sql_operations.get_latest_batch_partition(batch_name)
                    requires_new_partition = False
                inserted_id = sql_operations.insert_new_raw_data(
                    batch_name, product, tag_id, timestamp, value, 'NEW')
                insert_to_cfg_message(messages, tag_id, current_partion['id'], inserted_id)
    sql_operations.commit()
    return messages

In [10]:
def publish_message(new_records):
    for rec in new_records:
        channel.basic_publish(
            exchange='', routing_key='new_data', body=json.dumps(rec))

In [11]:
print('File name to read unde Generated Folder')
file_to_read = input()

File name to read unde Generated Folder


 ED1108_1


In [12]:
messages = process_file(file_to_read)
publish_message(messages)

In [13]:
mq_connection.close()