# Helper Functions, will eventually live in pascal module

In [1]:
import numpy as np
import itertools
import random

#### name parsing helper functions

def components_to_name(components, delimiter = '_'):
    composition_label = ''
    for c, n in components.items():
        if n > 0:
            composition_label += '{0}{1:.2f}{2}'.format(c, n, delimiter)

    return composition_label[:-1]

def name_to_components(name, factor = 1, delimiter = '_',):
    '''
    given a chemical formula, returns dictionary with individual components/amounts
    expected name format = 'MA0.5_FA0.5_Pb1_I2_Br1'. 
    would return dictionary with keys ['MA, FA', 'Pb', 'I', 'Br'] and values [0.5,.05,1,2,1]*factor
    '''
    components = {}
    for part in name.split(delimiter):
        species = part
        count = 1.0
        for l in range(len(part), 0, -1):
            try:
                count = float(part[-l:])
                species = part[:-l]
                break
            except:
                pass
        components[species] = count * factor
    return components

#### individual solution functions

def get_components(name, factor = 1, delimiter = '_',):
    components = {}
    for part in name.split(delimiter):
        species = part
        count = 1.0
        for l in range(len(part), 0, -1):
            try:
                count = float(part[-l:])
                species = part[:-l]
                break
            except:
                pass
        components[species] = count * factor
    return components

def calculate_mix(target, volume, solution_sources):
    '''
    given a target composition, target volume, and stock solution sources, calculates the volumes needed
    from individual stocks to achieve target composition

    target: target composition. Will be passed to name_to_components()
            Example: 'MA0.5_FA0.5_Pb1_I2_Br1'
    volume: target volume, in L
    solution_sources: dictionary with stock solution compositions, molarities, solvents, well positions, and available volumes labeled.
            Example:
                    solution_sources = {
                            'MA_Pb_I3': dict(well = 'A1', molarity = 2, solvent = dict(DMSO = 9, DMF = 1), volume = 15e-3),
                            'Cs_I': dict(well = 'A2', molarity = 3, solvent = dict(DMSO = 1), volume = 15e-3),
                            'FA_Pb_I3': dict(well = 'A3', molarity = 1.5, solvent = dict(DMSO = 9, DMF = 1), volume = 15e-3)
                        }
    '''
    target_composition = name_to_components(target)
    wells = [solution_properties['well'] for solution_name, solution_properties in solution_sources.items()]
    num_solutions = len(solution_sources)
    components = list(target_composition.keys())
    num_components = len(components)

    solution_matrix = np.zeros((num_components, num_solutions))    
    for n, (solution_name, solution_properties) in enumerate(solution_sources.items()):
        solution_components = get_components(solution_name, factor = solution_properties['molarity'])
        for m, component_name in enumerate(components):
            if component_name in solution_components:
                solution_matrix[m,n] = solution_components[component_name]
    
    target_matrix = np.zeros((num_components, ))
    for m, (component_name, component_amount) in enumerate(target_composition.items()):
        target_matrix[m] = component_amount
    
    amount_matrix = np.linalg.lstsq(solution_matrix, target_matrix, rcond = None)[0]
    amount_matrix[amount_matrix < 1e-6] = 0 #clean up values that are essentially 0. If we have a significant negative value here, should get caught downstream
    doublecheck = solution_matrix @ amount_matrix
    if np.linalg.norm((doublecheck - target_matrix))/np.linalg.norm(target_matrix) < 0.01: #check that we are within 1% error wrt target composition
        results = {}
        # for solution, solution_volume in zip(solutions, amount_matrix): 
        #     results[solution] = solution_volume * volume
        for well, solution_volume in zip(wells, amount_matrix): 
            results[well] = np.round(solution_volume * volume, 6) # round to nearest uL
    else:
        results = False
        print('Error: Unable to generate target solution with current stock solutions.')
        # raise Exception('Unable to generate target solution with current stock solutions.')
    return results
        

#### combining functions to generate experiment mesh

def compositions_spread(compositions, n):
    composition_components = [name_to_components(s) for s in compositions]
    components = []
    for s in composition_components:
        components += list(s.keys())
    components = np.unique(components)
    
    mat = np.zeros((len(compositions), len(components)))
    for sidx, s in enumerate(composition_components):
        for cidx, c in enumerate(components):
            if c in s:
                mat[sidx, cidx] = s[c]
    
    compositions = []
    for mix in itertools.combinations_with_replacement(mat, n):
        composition_amounts = np.array(mix).mean(axis = 0)
        composition_label = ''
        for c, a in zip(components, composition_amounts):
            if a > 0:
                composition_label += '{0}{1:.3f}_'.format(c, a)
        compositions.append(composition_label[:-1]) #exclude the last underscore

    return list(np.unique(compositions))

def spincoat_spread(spincoats, method, n = None):
    mat = np.array(spincoats)
    
    if method == 'interpolate':
        if n is None:
            raise Exception('If method = "interpolate", n must be specified.')
        spincoats = []
        for mix in itertools.combinations_with_replacement(mat, n):
            spincoats.append(np.array(mix).mean(axis = 0))
    elif method == 'permute':
        unique_steps = []
        for i in range(mat[0].shape[0]):
            steps = np.array([c[i] for c in mat])
            unique_steps.append(np.unique(steps, axis = 0))
        spincoats = list(np.vstack(rows) for rows in itertools.product(*unique_steps))
    else:
        raise Exception('Invalid method provided - must be "interpolate" or "permute"')
        
    return np.unique(spincoats, axis = 0)

def anneal_spread(anneals, method, n = None):
    mat = np.array(anneals)
    
    if method == 'interpolate':
        if n is None:
            raise Exception('If method = "interpolate", n must be specified.')
        anneals = []
        for mix in itertools.combinations_with_replacement(mat, n):
            anneals.append(np.array(mix).mean(axis = 0))
    elif method == 'permute':
        anneals = np.array(list(itertools.product(*[c for c in mat.T])))
    else:
        raise Exception('Invalid method provided - must be "interpolate" or "permute"')
    
    return np.unique(anneals, axis = 0)

C:\ProgramData\Anaconda3\lib\site-packages\numpy\.libs\libopenblas.GK7GX5KEQ4F6UYO3P26ULGBQYHGQO7J4.gfortran-win_amd64.dll
C:\ProgramData\Anaconda3\lib\site-packages\numpy\.libs\libopenblas.JPIJNSWNNAN3CE6LLI5FWSPHUT2VXMTH.gfortran-win_amd64.dll


In [2]:
def well_list_generator(nrows = 12, ncols = 8):
    num = 0
    col = -1
    while num < nrows*ncols:
        row = num%nrows
        if row == 0:
            col += 1
        yield f'{str.upper(chr(col+97))}{row+1}'
        num += 1

def generate_sample_list(compositions, spincoats, anneals, sample_volume = 100e-6, max_volume = 1e-3, randomize = True, repeats = 1):  
    samples = []
    wells = {}
    current_wells = {}
    well_generator = well_list_generator() 
    
    def start_new_well(composition, volume):
        this_well = well_generator.__next__()
        current_wells[c] = this_well
        wells[this_well] = {'composition': composition, 'volume': volume}  
        return this_well
        
    for c, sc, an in itertools.product(compositions, spincoats, anneals):
        if c not in current_wells: #we havent dedicated a well to mix this composition yet - lets set one
            this_well = start_new_well(c, sample_volume)
        else: 
            this_well = current_wells[c]
            if wells[this_well]['volume'] + sample_volume > max_volume:
                this_well = start_new_well(c, sample_volume)
            else:
                wells[this_well]['volume'] += sample_volume
        for r in range(repeats):
            samples.append(dict(composition = c, spincoat = sc, anneal = an, well = this_well))
    if randomize:
        random.shuffle(samples)
    return samples, wells

Recipes

In [11]:
class SpincoatRecipe:
    def __init__(self, steps: list, perovskite_droptime: float, antisolvent_droptime: float):
        """

        Args:
            steps (list): nested list of steps:
            [
                [speed, acceleration, duration],
                [speed, acceleration, duration],
                ...,
                [speed, acceleration, duration]
            ] 
            where speed = rpm, acceleration = rpm/s, duration = s (including the acceleration ramp!)

            perovskite_drop_time (float): timing (seconds) relative to first spin step start to drop precursor solution on substrate. negative values imply dropping the solution prior to spinning (static spincoat)
            antisolvent_drop_time (float): timing (seconds) relative to first spin step start to drop antisolvent on substrate.
        """
        self.steps = np.asarray(steps)
        if self.steps.shape[1] != 3:
            raise ValueError("steps must be an nx3 nested list/array where each row = [speed, acceleration, duration].")
        
        self.perovskite_droptime = perovskite_droptime
        self.antisolvent_droptime = antisolvent_droptime
        first_drop_time = min(perovskite_droptime, antisolvent_droptime)

        self.start_times = [max(0, -first_drop_time)] #push back spinning times to allow static drop beforehand
        for duration in self.steps[:-1,2]:
            self.start_times.append(self.start_times[-1] + duration)
        
        self.duration = self.steps[:,2].sum() + 10 #total duration + 10 seconds for stopping
        
    def __repr__(self):
        output = '<SpincoatingRecipe>\n'
        output += f'Perovskite drops at {self.perovskite_droptime} s\n'
        output += f'Antisolvent drops at {self.antisolvent_droptime} s\n'
        currenttime = 0
        psk_dropped = False
        as_dropped = False
        for (rpm, accel, duration) in self.steps:
            output += f'{currenttime}-{currenttime+duration}s:\t{rpm:.0f} rpm, {accel:.0f} rpm/s'
            currenttime += duration
            if not psk_dropped and self.perovskite_droptime <= currenttime:
                output += ' (perovskite dropped)'
                psk_dropped = True
            if not as_dropped and self.antisolvent_droptime <= currenttime:
                output += ' (antisolvent dropped)'
                as_dropped = True
            output += '\n'
        return output[:-1]

In [12]:
class AnnealRecipe:
    def __init__(self, duration: float, temperature: float):
        """

        Args:
            duration (float): duration (seconds) to anneal the sample
            temperature (float): temperature (C) to anneal the sample at
        """
        self.duration = duration
        self.temperature = temperature
    
    def __repr__(self):
        output = '<AnnealRecipe>\n'
        output += f'{self.temperature:.2f} C\n'
        if self.duration >= 3600:
            output += f'{self.duration/3600:.2f} hours' 
        elif self.duration >= 60:
            output += f'{self.duration/60:.2f} minutes' 
        else:
            output += f'{self.duration:.2f} seconds' 
        return output

In [14]:
import collections
storage_slot = collections.namedtuple("storage_slot", "tray slot")
move_type = collections.namedtuple("move_type", "machines duration")

class Sample:
    def __init__(self, name, spincoat_recipe: SpincoatRecipe, anneal_recipe: AnnealRecipe):
        self.name = name
        self.storage_slot = storage_slot(None, None) #tray, slot that sample is stored in. Initialized to None, will be filled when experiment starts
        self.spincoat_recipe = spincoat_recipe
        self.anneal_recipe = anneal_recipe
        self._generate_moves()
        self.status = 'not started'
        
    def _generate_moves(self):
        """
        dictionary of all steps this sample will go through for experiment
        """
        self.moves = collections.OrderedDict()
        
        self.moves['storagetospincoater'] = move_type(machines=["Gantry", "Spincoater"], duration=30)
        self.moves['spincoat'] = move_type(machines=["Spincoater"], duration=self.spincoat_recipe.duration)
        self.moves['spincoatertohotplate'] = move_type(machines=["Gantry", "Spincoater"], duration=30)
        self.moves['anneal'] = move_type(machines=["Hotplate"], duration=self.anneal_recipe.duration)
        self.moves['hotplatetostorage'] = move_type(machines=["Gantry"], duration=30)
        self.moves['cooldown'] = move_type(machines=["Storage"], duration=60*2)
        self.moves['storagetocharacterization'] = move_type(machines=["Gantry", "Characterization"], duration=30)
        self.moves['characterization'] = move_type(machines=["Characterization"], duration=60*4)
        self.moves['characterizationtostorage'] = move_type(machines=["Gantry", "Characterization"], duration=30)
    
    def __repr__(self):
        output = '<Sample>\n'
        output += f'name:\t{self.name}\n'
        output += f'status:\t{self.status}\n'
        output += f'{self.storage_slot}\n'
        output += f'\n{sc[0]}\n\n{an[0]}'
        return output

# Define Experimental Mesh

### Target Compositions

In [3]:
endpoint_compositions = [
    'Cs_Pb_I3', 
    'FA_Pb_I3', 
    'MA_Pb_I3'
]


compositions = compositions_spread(endpoint_compositions, 3)
print(f'==={len(compositions)} compositions===')
for c in compositions: print(c)

===10 compositions===
Cs0.333_FA0.333_I3.000_MA0.333_Pb1.000
Cs0.333_FA0.667_I3.000_Pb1.000
Cs0.333_I3.000_MA0.667_Pb1.000
Cs0.667_FA0.333_I3.000_Pb1.000
Cs0.667_I3.000_MA0.333_Pb1.000
Cs1.000_I3.000_Pb1.000
FA0.333_I3.000_MA0.667_Pb1.000
FA0.667_I3.000_MA0.333_Pb1.000
FA1.000_I3.000_Pb1.000
I3.000_MA1.000_Pb1.000


### Target Spincoating Conditions

In [22]:
endpoint_spincoats = [
    SpincoatRecipe(
        steps=[
            [1000,1000,10],
            [4000,2000,30]
        ],
        perovskite_droptime= -5,
        antisolvent_droptime=20),
    SpincoatRecipe(
        steps=[
            [1000,1000,10],
            [5000,2000,30]
        ],
        perovskite_droptime= -5,
        antisolvent_droptime=15),
    SpincoatRecipe(
        steps=[
            [1000,1000,10],
            [4000,2000,30]
        ],
        perovskite_droptime= -5,
        antisolvent_droptime=10)
]

In [23]:
# spincoats = spincoat_spread(endpoint_spincoats, method = 'permute', n = 2)
# print(f'==={len(spincoats)} spincoats===')
# for c in spincoats: print(f'{c}\n')
spincoats = endpoint_spincoats

### Target Annealing Conditions

In [24]:
endpoint_anneals = [
    AnnealRecipe(
        duration=10*60,
        temperature=100),
    AnnealRecipe(
        duration=15*60,
        temperature=100),
    AnnealRecipe(
        duration=20*60,
        temperature=100),
]

# endpoint_anneals = [
#     [80, 40*60], #temperature (C), duration (s)
#     [100, 20*60],
# ]

In [25]:
# anneals = anneal_spread(endpoint_anneals, method = 'permute')
# print(f'=== {len(anneals)} anneals ===')
# for c in anneals: 
#     print(f'{c}')
anneals = endpoint_anneals

In [26]:
samples, mixing_wells = generate_sample_list(compositions, spincoats, anneals, max_volume = 1000e-6)
print(f'{len(samples)} Samples')

90 Samples


In [27]:
samples[0]

{'composition': 'FA0.667_I3.000_MA0.333_Pb1.000',
 'spincoat': <SpincoatingRecipe>
 Perovskite drops at -5 s
 Antisolvent drops at 15 s
 0-10s:	1000 rpm, 1000 rpm/s (perovskite dropped)
 10-40s:	5000 rpm, 2000 rpm/s (antisolvent dropped),
 'anneal': <AnnealRecipe>
 100.00 C
 20.00 minutes,
 'well': 'A8'}

In [29]:
samples[1]

{'composition': 'Cs0.333_FA0.333_I3.000_MA0.333_Pb1.000',
 'spincoat': <SpincoatingRecipe>
 Perovskite drops at -5 s
 Antisolvent drops at 15 s
 0-10s:	1000 rpm, 1000 rpm/s (perovskite dropped)
 10-40s:	5000 rpm, 2000 rpm/s (antisolvent dropped),
 'anneal': <AnnealRecipe>
 100.00 C
 20.00 minutes,
 'well': 'A1'}

## Stock Solution Setup

In [11]:
stock_solutions = {
    'MA_Pb_I3': dict(well = 'A1', molarity = 2, solvent = dict(DMSO = 9, DMF = 1), volume = 4e-3),
    'Cs_Pb_I3': dict(well = 'A2', molarity = 3, solvent = dict(DMSO = 1), volume = 4e-3),
    'FA_Pb_I3': dict(well = 'A3', molarity = 2, solvent = dict(DMSO = 9, DMF = 1), volume = 4e-3)
}
stock_wells = {v['well']:comp for comp,v in stock_solutions.items()}

In [12]:
min_volume_to_aspirate = 4e-4 #volume below which the liquid level is too low to properly aspirate. This sets a volume floor
necessary_stock = {v['well']:min_volume_to_aspirate for v in stock_solutions.values()}

for i, s in enumerate(samples):
    this_mix = calculate_mix(s['composition'], 150e-6, stock_solutions) 
    for well, amount in this_mix.items():
        necessary_stock[well] += amount
    samples[i]['stock_mixture'] = this_mix

print('=== Stock Solution Volume Check === ')
for well, amt_needed in necessary_stock.items():
    amt_in_well = stock_solutions[stock_wells[well]]['volume']
    amt_ratio = amt_in_well/amt_needed
    
    if amt_ratio > 2:
        status = 'Not taking any chances, are you?'
    elif amt_ratio > 1.1:
        status = 'OK'
    elif amt_ratio > 1:
        status = 'Cutting it close...'
    else:
        status = '!!!!! NOT ENOUGH STOCK !!!!!'
        
    print(f'{well} ({stock_wells[well]})\t{amt_in_well*1000:.2f}/{amt_needed*1000:.2f} mL\t{status}')

=== Stock Solution Volume Check === 
A1 (MA_Pb_I3)	4.00/4.40 mL	!!!!! NOT ENOUGH STOCK !!!!!
A2 (Cs_Pb_I3)	4.00/3.07 mL	OK
A3 (FA_Pb_I3)	4.00/4.40 mL	!!!!! NOT ENOUGH STOCK !!!!!


### Stock solution per well, assuming distributing from stock -> 96 well plate -> spincoating

In [25]:
transfers = {well:dict(destination_wells = [], transfer_volumes = [])  for well in stock_wells}
for destination_well, destination_well_contents in mixing_wells.items():
    need = calculate_mix(destination_well_contents['composition'], destination_well_contents['volume'], stock_solutions)
    for stock_well, stock_volume_to_transfer in need.items():
        if stock_volume_to_transfer > 0:
            transfers[stock_well]['destination_wells'].append(destination_well)
            transfers[stock_well]['transfer_volumes'].append(stock_volume_to_transfer)

# Initializing Hardware

In [None]:
from opentrons import protocol_api
import opentrons.execute

# metadata()
metadata = {
    'protocolName': 'My Protocol',
    'author': 'Name <email@address.com>',
    'description': 'Simple protocol to get started using OT2',
    'apiLevel': '2.6'
}


# protocol run function. the part after the colon lets your editor know
# where to look for autocomplete suggestions
# def run(protocol: protocol_api.ProtocolContext):

#     # labware
#     plate = protocol.load_labware('corning_96_wellplate_360ul_flat', '2')
#     tiprack = protocol.load_labware('opentrons_96_tiprack_300ul', '1')

#     # pipettes
#     left_pipette = protocol.load_instrument(
#          'p300_single', 'left', tip_racks=[tiprack])

#     # commands
#     left_pipette.pick_up_tip()
#     left_pipette.aspirate(100, plate['A1'])
#     left_pipette.dispense(100, plate['B2'])
#     left_pipette.drop_tip()

Initialize robot/protocol control + lab hardware

In [None]:
protocol = opentrons.execute.get_protocol_api('2.6')
protocol.home()

In [None]:
#labware
stock_wells = protocol.load_labware('FRG_4ml_v0', '1') #Labware identifier, deck position
mixing_wells = protocol.load_labware('96wellplate', '2')
tiprack = protocol.load_labware('opentrons_96_tiprack_300ul', '3')

#pipettes
left_pipette = protocol.load_instrument('p300_single', 'left', tip_racks = [tiprack])

# Automation Begin

Distribute stock solutions to well plate

In [None]:
for source_well, transfer_info in transfers.items():
    pipette.transfer(
        transfer_info['transfer_volumes'],
        stock_wells[source_well],
        [mixing_wells.wells_by_name()[well_name] for well_name in ['B1', 'B2', 'B3']])

Run the experiment!