In [65]:
import import_ipynb
from Functions import *

# Main Model

In [66]:
class CNN(nn.Module):
    def __init__(self, N):
        super(CNN, self).__init__()
        self.N = N
        
        # CNN part: Extract feature maps
        self.cnn = nn.Sequential(
            nn.Conv2d(3, 16, kernel_size=3, stride=1, padding=1),  # (N, 16, 300, 300)
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),                 # (N, 16, 150, 150)
            nn.Conv2d(16, 32, kernel_size=3, stride=1, padding=1), # (N, 32, 150, 150)
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),                 # (N, 32, 75, 75)
            nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1), # (N, 64, 75, 75)
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)                  # (N, 64, 37, 37)
        )
        
        # Flattening for Transformer
        self.flatten = nn.Flatten(2)  # (N, 64, 37 * 37)

        # Transformer part
        self.transformer = nn.Transformer(
            d_model=64, nhead=8, num_encoder_layers=2, num_decoder_layers=2
        )
        
        # Fully connected to output a single value
        self.fc = nn.Sequential(
            nn.Linear(64, 32),  # Reduce dimension
            nn.ReLU(),
            nn.Linear(32, 1),   # Output a single value
        )

    def forward(self, x):
        # Pass input through CNN
        x = self.cnn(x)  # (N, 64, 37, 37)

        # Flatten spatial dimensions for the transformer
        x = self.flatten(x)  # (N, 64, 37 * 37)
        x = x.permute(2, 0, 1)  # Transformer expects (seq_len, batch_size, d_model)

        # Create a dummy target sequence for the transformer decoder
        tgt = torch.zeros(x.size(0), x.size(1), 64, device=x.device)  # (seq_len, batch_size, d_model)

        # Transformer
        x = self.transformer(x, tgt)  # (seq_len, batch_size, d_model)

        # Take the first sequence's output
        x = x[0]  # (batch_size, d_model)

        # Pass through fully connected layers
        x = self.fc(x)  # (batch_size, 1)

        # Average over the batch dimension to get a single output
        x = x.mean(dim=0, keepdim=True)  # (1, 1)
        return Fun.sigmoid(x.unsqueeze(-1).unsqueeze(-1))  # (1, 1, 1, 1)

# Get Training Data

In [67]:
def get_video_frames(path, N, size):
    # Find all video files in the specified path
    video_files = [f for f in os.listdir(path) if f.endswith(('.mp4', '.avi', '.mov', '.mkv'))]
    if not video_files:
        raise ValueError("No video files found in the specified path.")
    
    # Choose a random video file
    video_file = random.choice(video_files)
    video_path = os.path.join(path, video_file)
    
    # Initialize the video capture
    cap = cv2.VideoCapture(video_path)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    
    # If N is larger than the number of frames in the video, adjust it
    if N > total_frames:
        raise ValueError(f"The video has only {total_frames} frames, but {N} frames were requested.")
    
    # Select a random starting frame index such that we can capture N consecutive frames
    start_frame = random.randint(0, total_frames - N)
    cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame)
    
    # Resize transformation
    transform = transforms.Compose([
        transforms.ToPILImage(),
        transforms.Resize(size),
        transforms.ToTensor()
    ])
    
    frames = []
    for _ in range(N):
        ret, frame = cap.read()
        if not ret:
            break
        # Convert the frame (BGR to RGB) and apply the resize transform
        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        frame_tensor = transform(frame_rgb)
        frames.append(frame_tensor)
    
    cap.release()
    
    # Stack frames and reshape to the desired output shape
    frames_tensor = torch.stack(frames).unsqueeze(0)  # Shape (1, N, 3, H, W)
    
    return frames#_tensor

# Convert Tensor to Numpy List

In [68]:
def t2nl(frames_tensor):
    # Remove the batch dimension (1, N, 3, H, W) -> (N, 3, H, W)
    frames_tensor = frames_tensor.squeeze(0)
    
    # Conve
    #rt each frame tensor to a NumPy array
    frame_list = [cv2.cvtColor(frame.permute(1, 2, 0).numpy(),cv2.COLOR_BGR2RGB) for frame in frames_tensor]
    
    return frame_list

# Training Function

In [73]:
def train(Path, State=None, N=3, Batch=10, Epochs=10, Steps=5, LR=1e-3):
    Model = CNN(N*3)
    Size = (200,200)
    
    if State is not None:
        Model.load_state_dict(State)
        
    Loss = []
    LMin = 1e20

    start_time = time.time()  # Start timer

    for epoch in range(Epochs):
        epoch_start_time = time.time()  # Timer for each epoch
        optimizer = optim.Adam(Model.parameters(), lr=LR)
        total_loss = 0.0
        Correct0 = 0
        Correct1 = 0
        Total0 = 0
        Total1 = 0
        for batch in range(Batch):
            optimizer.zero_grad()
            
            Frames = get_video_frames(Path, N, Size)
            Frames = torch.stack(Frames)
    
            Target = torch.tensor([0.0]) if batch%2==0 else torch.tensor([1.0])

            if Target.item() == 1.0:
                steps = random.randint(1, Steps)
                for s in range(steps):
                    Frames[N // 2] = AddOneInc1(Frames[N // 2])
                       
            # Pixel Differences 
            diff_pix = [Frames[i]-Frames[i+1] for i in range(len(Frames)-1)]
            diff_pix = torch.stack(diff_pix)
            #print(diff_pix.shape)
            
            # Edges Frames
            edge_frames = [Edges_tensor(Frames[i])-Edges_tensor(Frames[i+1]) for i in range(len(Frames)-1)]
            edge_frames = torch.stack(edge_frames)
            #print(edge_frames.shape)
            
            Scores = [TSSIM(Frames[i].unsqueeze(0),Frames[i+1].unsqueeze(0)) for i in range(len(Frames)-1)]
            Score = [t[0].item() for t in Scores]
            SimScores = [t[1].squeeze(0) for t in Scores]
            SimScores = torch.stack(SimScores)
            #print(SimScores.shape)
            
            All = torch.cat((diff_pix,edge_frames,SimScores),dim=0)
            #print(All.shape)
            
            # Forward pass
            Pred = Model(All)[0][0][0]
            
            
            if Target.item() == 1.0:
                Total1 += 1
                if Target.item() == round(Pred.item()):
                    Correct1 += 1
            else:
                Total0 += 1
                if Target.item() == round(Pred.item()):
                    Correct0 += 1
            
            # Calculate loss
            criterion = nn.BCELoss()
            loss = criterion(Pred, Target)
            loss.backward()
            optimizer.step()

            total_loss += loss.item()
            elapsed_time = time.time() - start_time
            avg_time_per_iter = elapsed_time / ((epoch * Batch) + batch + 1)
            remaining_iters = (Epochs * Batch) - ((epoch * Batch) + batch + 1)
            remaining_time = avg_time_per_iter * remaining_iters
            Mins = int(remaining_time//60)
            Secs = int(remaining_time%60)
            print(f'LR: {LR:0.4} Batch [{batch+1}/{Batch}], Loss: {loss.item():.4f} Time:{Mins}:{Secs}, Correct0: {Correct0}/{Total0}, Correct1: {Correct1}/{Total1}', end='\r')

            # Display predictions on the frame
            Frame = cv2.cvtColor(Frames[N // 2].permute(1, 2, 0).numpy(), cv2.COLOR_BGR2RGB)
            Frame = cv2.putText(Frame, f'P: {round(Pred.item())}', (0, 60), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 1)
            Frame = cv2.putText(Frame, f'T: {int(Target.item())}', (0, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 1)
            cv2.imshow("Frame", Frame)
            cv2.waitKey(1)

        Loss.append(total_loss)
        print(f'\nEpoch [{epoch + 1}/{Epochs}], Loss: {total_loss / Batch:.6f}, Correct0: {Correct0}/{Total0}, Correct1: {Correct1}/{Total1}')
        
        LR *= 0.75
    
    States = Model.state_dict()
    
    cv2.destroyAllWindows()
    return Loss,States

# Training

In [74]:
cv2.destroyAllWindows()

In [75]:
State = None

In [77]:
L,States = train('VDB',Batch=50,State=State,N=5,Epochs=10,Steps=5,LR=1e-2)

LR: 0.01 Batch [50/50], Loss: 0.6920 Time:59:38, Correct0: 10/25, Correct1: 10/25
Epoch [1/10], Loss: 0.721118, Correct0: 10/25, Correct1: 10/25
LR: 0.0075 Batch [50/50], Loss: 0.6877 Time:53:48, Correct0: 9/25, Correct1: 15/25
Epoch [2/10], Loss: 0.702204, Correct0: 9/25, Correct1: 15/25
LR: 0.005625 Batch [50/50], Loss: 0.6848 Time:48:2, Correct0: 16/25, Correct1: 7/254
Epoch [3/10], Loss: 0.696926, Correct0: 16/25, Correct1: 7/25
LR: 0.004219 Batch [50/50], Loss: 0.7032 Time:42:25, Correct0: 24/25, Correct1: 0/25
Epoch [4/10], Loss: 0.695755, Correct0: 24/25, Correct1: 0/25
LR: 0.003164 Batch [50/50], Loss: 0.7090 Time:35:35, Correct0: 25/25, Correct1: 0/25
Epoch [5/10], Loss: 0.694621, Correct0: 25/25, Correct1: 0/25
LR: 0.002373 Batch [50/50], Loss: 0.7116 Time:28:37, Correct0: 25/25, Correct1: 0/25
Epoch [6/10], Loss: 0.694292, Correct0: 25/25, Correct1: 0/25
LR: 0.00178 Batch [50/50], Loss: 0.7144 Time:21:30, Correct0: 25/25, Correct1: 0/25
Epoch [7/10], Loss: 0.694325, Correct0

In [None]:
plt.plot(L[1:]) ##Cambios de Escenas

In [None]:
torch.save(States,"StatesV5.pth")

In [None]:
States = torch.load("StatesV5.pth")

In [None]:
#Que hace el mecanismo de atencion en procesamiento de imagenes
#Sino en PLN
#Embeding/Classes
#Cambios de Escenas con escena anterior (COnsecutivo)