In [1]:
import pandas as pd

In [3]:
def create_agilent_prep(spreadsheet_fn, batch_id, proposal_id=None, run_id=None, current_cycle=None, sheet_name='Samples'):
    """
    Creates an Agilent sample list from the spreadsheet. This will collate all the necessary parameters for Agilent and prep it to send over to HPLC.
    
    Args:
        spreadsheet_fn (str): File path of the spreadsheet.
        batch_id (str): Identifier for the batch.
        proposal_id (str, optional): Identifier for the proposal. Defaults to None.
        run_id (str, optional): Identifier for the run. Defaults to None.
        current_cycle (str, optional): Identifier for the current cycle. Defaults to None.
        sheet_name (str, optional): Name of the sheet in the spreadsheet. Defaults to 'Samples'.
    
    Returns:
        dict, dict: Samples dictionary, Valve position dictionary.
    """
    # Define the columns mapping for flexibility
    column_mapping = {
        "Vial": "Vial",
        "Sample Name": "Sample Name",
        "Injection Volume": "Volume",
        "Buffer": "Buffer",
        "Proc. method": "Proc. method",
        "Data file": "Data File",
        # Add more mappings as needed
    }
    
    # Read spreadsheet into a dictionary
    spreadsheet_data = parse_spreadsheet(spreadsheet_fn, sheet_name=sheet_name, return_dataframe=False)
    print(f"Spreadsheet data: {spreadsheet_data}")
    
    # Autofill the spreadsheet
    autofill_spreadsheet(spreadsheet_data, fields=["batchID"])
    
    # Check if login is required
    if proposal_id is None or run_id is None:
        print("Login is required...")
        login()
    
    # Get indices of rows with matching batch ID
    matching_indices = [i for i, value in spreadsheet_data['batchID'].items() if value == batch_id]
    print(f"Matching indices: {matching_indices}")
    
    # Initialize dictionaries for samples and valve positions
    samples = {}
    valve_positions = {}
    
    # Define data file path
    data_file_path = f"{current_cycle}/{proposal_id}/{run_id}/<S>" 
    
    for i in matching_indices:
        # Process each row
        for key in spreadsheet_data.keys():
            if key in column_mapping:
                # Map columns according to the defined mapping
                if not isinstance(spreadsheet_data[key][i], (int, float)):
                    raise Exception(f"Not a numeric value for {key}: {spreadsheet_data[key][i]}, replace with a number")
                spreadsheet_data[column_mapping[key]][i] = spreadsheet_data[key][i]
        
        # Get valve position
        valve_positions[i] = spreadsheet_data.get("Valve Position", {}).get(i)
        print(valve_positions)
        
        # Get sample name
        sample_name = spreadsheet_data.get('Sample Name', {}).get(i)
        if sample_name:
            samples[sample_name] = {
                "acq time": spreadsheet_data.get('Run Time', {}).get(i), 
                "valve_position": valve_positions[i],
                "md": {
                    "Column type": spreadsheet_data.get('Column type', {}).get(i),
                    "Injection Volume (ul)": spreadsheet_data.get('Volume', {}).get(i),
                    "Flow Rate (ml_min)": spreadsheet_data.get('Flow Rate', {}).get(i),
                    "Sample buffer": spreadsheet_data.get('Buffer', {}).get(i),
                    "Valve Position": valve_positions[i]
                }
            }
        
        # Set data file path
        spreadsheet_data["Data File"][i] = data_file_path
    
    # Define the sequence path
    sequence_path = "/nsls2/data/lix/legacy/HPLC/Agilent/"
    
    # Convert spreadsheet data to DataFrame
    df = pd.DataFrame.from_dict(spreadsheet_data, orient='columns')
    
    # Write DataFrame to CSV
    df[df['batchID'] == batch_id].to_csv(f"{sequence_path}sequence_table.csv", index=False, encoding="ASCII",
                                         columns=["Vial", "Sample Name", "Sample Type", "Volume", "Inj/Vial", "Acq. method", "Proc. method", "Data File"])
    
    return samples, flowpath, column_type, agilent_pump_channel, 

In [None]:
###send over individual runs to agilent
def send_hplc_run():
    """
    This will take all the necessary parameters and send over to hplc and initiate run.  This will also prepare beamline and wait for injection signal
    """
    pass

In [1]:
from time import sleep
from enum import Enum
import yaml
import pandas as pd

In [99]:
def get_sec_exp_param(experiment_type, column_type_name=None, column_position=None, buffer_position=None):
    """
    Read the sec_experiment_parameters YAML file, check if the experiment type exists, and return the experiment type and column type and positions in a dictionary.

    Args:
        experiment_type (str): Name of the experiment type to search for. This is obtained in Spreadsheet.

    Returns:
        str, dict: Experiment type and corresponding column type if found, otherwise None.
    """
    yaml_file = '/nsls2/data/lix/shared/config/bluesky/profile_collection/startup/devices/sec_experiment_parameters.yaml'  # Specify the fixed YAML file path here
    
    try:
        with open(yaml_file, 'r') as file:
            data = yaml.safe_load(file)
    except FileNotFoundError:
        print("Error: YAML file not found.")
        
    sec_exp_param = {}
    if column_position in [1,2] and 1 <= buffer_position <= 6:
        print("you chose correctly")
        sec_exp_param.update({"buffer_position" :buffer_position, "column_position": column_position})
        print(sec_exp_param)

        experiment_types = data.get('experiment_types', [])
        #print(experiment_types)
        for experiment in experiment_types:
                if experiment == experiment_type:
                    sec_exp_param["experiment_type"] = experiment_type
                if experiment != experiment_type:
                    print(f"Error: Experiment type '{experiment_type}' not found in YAML file.")
              ##chose column type
                if column_type_name is None:
                    print("NO column specified: Default to Superdex 200 Increase 5/150GL (small)")
                    sec_exp_param["column_type"] = "Superdex 200 Increase 5/150 GL"
                    print(sec_exp_param)
                else:
                    for column in column_types:
                        if column == column_type_name:
                            sec_exp_param["column_type"] = column_type_name
                    if column != column_type_name:
                        print(f"column name {column_type_name} is not in the list of approved columns! Approved columns are {column_types.keys()}")
                return sec_exp_param

                

In [103]:
get_sec_exp_param("X-ray_only", "Phenomenex dSEC2", column_position=2, buffer_position=6)

you chose correctly
{'buffer_position': 6, 'column_position': 2}


{'buffer_position': 6,
 'column_position': 2,
 'experiment_type': 'X-ray_only',
 'column_type': 'Phenomenex dSEC2'}

In [25]:
yaml_file = '/nsls2/data/lix/shared/config/bluesky/profile_collection/startup/devices/sec_experiment_parameters.yaml'  # Specify the fixed YAML file path here
try:
    with open(yaml_file, 'r') as file:
        data = yaml.safe_load(file)
except FileNotFoundError:
    print("Error: YAML file not found.")

In [26]:
experiment_types=data.get('experiment_types')
column_types=data.get('column_type')

In [27]:
experiment_types

{'X-ray_only': {'buffer_position': [1, 2, 3, 4, 5, 6],
  'column_position': [1, 2],
  'valve_position': ['A', 'B']},
 'X-ray_UV_MALS_RID': {'buffer_position': [1, 2, 3, 4, 5, 6],
  'column_position': [1, 2],
  'valve_position': ['A', 'B']},
 'X-ray_Regen': {'buffer_position': [1, 2, 3, 4, 5, 6],
  'column_position': [1, 2],
  'valve_position': ['A', 'B']},
 'UV_MALS_RID': {'buffer_position': [1, 2, 3, 4, 5, 6],
  'column_position': [1, 2],
  'valve_position': ['A', 'B']}}

In [28]:
column_types

{'Superdex 200 Increase 5/150 GL': {'pressure_limit': [45],
  'flowrate': [0.35],
  'run_time': [12]},
 'Superdex 200 Increase 10/300 GL': {'pressure_limit': [45],
  'flowrate': [0.5],
  'run_time': [50]},
 'Phenomenex dSEC2': {'pressure_limit': [125],
  'flowrate': [0.35],
  'run_time': [30]}}

In [30]:
for experiment in experiment_types:
    #print(experiment)
    if experiment:
        print(experiment)

X-ray_only
X-ray_UV_MALS_RID
X-ray_Regen
UV_MALS_RID


In [33]:
for column in column_types:
    #print(column)
    if column:
        print(type(column))

<class 'str'>
<class 'str'>
<class 'str'>


In [54]:
sec_exper_column=get_experiment_and_column_type(experiment_type="X-ray_only", column_type_name="Phenomenex dSEC2")

In [57]:
sec_exper_column.keys()

dict_keys(['experiment_type', 'column_type'])

In [64]:
def prepare_hplc_flowpath(experiment_type, column_position, buffer_position, columntype=None):
    """ This will need to also send the proper arguments to agilent so that it pulls from correct pump line
    """
    sec_exper_column = get_experiment_and_column_type(experiment_type, column_type_name=columntype, column_position=None, buffer_position=None)
    
    # Rest of the function logic goes here
    if experiment_type == "X-ray_only":
        valve_port = change_flowpath(column_position, column_info, buffer_position)
        print(valve_port)
    elif experiment_type == "X-ray_UV_MALS_RID":
        valve_port = change_flowpath(column_position, column_info, buffer_position)
        print(valve_port)
    elif experiment_type == "X-ray_Regen":
        valve_port = change_flowpath(column_position, column_info, buffer_position)
        print(valve_port)
    elif experiment_type == "UV_MALS_RID_only":
        valve_port = change_flowpath(column_position, column_info, buffer_position)
        print(valve_port)
    return flowpath

In [69]:
get_experiment_and_column_type

NO column specified: Default to Superdex 200 Increase 5/150GL (small)
{'buffer_position': None, 'column_position': None, 'experiment_type': 'X-ray_only', 'column_type': 'Superdex 200 Increase 5/150 GL'}


NameError: name 'change_flowpath' is not defined