# CAKE

## Audio bestand maken

In [None]:
import time

start_time = time.time()

In [None]:
import os
from pydub import AudioSegment

In [None]:
def convert_mp4_to_wav(input_mp4_path, output_wav_path):
    print(f"Converting '{input_mp4_path}' to WAV format...")
    audio = AudioSegment.from_file(input_mp4_path, format="mp4")
    audio.export(output_wav_path, format="wav")
    print(f"Conversion complete! WAV file saved to: {output_wav_path}")

In [None]:
vid = r"C:\Users\maxim\OneDrive\01-Opleidingen\03-MAAI\Afstuderen\Data\Video's\machine cover.mp4"
aud = r"C:\Users\maxim\OneDrive\01-Opleidingen\03-MAAI\Afstuderen\Cake\Data_machine_cover\machine_cover.wav"

In [None]:
convert_mp4_to_wav(vid, aud)

## Transcribe

In [None]:
import whisperx
import gc
import torch
import json
import os


In [None]:
# Input
wav_folder = r"C:\Users\maxim\OneDrive\01-Opleidingen\03-MAAI\Afstuderen\Cake\Data_machine_cover"
output_folder = "transcriptions_machine_cover"
unsupported_folder = "unsupported_language_machine_cover"
model_dir = "whisper-models"
vad_model_path = r"C:\Users\maxim\OneDrive\01-Opleidingen\03-MAAI\Afstuderen\Cake\CAKE\VAD\pytorch_model.bin"
audio_path = r"C:\Users\maxim\OneDrive\01-Opleidingen\03-MAAI\Afstuderen\Cake\Data_machine_cover\machine_cover.wav"

# Ensure output folders exist
os.makedirs(output_folder, exist_ok=True)
os.makedirs(unsupported_folder, exist_ok=True)

In [None]:
def transcribe(audio_file):
    # # Check system for compatibility
    # if torch.cuda.is_available():
    #     device = "cuda"
    #     print("CUDA wordt gebruikt")
    #     compute_type = "float16"  # change to "int8" if low on GPU mem (may reduce accuracy)
    #     batch_size = 16  # reduce if low on GPU mem
    # elif torch.backends.mps.is_available():
    #     device = "cpu"
    #     print("MPS (Apple Silicon) gebruikt")
    #     compute_type = "int8"
    #     batch_size = 8
    # else:
    #     print("CPU gebruikt")
    #     device = "cpu"
    #     compute_type = "int8"
    #     batch_size = 4
    
    print("CPU gebruikt")
    device = "cpu"
    compute_type = "int8"
    batch_size = 4

    if not os.path.exists(model_dir):
        model = whisperx.load_model("large-v2", device, compute_type=compute_type, download_root=model_dir)
    else:
        model = whisperx.load_model("./whisper-models/models--Systran--faster-whisper-large-v2/snapshots/f0fe81560cb8b68660e564f55dd99207059c092e", device, compute_type=compute_type, vad_model_fp=vad_model_path)

    audio = whisperx.load_audio(audio_file)

    # Perform transcription with automatic language detection
    result = model.transcribe(audio, batch_size=batch_size)
    detected_language = result.get("language", "en")

    # Check if detected language is supported, otherwise move file to unsupported folder
    if detected_language not in ["en", "fr", "de", "es", "nl"]:
        print(f"Language detected as {detected_language}, moving to unsupported folder.")
        os.rename(audio_file, os.path.join(unsupported_folder, os.path.basename(audio_file)))
        return

    print(f"Detected language: {detected_language}")

   
    try:
        model_a, metadata = whisperx.load_align_model(language_code=detected_language, device=device)
        result = whisperx.align(result["segments"], model_a, metadata, audio, device, return_char_alignments=False)
        gc.collect()
        torch.cuda.empty_cache()
        del model_a
    except ValueError as e:
        print(f"Skipping alignment due to error: {e}")

    # Save as JSON
    base_filename = os.path.splitext(os.path.basename(audio_file))[0]
    output_json_path = os.path.join(output_folder, f"{base_filename}.json")
    with open(output_json_path, 'w') as f:
        json.dump(result, f, indent=2)

    print(f"Results saved to {output_json_path}")

In [None]:
# Process all WAV files in the folder
for filename in os.listdir(wav_folder):
    if filename.endswith(".wav"):
        audio_path = os.path.join(wav_folder, filename)
        transcribe(audio_path)

## Full text of transcription

In [None]:
import os
import json

# Input directory containing JSON files
json_folder = 'transcriptions_machine_cover'
output_dir = 'individual_texts_machine_cover'

# Maak output directory aan als deze niet bestaat
os.makedirs(output_dir, exist_ok=True)
print(f"Output directory '{output_dir}' is ready.")

# Process each JSON file individually
for json_file in os.listdir(json_folder):
    if json_file.endswith('.json'):
        json_path = os.path.join(json_folder, json_file)
        print(f"Processing file: {json_file}")

        # Controleer of het JSON-bestand geldig is
        try:
            with open(json_path, 'r', encoding='utf-8') as f:
                data = json.load(f)
        except (json.JSONDecodeError, FileNotFoundError) as e:
            print(f"Error: Failed to process '{json_file}'. Details: {e}")
            continue

        segments = data.get('segments', [])
        if not segments:
            print(f"Warning: No segments found in '{json_file}'.")
            continue

        # Create output text file for the individual transcript
        individual_output_path = os.path.join(output_dir, json_file.replace('.json', '.txt'))

        with open(individual_output_path, 'w', encoding='utf-8') as individual_file:
            for i, segment in enumerate(segments, start=1):
                text = segment.get('text', '').strip()
                if not text:
                    print(f"Warning: Segment {i} in '{json_file}' is empty.")
                    continue
                individual_file.write(f"{text} ")

        print(f"Transcript saved to '{individual_output_path}'.")

## Output chonkie chunks with timestamp (in json)

In [None]:
import os
import json
from chonkie import SDPMChunker

def load_document(file_path: str) -> str:

    if not os.path.exists(file_path):
        raise FileNotFoundError(f"The file '{file_path}' does not exist.")
    with open(file_path, 'r', encoding='utf-8') as file:
        content = file.read()
    return content

def load_json(file_path: str) -> dict:

    if not os.path.exists(file_path):
        raise FileNotFoundError(f"Error: The JSON file '{file_path}' does not exist.")
    with open(file_path, 'r', encoding='utf-8') as f:
        try:
            return json.load(f)
        except json.JSONDecodeError as e:
            raise ValueError(f"Error: Failed to decode JSON file. Details: {e}")

def create_chunker(embedding_model="minishlab/potion-base-8M", chunk_size=512, min_sentences=1):

    return SDPMChunker(
        embedding_model=embedding_model,
        chunk_size=chunk_size,
        min_sentences=min_sentences
    )

# def process_text_and_json(text_folder: str, json_folder: str, output_folder: str):

#     if not os.path.exists(output_folder):
#         os.makedirs(output_folder)

#     for text_file in os.listdir(text_folder):
#         if text_file.endswith(".txt"):
#             base_name = os.path.splitext(text_file)[0]
#             text_path = os.path.join(text_folder, text_file)
#             json_path = os.path.join(json_folder, base_name + ".json")

#             if not os.path.exists(json_path):
#                 print(f"Warning: No matching JSON file for {text_file}")
#                 continue

#             text_content = load_document(text_path)
#             json_data = load_json(json_path)
#             segments = json_data.get('word_segments', [])

#             if not segments:
#                 raise ValueError(f"Error: No segments found in the JSON file {json_path}.")

#             word_list = [[seg.get('word', '').strip(), seg.get('start', ''), seg.get('end', '')] for seg in segments if seg.get('word', '').strip()]
#             chunker = create_chunker()
#             chunks = chunker.chunk(text_content)

#             final_chunks = []
#             current_word_index = 0
#             for chunk in chunks:
#                 chunk_text = chunk.text
#                 chunk_words = chunk_text.split()
#                 chunk_word_data = []
#                 chunk_start = None
#                 chunk_end = None

#                 for chunk_word in chunk_words:
#                     if current_word_index < len(word_list):
#                         word_info = word_list[current_word_index]
#                         if chunk_word == word_info[0]:
#                             chunk_word_data.append({
#                                 "word": word_info[0],
#                                 "start": word_info[1],
#                                 "end": word_info[2]
#                             })
#                             if chunk_start is None:
#                                 chunk_start = word_info[1]
#                             chunk_end = word_info[2]
#                             current_word_index += 1
#                         else:
#                             raise ValueError(f"Word mismatch at chunk '{chunk_text}': Expected '{word_info[0]}', found '{chunk_word}'.")
#                     else:
#                         raise IndexError("Ran out of words in word_data to match with chunks.")

#                 final_chunks.append({
#                     "text": chunk_text,
#                     "start": chunk_start,
#                     "end": chunk_end,
#                     "words": chunk_word_data
#                 })

#             output_json_path = os.path.join(output_folder, base_name + "_chunks.json")
#             with open(output_json_path, 'w', encoding='utf-8') as f:
#                 json.dump({"chunks": final_chunks}, f, ensure_ascii=False, indent=4)
#                 print(f"Processed {text_file} and saved to {output_json_path}")

def is_valid_time(value):
    try:
        return value not in (None, '', ' ') and float(value) >= 0
    except (ValueError, TypeError):
        return False

def process_text_and_json(text_folder: str, json_folder: str, output_folder: str):
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    for text_file in os.listdir(text_folder):
        if text_file.endswith(".txt"):
            base_name = os.path.splitext(text_file)[0]
            text_path = os.path.join(text_folder, text_file)
            json_path = os.path.join(json_folder, base_name + ".json")

            if not os.path.exists(json_path):
                print(f"Warning: No matching JSON file for {text_file}")
                continue

            text_content = load_document(text_path)
            json_data = load_json(json_path)
            segments = json_data.get('word_segments', [])

            if not segments:
                raise ValueError(f"Error: No segments found in the JSON file {json_path}.")

            word_list = [
                [seg.get('word', '').strip(), seg.get('start'), seg.get('end')]
                for seg in segments
                if seg.get('word', '').strip()
                and is_valid_time(seg.get('start'))
                and is_valid_time(seg.get('end'))
            ]

            total_segments = len(segments)
            valid_segments = len(word_list)
            skipped_segments = total_segments - valid_segments
            if skipped_segments > 0:
                print(f"Waarschuwing: {skipped_segments} ongeldige segmenten overgeslagen in {json_path}")

            chunker = create_chunker()
            chunks = chunker.chunk(text_content)

            final_chunks = []
            current_word_index = 0
            total_words = len(word_list)

            for chunk in chunks:
                chunk_text = chunk.text.strip()
                approx_num_words = len(chunk_text.split())

                chunk_word_data = []
                chunk_start = None
                chunk_end = None

                for _ in range(approx_num_words):
                    if current_word_index >= total_words:
                        break
                    word_info = word_list[current_word_index]
                    chunk_word_data.append({
                        "word": word_info[0],
                        "start": word_info[1],
                        "end": word_info[2]
                    })
                    if chunk_start is None:
                        chunk_start = word_info[1]
                    chunk_end = word_info[2]
                    current_word_index += 1

                final_chunks.append({
                    "text": chunk_text,
                    "start": chunk_start,
                    "end": chunk_end,
                    "words": chunk_word_data
                })

            output_json_path = os.path.join(output_folder, base_name + "_chunks.json")
            with open(output_json_path, 'w', encoding='utf-8') as f:
                json.dump({"chunks": final_chunks}, f, ensure_ascii=False, indent=4)
                print(f"Processed {text_file} and saved to {output_json_path}")


if __name__ == "__main__":
    text_folder = 'individual_texts_machine_cover'
    json_folder = 'transcriptions_machine_cover'
    output_folder = 'processed_json_machine_cover'
    process_text_and_json(text_folder, json_folder, output_folder)

## Video segmentation chonkie

In [None]:
import json
import os
import subprocess

def create_segments(video_file, result):

    if not os.path.exists(result):
        raise FileNotFoundError(f"Error: The result file '{result}' does not exist.")

    with open(result, 'r', encoding='utf-8') as f:
        data = json.load(f)

    segments = data.get('chunks', [])
    if not segments:
        raise ValueError("No segments found in the JSON file.")

    video_name = os.path.splitext(os.path.basename(video_file))[0]
    output_dir = os.path.join('video_segments_machine_cover', video_name)
    os.makedirs(output_dir, exist_ok=True)
    print(f"Output directory '{output_dir}' is ready.")

    for i, segment in enumerate(segments, start=1):
        start = segment['start']
        end = segment['end']
        output_filename = f"segment_{i}_{int(start)}_{int(end)}.mp4"
        output_path = os.path.join(output_dir, output_filename)

        command = [
            "ffmpeg",
            "-y",
            "-i", video_file,
            "-ss", str(start),
            "-to", str(end),
            "-c:v", "libx264",
            "-c:a", "aac",
            output_path
        ]
        print(f"Creating segment {i}: {start} to {end} seconds for {video_file}.")
        subprocess.run(command, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
        print(f"Segment {i} saved as '{output_filename}'.")

    print(f"All segments for {video_file} have been processed.")

def process_videos_in_directory(video_directory, json_directory):

    if not os.path.isdir(video_directory):
        raise NotADirectoryError(f"Error: The directory '{video_directory}' does not exist.")
    if not os.path.isdir(json_directory):
        raise NotADirectoryError(f"Error: The directory '{json_directory}' does not exist.")

    video_files = sorted(f for f in os.listdir(video_directory) if f.endswith('.mp4'))
    json_files = sorted(f for f in os.listdir(json_directory) if f.endswith('.json'))

    for video_file, json_file in zip(video_files, json_files):
        video_path = os.path.join(video_directory, video_file)
        json_path = os.path.join(json_directory, json_file)
        create_segments(video_path, json_path)

if __name__ == "__main__":
    video_directory = r"C:\Users\maxim\OneDrive\01-Opleidingen\03-MAAI\Afstuderen\Cake\Video_machine_cover"  # Replace with your video directory
    json_directory = "processed_json_machine_cover"  # Replace with your JSON directory
    process_videos_in_directory(video_directory, json_directory)

## Frame Extraction
### 1 frame per 3 seconds

In [None]:
import cv2
import os

def extract_frames(video_path, output_dir, interval_seconds=3):
    video_name = os.path.basename(video_path).replace(".", "_")
    segment_output_dir = os.path.join(output_dir, *video_path.split(os.sep)[-2:])
    os.makedirs(segment_output_dir, exist_ok=True)

    vidcap = cv2.VideoCapture(video_path)
    fps = vidcap.get(cv2.CAP_PROP_FPS)
    if fps <= 0:
        print(f"Kan de FPS voor {video_path} niet ophalen.")
        return

    frame_interval = int(fps * interval_seconds)
    success, image = vidcap.read()
    count = 0
    frame_count = 0

    while success:
        if frame_count % frame_interval == 0:
            frame_path = os.path.join(segment_output_dir, f"{video_name}_frame_{count}.jpg")
            cv2.imwrite(frame_path, image)
            count += 1
        success, image = vidcap.read()
        frame_count += 1

    vidcap.release()
    print(f"Geëxtraheerd {count} frames uit {video_path} naar {segment_output_dir}")

def process_videos_in_directory(base_directory, output_directory, interval_seconds=3):
    if not os.path.isdir(base_directory):
        raise NotADirectoryError(f"Error: De map '{base_directory}' bestaat niet.")

    os.makedirs(output_directory, exist_ok=True)
    print(f"Output directory '{output_directory}' is gereed.")

    for subdir in sorted(os.listdir(base_directory)):
        subdir_path = os.path.join(base_directory, subdir)
        if os.path.isdir(subdir_path):
            for filename in sorted(os.listdir(subdir_path)):
                if filename.lower().endswith(('.mp4', '.avi', '.mov', '.mkv')):
                    video_path = os.path.join(subdir_path, filename)
                    extract_frames(video_path, output_directory, interval_seconds)

if __name__ == "__main__":
    video_directory = "video_segments_machine_cover"  # Vervang dit door jouw basisvideo-segmentenmap
    output_directory = "frames_machine_cover"         # Map om geëxtraheerde frames op te slaan
    process_videos_in_directory(video_directory, output_directory, interval_seconds=3)

## Object segmentation

In [None]:
import torch
import numpy as np
import cv2
import os
import time
from dataclasses import dataclass
from typing import List, Tuple
from PIL import Image
from segment_anything import sam_model_registry, SamAutomaticMaskGenerator

@dataclass
class BoxPrompt:
    x1: float
    y1: float
    x2: float
    y2: float

class AdvancedAutoSAM:
    def __init__(self, model_type="vit_l", model_path="./models/sam_vit_l_0b3195.pth"):
        self.model_type = model_type
        self.model_path = model_path
        self.device = "cuda" if torch.cuda.is_available() else "cpu"
        self._load_model()

    def _load_model(self):
        print(f"Loading SAM model: {self.model_type}...")
        self.sam = sam_model_registry[self.model_type](checkpoint=self.model_path)
        self.sam.to(self.device)
        self.mask_generator = SamAutomaticMaskGenerator(self.sam)
        print("SAM model loaded.")

    def auto_generate_masks(self, image_path: str) -> List[np.ndarray]:
        image = cv2.imread(image_path)
        if image is None:
            raise ValueError(f"Failed to load {image_path}")
        image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        masks = self.mask_generator.generate(image_rgb)
        binary_masks = [m['segmentation'].astype(np.uint8) * 255 for m in masks]
        print(f"Generated {len(binary_masks)} masks.")
        return binary_masks

    def save_masks_as_color_overlay(self, masks: List[np.ndarray], image_path: str, output_path: str):
        image_bgr = cv2.imread(image_path)
        overlay = image_bgr.copy()
        for mask in masks:
            color = np.random.randint(0, 255, size=3, dtype=np.uint8)
            mask_indices = mask > 0
            overlay[mask_indices] = (0.5 * overlay[mask_indices] + 0.5 * color).astype(np.uint8)
        cv2.imwrite(output_path, overlay)
        print(f"Overlay saved to {output_path}")

    def save_masks_individually(self, masks: List[np.ndarray], output_folder: str, base_name: str = "mask"):
        os.makedirs(output_folder, exist_ok=True)
        for idx, mask in enumerate(masks, 1):
            path = os.path.join(output_folder, f"{base_name}_{idx}.png")
            cv2.imwrite(path, mask)
            print(f"Saved mask to {path}")

    def save_segments_individually(self, masks: List[np.ndarray], image_path: str, output_folder: str, base_name: str = "segment"):
        os.makedirs(output_folder, exist_ok=True)
        image = cv2.imread(image_path)
        for idx, mask in enumerate(masks, 1):
            segment = cv2.bitwise_and(image, image, mask=mask)
            path = os.path.join(output_folder, f"{base_name}_{idx}.png")
            cv2.imwrite(path, segment)
            print(f"Saved segment to {path}")


In [None]:
def main():
    auto_sam = AdvancedAutoSAM()

    input_dir = "frames_machine_cover/machine_cover/"
    output_dir = "generated_results_machine_cover"
    overlay_dir = os.path.join(output_dir, "overlays")
    masks_dir = os.path.join(output_dir, "masks")
    segments_dir = os.path.join(output_dir, "segments")

    os.makedirs(overlay_dir, exist_ok=True)
    os.makedirs(masks_dir, exist_ok=True)
    os.makedirs(segments_dir, exist_ok=True)

    supported_extensions = ('.jpg', '.jpeg', '.png', '.bmp')

    for root, _, files in os.walk(input_dir):
        for file in files:
            if file.lower().endswith(supported_extensions):
                image_path = os.path.join(root, file)
                base_name = os.path.splitext(file)[0]

                print(f"\nProcessing {image_path}...")
                try:
                    masks = auto_sam.auto_generate_masks(image_path)
                    if not masks:
                        continue
                    auto_sam.save_masks_as_color_overlay(
                        masks, image_path,
                        os.path.join(overlay_dir, f"{base_name}_overlay.png")
                    )
                    auto_sam.save_masks_individually(
                        masks,
                        os.path.join(masks_dir, f"{base_name}_masks")
                    )
                    auto_sam.save_segments_individually(
                        masks,
                        image_path,
                        os.path.join(segments_dir, f"{base_name}_segments")
                    )
                    torch.cuda.empty_cache()
                except Exception as e:
                    print(f"Error processing {file}: {e}")

if __name__ == "__main__":
    main()


## Image captioning on the frames

In [None]:
import os
import json
from PIL import Image
from transformers import VisionEncoderDecoderModel, ViTImageProcessor, AutoTokenizer
import torch


# Pad naar de hoofd directory met geëxtraheerde frames (alle video segmenten)
frames_dir = "frames_machine_cover/machine_cover/"  # Pas dit aan naar jouw frames hoofd directory

# Output JSON bestand
output_json_path = "combined_frames_machine_cover.json"


def initialize_captioning_model():

    print("Initialiseren van het image captioning model...")
    model = VisionEncoderDecoderModel.from_pretrained("nlpconnect/vit-gpt2-image-captioning")
    feature_extractor = ViTImageProcessor.from_pretrained("nlpconnect/vit-gpt2-image-captioning")
    tokenizer = AutoTokenizer.from_pretrained("nlpconnect/vit-gpt2-image-captioning")
    print("Model geïnitieerd.")
    return model, feature_extractor, tokenizer


def generate_caption(image_path, model, feature_extractor, tokenizer, device):

    try:
        # print(f"Generating caption for: {image_path}")  # Optioneel: Kan veel output genereren
        image = Image.open(image_path).convert("RGB")
        pixel_values = feature_extractor(images=image, return_tensors="pt").pixel_values.to(device)

        output_ids = model.generate(pixel_values, max_length=16, num_beams=4)
        caption = tokenizer.decode(output_ids[0], skip_special_tokens=True)
        return caption
    except Exception as e:
        print(f"Error generating caption for {image_path}: {e}")
        return ""

def process_all_frames(frames_dir, model, feature_extractor, tokenizer, device):
    combined_data = []
    supported_extensions = ('.png', '.jpg', '.jpeg', '.bmp', '.tiff')

    if not os.path.isdir(frames_dir):
        print(f"Error: Frames directory {frames_dir} bestaat niet.")
        return combined_data

    # Itereer over alle subdirectories (video segmenten)
    video_segments = [d for d in os.listdir(frames_dir) if os.path.isdir(os.path.join(frames_dir, d))]
    total_segments = len(video_segments)
    print(f"Found {total_segments} video segments in '{frames_dir}'.")

    for seg_idx, segment in enumerate(sorted(video_segments), start=1):
        segment_path = os.path.join(frames_dir, segment)
        frame_files = [f for f in os.listdir(segment_path) if f.lower().endswith(supported_extensions)]

        if not frame_files:
            print(f"error: Geen frames gevonden in segment '{segment}'.")
            continue

        total_frames = len(frame_files)
        print(f"Processing segment {seg_idx}/{total_segments}: '{segment}' with {total_frames} frames.")

        for idx, frame_file in enumerate(sorted(frame_files), start=1):
            frame_path = os.path.join(segment_path, frame_file)

            # Genereer een caption voor het frame
            caption = generate_caption(frame_path, model, feature_extractor, tokenizer, device)
            # print(f"Generated caption for {frame_file}: {caption}")  # Optioneel: kan veel output genereren

            # Voeg de gecombineerde data toe aan de lijst
            combined_data.append({
                "video_segment": segment,  # Naam van het video segment
                "frame_number": idx - 1,  # Frames starten meestal bij 0
                "frame_filename": frame_file,
                "caption": caption
            })

            if idx % 100 == 0 or idx == total_frames:
                print(f"Segment '{segment}': Processed {idx}/{total_frames} frames.")

    return combined_data


def save_combined_data(combined_data, output_json_path):

    try:
        with open(output_json_path, 'w', encoding='utf-8') as f:
            json.dump(combined_data, f, indent=4)
        print(f"Combined JSON saved to '{output_json_path}'.")
    except Exception as e:
        print(f"Error saving JSON file: {e}")


def main():
    # Initialiseer het captioning model
    device = "cuda" if torch.cuda.is_available() else "cpu"
    print(f"Using device: {device}")
    model, feature_extractor, tokenizer = initialize_captioning_model()
    model.to(device)

    # Genereer captions voor alle frames
    combined_data = process_all_frames(
        frames_dir=frames_dir,
        model=model,
        feature_extractor=feature_extractor,
        tokenizer=tokenizer,
        device=device
    )

    if not combined_data:
        print("Geen gecombineerde data om op te slaan.")
        return

    # Sla de gecombineerde data op als JSON
    save_combined_data(combined_data, output_json_path)

if __name__ == "__main__":
    main()

## Connect segments and captions to chunks

In [None]:
import json
import os
from pathlib import Path
from typing import List, Dict, Any
import re

def load_json(file_path: str) -> Any:

    with open(file_path, 'r', encoding='utf-8') as f:
        return json.load(f)

def save_json(data: Any, file_path: str):
  
    with open(file_path, 'w', encoding='utf-8') as f:
        json.dump(data, f, indent=4)
    print(f"JSON opgeslagen als '{file_path}'.")

def parse_segment_filename(filename: str) -> Dict[str, Any]:
    pattern = r"segment_(\d+)_(\d+)_(\d+)\.mp4"
    match = re.match(pattern, filename)
    if match:
        return {
            "index": int(match.group(1)),
            "start": float(match.group(2)),
            "end": float(match.group(3))
        }
    else:
        return {}

def find_video_segments(chunks: List[Dict[str, Any]], video_segments_dir: str) -> Dict[tuple, str]:
 
    video_segments = os.listdir(video_segments_dir)
    segment_map = {}
    for segment_file in video_segments:
        parsed = parse_segment_filename(segment_file)
        if not parsed:
            print(f" file '{segment_file}' not same as pattern")
            continue
        key = (parsed['start'], parsed['end'])
        segment_map[key] = os.path.join(video_segments_dir, segment_file)
    return segment_map

def find_frames_for_segment(frames_dir: str, segment_filename: str) -> List[str]:

    segment_frame_dir = os.path.join(frames_dir, segment_filename)
    if not os.path.isdir(segment_frame_dir):
        print(f"error: Frames directory '{segment_frame_dir}' not exist.")
        return []
    frames = [os.path.join(segment_frame_dir, f) for f in os.listdir(segment_frame_dir) 
              if f.lower().endswith(('.jpg', '.jpeg', '.png', '.bmp', '.tiff'))]
    return sorted(frames)  # Sorteer frames op naam

def find_object_segments_for_frame(generated_segments_dir: str, segment_filename: str, frame_filename: str) -> List[str]:
    # Voorbeeld pad:
    # generated_segments/segment_1_0_32.mp4/segment_1_0_32_mp4_frame_000003_segments/segment_1.png
    frame_basename = os.path.splitext(os.path.basename(frame_filename))[0]  # segment_1_0_32_mp4_frame_000003
    segments_folder = os.path.join(generated_segments_dir, f"{frame_basename}_segments")
    if not os.path.isdir(segments_folder):
        print(f"Waarschuwing: Object segments directory '{segments_folder}' bestaat niet.")
        return []
    object_segments = [os.path.join(segments_folder, f) for f in os.listdir(segments_folder) 
                       if f.lower().endswith('.png')]
    return sorted(object_segments)  # Sorteer object segmenten op naam

def load_captions(captions_json_path: str) -> Dict[str, str]:
    captions_data = load_json(captions_json_path)
    caption_map = {}
    for entry in captions_data:
        video_segment = entry.get('video_segment')
        frame_filename = entry.get('frame_filename')
        caption = entry.get('caption', "")
        if video_segment and frame_filename:
            key = f"{video_segment}/{frame_filename}"
            caption_map[key] = caption
    print(f"Loaded captions for {len(caption_map)} frames.")
    return caption_map

def update_chunks_with_segments_and_captions(original_chunks: List[Dict[str, Any]], 
                                segment_map: Dict[tuple, str],
                                frames_dir: str,
                                generated_segments_dir: str,
                                captions_map: Dict[str, str]) -> List[Dict[str, Any]]:

    updated_chunks = []
    for chunk in original_chunks:
        chunk_start = chunk['start']
        chunk_end = chunk['end']
        key = (int(chunk_start), int(chunk_end))
        video_segment_path = segment_map.get(key)
        if not video_segment_path:
            print(f"error: no video segment found for chunk start {chunk_start} and end {chunk_end}.")
            continue  # Of handle anders, afhankelijk van behoeften
        
        segment_filename = os.path.basename(video_segment_path)
        frames = find_frames_for_segment(frames_dir, segment_filename)
        frames_info = []
        for frame_path in frames:
            frame_filename = os.path.basename(frame_path)
            object_segments = find_object_segments_for_frame(generated_segments_dir, segment_filename, frame_filename)
            # Koppel de caption op basis van video_segment en frame_filename
            caption_key = f"{segment_filename}/{frame_filename}"
            caption = captions_map.get(caption_key, "")
            frames_info.append({
                "frame_path": frame_path,
                "object_segments": object_segments,
                "caption": caption
            })
        
        # Voeg video segment en frames info toe aan de chunk
        updated_chunk = {
            "text": chunk['text'],
            "start": chunk['start'],
            "end": chunk['end'],
            "video_segment": video_segment_path,
            "frames": frames_info,
            "words": chunk.get('words', [])
        }
        updated_chunks.append(updated_chunk)
    
    return updated_chunks

def main():
    # Definieer paden
    original_json_path = "processed_json_machine_cover/machine_cover_chunks.json"  # Originele JSON met chunks
    video_segments_dir = "video_segments_machine_cover/machine_cover"  # Map met video segmenten
    frames_dir = "frames_machine_cover/machine_cover"  # Hoofd map met frames per segment
    generated_segments_dir = "generated_results_machine_cover/segments"  # Map met object segmenten
    captions_json_path = "combined_frames_machine_cover.json"  # JSON met image captions
    new_json_path = "updated_chunks_with_segments_and_captions_machine_cover.json"  # Nieuwe JSON output

    # Controleer of alle benodigde bestanden en directories bestaan
    if not os.path.exists(original_json_path):
        print(f"Error: Originele JSON bestand '{original_json_path}' bestaat niet.")
        return
    if not os.path.isdir(video_segments_dir):
        print(f"Error: Video segments directory '{video_segments_dir}' bestaat niet.")
        return
    if not os.path.isdir(frames_dir):
        print(f"Error: Frames directory '{frames_dir}' bestaat niet.")
        return
    if not os.path.isdir(generated_segments_dir):
        print(f"Error: Generated segments directory '{generated_segments_dir}' bestaat niet.")
        return
    if not os.path.exists(captions_json_path):
        print(f"Error: Captions JSON bestand '{captions_json_path}' bestaat niet.")
        return

    # Laad originele JSON
    original_data = load_json(original_json_path)
    chunks = original_data.get('chunks', [])
    if not chunks:
        print("Geen chunks gevonden in de originele JSON.")
        return

    print(f"Loaded {len(chunks)} chunks from '{original_json_path}'.")

    # Maak een mapping van (start, end) tijden naar video segment paden
    segment_map = find_video_segments(chunks, video_segments_dir)
    print(f"Found {len(segment_map)} video segments.")

    # Laad captions en maak een mapping
    captions_map = load_captions(captions_json_path)

    # Update chunks met video segmenten, frames, object segmenten en captions
    updated_chunks = update_chunks_with_segments_and_captions(
        chunks, 
        segment_map, 
        frames_dir, 
        generated_segments_dir, 
        captions_map
    )

    print(f"Updated {len(updated_chunks)} chunks with segments and captions.")

    # Maak de nieuwe JSON structuur
    new_data = {
        "chunks": updated_chunks
    }

    # Sla de nieuwe JSON op
    save_json(new_data, new_json_path)
    print(f"new JSON with connected segments and captions saved as '{new_json_path}'.")


main()

## Sentence transformer captions linking to chunks

In [None]:
import json
import os
from typing import List, Dict, Any
from sentence_transformers import SentenceTransformer, util
import torch
from tqdm import tqdm
import re

def load_json(file_path: str) -> Any:
    with open(file_path, 'r', encoding='utf-8') as f:
        return json.load(f)

def save_json(data: Any, file_path: str):
    with open(file_path, 'w', encoding='utf-8') as f:
        json.dump(data, f, indent=4)
    print(f"JSON opgeslagen als '{file_path}'.")

def preprocess_chunks(chunks: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    processed_chunks = []
    for chunk in chunks:
        text = chunk.get('text', "")
        if text:
            # Gebruik zowel start als end tijden als float voor unieke identificatie
            processed_chunks.append({
                "chunk_id": f"{chunk['start']}_{chunk['end']}",
                "text": text,
                "start": chunk['start'],
                "end": chunk['end']
            })
    return processed_chunks

def preprocess_captions(captions: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    processed_captions = []
    for caption in captions:
        text = caption.get('caption', "")
        if text:
            processed_captions.append({
                "video_segment": caption.get('video_segment', ""),
                "frame_filename": caption.get('frame_filename', ""),
                "caption": text
            })
    return processed_captions

def compute_embeddings(model: SentenceTransformer, texts: List[str], batch_size: int = 32) -> torch.Tensor:

    embeddings = model.encode(texts, batch_size=batch_size, convert_to_tensor=True, show_progress_bar=True)
    return embeddings

def parse_segment_times(segment_name: str) -> tuple[int, int]:
    match = re.search(r"segment_\d+_(\d+)_(\d+)", segment_name)
    if match:
        return int(match.group(1)), int(match.group(2))
    return (0, 0)

# # Captions worden verkeerd gelinkt aan de chunks
# def link_captions_to_chunks(
#     chunks: List[Dict[str, Any]],
#     captions: List[Dict[str, Any]],
#     similarity_threshold: float = 0.3
# ) -> List[Dict[str, Any]]:

#     # Initialiseer het SentenceTransformer model
#     model = SentenceTransformer('all-mpnet-base-v2')  # Een model dat langere teksten ondersteunt
#     device = 'cuda' if torch.cuda.is_available() else 'cpu'
#     model = model.to(device)

#     # Voorbereiden van teksten
#     chunk_texts = [chunk['text'] for chunk in chunks]
#     caption_texts = [caption['caption'] for caption in captions]

#     # Compute embeddings
#     print("Computing embeddings for chunks...")
#     chunk_embeddings = compute_embeddings(model, chunk_texts).to(device)
#     print("Computing embeddings for captions...")
#     caption_embeddings = compute_embeddings(model, caption_texts).to(device)

#     # Bereken cosine similarity tussen elke caption en alle chunks
#     print("Calculating cosine similarities...")
#     cosine_similarities = util.cos_sim(caption_embeddings, chunk_embeddings)  # Shape: (num_captions, num_chunks)

#     # Voor elke caption, vind de chunk met hoogste similarity
#     print("Linking captions to chunks based on similarity...")
#     links = []
#     for idx, caption in enumerate(tqdm(captions, desc="Linking Captions")):
#         sim_scores = cosine_similarities[idx]
#         top_result = torch.argmax(sim_scores).item()
#         top_score = sim_scores[top_result].item()
#         if top_score >= similarity_threshold:
#             linked_chunk = chunks[top_result]
#             links.append({
#                 "caption_index": idx,
#                 "frame_filename": caption['frame_filename'],
#                 "video_segment": caption['video_segment'],
#                 "caption": caption['caption'],
#                 "linked_chunk_id": linked_chunk['chunk_id'],
#                 "linked_chunk_text": linked_chunk['text'],
#                 "similarity_score": top_score
#             })
#         else:
#             links.append({
#                 "caption_index": idx,
#                 "frame_filename": caption['frame_filename'],
#                 "video_segment": caption['video_segment'],
#                 "caption": caption['caption'],
#                 "linked_chunk_id": None,
#                 "linked_chunk_text": None,
#                 "similarity_score": top_score
#             })
#     return links

# def link_captions_to_chunks(
#     chunks: List[Dict[str, Any]],
#     captions: List[Dict[str, Any]],
#     similarity_threshold: float = 0.00
# ) -> List[Dict[str, Any]]:

#     model = SentenceTransformer('all-mpnet-base-v2')
#     device = 'cuda' if torch.cuda.is_available() else 'cpu'
#     model = model.to(device)

#     links = []

#     for idx, caption in enumerate(tqdm(captions, desc="Linking Captions")):
#         segment_name = caption.get("video_segment", "")
#         caption_text = caption.get("caption", "").strip()

#         if not caption_text or not segment_name:
#             continue

#         segment_start, segment_end = parse_segment_times(segment_name)

#         # Filter relevante chunks op basis van segmenttijd
#         candidate_chunks = [
#             chunk for chunk in chunks
#             if segment_start <= chunk["start"] <= segment_end or
#                segment_start <= chunk["end"] <= segment_end
#         ]

#         if not candidate_chunks:
#             print(f"Geen chunks gevonden binnen tijdsframe {segment_start}–{segment_end} voor caption {idx}")
#             links.append({
#                 "caption_index": idx,
#                 "frame_filename": caption.get("frame_filename", ""),
#                 "video_segment": segment_name,
#                 "caption": caption_text,
#                 "linked_chunk_id": None,
#                 "linked_chunk_text": None,
#                 "similarity_score": None
#             })
#             continue

#         # Bereken embeddings voor caption en chunks
#         chunk_texts = [chunk["text"] for chunk in candidate_chunks]
#         chunk_embeddings = compute_embeddings(model, chunk_texts).to(device)
#         caption_embedding = compute_embeddings(model, [caption_text]).to(device)[0]

#         # Bereken cosine similarity
#         similarities = util.cos_sim(caption_embedding, chunk_embeddings)[0]
#         top_result = torch.argmax(similarities).item()
#         top_score = similarities[top_result].item()

#         if top_score >= similarity_threshold:
#             best_chunk = candidate_chunks[top_result]
#             links.append({
#                 "caption_index": idx,
#                 "frame_filename": caption.get("frame_filename", ""),
#                 "video_segment": segment_name,
#                 "caption": caption_text,
#                 "linked_chunk_id": best_chunk['chunk_id'],
#                 "linked_chunk_text": best_chunk['text'],
#                 "similarity_score": top_score
#             })
#         else:
#             links.append({
#                 "caption_index": idx,
#                 "frame_filename": caption.get("frame_filename", ""),
#                 "video_segment": segment_name,
#                 "caption": caption_text,
#                 "linked_chunk_id": None,
#                 "linked_chunk_text": None,
#                 "similarity_score": top_score
#             })

#     return links

def link_captions_to_chunks(
    chunks: List[Dict[str, Any]],
    captions: List[Dict[str, Any]],
    similarity_threshold: float = 0.3
) -> List[Dict[str, Any]]:

    model = SentenceTransformer('all-mpnet-base-v2')
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    model = model.to(device)

    links = []

    for idx, caption in enumerate(tqdm(captions, desc="Linking Captions")):
        segment_name = caption.get("video_segment", "")
        caption_text = caption.get("caption", "").strip()

        if not caption_text or not segment_name:
            continue

        segment_start, segment_end = parse_segment_times(segment_name)
        
        margin = 1
        candidate_chunks = [
            chunk for chunk in chunks
                if chunk["start"] >= segment_start - margin and chunk["end"] <= segment_end + margin
        ]

        if not candidate_chunks:
            links.append({
                "caption_index": idx,
                "frame_filename": caption.get("frame_filename", ""),
                "video_segment": segment_name,
                "caption": caption_text,
                "linked_chunk_id": None,
                "linked_chunk_text": None,
                "similarity_score": None
            })
            continue

        # Bereken caption embedding correct als tensor
        caption_embedding = model.encode([caption_text], convert_to_tensor=True).to(device)

        chunk_texts = [chunk["text"] for chunk in candidate_chunks]
        chunk_embeddings = model.encode(chunk_texts, convert_to_tensor=True).to(device)

        similarities = util.cos_sim(caption_embedding, chunk_embeddings)[0]  # shape: (num_chunks,)
        top_idx = torch.argmax(similarities).item()
        top_score = similarities[top_idx].item()

        print(f"Caption {idx}: top_score = {top_score:.4f}, threshold = {similarity_threshold}")


        if top_score >= similarity_threshold:
            linked_chunk = candidate_chunks[top_idx]
            links.append({
                "caption_index": idx,
                "frame_filename": caption.get("frame_filename", ""),
                "video_segment": segment_name,
                "caption": caption_text,
                "linked_chunk_id": linked_chunk['chunk_id'],
                "linked_chunk_text": linked_chunk['text'],
                "similarity_score": top_score
            })
        else:
            links.append({
                "caption_index": idx,
                "frame_filename": caption.get("frame_filename", ""),
                "video_segment": segment_name,
                "caption": caption_text,
                "linked_chunk_id": None,
                "linked_chunk_text": None,
                "similarity_score": top_score
            })

    return links


def filter_linked_captions(links: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    """
    Filter de links om alleen de gekoppelde captions te behouden.
    """
    filtered_links = [link for link in links if link['linked_chunk_id'] is not None]
    print(f"Filtered captions: {len(filtered_links)} out of {len(links)} were successfully linked.")
    return filtered_links

def main():
    # Definieer paden
    chunks_json_path = "processed_json_machine_cover/machine_cover_chunks.json"        # Originele JSON met chunks
    captions_json_path = "combined_frames_machine_cover.json"              # JSON met image captions
    output_mapping_path = "captions_to_chunks_mapping_machine_cover.json" # Nieuwe JSON output (alle koppelingen)
    filtered_output_path = "filtered_captions_to_chunks_mapping_machine_cover.json" # Nieuwe JSON output (gekoppelde captions)

    # Controleer of alle benodigde bestanden bestaan
    required_files = [chunks_json_path, captions_json_path]
    for file in required_files:
        if not os.path.exists(file):
            print(f"Error: Vereist bestand '{file}' bestaat niet.")
            return

    # Laad de JSON-bestanden
    print("Loading JSON files...")
    chunks_data = load_json(chunks_json_path)
    captions_data = load_json(captions_json_path)

    chunks = chunks_data.get('chunks', [])
    captions = captions_data  # Verondersteld dat combined_frames.json een lijst is

    if not chunks:
        print("Geen chunks gevonden in de hoofd JSON.")
        return
    if not captions:
        print("Geen captions gevonden in de captions JSON.")
        return

    print(f"Loaded {len(chunks)} chunks and {len(captions)} captions.")

    # Voorbereiden van data
    processed_chunks = preprocess_chunks(chunks)
    processed_captions = preprocess_captions(captions)

    # Koppel captions aan chunks via Sentence Transformers
    links = link_captions_to_chunks(processed_chunks, processed_captions, similarity_threshold=0.1)
    print(f"Generated {len(links)} caption links.")

    # Sla de volledige mapping op als een nieuwe JSON
    save_json(links, output_mapping_path)
    print(f"Captions to chunks mapping saved to '{output_mapping_path}'.")

    # Filter de gekoppelde captions
    filtered_links = filter_linked_captions(links)

    # Sla de gefilterde mapping op als een nieuwe JSON
    save_json(filtered_links, filtered_output_path)
    print(f"Filtered captions to chunks mapping saved to '{filtered_output_path}'.")

if __name__ == "__main__":
    main()

# Cake
## Download model

In [None]:
from llama_cpp import Llama

llm = Llama.from_pretrained(
    repo_id="bartowski/Qwen2.5-7B-Instruct-GGUF",
    filename="*Q4_K_M.gguf",
    verbose=False,
    local_dir="models",
)

## Knowledge extraction

In [None]:
import json
import os
import pickle
import threading
from llama_cpp import Llama
from sentence_transformers import SentenceTransformer
import faiss
import numpy as np
from json import JSONDecodeError

# llama singleton to ensure one model is used for ram usage efficiency
class LlamaSingleton:
    _instance = None
    _lock = threading.Lock()

    def __new__(cls, model_path="models/Qwen2.5-7B-Instruct-Q4_K_M.gguf", chat_format="chatml"):
        with cls._lock:
            if cls._instance is None:
                cls._instance = super(LlamaSingleton, cls).__new__(cls)
                cls._instance.llm = Llama(model_path=model_path, chat_format=chat_format, n_ctx=2048)
            return cls._instance

# chatbot class to extract knowledge from chunks
class Chatbot:
    def __init__(self, 
                 messages_file='messages_machine_cover.json', 
                 knowledge_file='knowledge_machine_cover.json', 
                 faiss_index_file='faiss_index_machine_cover.pkl',
                 model_name='all-MiniLM-L6-v2'):
        self.messages_file = messages_file
        self.knowledge_file = knowledge_file
        self.faiss_index_file = faiss_index_file
        self.llm = LlamaSingleton().llm
        self.model = SentenceTransformer(model_name)
        self.index = None
        self.knowledge_data = []
        self.initialize_files()
        self.load_faiss_index()

    def initialize_files(self):
        for file in [self.messages_file, self.knowledge_file]:
            if not os.path.exists(file):
                with open(file, 'w') as f:
                    json.dump([], f)

    def load_json_data(self, file_path):
        with open(file_path, 'r') as f:
            return json.load(f)

    def save_json_data(self, file_path, data):
        with open(file_path, 'w') as f:
            json.dump(data, f, indent=4)

    # this function extracts the knowledge from the chunk text
    def extract_valuable_knowledge(self, message):
        """
        Sends the chunk text to the model and asks it to return JSON with
        subject/predicate/object. No timestamps are generated by the model.
        """
        response = self.llm.create_chat_completion(
            messages=[
                {
                    "role": "system",
                    "content": (
                        "You are a knowledge extractor. Try to extract any knowledge.\n"
                        "Return ONLY JSON with the following schema:\n"
                        "{\n"
                        "  \"valuable_knowledge\": [\n"
                        "    {\n"
                        "      \"subject\": \"...\",\n"
                        "      \"predicate\": \"...\",\n"
                        "      \"object\": \"...\"\n"
                        "    }\n"
                        "  ]\n"
                        "}\n"
                        "If no knowledge can be extracted, return:\n"
                        "{\"valuable_knowledge\": []}"
                    )
                },
                {"role": "user", "content": message},
            ],
            response_format={
                "type": "json",
                "schema": {
                    "type": "object",
                    "properties": {
                        "valuable_knowledge": {
                            "type": "array",
                            "items": {
                                "type": "object",
                                "properties": {
                                    "subject": {"type": "string"},
                                    "predicate": {"type": "string"},
                                    "object": {"type": "string"}
                                },
                                "required": ["subject", "predicate", "object"]
                            }
                        }
                    },
                    "required": ["valuable_knowledge"],
                },
            },
            temperature=0.5,
        )
        try:
            knowledge_data = json.loads(response['choices'][0]['message']['content'])
            print("Extracted knowledge from a chunk:", knowledge_data)
            if "valuable_knowledge" not in knowledge_data:
                knowledge_data["valuable_knowledge"] = []
            return knowledge_data["valuable_knowledge"]
        except (JSONDecodeError, KeyError):
            return []

    def save_knowledge(self, triplets):
        """
        Persists triplets to `knowledge.json` and updates FAISS index if new triplets
        are found. We do not add any timestamps here.
        """
        if not triplets:
            return
        knowledge = self.load_json_data(self.knowledge_file)
        existing_set = {(t['subject'], t['predicate'], t['object']) for t in knowledge}
        new_triplets = []
        for triplet in triplets:
            key = (triplet['subject'], triplet['predicate'], triplet['object'])
            if key not in existing_set:
                knowledge.append(triplet)
                new_triplets.append(triplet)
                existing_set.add(key)
        self.save_json_data(self.knowledge_file, knowledge)
        if new_triplets:
            self.update_faiss_index(new_triplets)

    def update_faiss_index(self, triplets):
        texts = [f"{t['subject']} {t['predicate']} {t['object']}" for t in triplets]
        embeddings = self.model.encode(texts)
        if self.index is None:
            self.index = faiss.IndexFlatL2(embeddings.shape[1])
        self.index.add(np.array(embeddings, dtype=np.float32))
        self.knowledge_data.extend(triplets)
        self.save_faiss_index()

    def save_faiss_index(self):
        with open(self.faiss_index_file, 'wb') as f:
            pickle.dump((self.index, self.knowledge_data), f)

    def load_faiss_index(self):
        if os.path.exists(self.faiss_index_file):
            with open(self.faiss_index_file, 'rb') as f:
                self.index, self.knowledge_data = pickle.load(f)
        else:
            self.index = None
            self.knowledge_data = []

# def main():
#     # Load the chunks from output_chunks.json
#     with open('filtered_captions_to_chunks_mapping.json', 'r', encoding='utf-8') as file: # Misschien moet filterd hier in 
#         # updated_chunks_with_segments_and_captions.json

#         data = json.load(file)
#     chunks = data.get("chunks", [])
#     print(f"Total chunks loaded: {len(chunks)}")

#     # Extract Knowledge from Each Chunk
#     chatbot = Chatbot()

#     for i, chunk in enumerate(chunks, start=1):
#         text = chunk.get("text", "")
#         start_time = chunk.get("start")
#         end_time = chunk.get("end")

#         print(f"\nProcessing chunk {i} (Start: {start_time}, End: {end_time})")

#         # Extract valuable knowledge from the chunk text
#         extracted_knowledge = chatbot.extract_valuable_knowledge(text)

#         if extracted_knowledge:

#             # Attach the chunk's start/end timestamps of video to each extracted item
#             for triplet in extracted_knowledge:
#                 triplet['start'] = start_time
#                 triplet['end'] = end_time

#             # Save the extracted knowledge
#             chatbot.save_knowledge(extracted_knowledge)

#     print("\nKnowledge extraction complete.")
#     print("Please check 'knowledge.json' for the extracted valuable knowledge.")

# def main():
#     # Laad het correcte JSON-bestand (lijst van items)
#     with open('filtered_captions_to_chunks_mapping.json', 'r', encoding='utf-8') as file:
#         links = json.load(file)

#     print(f"Totaal aantal gekoppelde captions: {len(links)}")

#     # Initialiseer de kennisextractor / chatbot
#     chatbot = Chatbot()

#     for i, item in enumerate(links, start=1):
#         linked_text = item.get("linked_chunk_text", "").strip()
#         chunk_id = item.get("linked_chunk_id", "")

#         if not linked_text or not chunk_id:
#             print(f"Skipping item {i}: ontbrekende tekst of ID.")
#             continue

#         # Haal start en end tijd uit linked_chunk_id
#         try:
#             start_str, end_str = chunk_id.split("_")
#             start_time = float(start_str)
#             end_time = float(end_str)
#         except ValueError:
#             print(f"Ongeldig chunk ID formaat in item {i}: '{chunk_id}'")
#             continue

#         print(f"\n[{i}] Extracting knowledge (chunk {start_time:.2f}–{end_time:.2f})")

#         # Extract knowledge uit de chunktekst
#         extracted_knowledge = chatbot.extract_valuable_knowledge(linked_text)

#         if extracted_knowledge:
#             for triplet in extracted_knowledge:
#                 triplet["start"] = start_time
#                 triplet["end"] = end_time
#             chatbot.save_knowledge(extracted_knowledge)

#     print("\nKennisextractie afgerond. Bekijk 'knowledge.json' voor de resultaten.")

# Goed maar gebruikt niet alle audio
# def main():
#     # Laad het correcte JSON-bestand (lijst van items)
#     with open('filtered_captions_to_chunks_mapping.json', 'r', encoding='utf-8') as file:
#         links = json.load(file)

#     print(f"Totaal aantal gekoppelde captions: {len(links)}")

#     # Initialiseer de kennisextractor / chatbot
#     chatbot = Chatbot()

#     for i, item in enumerate(links, start=1):
#         caption = item.get("caption", "").strip()
#         linked_text = item.get("linked_chunk_text", "").strip()
#         chunk_id = item.get("linked_chunk_id", "")

#         if not linked_text or not chunk_id:
#             print(f"Skipping item {i}: ontbrekende tekst of ID.")
#             continue

#         # Haal start en end tijd uit chunk_id
#         try:
#             start_str, end_str = chunk_id.split("_")
#             start_time = float(start_str)
#             end_time = float(end_str)
#         except ValueError:
#             print(f"Ongeldig chunk ID formaat in item {i}: '{chunk_id}'")
#             continue

#         print(f"\n[{i}] Extracting knowledge (chunk {start_time:.2f}–{end_time:.2f})")

#         # Combineer caption + transcripttekst als input voor de LLM
#         combined_input = f"Caption: {caption}\nTranscript: {linked_text}"

#         # Stuur gecombineerde input naar LLM
#         extracted_knowledge = chatbot.extract_valuable_knowledge(combined_input)

#         if extracted_knowledge:
#             for triplet in extracted_knowledge:
#                 triplet["start"] = start_time
#                 triplet["end"] = end_time
#             chatbot.save_knowledge(extracted_knowledge)

#     print("\nKennisextractie afgerond. Bekijk 'knowledge.json' voor de resultaten.")

# Alle audio erbij
def main():
    # Bestanden
    chunks_json_path = "processed_json_machine_cover/machine_cover_chunks.json"
    captions_mapping_path = "filtered_captions_to_chunks_mapping_machine_cover.json"

    # Controle
    if not os.path.exists(chunks_json_path):
        print(f"Error: Bestand '{chunks_json_path}' bestaat niet.")
        return
    if not os.path.exists(captions_mapping_path):
        print(f"Error: Bestand '{captions_mapping_path}' bestaat niet.")
        return

    # Laad data
    with open(chunks_json_path, 'r', encoding='utf-8') as f:
        chunk_data = json.load(f)
    all_chunks = chunk_data.get("chunks", [])
    print(f"{len(all_chunks)} transcriptie-chunks geladen.")

    with open(captions_mapping_path, 'r', encoding='utf-8') as f:
        caption_links = json.load(f)
    print(f"{len(caption_links)} gekoppelde captions geladen.")

    # Bouw een mapping: chunk_id → lijst van captions
    from collections import defaultdict
    caption_map = defaultdict(list)
    for item in caption_links:
        chunk_id = item.get("linked_chunk_id")
        caption = item.get("caption")
        if chunk_id and caption:
            caption_map[chunk_id].append(caption)

    # Initialiseer chatbot
    chatbot = Chatbot()

    # Verwerk alle chunks (met of zonder captions)
    for i, chunk in enumerate(all_chunks, start=1):
        chunk_text = chunk.get("text", "").strip()
        start = chunk.get("start")
        end = chunk.get("end")
        chunk_id = f"{start}_{end}"

        if not chunk_text:
            print(f"Skipping lege chunk {chunk_id}")
            continue

        captions = caption_map.get(chunk_id, [])
        if captions:
            caption_text = "\n".join([f"- {c}" for c in captions])
            combined_prompt = f"Transcript: {chunk_text}\nCaptions:\n{caption_text}"
        else:
            combined_prompt = f"Transcript: {chunk_text}"
        

        print(f"\n[{i}] Extracting knowledge for chunk {chunk_id}")

        print(combined_prompt)
        extracted_knowledge = chatbot.extract_valuable_knowledge(combined_prompt)

        if extracted_knowledge:
            for triplet in extracted_knowledge:
                triplet["start"] = start
                triplet["end"] = end
            chatbot.save_knowledge(extracted_knowledge)

    print("\nKennisextractie voltooid. Bekijk 'knowledge.json' voor de resultaten.")


if __name__ == "__main__":
    main()

In [None]:
end_time = time.time()
elapsed_time = end_time - start_time
print(f"Totaal uitgevoerde tijd: {elapsed_time:.2f} seconden")

## Test generated knowledge base with LLM with the integrated knowledge base

In [None]:
import json
import os
import pickle
from datetime import datetime
from llama_cpp import Llama
import threading
from sentence_transformers import SentenceTransformer
import faiss
import numpy as np
from json import JSONDecodeError

class LlamaSingleton:
    _instance = None
    _lock = threading.Lock()

    def __new__(cls, model_path="models/Qwen2.5-7B-Instruct-Q4_K_M.gguf", chat_format="chatml"):
        with cls._lock:
            if cls._instance is None:
                cls._instance = super(LlamaSingleton, cls).__new__(cls)
                cls._instance.llm = Llama(model_path=model_path, chat_format=chat_format, n_ctx=2048)
            return cls._instance

class Chatbot:
    def __init__(self, 
                 messages_file='messages.json', 
                 knowledge_file='knowledge.json', 
                 faiss_index_file='faiss_index.pkl',
                 model_name='all-MiniLM-L6-v2'):
        self.messages_file = messages_file
        self.knowledge_file = knowledge_file
        self.faiss_index_file = faiss_index_file
        self.llm = LlamaSingleton().llm
        self.model = SentenceTransformer(model_name)
        self.index = None
        self.knowledge_data = []
        self.initialize_files()
        self.load_faiss_index()

    def initialize_files(self):
        for file in [self.messages_file, self.knowledge_file]:
            if not os.path.exists(file):
                with open(file, 'w') as f:
                    json.dump([], f)

    def load_json_data(self, file_path):
        with open(file_path, 'r') as f:
            return json.load(f)

    def save_json_data(self, file_path, data):
        with open(file_path, 'w') as f:
            json.dump(data, f, indent=4)

    def save_message(self, role, content):
        messages = self.load_json_data(self.messages_file)
        message = {"role": role, "content": content, "timestamp": datetime.utcnow().isoformat()}
        messages.append(message)
        self.save_json_data(self.messages_file, messages)

    def save_knowledge(self, triplets):
        if not triplets:
            return
        knowledge = self.load_json_data(self.knowledge_file)
        existing_set = {(t['subject'], t['predicate'], t['object']) for t in knowledge}
        new_triplets = []
        for triplet in triplets:
            triplet['timestamp'] = datetime.utcnow().isoformat()
            key = (triplet['subject'], triplet['predicate'], triplet['object'])
            if key not in existing_set:
                knowledge.append(triplet)
                new_triplets.append(triplet)
                existing_set.add(key)
        self.save_json_data(self.knowledge_file, knowledge)
        if new_triplets:
            self.update_faiss_index(new_triplets)

    def update_faiss_index(self, triplets):
        texts = [f"{t['subject']} {t['predicate']} {t['object']}" for t in triplets]
        embeddings = self.model.encode(texts)
        if self.index is None:
            self.index = faiss.IndexFlatL2(embeddings.shape[1])
        self.index.add(np.array(embeddings, dtype=np.float32))
        self.knowledge_data.extend(triplets)
        self.save_faiss_index()

    def save_faiss_index(self):
        with open(self.faiss_index_file, 'wb') as f:
            pickle.dump((self.index, self.knowledge_data), f)

    def load_faiss_index(self):
        if os.path.exists(self.faiss_index_file):
            with open(self.faiss_index_file, 'rb') as f:
                self.index, self.knowledge_data = pickle.load(f)
        else:
            self.index = None
            self.knowledge_data = []

    def search_knowledge(self, query, top_k=5):
        if self.index is None or len(self.knowledge_data) == 0:
            return []
        query_embedding = self.model.encode([query])
        distances, indices = self.index.search(np.array(query_embedding, dtype=np.float32), top_k)
        results = []
        for idx in indices[0]:
            if idx == -1:
                continue
            results.append(self.knowledge_data[idx])
        return results

    # this function generates a response based on the conversation history and user message and the knowledge top k 5 matches
    def generate_response(self, conversation_history, user_message):
        knowledge_matches = self.search_knowledge(user_message, top_k=5)
        current_time = datetime.utcnow().isoformat()
        system_message = f"Current date and time: {current_time}\n"
        if knowledge_matches:
            system_message += "Answer based on retrieved knowledge:\n"
            for t in knowledge_matches:
                system_message += f"- {t['subject']} {t['predicate']} {t['object']} (Videotimestamps: start: {t['start']}, end: {t['end']})\n"
            
        else:
            system_message += "No direct related knowledge found. Proceeding with general reasoning.\n"
        enriched_history = [{"role": "system", "content": f"You are a helpful assistent; {system_message}"}] #+ conversation_history
        enriched_history.append({"role": "user", "content": user_message})
        print(enriched_history)
        response = self.llm.create_chat_completion(
            messages=enriched_history,
            temperature=0.7,
        )['choices'][0]['message']['content']
        return response

    def chat(self):
        print("Chatbot is ready! Type 'exit' to end the conversation.")
        while True:
            user_message = input("You: ")
            if user_message.lower().strip() in ['exit', 'quit']:
                print("Chatbot: Goodbye!")
                break
            self.save_message(role='user', content=user_message)
            conversation = self.load_json_data(self.messages_file)[-3:]
            assistant_response = self.generate_response(conversation, user_message)
            print(f"Assistant: {assistant_response}")
            self.save_message(role='assistant', content=assistant_response)


if __name__ == "__main__":
    chatbot = Chatbot()
    chatbot.chat()