In [1]:
import os
import cv2
import numpy as np
import torch
import torchaudio
import librosa
import pytesseract
from PIL import Image
import speech_recognition as sr
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.orm import declarative_base, sessionmaker
import warnings
warnings.filterwarnings('ignore')


pytesseract.pytesseract.tesseract_cmd = r'/usr/bin/tesseract'  # Укажите путь к tesseract

In [2]:
def load_video(video_path):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        raise Exception(f"Cannot open video file {video_path}")
    return cap


In [3]:
def extract_frames(cap):
    frames = []
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        # Изменение размера кадра
        frame = cv2.resize(frame, (224, 224))
        frames.append(frame)
    cap.release()
    return frames


In [4]:
from pydub import AudioSegment

def extract_audio(video_path, audio_path):
    video = AudioSegment.from_file(video_path)
    video.export(audio_path, format="wav")


In [5]:
from vosk import Model, KaldiRecognizer
import wave
import json

def speech_to_text_vosk(audio_path):
    model = Model('vosk-model-small-ru-0.22')  # Загрузите модель и укажите путь
    wf = wave.open(audio_path, "rb")
    if wf.getnchannels() != 1 or wf.getsampwidth() != 2 or wf.getcomptype() != "NONE":
        # Преобразование аудио в требуемый формат
        sound = AudioSegment.from_file(audio_path)
        sound = sound.set_channels(1)
        sound = sound.set_sample_width(2)
        sound.export("temp_audio.wav", format="wav")
        wf = wave.open("temp_audio.wav", "rb")
    rec = KaldiRecognizer(model, wf.getframerate())
    text = ""
    while True:
        data = wf.readframes(4000)
        if len(data) == 0:
            break
        if rec.AcceptWaveform(data):
            res = json.loads(rec.Result())
            text += res.get('text', '') + " "
    res = json.loads(rec.FinalResult())
    text += res.get('text', '')
    return text


In [6]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchaudio.transforms as T

def conv3x3(in_planes, out_planes, stride=1, groups=1, dilation=1):
    """3x3 сверточный слой с padding"""
    return nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=stride,
                     padding=dilation, groups=groups, bias=False, dilation=dilation)

def conv1x1(in_planes, out_planes, stride=1):
    """1x1 сверточный слой"""
    return nn.Conv2d(in_planes, out_planes, kernel_size=1, stride=stride, bias=False)

class BasicBlock(nn.Module):
    expansion = 1

    def __init__(self, inplanes, planes ,stride=1, downsample=None, groups=1,
                 base_width=64, dilation=1, norm_layer=None):
        super(BasicBlock, self).__init__()
        if norm_layer is None:
            norm_layer = nn.BatchNorm2d
        if groups != 1 or base_width != 64:
            raise ValueError('BasicBlock поддерживает только groups=1 и base_width=64')
        if dilation > 1:
            raise NotImplementedError("Dilation > 1 не поддерживается в BasicBlock")
        self.conv1 = conv3x3(inplanes, planes, stride)
        self.bn1 = norm_layer(planes)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = conv3x3(planes, planes)
        self.bn2 = norm_layer(planes)
        self.downsample = downsample
        self.stride = stride

    def forward(self, x):
        identity = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)
        
        if self.downsample is not None:
            identity = self.downsample(x)

        out += identity
        out = self.relu(out)

        return out

class Bottleneck(nn.Module):
    expansion = 4

    def __init__(self, inplanes, planes, stride=1, downsample=None, groups=1,
                 base_width=64, dilation=1, norm_layer=None):
        super(Bottleneck, self).__init__()
        if norm_layer is None:
            norm_layer = nn.BatchNorm2d
        width = int(planes * (base_width / 64.)) * groups
        # Оба слоя self.conv2 и self.downsample уменьшают размерность входа, когда stride != 1
        self.conv1 = conv1x1(inplanes, width)
        self.bn1 = norm_layer(width)
        self.conv2 = conv3x3(width, width, stride, groups, dilation)
        self.bn2 = norm_layer(width)
        self.conv3 = conv1x1(width, planes * self.expansion)
        self.bn3 = norm_layer(planes * self.expansion)
        self.relu = nn.ReLU(inplace=True)
        self.downsample = downsample
        self.stride = stride

    def forward(self, x):
        identity = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)
        out = self.relu(out)

        out = self.conv3(out)
        out = self.bn3(out)

        if self.downsample is not None:
            identity = self.downsample(x)

        out += identity
        out = self.relu(out)

        return out

class NetVLAD(nn.Module):
    """Реализация слоя NetVLAD"""

    def __init__(self, num_clusters=16, dim=512, alpha=100.0,
                 normalize_input=True):
        super(NetVLAD, self).__init__()
        self.num_clusters = num_clusters
        self.dim = dim
        self.alpha = alpha
        self.normalize_input = normalize_input
        self.conv = nn.Conv2d(dim, num_clusters, kernel_size=(1, 1), bias=True)
        self.centroids = nn.Parameter(torch.rand(num_clusters, dim))
        self._init_params()

    def _init_params(self):
        self.conv.weight = nn.Parameter(
            (2.0 * self.alpha * self.centroids).unsqueeze(-1).unsqueeze(-1)
        )
        self.conv.bias = nn.Parameter(
            - self.alpha * self.centroids.norm(dim=1)
        )

    def forward(self, x):
        N, C = x.shape[:2]

        if self.normalize_input:
            x = F.normalize(x, p=2, dim=1)  # Нормализация по размерности дескриптора

        # soft-assignment
        soft_assign = self.conv(x).view(N, self.num_clusters, -1)
        soft_assign = F.softmax(soft_assign, dim=1)

        x_flatten = x.view(N, C, -1)
        
        # Вычисление резидуалов к каждому кластеру
        residual = x_flatten.expand(self.num_clusters, -1, -1, -1).permute(1, 0, 2, 3) - \
            self.centroids.expand(x_flatten.size(-1), -1, -1).permute(1, 2, 0).unsqueeze(0)
        residual *= soft_assign.unsqueeze(2)
        vlad = residual.sum(dim=-1)

        vlad = F.normalize(vlad, p=2, dim=2)  # Внутренняя нормализация
        vlad = vlad.view(x.size(0), -1)  # Выпрямление
        vlad = F.normalize(vlad, p=2, dim=1)  # L2 нормализация

        return vlad

class ResNet(nn.Module):

    def __init__(self, block, layers, num_classes=1000, pool='avgpool', zero_init_residual=False,
                 groups=1, width_per_group=64, replace_stride_with_dilation=None,
                 norm_layer=None):
        super(ResNet, self).__init__()
        self.pool = pool
        if norm_layer is None:
            norm_layer = nn.BatchNorm2d
        self._norm_layer = norm_layer

        self.inplanes = 64
        self.dilation = 1
        if replace_stride_with_dilation is None:
            # Каждый элемент в кортеже указывает, следует ли заменить 2x2 stride на дилатированную свертку
            replace_stride_with_dilation = [False, False, False]
        if len(replace_stride_with_dilation) != 3:
            raise ValueError("replace_stride_with_dilation должно быть None или кортежем из 3 элементов")
        self.groups = groups
        self.base_width = width_per_group
        self.conv1 = nn.Conv2d(1, self.inplanes, kernel_size=7, stride=2, padding=3,
                               bias=False)  # Изменено на 1 канал
        self.bn1 = norm_layer(self.inplanes)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        self.layer1 = self._make_layer(block, 64, layers[0])
        self.layer2 = self._make_layer(block, 128, layers[1], stride=2,
                                       dilate=replace_stride_with_dilation[0])
        self.layer3 = self._make_layer(block, 256, layers[2], stride=2,
                                       dilate=replace_stride_with_dilation[1])
        self.layer4 = self._make_layer(block, 512, layers[3], stride=2,
                                       dilate=replace_stride_with_dilation[2])
        if self.pool == 'avgpool':
            self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
            self.fc = nn.Linear(512 * block.expansion, num_classes)
        elif self.pool == 'vlad':
            self.avgpool = NetVLAD()
            self.fc = nn.Linear(8192 * block.expansion, num_classes)

        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
            elif isinstance(m, (nn.BatchNorm2d, nn.GroupNorm)):
                nn.init.normal_(m.weight, mean=1, std=0.02)
                nn.init.constant_(m.bias, 0)

        # Инициализация нулями последнего BN в каждом резидуальном блоке
        if zero_init_residual:
            for m in self.modules():
                if isinstance(m, Bottleneck):
                    nn.init.constant_(m.bn3.weight, 0)
                elif isinstance(m, BasicBlock):
                    nn.init.constant_(m.bn2.weight, 0)

    def _make_layer(self, block, planes, blocks, stride=1, dilate=False):
        norm_layer = self._norm_layer
        downsample = None
        previous_dilation = self.dilation
        if dilate:
            self.dilation *= stride
            stride = 1
        if stride != 1 or self.inplanes != planes * block.expansion:
            downsample = nn.Sequential(
                    conv1x1(self.inplanes, planes * block.expansion, stride),
                    norm_layer(planes * block.expansion),
                )
           

        layers = []
        layers.append(block(self.inplanes, planes, stride, downsample, self.groups,
                            self.base_width, previous_dilation, norm_layer))
        self.inplanes = planes * block.expansion
        for _ in range(1, blocks):
            layers.append(block(self.inplanes, planes, groups=self.groups,
                                base_width=self.base_width, dilation=self.dilation,
                                norm_layer=norm_layer))

        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.conv1(x)  # [B, 64, H/2, W/2]
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)  # [B, 64, H/4, W/4]

        x = self.layer1(x)  # [B, 64, H/4, W/4]
        x = self.layer2(x)  # [B, 128, H/8, W/8]
        x = self.layer3(x)  # [B, 256, H/16, W/16]
        x = self.layer4(x)  # [B, 512, H/32, W/32]
        
        if self.pool == 'avgpool':
            x = self.avgpool(x)
            x = torch.flatten(x, 1)
            x = self.fc(x)
        elif self.pool == 'vlad':
            x = self.avgpool(x)
            x = self.fc(x)

        return x

def _resnet(arch, block, layers, pretrained, progress, **kwargs):
    model = ResNet(block, layers, **kwargs)
    # Если нужно загрузить предобученные веса, добавьте код здесь
    if pretrained:
        state_dict = torch.hub.load_state_dict_from_url(
            model_urls[arch], progress=progress)
        model.load_state_dict(state_dict)
    return model

model_urls = {
    'resnet18': 'https://download.pytorch.org/models/resnet18-f37072fd.pth',
    # Добавьте другие ссылки на модели, если нужно
}

def resnet18(pretrained=False, progress=True, **kwargs):
    return _resnet('resnet18', BasicBlock, [2, 2, 2, 2], pretrained, progress,
                   **kwargs)

# Добавьте другие функции resnet, если необходимо


In [7]:
def load_sound_model(model_path=None, num_classes=10, device='cpu'):
    # Загрузка предобученной модели ResNet18
    model = resnet18(pretrained=True)
    
    # Модификация первого сверточного слоя для принятия 1 канала
    model.conv1 = nn.Conv2d(1, 64, kernel_size=7, stride=2, padding=3, bias=False)
    
    # Корректировка весов для conv1
    pretrained_conv1_weight = model.state_dict()['conv1.weight']
    
    # Усреднение весов по каналам
    new_conv1_weight = pretrained_conv1_weight.mean(dim=1, keepdim=True)
    
    # Присвоение скорректированных весов
    model.conv1.weight.data = new_conv1_weight
    
    # Замена полносвязного слоя для соответствия количеству классов
    model.fc = nn.Linear(model.fc.in_features, num_classes)
    
    # Загрузка сохраненной модели, если она указана
    if model_path:
        state_dict = torch.load(model_path, map_location=device)
        
        # Удаление несовместимых ключей
        incompatible_keys = ['conv1.weight', 'fc.weight', 'fc.bias']
        for key in incompatible_keys:
            if key in state_dict:
                del state_dict[key]
                print(f"Removed {key} from state_dict due to size mismatch.")
        
        # Загрузка state_dict с strict=False
        model.load_state_dict(state_dict, strict=False)
    
    model.to(device)
    model.eval()
    return model


In [8]:
def classify_sound(audio_path, model, device='cpu'):
    waveform, sample_rate = torchaudio.load(audio_path)
    
    # Preprocess the audio waveform
    resample_rate = 16000
    if sample_rate != resample_rate:
        resampler = T.Resample(orig_freq=sample_rate, new_freq=resample_rate)
        waveform = resampler(waveform)
    
    # Convert to Mel spectrogram
    n_mels = 128
    mel_spectrogram = T.MelSpectrogram(
        sample_rate=resample_rate,
        n_fft=2048,
        hop_length=512,
        n_mels=n_mels
    )
    mel_spec = mel_spectrogram(waveform)
    mel_spec_db = T.AmplitudeToDB()(mel_spec)
    
    # Normalize
    mel_spec_db = (mel_spec_db - mel_spec_db.mean()) / mel_spec_db.std()
    
    # Add batch and channel dimensions
    input_tensor = mel_spec_db.unsqueeze(0).to(device)  # Shape: [1, 1, n_mels, time_steps]
    
    # Classification
    with torch.no_grad():
        output = model(input_tensor)
        probabilities = torch.nn.functional.softmax(output[0], dim=0)
        predicted_class = torch.argmax(probabilities).item()
    
    return predicted_class, probabilities.cpu().numpy()


In [9]:
def detect_objects(frames):
    model = torch.hub.load('ultralytics/yolov5', 'yolov5s', pretrained=True)
    results = []
    for frame in frames:
        # Получение результатов детекции
        result = model(frame)
        # Преобразование результатов в формат JSON
        result_json = result.pandas().xyxy[0].to_json(orient="records")
        results.append(json.loads(result_json))
    return results


In [10]:
def ocr_on_frame(frame):
    text = pytesseract.image_to_string(Image.fromarray(frame), lang='rus')
    return text


In [11]:
import torchvision.transforms as transforms
from torchvision import models

def classify_scene(frames):
    model = models.resnet18(pretrained=True)
    model.eval()
    preprocess = transforms.Compose([
        transforms.ToPILImage(),
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
    ])
    results = []
    for frame in frames:
        input_tensor = preprocess(frame)
        input_batch = input_tensor.unsqueeze(0)
        with torch.no_grad():
            output = model(input_batch)
        probabilities = torch.nn.functional.softmax(output[0], dim=0)
        predicted_class = torch.argmax(probabilities).item()
        results.append(predicted_class)
    return results


In [12]:
engine = create_engine('sqlite:///config.db')
Base = declarative_base()

class Keyword(Base):
    __tablename__ = 'keywords'
    id = Column(Integer, primary_key=True)
    word = Column(String)

class AnalysisResult(Base):
    __tablename__ = 'analysis_results'
    id = Column(Integer, primary_key=True)
    video_path = Column(String)
    data = Column(String)

Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()

def add_keyword(word):
    keyword = Keyword(word=word)
    session.add(keyword)
    session.commit()

def save_result(video_path, data):
    result = AnalysisResult(video_path=video_path, data=data)
    session.add(result)
    session.commit()


In [13]:
from fastapi import FastAPI
import uvicorn

app = FastAPI()

@app.post("/analyze_video/")
async def analyze_video_endpoint(video_path: str):
    # Здесь вызывается функция main для обработки видео
    main(video_path)
    return {"status": "Video analysis started"}

@app.get("/search/")
async def search_endpoint(query: str):
    # Поиск по базе данных
    results = session.query(AnalysisResult).filter(AnalysisResult.metadata.contains(query)).all()
    return {"results": [result.metadata for result in results]}


In [14]:
from flask import Flask, render_template

app_flask = Flask(__name__)

@app_flask.route('/')
def index():
    results = session.query(AnalysisResult).all()
    return render_template('index.html', results=results)


In [15]:
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def process_video(video_path):
    logger.info(f"Starting processing for {video_path}")
    # Логика обработки
    logger.info(f"Finished processing for {video_path}")


In [16]:
def main(video_path):
    cap = load_video(video_path)
    frames = extract_frames(cap)
    audio_path = 'temp_audio.wav'
    extract_audio(video_path, audio_path)

    # Analyze audio
    speech_text = speech_to_text_vosk(audio_path)

    # Load the sound classification model
    num_classes = 10  # Update with your actual number of classes
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    #sound_model = load_sound_model(num_classes=num_classes, device=device)
    #sound_class, sound_probabilities = classify_sound(audio_path, sound_model, device)
    
    # Analyze video
    objects = detect_objects(frames)
    ocr_texts = [ocr_on_frame(frame) for frame in frames]
    scenes = classify_scene(frames)

    # Save results
    metadata = {
        "speech_text": speech_text,
        "sound_class": int(sound_class),
        "objects": objects,
        "ocr_texts": ocr_texts,
        "scenes": scenes,
    }
    save_result(video_path, str(metadata))


In [17]:
# Укажите путь к вашему тестовому видео
test_video_path = '/home/jupyter/datasphere/project/example.mp4'

main(test_video_path)

TesseractNotFoundError: /usr/bin/tesseract is not installed or it's not in your PATH. See README file for more information.

In [None]:
def train_model(model, train_loader, val_loader, num_epochs=10, device='cpu'):
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
    
    model.to(device)
    
    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        
        for inputs, labels in train_loader:
            # inputs должны иметь форму [batch_size, 1, height, width]
            inputs = inputs.to(device)
            labels = labels.to(device)
            
            optimizer.zero_grad()
            
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            
            running_loss += loss.item() * inputs.size(0)
        
        epoch_loss = running_loss / len(train_loader.dataset)
        print(f'Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss:.4f}')
        
        # Дополнительно: оценка на валидационном наборе
    
    # Сохранение обученной модели
    torch.save(model.state_dict(), 'sound_classification_resnet.pth')
