In [1]:
import cv2
import numpy as np
import pandas as pd
#import matplotlib as plt
import matplotlib.pyplot as plt
import tensorflow as tf
import os
from facenet_pytorch import InceptionResnetV1, MTCNN
from PIL import Image
import torch
import torch.nn as nn
import torch.optim as optim
from moviepy.editor import VideoFileClip, concatenate_videoclips
import networkx as nx
import netwulf
import urllib.request

# Data initialization

## Data manipulation functions defined

In [3]:
def flip_fam(fam): #possibly unnecessary
    """function for flipping family string symbol

    Args:
        fam (str): a string of two or five letters with family relation : FS, MS, GM-GS etc.

    Returns:
        str : the inverted version of fam : FS to SF, GM-GS to GS-GM
    """
    
    
    if not isinstance(fam,str):
        print("Converted to string")
        fam = str(fam)
    if len(fam) == 2:
        flipped_family = fam[1] + fam[0] 
        return flipped_family
    else:
        fam1,fam2 = fam.split("-")
        flipped_family == str(fam2) + "-" + str(fam1)
        return(flipped_family)

def ret_edges_fam(fam):
    """Returns all edges of type fam

    Args:
        fam (str) : a string of two or five letters with family relation : FS, MS, GM-GS etc.
        
    Returns:
        list[edges] : a list of edges of type fam
    """
    if isinstance(fam,str):    
        return [(u, v) for u, v, attr in G.edges(data=True) if attr['label']==fam]
    else:
        print("Converted to string")
        return [(u, v) for u, v, attr in G.edges(data=True) if attr['label']==str(fam)]

## Dictionary for video names

In [2]:
# Specify the directory you want to list files from
directory_path = 'C:/DTU/3 - Tredje Aar/6 - Semester/Bachelor Projekt/Bachelors code/Videos Clipped'

# List all files in the directory
families = os.listdir(directory_path)
all_data = {}                                                           # A dictionary with each family member and the names of their videos

for family in families:
    persons = os.listdir(directory_path+"/"+str(family))
    temp_list = []
    for person in persons:
        temp_list.append([person,os.listdir(directory_path+"/"+str(family)+"/"+str(person))])
        
    all_data[family] = temp_list

## Dictionary of familial connections

In [4]:


relations_path = "Metadata - Gathered data/relationsheet_kinship_detection (version 1).xlsb.xlsx"
relations_csv = pd.read_excel(relations_path, sheet_name="Datasheet")
relations = ["FS", "FD", "MS", "MD", "BB", "BS", "GM-GS", "GM-GD", "GF-GS", "GF-GD"]
relations_dict = {}                                                                                              # Dictionary containing all relationships (Including pairs)
fam_pers_ids = list(relations_csv["Path"])

for i in range(len(fam_pers_ids)):
    for fam in relations:
        item = list(relations_csv[fam])[i]
        if item == 0 or item == "0":
            continue
        if not isinstance(item, str):
            continue
        elif len(item) < 5:
            relations_dict.setdefault(fam_pers_ids[i], []).append([item.replace(" ", ""), fam])
        else:
            relations_dict.setdefault(fam_pers_ids[i], []).append([item.replace(" ", "").split(","), fam])

## Creating a graph using the links
This is done as it is by far the easiest way to manipulate this set of connections, especially when only considering kin of any kind, rather than kin of a specific nature.

In [1]:

# Create an empty directed graph
G = nx.DiGraph()

#Creating nodes (family members)
for person in relations_dict.keys():
    G.add_node(person)
    for relation in relations_dict[person]:
        fam = person.split("/")[0]
        if isinstance(relation[0], list):
            for relation_more in relation[0]:
                G.add_edge(fam+"/"+relation_more[0],fam+"/"+relation_more[-1], label=relation[-1])
        else:
            G.add_edge(fam+"/"+relation[0][0],fam+"/"+relation[0][-1], label=relation[-1])


NameError: name 'nx' is not defined

In [18]:
netwulf.visualize(G) # Here you can visualize the graph if desired

(None, None)

# Model Implementations

## Convolutional Neural Network

### Facenet Pretrained Model

In [2]:
# Download the pre-trained FaceNet model

url = 'https://drive.google.com/uc?id=1EXPBSXwTaqrSC0OhUdXNmKSh9qJUQ55-&export=download'
model_filename = '20180402-114759.pb'

if not os.path.exists(model_filename):
    urllib.request.urlretrieve(url, model_filename)

### Feature Extraction and face alignment

In [3]:
video_path = 'Videos Clipped/40/1/40-1-2022-1-1.mp4.mp4'

In [14]:
#Load pre-trained FaceNet model for face detection and alignment
base_model = InceptionResnetV1(pretrained='vggface2').eval()
mtcnn = MTCNN()

# Open the video file
cap = cv2.VideoCapture(video_path)

#Extract frame
frame_number = 10
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number)# Set the video file's position to the desired frame number
ret, frame = cap.read()# Read the frame from the video file
cap.release()# Close the video file

#Convert to PIL.
if ret:#ret is False if no frame present
    # Convert the OpenCV frame (BGR) to a PIL image (RGB)
    image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
else:
    print("Failed to read frame.")

#Align face image for better training
aligned_image = mtcnn(image)
#Create face embeddings using CNN model
face_embedding = base_model(aligned_image.unsqueeze(0))

### Model Training

#### feedforward neural network using PyTorch

In [17]:
class SimpleClassifier(nn.Module):
    def __init__(self, input_size):
        super(SimpleClassifier, self).__init__()
        self.fc1 = nn.Linear(input_size, 128)
        self.fc2 = nn.Linear(128, 1)
    
    def forward(self, x):
        x = nn.functional.relu(self.fc1(x))
        x = nn.functional.sigmoid(self.fc2(x))
        return x

In [None]:
class SimpleClassifier(nn.Module):
    def __init__(self, input_size):
        super(SimpleClassifier, self).__init__()
        self.fc1 = nn.Linear(input_size, 128)
        self.fc2 = nn.Linear(128, 1)
        
    def forward(self, x):
        x = nn.functional.relu(self.fc1(x))
        x = torch.sigmoid(self.fc2(x))
        return x

# Load the dataset
dataframe = pd.read_csv("your_data.csv")  # assuming your data is stored in a CSV file

# Preprocess the dataset
X = np.array([list(map(float, x.split(","))) for x in dataframe["minus_product"]])
y = np.array(dataframe["label"]).reshape(-1, 1)

# Split the dataset into training and validation sets
train_size = int(0.8 * len(X))
X_train, X_val = X[:train_size], X[train_size:]
y_train, y_val = y[:train_size], y[train_size:]

# Convert the dataset to PyTorch tensors
X_train, X_val = torch.from_numpy(X_train), torch.from_numpy(X_val)
y_train, y_val = torch.from_numpy(y_train), torch.from_numpy(y_val)

# Define the neural network and optimizer
model = SimpleClassifier(input_size=2)
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Train the model on the training set and evaluate on the validation set
num_epochs = 10
batch_size = 32
for epoch in range(num_epochs):
    # Shuffle the training data
    perm = torch.randperm(X_train.size(0))
    X_train, y_train = X_train[perm], y_train[perm]
    
    # Train the model in batches
    for i in range(0, X_train.size(0), batch_size):
        # Get a batch of data
        X_batch = X_train[i:i+batch_size]
        y_batch = y_train[i:i+batch_size]
        
        # Zero the gradients and compute the forward pass
        optimizer.zero_grad()
        outputs = model(X_batch.float())
        
        # Compute the loss and gradients
        loss = criterion(outputs, y_batch.float())
        loss.backward()
        
        # Update the weights
        optimizer.step()
        
    # Evaluate the model on the validation set
    with torch.no_grad():
        outputs = model(X_val.float())
        preds = (outputs > 0.5).float()
        accuracy = (preds == y_val).float().mean()
        print(f"Epoch {epoch+1}: Validation accuracy = {accuracy:.3f}")

In [78]:
# Replace this with one of the video file paths from your list
video_path = 'Videos Clipped/40/1/40-1-2022-1-1.mp4.mp4'

# Read the video file
video = cv2.VideoCapture(video_path)

# Check if the video file was opened successfully
if not video.isOpened():
    print(f"Error: Could not open video file {video_path}")
else:
    ret,frame = video.read()
    print(ret)

True


"\nelse:\n    # Loop through the video frames\n    while True:\n        # Read the next frame\n        ret, frame = video.read()\n\n        # Break the loop if we have reached the end of the video\n        if not ret:\n            break\n\n        # Process the frame using a convolutional neural network\n        # Your CNN code here\n\n        # Show the frame\n        cv2.imshow('Video Frame', frame)\n\n        # Wait for a key press and break the loop if the 'q' key is pressed\n        if cv2.waitKey(1) & 0xFF == ord('q'):\n            break\n\n    # Release the video file\n    video.release()\n\n    # Close any OpenCV windows\n    cv2.destroyAllWindows()\n"

# Results