In [94]:
import requests
import csv
import time
from datetime import datetime
import json
import pandas as pd
import os

API_URL = "http://minim-phxap-h8qgv5ltdcsr-1721285542.us-east-1.elb.amazonaws.com/bioreactor/0"
DETAILS_API_URL = "http://minim-phxap-h8qgv5ltdcsr-1721285542.us-east-1.elb.amazonaws.com/bioreactor/{}"
TIME_INTERVAL = 10  # Time interval in seconds

def fetch_id():
    response = requests.get(API_URL)
    if response.status_code == 200:
        return response.json()
    else:
        print(f"Failed to fetch data. Status code: {response.status_code}")
        return []

def fetch_details(batch_id):
    details_url = DETAILS_API_URL.format(batch_id)
    response = requests.get(details_url)
    if response.status_code == 200:
        return response.json()
    else:
        print(f"Failed to fetch details for ID {batch_id}. Status code: {response.status_code}")
        return None
    
def create_csv_file(file_name, header):
    if not os.path.exists(file_name):
        with open(file_name, 'a', newline='') as csv_file:
            csv_writer = csv.writer(csv_file)
            csv_writer.writerow(header)
            
def update_status(id_update_url,status):
    status_data = {"status": status}
    status_update_url = DETAILS_API_URL.format(id_update_url)
    response = requests.put(status_update_url, json=status_data)
    if response.status_code == 200:
        print(f"Status updated for Batch {id_update_url}")
    else:
        print(f"Failed to update status for Batch {id_update_url}. Status code: {response.status_code}")

def raw_data_validation(target_list):
    if (int(target_list[1]) >=0 and int(target_list[2])>=0 and 
    int(target_list[3])>=0 and int(target_list[4])>=0 and int(target_list[5])>=0 
    and target_list[6] in ('closed','open') and target_list[7] in ('closed','open')):
        return True
    else:
        return False

def data_transformation(df):
    temperature_range = (df[df['batch_id']==batch_id].groupby('batch_id')['temperature'].max() 
                         - df[df['batch_id']==batch_id].groupby('batch_id')['temperature'].min())
    
    ph_range = (df[df['batch_id']==batch_id].groupby('batch_id')['ph_value'].max() 
                - df[df['batch_id']==batch_id].groupby('batch_id')['ph_value'].min())
    
    presure_range = (df[df['batch_id']==batch_id].groupby('batch_id')['pressure'].max() 
                     - df[df['batch_id']==batch_id].groupby('batch_id')['pressure'].min())
    
    total_time = (df[df['batch_id']==batch_id].groupby('batch_id')['timestamp'].max() 
                  - df[df['batch_id']==batch_id].groupby('batch_id')['timestamp'].min())
    
    if details['fill_percent'] >=68 and details['fill_percent']<=72:
        met_fill_level = 'Y'
    else:
        met_fill_level = 'N'
    if details['temperature']>=79 and details['temperature']<=81:
        met_temperature_level = 'Y'
    else:
        met_temperature_level = 'N'
    return [temperature_range,ph_range,presure_range,total_time,met_fill_level,met_temperature_level]

def aggregated_data_validation(agg_list):
    if any([int(num) >= 0 for num in agg_list[1:-1]]):
        return True
    else:
        return False

def main():
    #create raw file header,we don't have to have a header, just add it in case there might be a need
    raw_file_header =['timestamp','batch_id','fill_percent','temperature','ph_value','pressure','input_state',
                      'output_state']
    #target aggregated file header,we don't have to have a header, just add it in case there might be a need
    agg_file_header =['batch_id','success','fill_level','temperature_range','ph_range','presure_range',
                      'total_time','met_fill_level','met_temperature_level','met_presure_level']
    #create empty df for aggregation with same header as raw data 
    empty_df = pd.DataFrame(columns = raw_file_header)
    
    while True:
        data = fetch_id()
        if not data:
            print("This vessel is emptied and the batch process is done.")
            break
        batch_id = data['id']
        # Fetch details information based on id
        details = fetch_details(batch_id)
        #Fetch input and output state
        input_id = batch_id + '/input-valve'
        input_info = fetch_details(input_id)
        output_id = batch_id + '/output-valve'
        output_info = fetch_details(output_id)
        
        #create raw data file, since streaming data can be accumulated to extremely large so I split the file by date
        #we don't have to split the file if the dataset is small or split by bigger date range
        dataload_time = datetime.now().date()
        mes_raw_file = f"mes_raw_{dataload_time}.csv"
        if not os.path.exists(mes_raw_file):
            create_csv_file(mes_raw_file,raw_file_header)
        
        #validate the raw data and load to csv file
        timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        target_list = [timestamp, batch_id, details['fill_percent'],details['temperature'],
                                 details['pH'],details['pressure'],input_info['state'],output_info['state']]
        if raw_data_validation(target_list):
            with open(mes_raw_file, 'a', newline='') as csv_file:
                csv_writer = csv.writer(csv_file)
                csv_writer.writerow(target_list)
        else: 
            print('raw data is not properly generated')
        
        #use pandas dataframe for all the calculations
        df = empty_df.append(target_list,ignore_index=True)
        
        #success batch
        if details['fill_percent'] >=68 and details['fill_percent']<=72 and \
        details['temperature']>=79 and details['temperature']<=81 and details['pressure']<200:
            success = 'Y'
            fill_level = details['fill_percent']
            transformation_list = data_transformation(df)
            met_presure_level = 'Y'
            agg_list = [batch_id,success]
            agg_list.extend(transformation_list)
            agg_list.append(met_presure_level)
            #valid the agg data and lod to file
            if aggregated_data_validation(agg_list):
                #create and write aggregation to file each day
                mes_agg_file = f"mes_agg_{dataload_time}.csv"
                if not os.path.exists(mes_agg_file):
                    create_csv_file(mes_agg_file,agg_file_header)
                with open(mes_agg_file, 'a', newline='') as agg_csv_file:
                    csv_writer = csv.writer(agg_csv_file)
                    csv_writer.writerow(agg_list)
            else:
                print('success aggregated data is not valid')
            
            #delete the compeleted batch info,so that the computing dataframe won't go too big
            df = df[df['batch_id'] != batch_id]
            
            #update state:batch run success
            inport_status = 'closed'
            update_status(input_id,inport_status)
            outport_status = 'open'
            update_status(output_id,inport_status)
            
        #abort batch  
        elif details['pressure']>=200:
            #data transformation
            success = 'N'
            fill_level = details['fill_percent']
            transformation_list = data_transformation(df)
            met_presure_level = 'N'
            agg_list = [batch_id,success]
            agg_list.extend(transformation_list)
            agg_list.append(met_presure_level)
            #valid the agg data and lod to file
            if aggregated_data_validation(agg_list):
                #create and write aggregation to file each day
                mes_agg_file = f"mes_agg_{dataload_time}.csv"
                if not os.path.exists(mes_agg_file):
                    create_csv_file(mes_agg_file,agg_file_header)
                with open(mes_agg_file, 'a', newline='') as agg_csv_file:
                    csv_writer = csv.writer(agg_csv_file)
                    csv_writer.writerow(agg_list)
            else:
                print('aborted aggregated data is not valid')
                
            #delete the compeleted batch info,so that the computing dataframe won't go too big
            df = df[df['batch_id'] != batch_id]
            
            #update state, batch fail
            inport_status = 'closed'
            update_status(input_id,inport_status)
            outport_status = 'open'
            update_status(output_id,inport_status)
        
        #batch is running
        else:
            #update state: batch is still running
            inport_status = 'open'
            update_status(input_id,inport_status)
            outport_status = 'closed'
            update_status(output_id,inport_status)
        
        
        #run time interval
        time.sleep(TIME_INTERVAL)

if __name__ == "__main__":
    main()


  result = result.union(other)


Status updated for Batch 24762/input-valve
Status updated for Batch 24762/output-valve
Status updated for Batch 84730/input-valve
Status updated for Batch 84730/output-valve
Status updated for Batch 10845/input-valve
Status updated for Batch 10845/output-valve
Status updated for Batch 70064/input-valve
Status updated for Batch 70064/output-valve
Status updated for Batch 21982/input-valve
Status updated for Batch 21982/output-valve
Status updated for Batch 93874/input-valve
Status updated for Batch 93874/output-valve
Status updated for Batch 61306/input-valve
Status updated for Batch 61306/output-valve


KeyboardInterrupt: 