In [1]:
import os
import subprocess
import cv2
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np

In [75]:
anime = 'fma'
ep_path = f"data/{anime}/episodes/"
abs_ep_path = os.path.abspath(ep_path)
episodes = [os.path.join(ep_path, file) for file in os.listdir(ep_path)][2]

In [63]:
import subprocess
import cv2

# Iframes and Pframes
filename = episodes
save_path = f"data/{anime}/frames/ep4"

def get_frame_types(video_fn):
	command = 'ffprobe -v error -show_entries frame=pict_type -of default=noprint_wrappers=1'.split()
	out = subprocess.check_output(command + [video_fn]).decode()
	frame_types = out.replace('pict_type=','').split()
	return zip(range(len(frame_types)), frame_types)

def save_keyframes(video_fn, out_path):
	frame_types = get_frame_types(video_fn)
	i_frames = [x[0] for x in frame_types if x[1]=='P']
	if i_frames:
		basename = os.path.splitext(os.path.basename(video_fn))[0]
		cap = cv2.VideoCapture(video_fn)
		for frame_no in i_frames:
			cap.set(cv2.CAP_PROP_POS_FRAMES, frame_no)
			ret, frame = cap.read()
			outname = os.path.join(out_path, f"{basename}_p_frame_{str(frame_no)}.jpg")
			frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
			cv2.imwrite(outname, frame)
		cap.release()
	else:
		print ('No I-frames in '+video_fn)

if __name__ == '__main__':
	save_keyframes(filename, save_path)
# Pframes: 8m, Iframes: 1m

In [59]:
# Fastest way to get Iframes
import av

content = episodes
with av.open(content) as container:
	# Signal that we only want to look at keyframes.
	stream = container.streams.video[0]
	stream.codec_context.skip_frame = "NONREF"
	stream.codec_context.thread_type = "FRAME"

	for frame in container.decode(stream):

		# We use `frame.pts` as `frame.index` won't make must sense with the `skip_frame`.
		save_path = os.path.join(f"mta_data/{anime}/frames/ep3", f"fma_key_frame_{frame.pts}.jpg")
		# frame.to_image().save(save_path)
		
		 # Convert the frame to grayscale.
		gray_frame = frame.to_image().convert("L")
		gray_frame.save(save_path)


In [5]:
# Image Hashing, not really used
import imagehash
from PIL import Image

def with_ztransform_preprocess(hashfunc, hash_size=8):
	def function(path):
		image = Image.open(path)
		image = image.convert("L").resize((hash_size, hash_size), Image.Resampling.LANCZOS)
		data = image.getdata()
		quantiles = np.arange(100)
		quantiles_values = np.percentile(data, quantiles)
		zdata = (np.interp(data, quantiles_values, quantiles) / 100 * 255).astype(np.uint8)
		image.putdata(zdata)
		return hashfunc(image)
	return function

dhash_z_transformed = with_ztransform_preprocess(imagehash.dhash, hash_size = 8)
out_folder = f"mta_data/frames/{anime}/Pframes_filtered"
pics = [os.path.join(out_folder, file) for file in os.listdir(out_folder)]
hashes = [dhash_z_transformed(pic) for pic in pics]

df = pd.DataFrame({"image_ids": pics, "hash_values": hashes, })
df_clean = df[df.duplicated(['hash_values'], keep=False)]
for image in df_clean["image_ids"].values:
	os.remove(image)

### Panel segmentation

In [None]:
from skimage.transform import resize
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
from skimage import exposure
from PIL import Image
import cv2

def remove_text(model_path, original_img, debug_display=False):
	tf.data.experimental.enable_debug_mode()
	# Load the model
	model = tf.keras.models.load_model(model_path, compile=False)

	# Pre-process the image
	height, width = original_img.shape[:2]

	img = np.expand_dims(original_img, axis=-1)  # add an extra dimension for the channel
	img = np.repeat(img, 3, axis=-1)  # repeat the image along the channel dimension
	img = resize(img, (768,512), anti_aliasing=True, preserve_range=True)
	img = np.expand_dims(img, axis=0)
	img = img/255

	# Use the model to make a prediction
	p = model.predict(img)

	mask = p[0,:,:,0]

	mask = np.round(mask, 0)

	# Return the image to its original dimensions
	mask_img = Image.fromarray(mask)
	mask_img = mask_img.resize((width,height), resample=Image.Resampling.BICUBIC)
	mask = np.asarray(mask_img)
	
	# Convert the mask to a boolean array
	mask = mask == 1

	no_text = original_img.copy()
	# Set the pixels in the original image to 1 wherever the mask is 0
	no_text[mask] = 255
	# no_text = np.where(mask, mask, no_text)

	# Increase the contrast of the image
	no_text = exposure.rescale_intensity(no_text, out_range=(0, 255))
	no_text = cv2.normalize(src=no_text, dst=None, alpha=0, beta=255, norm_type=cv2.NORM_MINMAX, dtype=cv2.CV_8U)

	if debug_display == True:
		# Display the resulting image
		plt.imshow(mask, cmap='gray')
		plt.show()
		plt.imshow(original_img, cmap='gray')
		plt.show()
		plt.imshow(no_text, cmap='gray')
		plt.show()
	else:
		return no_text
		
# img_path = test[3]
# img = cv2.imread("data/x5.png", cv2.IMREAD_GRAYSCALE)
# remove_text(model_path="0207_e500_std_model_4.h5", original_img=img, debug_display=True)

In [51]:
from kumiko import kumikolib
import re

def enough_info(image, thresh=95):
	image = cv2.equalizeHist(image)
	_, image = cv2.threshold(image, 128, 255, cv2.THRESH_BINARY)
	total_pix = image.shape[0] * image.shape[1]
	white = np.sum(image >= 254)
	white_percent = round((white / total_pix) * 100)
	black = np.sum(image <= 1)
	black_percent = round((black / total_pix) * 100)
	# print(black_percent, white_percent)
	if (white_percent < thresh and black_percent < thresh):
		return True
	else:
		return False

def panel_extractor(im_path, save_path):
	k = kumikolib.Kumiko()
	# Open the input image
	# img = cv2.imread(im_path, cv2.IMREAD_GRAYSCALE)
	img = Image.open(im_path)
	# Check the mode of the image
	if img.mode == 'RGB':
		return
	img = img.convert('L')
	img = np.array(img)
	img = remove_text(model_path="0207_e500_std_model_4.h5", original_img=img)
	im_size = img.shape[0] * img.shape[1]

	if enough_info(img, thresh=95):
		info = k.parse_image(im_path, image=img)
		panels = info["panels"]

		panel_percent = 0.05
		for index, panel in enumerate(panels):
			crop_img = img[panel[1]:panel[1]+panel[3], panel[0]:panel[0]+panel[2]]
			if (crop_img.shape[0] * crop_img.shape[1] > panel_percent * im_size):
				output_path = os.path.join(save_path, 'panel_{}-{}.png'.format(re.search(r'\D*(\d+)\.\D*', im_path).group(1), index))
				cv2.imwrite(output_path, crop_img)
				# pil_img = Image.fromarray(crop_img)
				# res = pil_resize(pil_img)
				# res.save(output_path)

In [36]:
from natsort import natsorted

anime = 'fma'
manga_path = f"data/{anime}/manga"
panel_path = f"data/{anime}/panels"
manga_chapters = natsorted(os.listdir(manga_path))
manga = {}
for chapter in manga_chapters:
	ch_path = os.path.join(manga_path, chapter)
	manga[chapter] = natsorted([os.path.join(ch_path, file) for file in os.listdir(ch_path)])

In [None]:
# save_path = f"data"
chapter = "ch5"
save_path = os.path.join(panel_path, chapter)
for mng in manga[chapter]:
	panel_extractor(mng, save_path)

### panel-anime correlation

In [14]:
anime = 'fma'
frame_path = f"data/fma_Pframes_resnet_ep3"
anime_frames = [os.path.join(frame_path, file) for file in os.listdir(frame_path)]

In [15]:
import torch
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
from PIL import Image

def calculate_mean_std(image_paths, my_batch_size=32):
	# Define a dataset that reads the images one at a time
	class ImageDataset(torch.utils.data.Dataset):
		def __init__(self, image_paths, transform=None):
			self.image_paths = image_paths
			self.transform = transform

		def __getitem__(self, index):
			image = Image.open(self.image_paths[index])
			if self.transform:
				image = self.transform(image)
			return image

		def __len__(self):
			return len(self.image_paths)

	# Define a transform that converts the images to tensors
	transform = transforms.Compose([
		transforms.ToTensor(),
	])

	# Create a dataset and a dataloader
	dataset = ImageDataset(image_paths, transform=transform)
	dataloader = DataLoader(dataset, batch_size=my_batch_size, num_workers=4)

	# Calculate mean and std
	mean = 0.0
	std = 0.0
	count = 0
	for images in dataloader:
		images = images.view(images.size(0), -1)
		mean += images.mean(dim=1).sum()
		std += images.std(dim=1).sum()
		count += images.size(0)
	mean /= count
	std /= count

	return mean, std

image_paths = anime_frames
mean, std = calculate_mean_std(image_paths)
print(mean, std)

tensor(0.4062) tensor(0.1713)


In [2]:
from PIL import Image
from skimage import exposure
import numpy as np
import cv2

def pil_resize(img, width, height):
	# Get the original aspect ratio
	original_aspect = img.width / img.height

	# Determine the new width and height
	new_width = width
	new_height = int(new_width / original_aspect)

	# Resize the image
	resized_im = img.resize((new_width, new_height), resample=Image.Resampling.BICUBIC)

	# Check if the image is larger than width*height
	if img.width > width or img.height > height:
		# Resize the image using thumbnail method
		resized_im.thumbnail((width, height), resample=Image.Resampling.BICUBIC)

	# Create a new image with white background (255 represents white)
	new_im = Image.new("L", (width, height), 255)

	# Paste the resized image on center of the new image
	left = (width - resized_im.width) // 2
	top = (height - resized_im.height) // 2
	new_im.paste(resized_im, (left, top))
	
	return new_im

def my_norm(img):
	new_im = np.array(img)
	new_im = exposure.rescale_intensity(new_im, out_range=(0, 255))
	new_im = cv2.normalize(src=new_im, dst=None, alpha=0, beta=255, norm_type=cv2.NORM_MINMAX, dtype=cv2.CV_8U)
	# new_im = Image.fromarray(new_im)
	return new_im

In [16]:
import torch
import torchvision.models as Models
import torchvision.transforms as transforms
from torch.multiprocessing import Pool
from multiprocessing import cpu_count
import numpy as np
from PIL import Image

# Define the mean and standard deviation values for ImageNet
# mean = [0.485, 0.456, 0.406]
# std = [0.229, 0.224, 0.225]
# Define the mean and standard deviation values from my images
mean_std = (mean, std)

# Create the normalization transform
transform = transforms.Compose([
	# transforms.Grayscale(),
	# transforms.Resize((224, 224)),
	transforms.Lambda(lambda x: pil_resize(x, 224, 224)),
	transforms.Lambda(lambda x: my_norm(x)),
	transforms.ToTensor(),
	transforms.Normalize(mean=mean, std=std),
	transforms.Lambda(lambda x: x.expand(1, 3, -1, -1).view(1, 3, 224, 224))
])

def extract_features(frame_path):
	frame = Image.open(frame_path)
	img_tensor = transform(frame)
	with torch.no_grad():
		frame_features = model(img_tensor).detach().numpy()
	return frame_features

# Load the ResNet50 model from PyTorch's model zoo
model = Models.resnet50(weights=Models.ResNet50_Weights.DEFAULT)
model.eval()

# Extract the features from the model's second-to-last fully-connected layer
model = torch.nn.Sequential(*list(model.children())[:-1])

# Read the anime frame images
frame_paths = anime_frames

# Extract the features for all anime frames
torch.set_num_threads(1)
with Pool(processes=cpu_count()) as p:
	frame_features_list = list(p.imap(extract_features, frame_paths))

frame_features_list_squeezed = [np.squeeze(f) for f in frame_features_list]

In [77]:
# Test some different pretrained models
import torch
import torchvision.models as Models
import torchvision.transforms as transforms
from torch.multiprocessing import Pool
from multiprocessing import cpu_count
import numpy as np
from PIL import Image

# Define the mean and standard deviation values from my images
mean_std = (mean, std)

# Create the normalization transform
transform = transforms.Compose([
	# transforms.Grayscale(),
	# transforms.Resize((224, 224)),
	transforms.Lambda(lambda x: pil_resize(x, 299, 299)),
	transforms.Lambda(lambda x: my_norm(x)),
	transforms.ToTensor(),
	transforms.Normalize(mean=mean, std=std),
	transforms.Lambda(lambda x: x.expand(1, 3, -1, -1).view(1, 3, 299, 299))
])

def extract_features(frame_path):
	frame = Image.open(frame_path)
	img_tensor = transform(frame)
	with torch.no_grad():
		frame_features = model(img_tensor).detach().numpy()
	return frame_features

# Load the ResNet50 model from PyTorch's model zoo
model = Models.densenet201(weights=Models.DenseNet201_Weights.DEFAULT)
model.eval()

# Read the anime frame images
frame_paths = anime_frames

# Extract the features for all anime frames
torch.set_num_threads(1)
with Pool(processes=cpu_count()) as p:
	frame_features_list = list(p.imap(extract_features, frame_paths))

frame_features_list_squeezed = [np.squeeze(f) for f in frame_features_list]
# Densenet201, resnet152 > Inception_v3, Xception, EfficientNet

In [87]:
# Choose most significant frames, by deleting the rest in the frame_path folder
import annoy
from concurrent.futures import ThreadPoolExecutor
import numpy as np
import math

frame_path = f"mta_data/frames/{anime}/Pframes_resnet"
frame_paths = [os.path.join(frame_path, file) for file in os.listdir(frame_path)]

def calculate_similarity(vector1, vector2):
	# Convert the vectors to PyTorch tensors
	vector1 = torch.tensor(vector1)
	vector2 = torch.tensor(vector2)
	
	# Flatten the tensors to 1D
	vector1 = vector1.view(-1)
	vector2 = vector2.view(-1)
	
	# Calculate the dot product and norms of the two feature vectors
	dot_product = torch.dot(vector1, vector2)
	norm1 = torch.norm(vector1)
	norm2 = torch.norm(vector2)
	
	# Calculate the cosine similarity between the two feature vectors
	similarity = dot_product / (norm1 * norm2)
	return similarity.tolist()

def calculate_optimal_trees(n_samples, accuracy):
	# Calculate the optimal number of trees based on the size of the dataset and the desired accuracy
	n_trees = int(np.round(np.log(n_samples) / np.log(2) / accuracy))
	return n_trees

# Build the Annoy index from the frame feature vectors
annoy_index = annoy.AnnoyIndex(len(frame_features_list_squeezed[0]), metric='angular')
for i, frame_features in enumerate(frame_features_list_squeezed):
	annoy_index.add_item(i, frame_features)

# Calculate the sample length of the dataset
n_samples = len(frame_features_list_squeezed)

# Set the desired accuracy of the search
accuracy = 0.9
# Set the similarity threshold
threshold = 0.7

# Calculate the optimal number of trees for a dataset with n_samples and an accuracy of accuracy
n_trees = calculate_optimal_trees(n_samples, accuracy)
print("n_trees", n_trees)
annoy_index.build(n_trees)

# Set the number of nearest neighbors to search for
num_neighbors = int(n_samples * math.log(1 / accuracy) * 0.5)
print("log num_neighbors", num_neighbors)

# Create a list to store the indexes of the frames to delete
frames_to_delete = []

# Iterate through the list of frame feature vectors
for i, frame_features in enumerate(frame_features_list_squeezed):
	# Check if the current frame has already been added to the list of frames to delete
	if i in frames_to_delete:
		continue
	# Find the nearest neighbors of the current frame in the Annoy index
	indices = annoy_index.get_nns_by_item(i, num_neighbors)
	# Iterate through the nearest neighbors
	for j in indices:
		# Skip the current frame
		if i == j:
			continue
		# Calculate the similarity between the current frame and the next nearest neighbor
		similarity = calculate_similarity(frame_features, frame_features_list_squeezed[j])
		# If the similarity is above the threshold, add the nearest neighbor to the list of frames to delete
		if similarity > threshold:
			frames_to_delete.append(j)

# Create a list to store the paths of the images to delete
image_paths_to_delete = [frame_paths[i] for i in range(len(frame_paths)) if i in frames_to_delete]

# Create a ThreadPoolExecutor with a fixed number of threads
with ThreadPoolExecutor(max_workers=4) as executor:
	 # Submit the delete tasks to the executor
	 for image_path in image_paths_to_delete:
		 executor.submit(os.remove, image_path)


n_trees 14
log num_neighbors 443


In [18]:
def calculate_similarity(vector1, vector2):
	# Convert the vectors to PyTorch tensors
	vector1 = torch.tensor(vector1)
	vector2 = torch.tensor(vector2)
	
	# Flatten the tensors to 1D
	vector1 = vector1.view(-1)
	vector2 = vector2.view(-1)
	
	# Calculate the dot product and norms of the two feature vectors
	dot_product = torch.dot(vector1, vector2)
	norm1 = torch.norm(vector1)
	norm2 = torch.norm(vector2)
	
	# Calculate the cosine similarity between the two feature vectors
	similarity = dot_product / (norm1 * norm2)
	return similarity.tolist()

# Read the manga panel image
panel_path = 'data/panel_3.png'
panel = Image.open(panel_path)
panel = transform(panel)
panel_features = model(panel).detach().numpy()

# Initialize the most similar frame and its similarity score
most_similar_frame = None
highest_similarity = -1

# Compare the manga panel with all anime frames
for i, frame_features in enumerate(frame_features_list):
	similarity = calculate_similarity(panel_features, frame_features)
	if similarity > highest_similarity:
		highest_similarity = similarity
		most_similar_frame = frame_paths[i]

# Print the most similar frame and its similarity score
print(f"Most similar frame for panel {panel_path}, {most_similar_frame} (similarity: {highest_similarity:.4f})")

Most similar frame for panel data/panel_3.png, data/fma_Pframes_resnet_ep3/fma_04_i_frame_6977.jpg (similarity: 0.6283)
