In [None]:
# Uncomment this cell if using gdrive and colab
# from google.colab import drive
# drive.mount('/content/drive')
# root_folder = '/content/drive/MyDrive/<root folder with data from previous step>'
# !pip install openai-whisper==20240930 opencv-python==4.11.0.86 paddleocr==2.10.0
# !pip install paddlepaddle-gpu==3.0.0 -i https://www.paddlepaddle.org.cn/packages/stable/cu126/

In [None]:
%pip install -r requirements2.txt

In [None]:
import whisper
import os
import glob
import cv2
import pytesseract
from paddleocr import PaddleOCR
import numpy as np
import matplotlib.pyplot as plt
import logging
import paddle

In [None]:
try:
    root_folder
except NameError:
    root_folder = os.path.join(os.pardir, "data")

In [5]:
def is_similar_box(points1, points2, threshold=100):
    difference = 0
    for (x1, y1), (x2, y2) in zip(points1, points2):
        difference += abs(x1 - x2) + abs(y1 - y2)
    return difference < threshold

In [6]:
def levenshtein_distance(s1, s2):
    """Calculate the Levenshtein distance between two strings."""
    if len(s1) < len(s2):
        return levenshtein_distance(s2, s1)

    if len(s2) == 0:
        return len(s1)

    previous_row = range(len(s2) + 1)
    for i, c1 in enumerate(s1):
        current_row = [i + 1]
        for j, c2 in enumerate(s2):
            insertions = previous_row[j + 1] + 1
            deletions = current_row[j] + 1
            substitutions = previous_row[j] + (c1 != c2)
            current_row.append(min(insertions, deletions, substitutions))
        previous_row = current_row
    return previous_row[-1]

In [7]:
#filter to texts that appear for a few frames and fix minor OCR mistakes by majority vote rule
def filter_texts(texts_by_frame, frame_dimensions, frames_threshold=4, error_threshold=2, levenshtein_distance_threshold=0.2):
    filtered_texts = [] #[start frame, text, position]
    text_positions = {} #"text": [frames_numbers, positions]

    text_positions_heatmap = []

    group_similar = {} #group_name: [text]

    prev_frame_texts = {} #[text, key_name]

    # generate heatmap of places where text appears
    # filter to only text from most used area on the screen
    for frame_texts in texts_by_frame:
        if not frame_texts:
            continue
        for line in frame_texts:
            box = line[0]
            text = line[1][0].strip()

            x = sum(p[0] for p in box) / 4
            y = sum(p[1] for p in box) / 4

            text_positions_heatmap.append((x, y))

    xs, ys = zip(*text_positions_heatmap)

    heatmap, yedges, xedges = np.histogram2d(np.copy(ys), np.copy(xs), bins=(5, 5))

    plt.imshow(heatmap, cmap="hot", interpolation='nearest')
    plt.title("Text occurrence heatmap")
    plt.show()

    max_idx = np.unravel_index(np.argmax(heatmap), heatmap.shape)
    y_bin, x_bin = max_idx

    for frame_index, frame_texts in enumerate(texts_by_frame):
        if True:
            if not frame_texts:
                continue
            new_frame_texts = {}

            for line in frame_texts:
                box = line[0]
                text = line[1][0].strip()

                if not text:
                    continue

                x = sum(p[0] for p in box) / 4
                y = sum(p[1] for p in box) / 4

                #if text appears not in the most common region on the heatmap, skip it
                if not (xedges[x_bin] <= x <= xedges[x_bin+1] and yedges[y_bin] <= y <= yedges[y_bin+1]):
                    continue

                # check levenstein distance to the previous words to minimize OCR error
                if text in text_positions:
                    text_positions[text][0].append(frame_index)
                    text_positions[text][1].append(box)
                    if text in prev_frame_texts:
                        new_frame_texts[text] = prev_frame_texts.pop(text)
                else:
                    text_positions[text] = [[frame_index], [box]]
                    found = False
                    for prev_text, key_name in prev_frame_texts.items():
                        #instead of checking all possible words, check similiarity with words from the last frame only
                        if levenshtein_distance(text, prev_text)/len(text) < levenshtein_distance_threshold:
                            group_similar[key_name].append(text)
                            new_frame_texts[text] = key_name
                            found = True
                            break
                    if not found:
                        group_similar[text] = [text]
                        new_frame_texts[text] = text


            prev_frame_texts = new_frame_texts


    #find groups with similar levenstein distance and merge them
    group_similar_new = group_similar.copy()
    for key1, similar_texts in group_similar.items():
        if key1 not in group_similar_new:
            continue
        start = False
        for key2, similar_texts2 in group_similar.items():
            if key1 == key2:
                start = True
                continue
            if start:
                if levenshtein_distance(key1, key2)/len(key1) < levenshtein_distance_threshold:
                    try:
                        group_similar_new[key1].extend(group_similar_new.pop(key2))
                    except KeyError:
                        pass


    #choose the most popular word in a group and update text positions
    group_similar = group_similar_new
    for _, similar_texts in group_similar.items():
        most_used_text = max(similar_texts, key=lambda x: len(text_positions[x][0]))
        for text in similar_texts:
            if text != most_used_text:
                text_positions[most_used_text][0].extend(text_positions[text][0])
                text_positions[most_used_text][1].extend(text_positions.pop(text)[1])

    #remove text that wasn't staying long in the same position
    #captions on the video are usually not moving
    for text, (frames_numbers, positions) in text_positions.items():
        current_list = []
        frames_and_positions = list(zip(frames_numbers, positions))
        frames_and_positions.sort(key=lambda x: x[0])
        for i in range(1, len(frames_and_positions)):
            if frames_and_positions[i][0] - frames_and_positions[i-1][0] >= error_threshold or not is_similar_box(frames_and_positions[i-1][1], frames_and_positions[i][1]):
                if len(current_list) >= frames_threshold:
                    filtered_texts.append([current_list[0], text, frames_and_positions[i-1][1]])
                current_list = []
            else:
                current_list.append(frames_and_positions[i-1][0])
        if len(current_list) >= frames_threshold:
            filtered_texts.append([current_list[0], text, positions[i-1]])
    #sort text by frames
    filtered_texts.sort(key=lambda x: x[0])
    return filtered_texts

In [8]:
def group_lines_by_frame(results, tolerance = 3): #[frame, text, position]
    results.sort(key=lambda x: x[0])

    groups = []
    current_group = [results[0]]

    for i in range(1, len(results)):
        if results[i][0] - results[i-1][0] <= tolerance:
            current_group.append(results[i])
        else:
            groups.append(current_group)
            current_group = [results[i]]

    groups.append(current_group)

    return groups

In [9]:
# combine text lines to create sentences in the correct order
def merge_lines(lines, y_threshold=30, frame_tolerance = 3):
    merged = ""
    last_pos = -100

    for group in group_lines_by_frame(lines, frame_tolerance):
        group.sort(key=lambda x: x[2][0][1]) # sort by y position from top to bottom
        for _, text, position in group:
            merged += " " + text
            last_pos = position[3][1]
        last_pos = group[-1][2][0][1]

    return merged


In [14]:
# function to process image only posts
def process_img_post(image_list,ocr):
    MIN_CONFIDENCE = 0.7
    ocr_results = ""

    for img_path in image_list:
        image = cv2.imread(img_path)
        results = ocr.ocr(image, cls=True)

        if not results[0]:
            continue

        for i in results[0]:
            if i[1][1] >= MIN_CONFIDENCE:
                ocr_results += i[1][0] + " "

        ocr_results += "\n"

    return ocr_results

In [None]:
def process_post(post_id,root_folder,ocr):
    mp4_files = glob.glob(os.path.join(root_folder, post_id, '*.mp4'))
    #if multiple images/videos extract data only from images
    if not mp4_files or len(mp4_files) > 1:
        print(f"{post_id} - photos post")
        img_files = glob.glob(os.path.join(root_folder, post_id, '*.jpg'))
        with open(os.path.join(root_folder, post_id, "extracted_text.txt"), 'w', encoding='utf-8') as f:
            f.write(process_img_post(img_files,ocr))
    else:
        # if video extract data from audio
        print(f"{post_id} - video post")
        result = model.transcribe(mp4_files[0], language="en")
        if len(result["text"]) > 100:
            print("Audio text is more than 100 characters, skip screen text extraction")
            with open(os.path.join(root_folder, post_id, "extracted_text.txt"), 'w', encoding='utf-8') as f:
                f.write(result["text"])
        else:
            #if audio is too short look for text on the framse
            print("Audio text is less than 100 characters, start screen text extraction")

            MIN_CONFIDENCE = 0.7

            cap = cv2.VideoCapture(mp4_files[0])

            fps = cap.get(cv2.CAP_PROP_FPS)
            frame_rate = 0.2  # frames per second to analyze
            frame_idx = 0
            frame_dimensions = (0,0)

            ocr_results = []

            while cap.isOpened():
                ret, frame = cap.read()
                if frame_dimensions[0] == 0 or frame_dimensions[1] == 0:
                    frame_dimensions = frame.shape[:2]
                if not ret:
                    break

                if frame_idx % int(fps * frame_rate) == 0:
                    img_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

                    # Run OCR
                    results = ocr.ocr(img_rgb, cls=True)
                    if not results[0]:
                        continue
                    ocr_results.append([i for i in results[0] if i[1][1] >= MIN_CONFIDENCE])
                frame_idx += 1

            cap.release()

            extracted_text = merge_lines(filter_texts(ocr_results, frame_dimensions, error_threshold=5, frames_threshold=3))
            with open(os.path.join(root_folder, post_id, "extracted_text.txt"), 'w', encoding='utf-8') as f:
                f.write(extracted_text)


In [None]:
ocr = PaddleOCR(use_angle_cls=False, lang='en', use_gpu=True, show_log=False)
logging.getLogger('ppocr').setLevel(logging.ERROR)
model = whisper.load_model("medium")
counter = 1

for post_id in os.listdir(root_folder):
    print(f"{counter}/{len(os.listdir(root_folder))}")
    counter += 1
    try:
        process_post(post_id,root_folder, ocr)
    except Exception as e:
        print(f"Error processing post {post_id}: {e}")

In [16]:
#combine all information into one txt file
for post_id in os.listdir(root_folder):
    try:
        txt_files = glob.glob(os.path.join(root_folder, post_id, '*.txt'))
        txt_files.sort()
        #can be changed json if needed
        with open(os.path.join(root_folder, post_id, "extracted_text_final.txt"), 'w', encoding='utf-8') as fw, open(txt_files[0], 'r', encoding='utf-8') as post_descr, open(txt_files[1], 'r', encoding='utf-8') as post_extracted:
            fw.write(f"Instagram post {post_id}:")
            fw.write("\n\nPost description:\n")
            fw.write(post_descr.read())
            fw.write("\n\nPost extracted text:\n")
            fw.write(post_extracted.read())
    except FileNotFoundError:
        print(f"File not found for post {post_id}")
        continue