### Specify Inputs

In [1]:
# Define the tray name(s) for filtering

batches = "UCB007|UCB008|UCB009|UCB010" #specify which batches you want to filter, leave blank if you want to see all batches (" ")

tray_name = ["0095584-1.6", "0086469-1.0-1"]   # prefix for metrology trays and thermoform trays

LastFewDays = 2 #the amount of days we want to check for scan history 

### Import Modules and Functions

In [2]:
#%% import pandas as pd
import pandas as pd
import numpy as np
from qsdc.client import Client
import met_client as app
from datetime import datetime, timedelta
import re
from openpyxl import load_workbook
from openpyxl.styles import PatternFill, Alignment


# Create the QuantumScape data client
qs_client = Client()
conn = qs_client.get_mysql_engine()


### Generate List of Cells in Recent Trays

In [None]:
# Convert tray_name list into a regular expression pattern
regexp_pattern = "|".join(tray_name)

# Query to get relevant data from the database
query = f"""
    SELECT *
    FROM production_2011_beta_1.tray_links
    WHERE barcode_data REGEXP '{regexp_pattern}'
"""

# Execute the query
data = pd.read_sql_query(query, conn)

# Filter the data by 'idtray' to keep only rows with 'tray_unit_cell_automation_4x3_001'
data = data[data['idtray'] == 'tray_unit_cell_automation_4x3_001']


# Convert the list 'batches' into a regex pattern
#batches_pattern = "|".join(map(re.escape, batches))  # Ensures special characters are escaped
# Apply the regex pattern in str.contains()
data = data[data['sample_name'].str.contains(batches, regex=True, na=False)]


# Keep only specific columns
keep_columns = ['barcode_data', 'sample_name', 'row_index', 'col_index', 'modified']
newdata_group_1 = data.loc[:, keep_columns]

# Rename 'barcode_data' to 'tray_id'
newdata_group_1 = newdata_group_1.rename(columns={'barcode_data': 'tray_id', 'modified':'Scan Time'})

# Convert 'col_index' to integers to remove the decimal point
newdata_group_1['col_index'] = newdata_group_1['col_index'].astype(int)

# Reorder columns to move 'sample_name' to the first position
newdata_group_1 = newdata_group_1[['sample_name', 'tray_id', 'row_index', 'col_index', 'Scan Time']]
newdata_group_1 = newdata_group_1.loc[newdata_group_1.groupby('sample_name')['Scan Time'].idxmax()]

# Sort the data with 'modified' in descending order and other columns in ascending order
TrayHistory = newdata_group_1.sort_values(by=['Scan Time', 'tray_id', 'row_index', 'col_index'], 
                                             ascending=[False, True, True, True])


# Convert 'Scan Time' column to datetime format if not already
TrayHistory['Scan Time'] = pd.to_datetime(TrayHistory['Scan Time'])

# Compute the cutoff date
cutoff_date = datetime.now() - timedelta(days=LastFewDays)

# Filter rows where 'Scan Time' is within the last 'x' days abd 'sample_name' contains "US"
TrayHistory= TrayHistory[
    (TrayHistory['Scan Time'] >= cutoff_date) & 
    (TrayHistory['sample_name'].str.contains("US", na=False))
]




### Pull Cell Data from Datahub

In [4]:
## Query Data from Datahub

#Pull cell metrology data from datahub, both standard/auto metrology and manual review
dfctq = qs_client.data_hub.get_dataset(dataset = 'MFG-60L-UC-CTQ') ## standard metro review of unit cells
dfmr = qs_client.data_hub.get_dataset(dataset = 'MFG-60L-UC-MR') ## manual review of unit cells

#Pull geneology/multilayer info
dmlg = qs_client.data_hub.get_dataset(dataset = 'MFG-80L-ML-PRODUCTION') ##multilayer info (ML_id)
dmlg = dmlg.dropna(subset="US_id").drop_duplicates()
#dgen = qs_client.data_hub.get_dataset(dataset = 'MFG-MASTER-GENEALOGY') ##multilayer info (ML_id)
danc = qs_client.data_hub.get_dataset("MFG-UNIT-CELL-ANCESTRY")

#Pull Cathode Mass data
cathode_mass = qs_client.data_hub.get_dataset("MFG-50L-CATHODE-CTQ")
cathode_mass = cathode_mass[["PU_id", "cathode_dry_mass"]]
danc = danc[["US_id","PU_id"]].dropna(subset="US_id").drop_duplicates()
danc = danc.merge(cathode_mass, on = "PU_id", how = "left").sort_values(["US_id"])
danc = danc.rename(columns={'US_id': 'US_ID'}) 


#Pull cell electrical test data
dfc = qs_client.data_hub.get_dataset(dataset = 'MFG-60L-UNIT-CELL-TEST-CYCLE') ##electrical test data of unit cells
cols = [
    "US_id",
    "TestCycleStart_datetime_first",
    "TestCycleStart_datetime",
    "UCT_Version",
    "idtest_recipe",
    "RunIndex",
    "CycleIndex",
    "dvdt",
    "recipe_dvdt_range",
    "CeilingHoldTime",
    "CE",
    "CapacityChargeFraction",
    "CeilingRestVoltage",
    "AMSDcCapacity",
    "CycleFailure",
    "AnyFailure",
    "MedDcASR",
    "DischargeCapacity"
]

#Filter datahub rows by batches 
    # Keep cells from batches of interest and remove those that failed screening

yielded_dfctq = dfctq[dfctq['US_id'].isin(TrayHistory['sample_name'])]
yielded_dfmr = dfmr[dfmr['US_id'].isin(TrayHistory['sample_name'])]
batches = "CustomList"        

  return pd.read_csv(bytesIO(blob_bytes))


### Combine Automated and Manual Review Tiering Metrics

In [5]:
## Overwrite automated results with manual review results
# First, merge the DataFrames on 'US_id' to align rows
merged_df = yielded_dfctq.merge(yielded_dfmr[['US_id', 'edge_thickness_tier_us_mr', 'A1_anode_tier_top_us_mr', 'A1_anode_tier_bottom_us_mr',
                                              'cathode_alignment_custom_model_tier_us_mr', 'median_contour_catholyte_pct_us_mr', 'max_f2f_distance_us','disposition_mr', 'failure_modes_mr']], on='US_id', how='left')
# Then, overwrite 'edge_thickness_tier_us' in 'filtered_dfctq' where 'edge_thickness_tier_us_mr' has a value
merged_df['edge_thickness_tier_us'] = merged_df['edge_thickness_tier_us_mr'].combine_first(merged_df['edge_thickness_tier_us'])
# Then, overwrite 'A1_anode_tier_top_us' in 'filtered_dfctq' where 'A1_anode_tier_top_us_mr' has a value
merged_df['A1_anode_tier_top_us'] = merged_df['A1_anode_tier_top_us_mr'].combine_first(merged_df['A1_anode_tier_top_us'])
# Then, overwrite 'A1_anode_tier_bottom_us' in 'filtered_dfctq' where 'A1_anode_tier_bottom_us_mr' has a value
merged_df['A1_anode_tier_bottom_us'] = merged_df['A1_anode_tier_bottom_us_mr'].combine_first(merged_df['A1_anode_tier_bottom_us'])
# Then, overwrite 'A1_anode_tier_bottom_us' in 'filtered_dfctq' where 'A1_anode_tier_bottom_us_mr' has a value
merged_df['cathode_alignment_custom_model_tier_us'] = merged_df['cathode_alignment_custom_model_tier_us_mr'].combine_first(merged_df['cathode_alignment_custom_model_tier_us'])
#merged_df.loc[merged_df['cathode_alignment_custom_model_tier_us_mr'].notna(), 'cathode_alignment_custom_model_tier_us'] = \
    #merged_df['cathode_alignment_custom_model_tier_us_mr']

# Then, overwrite 'A1_anode_tier_bottom_us' in 'filtered_dfctq' where 'A1_anode_tier_bottom_us_mr' has a value
merged_df['median_contour_catholyte_pct_us'] = merged_df['median_contour_catholyte_pct_us_mr'].combine_first(merged_df['median_contour_catholyte_pct_us'])
# Then, overwrite 'disposition_us' in 'disposition_mr' has a value
merged_df['disposition'] = merged_df['disposition_mr'].combine_first(merged_df['disposition'])
# Then, overwrite 'disposition_us' in 'disposition_mr' has a value
merged_df['failure_modes'] = merged_df['failure_modes_mr'].combine_first(merged_df['failure_modes'])
# Drop the 'edge_thickness_tier_us_mr' column if you don't need it
dfctq_updated = merged_df.drop(columns=['edge_thickness_tier_us_mr', 'A1_anode_tier_top_us_mr', 'A1_anode_tier_bottom_us_mr',
                                              'cathode_alignment_custom_model_tier_us_mr', 'median_contour_catholyte_pct_us_mr', 'disposition_mr','failure_modes_mr' ])


#Update Final Tier of Cells
conditions = [
    dfctq_updated['disposition'] == 'Tier 1',
    dfctq_updated['disposition'] == 'Tier 2',
    dfctq_updated['disposition'] == 'Fail',
    dfctq_updated['disposition'] == 'Scrap',
    dfctq_updated['disposition'] == 'Missing Data',
]
choices = ['1', '2', '3','Scrapped', 'TBD']
dfctq_updated['Tier'] = np.select(conditions, choices)

### Tier Cells

In [6]:
## Fill in tiering metrics based on data pulled from datahub

## Rename Spreadsheet and sample/batch columns
CellTiering = dfctq_updated[['US_id']].rename(columns={'US_id': 'Cell ID'}) 
CellTiering['Batch'] = dfctq_updated['US_process_flow'] #Create Tiering Spreadsheet
# Create columns for final spreadsheet
new_columns = ["Cell Status", "Cell Tier", "Edge Wetting", "Thickness", "Alignment", "Anode","Film-to-Film Distance", "Cathode Mass"]
for col in new_columns:
    CellTiering[col] = np.nan


##Update Cell Tiering metrics in final spreadsheet
# Merge the dfctq_updated with the CellTiering
merged_df = CellTiering.merge(
    dfctq_updated[['US_id', 'Tier', 'median_contour_catholyte_pct_us', 'edge_thickness_tier_us', 'center_normalized_0_5mm_eroded_rect_outside_median_us', 
                   'cathode_alignment_custom_model_tier_us', 'A1_anode_tier_top_us','A1_anode_tier_bottom_us', 'max_f2f_distance_us']],
    left_on='Cell ID',
    right_on='US_id',
    how='left'
)
# Update final 'Cell Tier' columns with data from dfctq_updated
merged_df['Cell Tier'] = merged_df['Tier']
#Update 'Edge Wetting' column
EWconditions = [
    merged_df['median_contour_catholyte_pct_us'] < 80,
    (merged_df['median_contour_catholyte_pct_us'] >= 80) & (merged_df['median_contour_catholyte_pct_us'] <= 98),
    merged_df['median_contour_catholyte_pct_us'] > 98
]
EWchoices = [3, 2, 1]
merged_df['Edge Wetting'] = np.select(EWconditions, EWchoices)
#Update 'Thickness' Column
merged_df['Thickness'] = merged_df['edge_thickness_tier_us']
#Update Alignment' Column
merged_df['Alignment'] = merged_df['cathode_alignment_custom_model_tier_us']
#Update 'Anode' Column
merged_df['Anode'] = np.maximum(merged_df['A1_anode_tier_top_us'], merged_df['A1_anode_tier_bottom_us'])
#Update 'Film-to-Film Distance Column
merged_df.loc[merged_df['max_f2f_distance_us'] > 0.40, 'Film-to-Film Distance'] = 'High' #merged_df['Film-to-Film Distance'] = 'Low'  # Default value
#Update the 'Cathode Mass' Column
merged_df = merged_df.merge(danc[['US_ID', 'cathode_dry_mass']], left_on='Cell ID', right_on='US_ID', how='left')
merged_df['Cathode Mass'] = merged_df['cathode_dry_mass']

# Drop the extra columns from dfctq_updated
CellTiering = merged_df.drop(columns=['US_id', 'Tier', 'edge_thickness_tier_us','cathode_alignment_custom_model_tier_us',
                                      'A1_anode_tier_top_us','A1_anode_tier_bottom_us','max_f2f_distance_us', 'US_ID', 'cathode_dry_mass'])



## Asign Final Tier to every cell
# Select columns that we are considering for tiering
columns_to_consider = ['Alignment', 'Anode', 'Thickness', 'Edge Wetting']
CellTiering['Alignment'] = CellTiering['Alignment'].fillna(0)  # Replace NaN with 0 (or another placeholder)
CellTiering['Alignment'] = CellTiering['Alignment'].astype(int) #convert to integer
CellTiering['Anode'] = CellTiering['Anode'].fillna(0)  # Replace NaN with 0 (or another placeholder)
CellTiering['Anode'] = CellTiering['Anode'].astype(int) #convert to intege
CellTiering['Thickness'] = CellTiering['Thickness'].fillna(0)  # Replace NaN with 0 (or another placeholder)
CellTiering['Thickness'] = CellTiering['Thickness'].astype(int) #convert to integer
CellTiering['Edge Wetting'] = CellTiering['Edge Wetting'].fillna(0)
CellTiering['Edge Wetting'] = CellTiering['Edge Wetting'].astype(int) #convert to integer
#Fill out Cell Tier
CellTiering['Cell Tier'] = CellTiering[columns_to_consider].max(axis=1) #Update cell tier based on avaialable tiering metrics
missing_or_zero = (CellTiering[columns_to_consider].isnull() | (CellTiering[columns_to_consider] == 0)).any(axis=1) #Identify cells that are missing data
CellTiering.loc[missing_or_zero, 'Cell Tier'] = 0 # Update 'Cell Status' for these rows

RecentCellTiers = TrayHistory.merge(CellTiering, left_on='sample_name', right_on='Cell ID', how='left')
RecentCellTiers['Matrix'] = RecentCellTiers['Alignment'].apply(lambda x: 'Done' if x in [1, 2, 3] else 'Waiting for Matrix')
RecentCellTiers['HiFi'] = RecentCellTiers['Anode'].apply(lambda x: 'Done' if x in [1, 2, 3] else 'Waiting for HiFi')
RecentCellTiers = RecentCellTiers[['sample_name', 'Batch', 'tray_id', 'row_index', 'col_index', 'Scan Time','Matrix', 'HiFi', 'Cell Tier']] 


current_date = datetime.now().strftime('%Y-%m-%d')
output_name = f'{current_date}_Sorting list from last {LastFewDays} days.xlsx'
RecentCellTiers.to_excel(output_name, index=False)
RecentCellTiers

# Save DataFrame to Excel
output_name = f'{current_date}_Sorting list from last {LastFewDays} days.xlsx'
RecentCellTiers.to_excel(output_name, index=False)

# Load the workbook and worksheet
wb = load_workbook(output_name)
ws = wb.active

# Define color mappings
tier_colors = {
    0: "CECECE",  # Grey
    1: "7DDA58",  # Light Green
    2: "FFECA1",  # Yellow
    3: "EFC3CA"   # Red
}

# Find the column index of "Cell Tier"
tier_column_index = None
for col_num, col_cells in enumerate(ws.iter_cols(min_row=1, max_row=1), start=1):
    if col_cells[0].value == "Cell Tier":
        tier_column_index = col_num
        break

# Apply color formatting to the entire row & center-align text
if tier_column_index:
    for row in ws.iter_rows(min_row=2, max_row=ws.max_row):
        cell = row[tier_column_index - 1]  # Get the "Cell Tier" cell
        if cell.value in tier_colors:
            fill = PatternFill(start_color=tier_colors[cell.value], end_color=tier_colors[cell.value], fill_type="solid")
            for cell_in_row in row:  # Apply color to the entire row
                cell_in_row.fill = fill
                cell_in_row.alignment = Alignment(horizontal='center', vertical='center')  # Center-align text

# Auto-adjust column widths based on content length
for col in ws.columns:
    max_length = 0
    col_letter = col[0].column_letter  # Get column letter (e.g., 'A', 'B', etc.)
    for cell in col:
        try:
            if cell.value:
                max_length = max(max_length, len(str(cell.value)))
        except:
            pass
    adjusted_width = max_length + 2  # Add padding for readability
    ws.column_dimensions[col_letter].width = adjusted_width

# Save the workbook
wb.save(output_name)

