In [1]:
import sys
sys.path.append('../../30_data_tools/')

from helper import load_dotenv
from pathlib import Path
from PIL import Image, ImageOps
import numpy as np
import math
import plotly.express as px
from scipy.ndimage import gaussian_filter
from random import randrange, choices, shuffle
from tqdm.auto import tqdm
import json
import pickle
from file_interaction import download_blob
import pandas as pd
from skimage.metrics import structural_similarity as ssim
from scipy import stats

In [2]:
from PIL import Image
from PIL import ImageFont, ImageDraw  
from pathlib import Path

In [3]:
import shutil

In [4]:
dotenv = load_dotenv()
model_name = '2024-05-02_Resnet50_002'
dataset_name = '24-05-02_001_tile_dataset'

In [5]:
target_dir = dotenv['TEMP_PROCESSING_DIR'] / 'tile_dataset_fourierGain_check' / 'unassigned'

if target_dir.exists() == False:
    target_dir.mkdir()

In [6]:
all_tiles = list((dotenv['TILE_DATASET_DIR'] / dataset_name / 'train' / 'moire').glob('./*.jpg'))

In [None]:
deathclock = 0

while len(list(target_dir.glob('./*.jpg'))) < 200 and deathclock < 10000:
    in_path = choices(all_tiles, k=1)[0]
    potential_moire_path = dotenv['TEMP_PROCESSING_DIR'] / 'tile_dataset_fourierGain_check' / 'moire' / in_path.name
    potential_no_moire_path = dotenv['TEMP_PROCESSING_DIR'] / 'tile_dataset_fourierGain_check' / 'no_moire' / in_path.name
    
    if potential_moire_path.exists() == False and potential_no_moire_path.exists() == False:
        out_path = target_dir / in_path.name
    
        shutil.copy(
            in_path,
            out_path
        )

    deathclock += 1

# Funktionen

In [7]:
def get_tile_result_grid( data, img_size=224, col_count=5, label_type='both' ):
    label_names = ['moire','no_moire']
    row_count = math.ceil(data.shape[0] / col_count)

    out_img = Image.new(
        'RGB',
        (img_size * col_count, img_size * row_count),
        color="white"
    )
    draw = ImageDraw.Draw(out_img)  
    label_color=[
        (0,200,0),
        (200,0,50)
    ]

    for i in range(data.shape[0]):
        row = i // col_count
        col = i % col_count

        result = data.iloc[i]
        tile_path = dotenv['TILE_DATASET_DIR'] / dataset_name / result.dataset / result.label / result.tile_name
        tile_img = Image.open( tile_path )
        out_img.paste(
            tile_img,
            ( col * tile_img.size[0], row * tile_img.size[1] )
        )
        draw.rectangle(
            (
                ( col * tile_img.size[0], row * tile_img.size[1] + round(img_size * 0.9) ),
                ( col * tile_img.size[0] + img_size, row * tile_img.size[1] + img_size )
            ),
            outline=None,
            fill=label_color[0] if result.classification_correct else label_color[1]
        )

        if label_type == 'label_name':
            label_text = f'{ result.predicted_label }/{ result.label }'
        elif label_type == 'moire_value':
            label_text = str( round(result.result_moire, 5) )
        elif label_type == 'description_values':
            label_text = f'{ round(result.frequency_gain, 5) }/{ round(result.ssim_value, 5) }'
        else:
            label_text = f'{ result.predicted_label }/{ result.label } - {round(result.result_moire, 5)}'
        
        draw.text(
            ( col * tile_img.size[0] + round(img_size * 0.5), row * tile_img.size[1] + round(img_size * 0.97) ),
            label_text,
            anchor='ms',
            font_size=15
        )
    
    return out_img

In [8]:
def load_data( model_results_name ):
    data = pickle.loads( download_blob(f'model_results/{ model_results_name }.pkl').getbuffer() )
    data.rename(columns={'category':'label'}, inplace=True)
    
    data.loc[:,'predicted_label'] = (data.result_moire < data.result_no_moire).astype('int').apply(lambda val: ['moire','no_moire'][val])
    data.loc[:,'classification_correct'] = data.predicted_label == data.label

    data = pd.merge(
        data,
        data.tile_name.str.extract('(.+)_(\d+)\.\d+\.jpg').rename(columns={0:'mask_id',1:'dpi'}),
        left_index=True,
        right_index=True
    )
    
    return data

In [9]:
def calc_metrics( data ):
    TP = data.loc[(data.label == 'moire') & (data.predicted_label == 'moire')].shape[0]
    TN = data.loc[(data.label == 'no_moire') & (data.predicted_label == 'no_moire')].shape[0]
    FP = data.loc[(data.label == 'no_moire') & (data.predicted_label == 'moire')].shape[0]
    FN = data.loc[(data.label == 'moire') & (data.predicted_label == 'no_moire')].shape[0]

    out = {
        "count_data" : data.shape[0],
        "TP" : TP,
        "TN" : TN,
        "FP" : FP,
        "FN" : FN,
        "TN-Rate" : 0,
        "TP-Rate" : 0,
        "accuracy" : 0,
        "precision" : 0,
        "recall" : 0
    }
    
    if (FP + TN) > 0:
        out["TN-Rate"] = TN / (FP + TN)

    if (FN + TP) > 0:
        out["TP-Rate"] = TP / (FN + TP)

    if out['count_data'] > 0:
        out["accuracy"] = (TP + TN) / out['count_data']

    if (TP + FP) > 0:
        out["precision"] = TP / (TP + FP)

    if (TP + FN) > 0:
        out["recall"] = TP / (TP + FN)
    

    return out

In [10]:
def get_fft( input_img ):
    ft = np.fft.ifftshift(np.array(input_img))
    ft = np.fft.fft2(ft)
    ft = np.fft.fftshift(ft)
    
    return ft

def limit_frequencies( fft, inner_limit=None, outer_limit=None ):
    center = (fft.shape[1] / 2, fft.shape[0] / 2)
    for y in range(fft.shape[0]):
        for x in range(fft.shape[1]):
            r = math.sqrt( abs(center[0] - x) ** 2 + abs(center[1] - y) ** 2 )
            
            if outer_limit is not None and r > outer_limit:
                fft[y,x] = 1
    
            if inner_limit is not None and r < inner_limit:
                fft[y,x] = 1

    return fft


def get_frequency_gain( orig_img, synthetic_img, additional=.00001 ):
    fft_orig = get_fft( orig_img )
    fft_synthetic = get_fft( synthetic_img )

    res = np.log( (np.abs(fft_orig) ** 2 + additional) / (np.abs(fft_synthetic) ** 2 + additional) )
    res = limit_frequencies( res, outer_limit=70 )
    res = gaussian_filter(res, sigma=3)

    return res


def get_diff_img_frequency_gain( orig_img, synthetic_img ):
    diff_img = Image.fromarray(gaussian_filter(np.array(synthetic_img) - np.array(orig_img),sigma=3))
    fft = np.abs( limit_frequencies( get_fft(diff_img), inner_limit=5 ) )
    fft = gaussian_filter(fft, sigma=3)

    return fft

# Auswertung

In [None]:
tiles = list((dotenv['TEMP_PROCESSING_DIR'] / 'tile_dataset_fourierGain_check').glob('./**/*.jpg'))
tiles = [t for t in tiles if t.parent.name != 'unassigned']

In [None]:
check_data = pd.DataFrame.from_dict([
    {
        'tile_name' : tile.name,
        'label' : tile.parent.name
    } for tile in tiles
])

In [None]:
for i in tqdm(range(check_data.shape[0])):
    row = check_data.iloc[i]

    moire_tile_path = dotenv['TEMP_PROCESSING_DIR'] / 'tile_dataset_fourierGain_check' / row.label / row.tile_name
    non_moire_tile_path = dotenv['TILE_DATASET_DIR'] / dataset_name / 'train' / 'no_moire' / moire_tile_path.name
    moire_tile = Image.open( moire_tile_path ).convert('L')
    non_moire_tile = Image.open( non_moire_tile_path ).convert('L')
    frequency_gain = get_frequency_gain( moire_tile, non_moire_tile ).max()
    ssim_value = ssim( np.array(moire_tile), np.array(non_moire_tile) )

    check_data.loc[
        row.name,
        ['frequency_gain', 'ssim_value']
    ] = frequency_gain, ssim_value

In [None]:
px.scatter(
    y=check_data.frequency_gain,
    x=check_data.ssim_value,
    color=check_data.label
)

In [None]:
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder

In [None]:
check_data = check_data.sample(frac=1)

In [None]:
le = LabelEncoder()
le.fit(check_data.label)

X = check_data.loc[:,['frequency_gain','ssim_value']].to_numpy()
y = le.transform(check_data.label)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.33, random_state=42)

In [None]:
clf = make_pipeline(StandardScaler(), SVC(gamma='auto'))
clf.fit(X_train, y_train)

In [None]:
clf.score(X_train,y_train), clf.score(X_test,y_test)

In [None]:
check_data.loc[:, 'predicted_label'] = check_data.apply( lambda row: ['moire','no_moire'][clf.predict(np.array([row.frequency_gain,row.ssim_value]).reshape(1, -1))[0]], axis=1)

In [None]:
calc_metrics( check_data )

In [None]:
with (dotenv['MODEL_DIR'] / 'svm_tile_postprocessing.pkl').open('wb') as pkl_file:
    pickle.dump( clf, pkl_file )

# Rest

In [11]:
data = load_data(f"{ model_name }_{ dataset_name }")
data.loc[:,'frequency_gain'] = np.nan
data.loc[:,'svm_prediction'] = np.nan

In [None]:
data_selection = data.loc[
    (data.dataset == 'train') &
    (data.label == 'moire') &
    (pd.isna(data.frequency_gain))
]

In [None]:
sample = data_selection.sample(n=1000)

In [None]:
for i in tqdm(range(sample.shape[0])):
    row = sample.iloc[i]

    moire_tile_path = dotenv['TILE_DATASET_DIR'] / row.dataset / row.label / row.tile_name
    non_moire_tile_path = moire_tile_path.parent.parent / 'no_moire' / moire_tile_path.name
    moire_tile = Image.open( moire_tile_path ).convert('L')
    non_moire_tile = Image.open( non_moire_tile_path ).convert('L')
    frequency_gain = get_frequency_gain( moire_tile, non_moire_tile ).max()
    ssim_value = ssim( np.array(moire_tile), np.array(non_moire_tile) )

    data.loc[
        row.name,
        ['frequency_gain', 'ssim_value']
    ] = frequency_gain, ssim_value

In [None]:
data.loc[
    sample.index,
    'svm_prediction'
] = data.loc[sample.index].apply( lambda row: ['moire','no_moire'][clf.predict(np.array([row.frequency_gain,row.ssim_value]).reshape(1, -1))[0]], axis=1)

In [None]:
data.loc[
    (pd.isna(data.svm_prediction) == False) &
    (data.svm_prediction == 'moire')
]

In [None]:
25 / data.loc[
    (pd.isna(data.svm_prediction) == False) &
    (data.svm_prediction == 'no_moire')
].shape[0]

In [None]:
17 / data.loc[
    (pd.isna(data.svm_prediction) == False) &
    (data.svm_prediction == 'moire')
].shape[0]

In [None]:
get_tile_result_grid(
    data.loc[
        (pd.isna(data.svm_prediction) == False) &
        (data.svm_prediction == 'moire')
    ],
    label_type='description_values',
    col_count=10
).save( '/Users/frederic.birwe/Downloads/moire_tile.jpg' )

In [None]:
get_tile_result_grid(
    data.loc[
        (pd.isna(data.svm_prediction) == False) &
        (data.svm_prediction == 'no_moire')
    ],
    label_type='description_values',
    col_count=10
).save( '/Users/frederic.birwe/Downloads/no_moire_tile.jpg' )