<h1 style="font-size: 50px; color: black;">This notebook goal is to play with data, the solution is in original notebook</h1>

In [None]:
import pandas as pd
import zipfile
from datetime import datetime

In [None]:
#load zip file
zf = zipfile.ZipFile('data.zip') 

In [None]:
#load train data
df_wafers = pd.read_csv(zf.open('wafers_train.csv'))
df_wafers.head()

In [None]:
#load test data
df_wafers_test = pd.read_csv(zf.open('wafers_test.csv'))
df_wafers_test.head()

In [None]:
def plot_wafer_maps(wafer_df_list, figsize, labels = True):
    """
    plot wafer maps for list of df of wafers

    :param wafer_df_list: list, The list of df's of the wafers
    :param figsize: int, the size of the figsize height 
    :param labels: bool, Whether to show the layer of labels (based on column 'IsScratchDie')
    
    :return: None
    """
    def plot_wafer_map(wafer_df, ax, map_type):
        wafer_size = len(wafer_df)
        s = 2**17/(wafer_size)
        if map_type == 'Label':
            mes = 'Scratch Wafer' if (wafer_df['IsScratchDie'] == True).sum()>0 else 'Non-Scratch Wafer'
        else:
            mes = 'Yield: ' + str(round((wafer_df['IsGoodDie']).sum()/(wafer_df['IsGoodDie']).count(), 2)) 
        
        ax.set_title(f'{map_type} | Wafer Name: {wafer_df["WaferName"].iloc[0]}, \nSum: {len(wafer_df)} dies. {mes}', fontsize=20)
        ax.scatter(wafer_df['DieX'], wafer_df['DieY'], color = 'green', marker='s', s = s)

        bad_bins = wafer_df.loc[wafer_df['IsGoodDie'] == False]
        ax.scatter(bad_bins['DieX'], bad_bins['DieY'], color = 'red', marker='s', s = s)
        
        if map_type == 'Label':
            scratch_bins = wafer_df.loc[(wafer_df['IsScratchDie'] == True) & (wafer_df['IsGoodDie'] == False)]
            ax.scatter(scratch_bins['DieX'], scratch_bins['DieY'], color = 'blue', marker='s', s = s)

            ink_bins = wafer_df.loc[(wafer_df['IsScratchDie'] == True) & (wafer_df['IsGoodDie'] == True)]
            ax.scatter(ink_bins['DieX'], ink_bins['DieY'], color = 'yellow', marker='s', s = s)

            ax.legend(['Good Die', 'Bad Die', 'Scratch Die', 'Ink Die'], fontsize=8)
        else:
            ax.legend(['Good Die', 'Bad Die'], fontsize=8)

        #ax.axes.get_xaxis().set_visible(False)
        #ax.axes.get_yaxis().set_visible(False) 
    
    import numpy as np
    import matplotlib.pyplot as plt
    
    if labels:
        fig, ax = plt.subplots(2, len(wafer_df_list), figsize=(figsize*len(wafer_df_list), figsize*2))
        for idx1, wafer_df in enumerate(wafer_df_list):
            for idx2, map_type in enumerate(['Input', 'Label']):
                plot_wafer_map(wafer_df, ax[idx2][idx1], map_type)
    else:
        fig, ax = plt.subplots(1, len(wafer_df_list), figsize=(figsize*len(wafer_df_list), figsize))
        for idx, wafer_df in enumerate(wafer_df_list):
            plot_wafer_map(wafer_df, ax[idx], 'Input')

    plt.show()

In [None]:
n_samples = 4 
list_sample_train = [df_wafers.groupby('WaferName').get_group(group) for group in df_wafers['WaferName'].value_counts().sample(n_samples, random_state=20).index]
plot_wafer_maps(list_sample_train, figsize = 8, labels = True)

In [None]:
list_sample_test = [df_wafers_test.groupby('WaferName').get_group(group) for group in df_wafers_test['WaferName'].value_counts().sample(n_samples, random_state=20).index]
plot_wafer_maps(list_sample_test, figsize = 8, labels = False)

In [None]:
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from scipy.ndimage import binary_dilation, binary_erosion
from scipy.ndimage import convolve
from scipy.ndimage import label, find_objects
from sklearn.linear_model import RANSACRegressor
from sklearn.preprocessing import PolynomialFeatures
from sklearn.pipeline import make_pipeline
from scipy.spatial import cKDTree

In [None]:
# get number of wafers in train set

num_train_wafers = df_wafers['WaferName'].nunique()
print("Number of wafers in training data:", num_train_wafers)


In [None]:
# get number of wafers in test set
num_test_wafers = df_wafers_test['WaferName'].nunique()
print("Number of wafers in test data:", num_test_wafers)


In [None]:
# plot wafer from train set
#{'KQWkdW' , 'cp5x0M' , 'uGFKya', ' rHkc9I'} 8PFrAN
wafer_name = '8PFrAN'#'seScVc'#'sNyWi7'
wafer_df = df_wafers[df_wafers['WaferName'] == wafer_name].copy()
wafer_df_original = df_wafers[df_wafers['WaferName'] == wafer_name].copy()


# Approximate center

center_x = wafer_df['DieX'].mean()
center_y = wafer_df['DieY'].mean()

print(f"Estimated center of wafer {wafer_name}: ({center_x:.2f}, {center_y:.2f})")
wafer_df['distance_from_center'] = np.sqrt(
    (wafer_df['DieX'] - center_x)**2 + (wafer_df['DieY'] - center_y)**2
)

# clean dies near edge

radius_threshold = 0.91
edge_mask = (
    (wafer_df['IsGoodDie'] == False) &
    (wafer_df['distance_from_center'] >= wafer_df['distance_from_center'].max() * radius_threshold)
)

wafer_df.loc[edge_mask, 'IsGoodDie'] = True
edge_dies = wafer_df.loc[edge_mask, ['DieX', 'DieY']]
print(edge_dies)
# Approximate radius

radius_estimate = wafer_df['distance_from_center'].max()
print(f"Estimated radius of wafer {wafer_name}: {radius_estimate:.2f}")

# Approximate x , y -  min , max
min_x = wafer_df['DieX'].min()
max_x = wafer_df['DieX'].max()
min_y = wafer_df['DieY'].min()
max_y = wafer_df['DieY'].max()

print(f"DieX range: {min_x} to {max_x}")
print(f"DieY range: {min_y} to {max_y}")

print(wafer_df.columns)
# plot wafer

plot_wafer_maps([wafer_df_original, wafer_df], figsize=8, labels=True)

In [None]:
# Choose a wafer
wafer_name = '8PFrAN'
wafer_df = df_wafers[df_wafers['WaferName'] == wafer_name].copy()

# Create a grid of shape (max_y+1, max_x+1)
max_x = wafer_df['DieX'].max()
max_y = wafer_df['DieY'].max()

# Initialize an empty binary grid for bad dies
bad_die_grid = np.zeros((max_y + 1, max_x + 1), dtype=bool)

# Fill the grid: True if the die is bad
for _, row in wafer_df.iterrows():
    if not row['IsGoodDie']:
        bad_die_grid[row['DieY'], row['DieX']] = True

# Sanity check
print(f"Original bad dies: {np.sum(bad_die_grid)}")

In [None]:
# Define the 3x3 kernel to count neighbors (excluding center)
neighbor_kernel = np.array([[1, 1, 1],
                            [1, 0, 1],
                            [1, 1, 1]])

# Count red neighbors for each die
neighbor_count = convolve(bad_die_grid.astype(int), neighbor_kernel, mode='constant', cval=0)

# A die is considered isolated if it is red AND has 0 red neighbors
isolated_red = (bad_die_grid == True) & (neighbor_count == 0)


# Flip isolated red dies to green
cleaned_grid = bad_die_grid.copy()
cleaned_grid[isolated_red] = False

# Sanity check
print(f"Bad dies after removing isolated: {np.sum(cleaned_grid)} (removed {np.sum(isolated_red)})")

In [None]:
# Add cleaned result to DataFrame
wafer_df['IsGoodDie_Cleaned'] = wafer_df.apply(
    lambda row: False if cleaned_grid[row['DieY'], row['DieX']] else True,
    axis=1
)

# Plot Original vs Cleaned
fig, ax = plt.subplots(1, 2, figsize=(14, 7))
titles = ['Original Wafer', 'After Removing Isolated Bad Dies']
columns = ['IsGoodDie', 'IsGoodDie_Cleaned']

for i in range(2):
    good = wafer_df[wafer_df[columns[i]] == True]
    bad = wafer_df[wafer_df[columns[i]] == False]

    ax[i].scatter(good['DieX'], good['DieY'], color='green', marker='s', s=15, label='Good Die')
    ax[i].scatter(bad['DieX'], bad['DieY'], color='red', marker='s', s=15, label='Bad Die')
    ax[i].set_title(titles[i], fontsize=16)
    ax[i].invert_yaxis()
    ax[i].set_xlabel("DieX")
    ax[i].set_ylabel("DieY")
    ax[i].legend()

plt.tight_layout()
plt.show()

In [None]:
# Original wafer
wafer_original = wafer_df.copy()

# New wafer using cleaned 'IsGoodDie'
wafer_cleaned = wafer_df.copy()
wafer_cleaned['IsGoodDie'] = wafer_cleaned['IsGoodDie_Cleaned']

# Plot them side-by-side using your existing function
plot_wafer_maps([wafer_original, wafer_cleaned], figsize=8, labels=True)

In [None]:
structure = np.ones((3, 3), dtype=bool)

# Perform dilation to expand the bad die clusters
dilated_grid = binary_dilation(cleaned_grid, structure=structure)

# Add dilated result to DataFrame
wafer_df['IsGoodDie_Dilated'] = wafer_df.apply(
    lambda row: False if dilated_grid[row['DieY'], row['DieX']] else True,
    axis=1
)

# Sanity check
print(f"Bad dies after dilation: {np.sum(~wafer_df['IsGoodDie_Dilated'])}")

In [None]:
# not using it
fig, ax = plt.subplots(1, 3, figsize=(21, 7))
titles = ['After Removing Isolated', 'After Dilation', 'Overlay (Cleaned + Dilated)']
columns = ['IsGoodDie_Cleaned', 'IsGoodDie_Dilated']

# 1st plot: Cleaned
good = wafer_df[wafer_df['IsGoodDie_Cleaned']]
bad = wafer_df[~wafer_df['IsGoodDie_Cleaned']]
ax[0].scatter(good['DieX'], good['DieY'], color='green', s=15, marker='s')
ax[0].scatter(bad['DieX'], bad['DieY'], color='red', s=15, marker='s')
ax[0].set_title(titles[0])
ax[0].invert_yaxis()

# 2nd plot: Dilation
good = wafer_df[wafer_df['IsGoodDie_Dilated']]
bad = wafer_df[~wafer_df['IsGoodDie_Dilated']]
ax[1].scatter(good['DieX'], good['DieY'], color='green', s=15, marker='s')
ax[1].scatter(bad['DieX'], bad['DieY'], color='red', s=15, marker='s')
ax[1].set_title(titles[1])
ax[1].invert_yaxis()

# 3rd plot: Overlay
bad_cleaned = wafer_df[~wafer_df['IsGoodDie_Cleaned']]
bad_dilated = wafer_df[~wafer_df['IsGoodDie_Dilated']]
ax[2].scatter(wafer_df['DieX'], wafer_df['DieY'], color='lightgrey', s=15, marker='s', alpha=0.3)
ax[2].scatter(bad_cleaned['DieX'], bad_cleaned['DieY'], color='blue', s=15, marker='s', label='Cleaned Bad')
ax[2].scatter(bad_dilated['DieX'], bad_dilated['DieY'], color='orange', s=15, marker='s', label='Dilated Bad')
ax[2].set_title(titles[2])
ax[2].invert_yaxis()
ax[2].legend()

plt.tight_layout()
plt.show()

In [None]:
# Define 3x3 structuring element for erosion
structure = np.ones((3, 3), dtype=bool)

# Perform erosion
eroded_grid = binary_erosion(dilated_grid, structure=structure)

# Add eroded result to DataFrame
wafer_df['IsGoodDie_Eroded'] = wafer_df.apply(
    lambda row: False if eroded_grid[row['DieY'], row['DieX']] else True,
    axis=1
)

# Sanity check
print(f"Bad dies after erosion: {np.sum(~wafer_df['IsGoodDie_Eroded'])}")

In [None]:
# Plot Original vs Cleaned vs Eroded
fig, ax = plt.subplots(1, 3, figsize=(21, 7))
titles = ['Original Wafer', 'After Removing Isolated Bad Dies', 'After Erosion']
columns = ['IsGoodDie', 'IsGoodDie_Cleaned', 'IsGoodDie_Eroded']

for i in range(3):
    good = wafer_df[wafer_df[columns[i]] == True]
    bad = wafer_df[wafer_df[columns[i]] == False]

    ax[i].scatter(good['DieX'], good['DieY'], color='green', marker='s', s=15, label='Good Die')
    ax[i].scatter(bad['DieX'], bad['DieY'], color='red', marker='s', s=15, label='Bad Die')
    ax[i].set_title(titles[i], fontsize=16)
    ax[i].invert_yaxis()
    ax[i].set_xlabel("DieX")
    ax[i].set_ylabel("DieY")
    ax[i].legend()

plt.tight_layout()
plt.show()

In [None]:
# Original wafer
wafer_original = wafer_df.copy()

# New wafer using cleaned 'IsGoodDie'
wafer_cleaned = wafer_df.copy()
wafer_er = wafer_df.copy()
wafer_dil = wafer_df.copy()
wafer_cleaned['IsGoodDie'] = wafer_cleaned['IsGoodDie_Cleaned']
wafer_er['IsGoodDie'] = wafer_cleaned['IsGoodDie_Eroded']
wafer_dil['IsGoodDie'] = wafer_cleaned['IsGoodDie_Dilated']


# Plot them side-by-side using your existing function
plot_wafer_maps([wafer_original, wafer_cleaned, wafer_dil , wafer_er], figsize=8, labels=True)

In [None]:
# train set yield
yield_per_wafer = df_wafers.groupby('WaferName').agg(
    total_dies=('IsGoodDie', 'count'),
    good_dies=('IsGoodDie', 'sum')
)

yield_per_wafer['yield'] = yield_per_wafer['good_dies'] / yield_per_wafer['total_dies']

# yield for wafer low to high
low_yield_wafers = yield_per_wafer.sort_values(by='yield')
print(low_yield_wafers.tail(10))  # change number as needed

In [None]:
wafer_n = 'yTBVIw'
wafer_e = df_wafers[df_wafers['WaferName'] == wafer_n].copy()
plot_wafer_maps([wafer_e, wafer_e], figsize=8, labels=True)

In [None]:
threshold = 0.9
bad_wafer_names = yield_per_wafer[yield_per_wafer['yield'] < threshold].index.tolist()

print(f"Wafers with yield below {threshold*100}%: {bad_wafer_names}")

In [None]:
# Label connected components in the cleaned bad die grid
labeled_grid, num_features = label(cleaned_grid)

print(f"Found {num_features} connected components.")

In [None]:
#Filter for scratch-like shapes
scratch_mask = np.zeros_like(labeled_grid, dtype=bool)
component_slices = find_objects(labeled_grid)

for label_id, slc in enumerate(component_slices, start=1):
    if slc is None:
        continue
    
    component = (labeled_grid[slc] == label_id)
    height, width = component.shape
    area = component.sum()
    
    # Skip small blobs
    if area < 1:
        continue

    # Aspect ratio
    aspect_ratio = max(height / width, width / height)

    if aspect_ratio > 1:  # long and narrow → likely scratch
        scratch_mask[slc][component] = True


In [None]:
#Add scratch prediction to DataFrame
wafer_df['IsPredictedScratch'] = wafer_df.apply(
    lambda row: scratch_mask[row['DieY'], row['DieX']],
    axis=1
)
wafer_df.tail(10)
num_predicted_scratch = wafer_df['IsPredictedScratch'].sum()
print(f"Number of dies predicted as part of a scratch: {num_predicted_scratch}")
print(f"Component {label_id}: Area={area}, Aspect Ratio={aspect_ratio:.2f}")


In [None]:
# New wafer using cleaned 
wafer_cleaned = wafer_df.copy()
print(wafer_cleaned.columns)
wafer_cleaned['IsScratchDie'] = wafer_cleaned['IsPredictedScratch']
wafer_cleaned['IsGoodDie'] = wafer_cleaned['IsGoodDie_Cleaned']

# Plot them side-by-side using your existing function
plot_wafer_maps([wafer_cleaned, wafer_cleaned], figsize=8, labels=True)

<h1 style="font-size: 20px; color: black;">scratch with ransac</h1>

In [None]:
# Create RANSAC model with linear regression
##model = make_pipeline(PolynomialFeatures(1), RANSACRegressor())
if bad_dies.shape[0] >= 2:
    # Proceed only if at least 2 bad dies exist (RANSAC needs 2 points minimum)
    model = make_pipeline(PolynomialFeatures(degree=1), RANSACRegressor(max_trials=2000, residual_threshold=2))
    X = bad_dies[:, 1].reshape(-1, 1)  # DieX
    y = bad_dies[:, 0]                # DieY
    model.fit(X, y)
    
    # Continue with inlier mask, prediction, etc.
    inlier_mask = model.named_steps['ransacregressor'].inlier_mask_
    scratch_coords = bad_dies[inlier_mask]
    
    # Your existing logic...
    
else:
    print("No scratch candidates (bad dies) found on wafer. Skipping RANSAC.")

In [None]:
# Create binary mask from the inlier coordinates
scratch_mask_ransac = np.zeros_like(cleaned_grid, dtype=bool)
for y, x in scratch_coords:
    scratch_mask_ransac[y, x] = True

# Add to DataFrame
wafer_df['IsPredictedScratch_RANSAC'] = wafer_df.apply(
    lambda row: scratch_mask_ransac[row['DieY'], row['DieX']],
    axis=1
)

# Count detected scratch dies
print(f"Detected {np.sum(wafer_df['IsPredictedScratch_RANSAC'])} scratch dies via RANSAC.")

In [None]:
# Split data
original_bad = wafer_df[wafer_df['IsGoodDie'] == False]
ransac_scratch = wafer_df[wafer_df['IsPredictedScratch_RANSAC'] == True]
good = wafer_df[wafer_df['IsGoodDie'] == True]

# Plot
fig, axs = plt.subplots(1, 2, figsize=(14, 7))

# --- Original ---
axs[0].scatter(good['DieX'], good['DieY'], color='green', marker='s', s=15, label='Good Die')
axs[0].scatter(original_bad['DieX'], original_bad['DieY'], color='red', marker='s', s=15, label='Bad Die')
axs[0].set_title("Original Wafer")
axs[0].invert_yaxis()
axs[0].set_xlabel("DieX")
axs[0].set_ylabel("DieY")
axs[0].legend()

# --- RANSAC Scratch ---
axs[1].scatter(good['DieX'], good['DieY'], color='green', marker='s', s=15, label='Good Die')
axs[1].scatter(original_bad['DieX'], original_bad['DieY'], color='red', marker='s', s=15, label='Bad Die')
axs[1].scatter(ransac_scratch['DieX'], ransac_scratch['DieY'], color='blue', marker='s', s=15, label='Scratch (RANSAC)')
axs[1].set_title("RANSAC Scratch Prediction")
axs[1].invert_yaxis()
axs[1].set_xlabel("DieX")
axs[1].set_ylabel("DieY")
axs[1].legend()

plt.tight_layout()
plt.show()


In [None]:
# Original wafer (before isolated dies removal)
wafer_original = wafer_df.copy()


# Cleaned wafer (after removing isolated bad dies)
wafer_cleaned = wafer_df.copy()
wafer_cleaned['IsGoodDie'] = wafer_cleaned['IsGoodDie_Cleaned']


# RANSAC result wafer (scratch detection added)
wafer_ransac = wafer_cleaned.copy()
wafer_ransac['IsScratchDie'] = wafer_df['IsPredictedScratch_RANSAC'] | wafer_df['IsScratchDie']   # use RANSAC result

# Plot all three side by side using your custom function
plot_wafer_maps([wafer_original, wafer_cleaned, wafer_ransac], figsize=8, labels=True)


In [None]:
# Predict all dies along the curve
x_inliers = bad_dies[inlier_mask][:, 1]  # DieX values
min_x_ransac = x_inliers.min()
max_x_ransac = x_inliers.max()

# Get good dies in range of RANSAC
good_dies_in_range = wafer_df[
    (wafer_df['IsGoodDie_Cleaned']) &
    (wafer_df['DieX'] >= min_x_ransac) &
    (wafer_df['DieX'] <= max_x_ransac)
].copy()

# Predict curve y-values for these good dies
X_good = good_dies_in_range['DieX'].values.reshape(-1, 1)
y_actual = good_dies_in_range['DieY'].values
y_pred = model.predict(X_good)

# Calculate how close each die is to the predicted curve
residuals = np.abs(y_actual - y_pred)
print(residuals.min())
residual_threshold = 0.5  # Try 0.3 or 0.5 to tighten or loosen
good_dies_in_range['IsPredictedScratch_Local'] = residuals < residual_threshold


# Drop existing prediction column if it's already there (from a previous run)
if 'IsPredictedScratch_Local' in wafer_df.columns:
    wafer_df.drop(columns=['IsPredictedScratch_Local'], inplace=True)

# Then proceed with the merge safely
wafer_df = wafer_df.merge(
    good_dies_in_range[['DieX', 'DieY', 'IsPredictedScratch_Local']],
    on=['DieX', 'DieY'],
    how='left'
)
wafer_df['IsPredictedScratch_Local'].fillna(False, inplace=True)

wafer_local_ransac = wafer_cleaned.copy()
wafer_local_ransac['IsScratchDie'] = wafer_df['IsPredictedScratch_Local'] | wafer_df['IsScratchDie']

plot_wafer_maps([wafer_original, wafer_cleaned, wafer_local_ransac], figsize=8, labels=True)


In [None]:
# test set yield
yield_per_wafer_test = df_wafers_test.groupby('WaferName').agg(
    total_dies=('IsGoodDie', 'count'),
    good_dies=('IsGoodDie', 'sum')
)

yield_per_wafer_test['yield'] = yield_per_wafer_test['good_dies'] / yield_per_wafer_test['total_dies']

# yield for wafer low to high
low_yield_wafers_test = yield_per_wafer_test.sort_values(by='yield')
print(low_yield_wafers_test.tail(10))  # change number as needed

threshold = 0.8
bad_wafer_names_test = low_yield_wafers_test[low_yield_wafers_test['yield'] < threshold].index.tolist()

print(f"\nWafers with yield below {threshold*100}%: {bad_wafer_names_test}")