## NEWARE TCP connect

In [1]:
from tqdm import tqdm
import socket
import time

sockethost = "127.0.0.1"
socketport = 502

client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client_socket.connect((sockethost, socketport))
client_socket.settimeout(2)

def newaresend(cmd: str, default_len = 768):
    cmd_len = len(cmd)
    sendmsg = ' '*(default_len-cmd_len) + cmd
    client_socket.send(sendmsg.encode("utf-8"))

def newarerecv():
    return client_socket.recv(20000).decode("utf-8")

def neware_start(subid = 1, chlid = 1, file = "pause test", 
                 savedir = "C:/NewareData", savename = "TEST",
                 backup = "C:/NewareData/backup", type = 0):
    """
    subid: cycling device id (ex. 1 for 1-1 ~ 1-8 channel)
    chlid: channel id (ex. 1 for 1-1, 2-1, ... channel)
    file: cycling protocol file name (ex. pause test)
    backup: backup data file save path
    type: type of backup file (0: NDA, 1: excel)
    """
    newaresend('''<?xml version="1.0" encoding="UTF-8" ?> 
<bts version="1.0"> 
 <cmd>start</cmd> 
 <list count = "1"> 
  <start ip="192.168.1.2" devtype="25" devid="40" subdevid="{}" chlid="{}" >{}</start> 
<backup backupdir="{}" remotedir="{}" filenametype="2" customfilename="{}" addtimewhenrepeat="0" filetype="{}" />
 </list> 
</bts>'''.format(subid, chlid, file, savedir, backup, savename, type))
    time.sleep(1)
    returnvalue = newarerecv()
    return returnvalue

def neware_continue(subid = 1, chlid = 1):
    """
    subid: cycling device id (ex. 1 for 1-1 ~ 1-8 channel)
    chlid: channel id (ex. 1 for 1-1, 2-1, ... channel)
    """
    newaresend('''<?xml version="1.0" encoding="UTF-8" ?> 
<bts version="1.0"> 
<cmd>continue</cmd> 
<list count = "1"> 
<continue ip="192.168.1.2" devtype="25" devid="40" subdevid="{}" chlid="{}" >true</continue> 
</list> 
</bts>'''.format(subid, chlid))
    time.sleep(1)
    returnvalue = newarerecv()
    return returnvalue

def neware_check(subid = 1, chlid = 1):
    """
    subid: cycling device id (ex. 1 for 1-1 ~ 1-8 channel)
    chlid: channel id (ex. 1 for 1-1, 2-1, ... channel)
    """
    newaresend('''<?xml version="1.0" encoding="UTF-8" ?> 
<bts version="1.0"> 
<cmd>getchlstatus</cmd> 
<list count = "1"> 
<status ip="192.168.1.2" devtype="25" devid="40" subdevid="{}" chlid="{}" >true</status> 
</list> 
</bts>'''.format(subid, chlid))
    time.sleep(1)
    returnvalue = newarerecv()
    if "finish" in returnvalue:
        return "finish"
    elif "working" in returnvalue:
        return "working"
    elif "stop" in returnvalue:
        return "stop"
    elif "pause" in returnvalue:
        return "pause"
    else:
        return returnvalue

In [2]:
neware_check(2, 5)

'finish'

## PLC OPCUA connect

In [3]:
from opcua import Client, ua
opc_client = Client("opc.tcp://192.168.1.100:4840")
opc_client.connect()

def opc_send(node: str, value: bool or int or list):
    time.sleep(0.2)
    if type(value) == bool:
        opc_client.get_node("ns=4;s={}".format(node)).set_value(ua.DataValue(ua.Variant(value, ua.VariantType.Boolean)))
    elif type(value) == int:
        opc_client.get_node("ns=4;s={}".format(node)).set_value(ua.DataValue(ua.Variant(value, ua.VariantType.Int16)))
    elif type(value) == list:
        # for our case, list is composed of float or string
        try:
            opc_client.get_node("ns=4;s={}".format(node)).set_value(ua.DataValue(ua.Variant(value, ua.VariantType.Float)))
        except:
            opc_client.get_node("ns=4;s={}".format(node)).set_value(ua.DataValue(ua.Variant(value, ua.VariantType.String)))
    time.sleep(0.2)

def opc_read(node: str):
    time.sleep(0.2)
    return opc_client.get_node("ns=4;s={}".format(node)).get_value()

def jigjob(job, index:int=0):
    """
    Job for the coincell jig
    job: Jig_Origin, Jig_Readypos, Jig_Insert, Jig_Retrieve, EIS_Insert, EIS_Retrieve
    * Jig: 0 ~ 47 (48 ch.) / EIS: 0 ~ 1 (2 ch.)
    index: position for Insert and Retrieve
    """
    opc_send("JOB", [job, str(index)])

Requested session timeout to be 3600000ms, got 60000ms instead


In [4]:
import pandas as pd

jig = pd.read_excel("C:/Users/HOME/Desktop/코인셀 지그.xlsx")
jig_position = jig.iloc[:, 1:7].values.tolist()
opc_send("Jigmove_csv", jig_position)
EIS_position = jig.iloc[0:2, 7:11].values.tolist()
opc_send("EISmove_csv", EIS_position)

## Biologics connect

In [5]:
import sys
sys.path.append("C:/Users/HOME/easy-biologic/src")

import easy_biologic as ebl
import easy_biologic.base_programs as blp

bl = ebl.BiologicDevice('192.168.1.128')
bl.connect()

In [6]:
# eis parameter
params = {
    'voltage': -3.56, # initial potential [Volts] will be changed to measured OCV
    'amplitude_voltage': 0.01, # Sinus amplitude [Volts]
    'initial_frequency': 200000, # [Hz]
    'final_frequency': 0.1,# [Hz]
    'frequency_number': 39, #39
    'duration': 0,
    'vs_initial': False, #default:False
    'repeat': 2, #default:1
    #'time_interval': 2, #default:1
    #'current_interval': 0.5, #default:0.001
    #'sweep': 1.0, #default:'log'
    #'correction': 1.0, #default:F}alse
    'wait': 0.1, #default:0
    } 

# ocv parameter
params_ocv = {'time':2, 'time_interval':1}

In [7]:
def ocv_eis(save_path, channel = 1):
    ocv = blp.OCV(bl, params_ocv, channels = [channel])
    ocv.run()
    voc = {
        ch: [datum.voltage for datum in data]
        for ch, data in ocv.data.items()
    }
    voc = {
        ch: sum(ch_voc) / len(ch_voc)
        for ch, ch_voc in voc.items()
    }

    params['voltage'] = list(voc.values())[0]
    peis = blp.PEIS(bl, params, channels = [channel])
    peis.run()
    peis.save_data(save_path)

## Check Status and automate EIS (자동 진행)

In [8]:
import datetime

def cycle_server(check_ch: list, eisauto = True, slack = True):
    checklist = []
    for i in check_ch:
        sub_id = i//8+1
        chl_id = i%8+1
        temp = [sub_id, chl_id]
        checklist.append(temp)
    
    while True:
        time.sleep(1)
        for n, i in enumerate(checklist):
            if neware_check(i[0], i[1]) == 'pause':
                
                jigjob("Jig_Retrieve", check_ch[n])
                jigjob("EIS_Insert", 0)
                
                while True:
                    time.sleep(1)
                    gantry_check = opc_read("DataJig.Gantry")
                    if gantry_check[0] == "Jig_Retrieve" and gantry_check[1] == str(check_ch[n]):
                        break

                while True:
                    time.sleep(1)
                    gantry_check = opc_read("DataJig.Gantry")
                    if gantry_check[0] == "EIS_Insert" and gantry_check[1] == str(0):
                        break
                time.sleep(75) # time for linear motor to go to the EIS channel
                    
                # filename
                today = datetime.datetime.now().strftime("%y%m%d%H%M%S")
                save_path = "C://EIS/{}_{}({}-{}).csv".format(today, check_ch[n], i[0], i[1])
                
                # python EIS
                print("Start OCV/EIS TEST")
                ocv_eis(save_path)
                
                # return cell to original position
                jigjob("EIS_Retrieve", 0)
                jigjob("Jig_Insert", check_ch[n])

                while True:
                    time.sleep(1)
                    gantry_check = opc_read("DataJig.Gantry")
                    if gantry_check[0] == "Jig_Insert" and gantry_check[1] == str(check_ch[n]):
                        break

                while True:
                    time.sleep(1)
                    gantry_check = opc_read("DataJig.Gantry")
                    if gantry_check[0] != "Jig_Insert" or gantry_check[1] != str(check_ch[n]):
                        break
                        
                neware_continue(i[0], i[1])

# Main

In [1]:
# start EIS automation process
cycle_server(range(48))

## Disconnect

In [2]:
bl.disconnect()

In [11]:
client_socket.close()

In [12]:
opc_client.disconnect()