In [None]:
import os
import ffmpeg
import numpy as np
import cv2

# Create a directory to store the frames
output_image_dir = "output_frames"
if os.path.exists(output_image_dir):
    for file in os.listdir(output_image_dir):
        file_path = os.path.join(output_image_dir, file)
        if os.path.isfile(file_path):
            os.unlink(file_path)
os.makedirs(output_image_dir, exist_ok=True)

# Function to find the two strongest horizontal lines in a single frame
def find_two_strongest_horizontal_lines(frame):
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    blurred = cv2.GaussianBlur(gray, (5, 5), 0)  # Apply Gaussian blur before edge detection
    edges = cv2.Canny(blurred, 30, 100, apertureSize=3)  # Lowered thresholds for more sensitivity
    lines = cv2.HoughLines(edges, 1, np.pi / 180, 200)  # Reduced threshold for more lines

    top = 0
    bottom = frame.shape[0]

    if lines is not None:
        strongest_lines = []
        for rho, theta in lines[:, 0]:
            theta_diff = abs(theta - np.pi / 2)
            strongest_lines.append((theta_diff, rho, theta))

        strongest_lines = sorted(strongest_lines, key=lambda x: x[0])[:2]

        if len(strongest_lines) == 2:
            y_coords = []
            for _, rho, theta in strongest_lines:
                a = np.cos(theta)
                b = np.sin(theta)
                y0 = b * rho
                y_coords.append(int(y0))

            y_coords.sort()
            if len(y_coords) == 2 and y_coords[0] < y_coords[1]:
                top = max(0, y_coords[0] - 10)
                bottom = min(frame.shape[0], y_coords[1] + 10)

    return top, bottom

# Function to segment white and black keys using binary masks
def segment_piano_keys(frame):
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

    # Threshold to create a binary mask for white keys
    _, white_key_mask = cv2.threshold(gray, 100, 255, cv2.THRESH_BINARY)

    # Threshold to create a binary mask for black keys (invert white keys mask)
    black_key_mask = cv2.bitwise_not(white_key_mask)

    return white_key_mask, black_key_mask

# Function for seeded segmentation based on black key positions
def seeded_segmentation(black_key_mask, frame):
    # Find contours in the black key mask
    contours, _ = cv2.findContours(black_key_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)

    seeds = []
    for contour in contours:
        x, y, w, h = cv2.boundingRect(contour)
        # Center of the black key
        seed_center = (x + w // 2, y + h // 2)

        # Ensure the seed is within the contour
        if cv2.pointPolygonTest(contour, seed_center, False) >= 0:
            seeds.append(seed_center)

        # Left and right of the black key
        seed_left = (x, y + h // 2)
        seed_right = (x + w, y + h // 2)

        if cv2.pointPolygonTest(contour, seed_left, False) >= 0:
            seeds.append(seed_left)
        if cv2.pointPolygonTest(contour, seed_right, False) >= 0:
            seeds.append(seed_right)

    # Visualize seeds on the frame
    for seed in seeds:
        cv2.circle(frame, seed, 5, (0, 255, 0), -1)  # Draw seeds as green circles

    # Create a labeled image for seeded segmentation
    labeled_mask = np.zeros_like(black_key_mask, dtype=np.int32)
    for i, seed in enumerate(seeds):
        if 0 <= seed[0] < black_key_mask.shape[1] and 0 <= seed[1] < black_key_mask.shape[0]:
            labeled_mask[seed[1], seed[0]] = i + 1

    # Perform watershed algorithm for segmentation
    gray = cv2.cvtColor(black_key_mask, cv2.COLOR_GRAY2BGR)
    distance_transform = cv2.distanceTransform(black_key_mask, cv2.DIST_L2, 5)
    _, sure_fg = cv2.threshold(distance_transform, 0.9 * distance_transform.max(), 255, 0)
    sure_fg = np.uint8(sure_fg)
    unknown = cv2.subtract(black_key_mask, sure_fg)

    markers = cv2.connectedComponents(sure_fg)[1]
    markers = markers + 1
    markers[unknown == 255] = 0

    segmented = cv2.watershed(gray, markers)

    # Overlay the segmented regions on the original frame
    frame[segmented == -1] = [0, 0, 255]  # Mark boundaries in red

    return frame

# Input video file
input_video_path = "dataset/MIDItest/miditest_videos/5.mp4"

# Probe video dimensions
probe = ffmpeg.probe(input_video_path)
video_stream = next(stream for stream in probe['streams'] if stream['codec_type'] == 'video')
width = int(video_stream['width'])
height = int(video_stream['height'])

# Read the first frame to determine cropping boundaries
process = (
    ffmpeg.input(input_video_path)
    .output('pipe:', format='rawvideo', pix_fmt='bgr24')
    .run_async(pipe_stdout=True)
)

in_bytes = process.stdout.read(width * height * 3)
frame = np.frombuffer(in_bytes, np.uint8).reshape([height, width, 3])

top, bottom = find_two_strongest_horizontal_lines(frame)
process.stdout.close()
process.wait()

# Read video frames again and save each frame as an image
process = (
    ffmpeg.input(input_video_path)
    .output('pipe:', format='rawvideo', pix_fmt='bgr24')
    .run_async(pipe_stdout=True)
)

frame_count = 0

while True:
    in_bytes = process.stdout.read(width * height * 3)
    if not in_bytes:
        break

    frame = np.frombuffer(in_bytes, np.uint8).reshape([height, width, 3])

    # Crop the frame
    cropped_frame = frame[top:bottom, :]
    
    # Create a writable copy of the cropped frame
    cropped_frame = cropped_frame.copy()

    # Segment piano keys in the cropped frame
    white_key_mask, black_key_mask = segment_piano_keys(cropped_frame)

    # Perform seeded segmentation
    segmented_frame = seeded_segmentation(black_key_mask, cropped_frame)

    # Save the segmented frame
    segmented_path = os.path.join(output_image_dir, f"segmented_frame_{frame_count:04d}.jpg")
    cv2.imwrite(segmented_path, segmented_frame)

    frame_count += 1
    print(f"Saved {segmented_path}")

# Cleanup
process.stdout.close()
process.wait()
cv2.destroyAllWindows()


In [29]:
import pretty_midi
import numpy as np
import json

def midi_to_piano_vector(file_path):
    # Load the MIDI file
    midi_data = pretty_midi.PrettyMIDI(file_path)

    # Define the range of piano notes (A0 to C8 corresponds to MIDI notes 21 to 108)
    piano_range = range(21, 109)

    # Collect all note events
    note_events = []
    for instrument in midi_data.instruments:
        for note in instrument.notes:
            # Normalize the velocity to the range [0, 1]
            normalized_velocity = note.velocity / 127.0
            note_name = pretty_midi.note_number_to_name(note.pitch)
            note_events.append((note.start, note.pitch, normalized_velocity, 'on', note_name))  # Note-on event
            note_events.append((note.end, note.pitch, normalized_velocity, 'off', note_name))  # Note-off event

    # Sort events by time
    note_events.sort(key=lambda x: x[0])

    # Create a list to store the output
    piano_vectors = []
 
    # Initialize the state of the piano (88-dimensional vector)
    piano_state = np.zeros(88, dtype=float)

    # Process each event
    for event in note_events:
        timestamp, pitch, velocity, event_type, note_name = event

        if pitch in piano_range:
            note_index = pitch - 21  # Map pitch to the 88-key piano range

            if event_type == 'on':
                piano_state[note_index] = velocity  # Set the normalized velocity
            elif event_type == 'off':
                piano_state[note_index] = 0  # Release the note

        # Shift timestamp by 0.6 seconds
        shifted_timestamp = timestamp - 0.25

        # Append the current state, shifted timestamp, and note name
        piano_vectors.append({"timestamp": shifted_timestamp, "vector": piano_state.tolist(), "note_name": note_name})

    return piano_vectors, midi_data.get_end_time()

# Example usage
def save_output_to_file(midi_file, output_file):
    output, midi_duration = midi_to_piano_vector(midi_file)
    
    # Include the total duration in the output
    result = {
        "duration": midi_duration,
        "piano_vectors": output
    }

    # Output the result to a JSON file
    with open(output_file, 'w') as f:
        json.dump(result, f, indent=4)

# Replace with your MIDI file path and desired output file path
midi_file = '/Users/tunaonat/Desktop/proje-git/piano-transcription/dataset/MIDItest/miditest_MIDI/5.mid'
output_file = 'output.json'
save_output_to_file(midi_file, output_file)


In [None]:
import os
import ffmpeg
import numpy as np
import cv2

# Create a directory to store the difference frames
output_diff_dir = "output_differences"
if os.path.exists(output_diff_dir):
    for file in os.listdir(output_diff_dir):
        file_path = os.path.join(output_diff_dir, file)
        if os.path.isfile(file_path):
            os.unlink(file_path)
os.makedirs(output_diff_dir, exist_ok=True)

# Function to find the two strongest horizontal lines in a single frame
def find_two_strongest_horizontal_lines(frame):
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    blurred = cv2.GaussianBlur(gray, (5, 5), 0)
    edges = cv2.Canny(blurred, 30, 100, apertureSize=3)
    lines = cv2.HoughLines(edges, 1, np.pi / 180, 200)

    top = 0
    bottom = frame.shape[0]

    if lines is not None:
        strongest_lines = []
        for rho, theta in lines[:, 0]:
            theta_diff = abs(theta - np.pi / 2)
            strongest_lines.append((theta_diff, rho, theta))

        strongest_lines = sorted(strongest_lines, key=lambda x: x[0])[:2]

        if len(strongest_lines) == 2:
            y_coords = []
            for _, rho, theta in strongest_lines:
                a = np.cos(theta)
                b = np.sin(theta)
                y0 = b * rho
                y_coords.append(int(y0))

            y_coords.sort()
            if len(y_coords) == 2 and y_coords[0] < y_coords[1]:
                top = max(0, y_coords[0] - 10)
                bottom = min(frame.shape[0], y_coords[1] + 10)

    return top, bottom

# Input video file
input_video_path = "/Users/tunaonat/Desktop/proje-git/piano-transcription/dataset/MIDItest/miditest_videos/5.mp4"

# Probe video dimensions
probe = ffmpeg.probe(input_video_path)
video_stream = next(stream for stream in probe['streams'] if stream['codec_type'] == 'video')
width = int(video_stream['width'])
height = int(video_stream['height'])
fps = eval(video_stream['r_frame_rate'])  # Get FPS as a float

# Read the first frame to determine cropping boundaries
process = (
    ffmpeg.input(input_video_path)
    .output('pipe:', format='rawvideo', pix_fmt='bgr24')
    .run_async(pipe_stdout=True)
)

in_bytes = process.stdout.read(width * height * 3)
frame = np.frombuffer(in_bytes, np.uint8).reshape([height, width, 3])

top, bottom = find_two_strongest_horizontal_lines(frame)
process.stdout.close()
process.wait()

# Read video frames again and calculate differences
process = (
    ffmpeg.input(input_video_path)
    .output('pipe:', format='rawvideo', pix_fmt='bgr24')
    .run_async(pipe_stdout=True)
)

prev_frame = None
frame_count = 0

while True:
    in_bytes = process.stdout.read(width * height * 3)
    if not in_bytes:
        break

    frame = np.frombuffer(in_bytes, np.uint8).reshape([height, width, 3])

    # Crop the frame
    cropped_frame = frame[top:bottom, :]

    # Convert to grayscale for difference calculation
    gray_frame = cv2.cvtColor(cropped_frame, cv2.COLOR_BGR2GRAY)

    if prev_frame is not None:
        # Compute absolute difference
        diff = cv2.absdiff(prev_frame, gray_frame)

        # Generate timestamp
        timestamp = frame_count / fps
        timestamp_text = f"{timestamp:.2f}s"

        # Save the difference frame with timestamp in the filename
        diff_path = os.path.join(output_diff_dir, f"frame_diff_{timestamp_text}_{frame_count:04d}.jpg")
        cv2.imwrite(diff_path, diff)

        print(f"Saved {diff_path}")

    # Update previous frame
    prev_frame = gray_frame
    frame_count += 1

# Cleanup
process.stdout.close()
process.wait()
cv2.destroyAllWindows()


In [None]:
import cv2
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import os
import json
from sklearn.model_selection import train_test_split

# Step 1: Load Preprocessed Frames

def load_frames(output_folder):
    frames = []
    frame_timestamps = []
    frame_files = sorted(os.listdir(output_folder))  # Ensure frames are sorted by time

    for file in frame_files:
        if file.endswith(".jpg"):
            frame_path = os.path.join(output_folder, file)
            frame = cv2.imread(frame_path, cv2.IMREAD_GRAYSCALE)
            height, width = frame.shape

            # Split the frame into top and bottom halves and stack them
            top_half = frame[:height // 2, :]
            bottom_half = frame[height // 2:, :]
            stacked_frame = np.vstack((top_half, bottom_half))

            frames.append(stacked_frame)

            # Extract timestamp from filename
            timestamp = float(file.split("_")[2][:-1])  # Extract the seconds value (e.g., 0.24s)
            frame_timestamps.append(timestamp)

    return np.array(frames), np.array(frame_timestamps)

# Step 2: Resize Frames and Save to Folder

def save_resized_frames(frames, target_size, output_folder):
    os.makedirs(output_folder, exist_ok=True)
    for idx, frame in enumerate(frames):
        resized_frame = cv2.resize(frame, target_size)
        output_path = os.path.join(output_folder, f"frame_{idx:04d}.jpg")
        cv2.imwrite(output_path, resized_frame)

# Step 3: Load JSON Data and Synchronize

def load_labels(json_path):
    with open(json_path, 'r') as f:
        data = json.load(f)

    label_timestamps = [entry['timestamp'] for entry in data['piano_vectors']]
    vectors = [entry['vector'] for entry in data['piano_vectors']]
    return np.array(label_timestamps), np.array(vectors)


def synchronize_data(frames, frame_timestamps, label_timestamps, vectors):
    synchronized_frames = []
    synchronized_vectors = []

    for label_timestamp in label_timestamps:
        # Find the three closest frame timestamps
        closest_indices = np.argsort(np.abs(frame_timestamps - label_timestamp))[:3]

        # Stack the three closest frames as channels
        selected_frames = np.stack([frames[idx] for idx in closest_indices], axis=-1)

        # Append the synchronized data
        synchronized_frames.append(selected_frames)
        synchronized_vectors.append(vectors[np.argmin(np.abs(label_timestamps - label_timestamp))])

    return np.array(synchronized_frames), np.array(synchronized_vectors)

# Step 4: Prepare Dataset

class PianoDataset(Dataset):
    def __init__(self, frames, vectors, target_size=(64, 64)):
        resized_frames = [cv2.resize(frame, target_size) for frame in frames]
        self.frames = torch.tensor(resized_frames, dtype=torch.float32).permute(0, 3, 1, 2) / 255.0  # Normalize and rearrange
        self.vectors = torch.tensor(vectors, dtype=torch.float32)

    def __len__(self):
        return len(self.frames)

    def __getitem__(self, idx):
        return self.frames[idx], self.vectors[idx]

# Step 5: Define CNN Model

class PianoCNN(nn.Module):
    def __init__(self, input_channels, output_dim):
        super(PianoCNN, self).__init__()
        self.conv1 = nn.Conv2d(input_channels, 32, kernel_size=3)
        self.pool1 = nn.MaxPool2d(kernel_size=2)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3)
        self.pool2 = nn.MaxPool2d(kernel_size=2)

        # Calculate input size dynamically
        dummy_input = torch.zeros(1, input_channels, 64, 64)  # Assuming resized input to 64x64
        with torch.no_grad():
            dummy_output = self.pool2(self.pool1(self.conv2(self.pool1(self.conv1(dummy_input)))))
        fc1_input_dim = dummy_output.numel()

        self.fc1 = nn.Linear(fc1_input_dim, 128)
        self.fc2 = nn.Linear(128, output_dim)

    def forward(self, x):
        x = torch.relu(self.conv1(x))
        x = self.pool1(x)
        x = torch.relu(self.conv2(x))
        x = self.pool2(x)
        x = x.view(x.size(0), -1)  # Flatten the tensor
        x = torch.relu(self.fc1(x))
        x = self.fc2(x)
        return x

# Step 6: Train and Evaluate Model

def train_model(model, dataloader, criterion, optimizer, device, epochs=10):
    model = model.to(device)
    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        for inputs, targets in dataloader:
            inputs, targets = inputs.to(device), targets.to(device)

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()

        print(f"Epoch {epoch+1}/{epochs}, Loss: {running_loss/len(dataloader)}")

# Example Usage
if __name__ == "__main__":
    # Paths to output folder and JSON file
    output_folder = "output_differences"
    json_path = "output.json"
    resized_folder = "cnn_input"

    # Load preprocessed frames
    frames, frame_timestamps = load_frames(output_folder)
    frame_height, frame_width = frames[0].shape[:2]  # Automatically detect frame dimensions

    # Save resized frames
    save_resized_frames(frames, target_size=(64, 64), output_folder=resized_folder)

    # Load labels and synchronize
    label_timestamps, vectors = load_labels(json_path)
    synchronized_frames, synchronized_vectors = synchronize_data(frames, frame_timestamps, label_timestamps, vectors)

    # Prepare dataset
    dataset = PianoDataset(synchronized_frames, synchronized_vectors, target_size=(64, 64))
    dataloader = DataLoader(dataset, batch_size=32, shuffle=True)

    # Define model, criterion, and optimizer
    input_channels = 3  # Three stacked frames
    output_dim = synchronized_vectors.shape[1]
    model = PianoCNN(input_channels, output_dim)
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    # Train the model
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    train_model(model, dataloader, criterion, optimizer, device, epochs=10)

    # Save the model
    torch.save(model.state_dict(), "piano_key_model.pth")
