In [1]:
from sqlalchemy import create_engine, func
from sqlalchemy.orm import sessionmaker
from models import Base
from utils.general import generate_uppercase_alphabets
from utils.create import add_well_type_to_plate_type, add_lib_well_to_plate, add_xtal_wells_to_plate, transfer_xtal_to_pin
from utils.read import get_or_create, get_xtal_well

engine = create_engine("sqlite:///../test/test2.db")
Session = sessionmaker(bind=engine)
session = Session()

Base.metadata.create_all(engine)

In [2]:
from models import (
    Base,
    Project,
    LibraryPlateType,
    LibraryWellType,
    LibraryPlate,
    LibraryWell,
    XtalPlate,
    XtalPlateType,
    XtalWellType,
    XtalWell,
    WellMap,
    Batch,
    DropPosition,
    EchoTransfer,
    XrayStatusEnum,
    PuckType,
    Puck,
    Pin,
)
import pandas
from sqlalchemy.exc import NoResultFound
DATABASE_SETUP_ONLY = True

## Add Project

In [3]:
project = Project(target="mpro", year=2023, cycle=2, visit=1)
session.add(project)


## Add Library Plate type

In [4]:
LIB_PLATE_ROWS = 32
LIB_PLATE_COLS = 48
lib_plate_type = LibraryPlateType(name="1536LDV", rows=LIB_PLATE_ROWS, columns=LIB_PLATE_COLS)
session.add(lib_plate_type)
a_to_af = generate_uppercase_alphabets(LIB_PLATE_ROWS)
lib_well_type_names = [
        {"name": f"{i}{str(j).zfill(2)}"} for i in a_to_af for j in range(1, LIB_PLATE_COLS+1)
    ]
add_well_type_to_plate_type(session, lib_well_type_names, lib_plate_type)



### Create library plate and add associate chemicals with it

In [5]:
lib_plate = LibraryPlate(library_plate_type=lib_plate_type, name="DSI-poised")
session.add_all([lib_plate_type, lib_plate])

# Read the vendor csv and populate the library plate
df = pandas.read_csv("../test/dsip.csv")
add_lib_well_to_plate(session, df, lib_plate)

### Create Xtal plate type and add an echo to shifter mapping to the plate type

In [6]:
# Create and add xtal plate
xtal_plate_type = XtalPlateType(name="SwissCI-MRC-2d")
session.add(xtal_plate_type)

# Create lists that map echo wells to shifter wells 
a_to_h = generate_uppercase_alphabets(8)
a_to_p = generate_uppercase_alphabets(16)
echo = [f"{i}{j}" for i in a_to_p for j in range(1, 17)]
shifter = [f"{i}{k}{j}" for i in a_to_h for j in ["a", "b"] for k in range(1, 13)]
plate_maps = [{"echo": i, "shifter": j} for i, j in zip(echo, shifter)]

# Loop through plate map list, if the shifter well name ends with a "b", add a y offset
for plate_map in plate_maps:
    x_offset = 0
    if plate_map["shifter"][-1] == "b":
        y_offset = 1350  # microns
    else:
        y_offset = 0
    well_map = WellMap(well_pos_x=x_offset, well_pos_y=y_offset, **plate_map)
    session.add(well_map)
    xtal_well_type = XtalWellType(name=plate_map["shifter"], well_map=well_map)
    session.add(xtal_well_type)


### Add drop position codes and offsets to the database

In [7]:
drop_position_codes = [
    {"name": "ul", "x_offset": -300, "y_offset": 300},
    {"name": "lu", "x_offset": -300, "y_offset": 300},
    {"name": "u", "x_offset": 0, "y_offset": 300},
    {"name": "ru", "x_offset": 300, "y_offset": 300},
    {"name": "ur", "x_offset": 300, "y_offset": 300},
    {"name": "l", "x_offset": -300, "y_offset": 0},
    {"name": "c", "x_offset": 0, "y_offset": 0},
    {"name": "r", "x_offset": 300, "y_offset": 0},
    {"name": "dl", "x_offset": -300, "y_offset": -300},
    {"name": "ld", "x_offset": -300, "y_offset": -300},
    {"name": "dr", "x_offset": 300, "y_offset": -300},
    {"name": "rd", "x_offset": 300, "y_offset": -300},
    {"name": "d", "x_offset": -300, "y_offset": 0},
]

session.add_all(
    [
        DropPosition(**drop_position_code)
        for drop_position_code in drop_position_codes
    ]
)

### The rest of the steps described here are used for reference. It shows code that populates the database during an actual experimental run. It is not part of the database setup

#### Steps to add experiment info to database: 
- Use the imaging csv from the shifter to create an instance of an Xtal plate

In [8]:
if not DATABASE_SETUP_ONLY:
    imaging_df = pandas.read_csv("../test/imaging.csv", skiprows=8)
    # shifter app leaves semi-colons in front of all header text, fix that
    # add some light spreadsheet validation (unique plate_ids)
    imaging_df.rename(columns={";PlateType": "PlateType"}, inplace=True)
    unique_plate_ids = imaging_df["PlateID"].unique()
    if len(unique_plate_ids) != 1:
        raise ValueError("inconsistent plateIDs; plateIDs should all be the same")

    xtal_plate = XtalPlate(plate_type=xtal_plate_type, name=unique_plate_ids[0])
    session.add(xtal_plate)
    add_xtal_wells_to_plate(session, imaging_df, xtal_plate)

    # create a batch
    batch = Batch()
    session.add(batch)
    # populate transfers
    # need to ensure that we go row by row through xtal plate, column
    # by column through library plate to minimize xy motions during transfer
    # added sequence column to well tables to enforce this ordering
    xtal_well_query = (
        session.query(XtalWell)
        .order_by(XtalWell.sequence)
        .filter(XtalWell.plate.has(name="pmtest"))
    )

    library_well_query = (
        session.query(LibraryWell)
        .order_by(LibraryWell.sequence)
        .filter(LibraryWell.plate.has(name="DSI-poised"))
    )

    for xtal_well, library_well in zip(xtal_well_query, library_well_query):
        transfer = EchoTransfer(
            batch=batch, from_well=library_well, to_well=xtal_well, transfer_volume=25
        )
        session.add(transfer)

    # prepare to ingest shifter harvesting csv
    # make sure there are pucks (and puck_types)
    puck_names = [{"name": name} for name in ["testpuck", "FGZ001", "FGZ002"]]
    for puck_name in puck_names:
        get_or_create(session, PuckType, **puck_name)

    harvesting_df = pandas.read_csv("../test/harvesting.csv", skiprows=8)
    harvesting_df.rename(columns={";PlateType": "PlateType"}, inplace=True)

    puck_data = [
        {"puck_type": session.query(PuckType).filter_by(name=k).one()}
        for k in harvesting_df["DestinationName"].dropna().unique()
    ]
    for puck in puck_data:
        get_or_create(session, Puck, **puck)

    for index, row in harvesting_df.iterrows():
        # this entry will be populated if something happened at the well,
        # successful or not
        if pandas.notna(row["Comment"]):
            shifter_well_pos = (
                f"{row['PlateRow']}{row['PlateColumn']}{row['PositionSubWell']}"
            )
            xtal_well = get_xtal_well(session, row["PlateID"], shifter_well_pos)
            xtal_well.harvest_comment = row["Comment"]
            xtal_well.harvesting_status = True
            xtal_well.time_arrival = pandas.to_datetime(
                row["TimeArrival"], format="%d/%m/%Y %H:%M:%S"
            )
            echo_transfer = (
                session.query(EchoTransfer)
                .filter(EchoTransfer.to_well_id == xtal_well.uid)
                .first()
            )
            if echo_transfer:
                library_well = echo_transfer.from_well
                library_well.used = True

            # this entry will only be populated if the pin made it into the puck
            if pandas.notna(row["DestinationLocation"]):
                transfer_xtal_to_pin(session, xtal_well, row["DestinationName"], 
                                    row["DestinationLocation"], row["TimeDeparture"])

In [9]:
len(session.query(XtalWell).filter(
            XtalWell.plate.has(name="pmtest"),
            #XtalWell.well_type.has(name=shifter_well_pos),
        ).all())

0

In [10]:
session.query(XtalWellType).filter_by(name="A1a").one()

<models.xtal_plate.XtalWellType at 0x7fb8f2488d90>

In [11]:
session.commit()