<a href="https://colab.research.google.com/github/Sahilvaghasiyaa/-Market-Trend-classifier-and-analyzer/blob/main/cvpreprocessing.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import cv2
import numpy as np
import os
from tqdm import tqdm
from google.colab import drive
import zipfile
import shutil

def crop_and_detect_edges(image_path, crop_margin=3, output_size=(512, 512)):
    """
    Process a single image: crop it to the chart area, resize, and detect edges.

    Args:
        image_path: Path to the input image
        crop_margin: Margin to apply during cropping
        output_size: Output image size

    Returns:
        edges: Edge-detected, cropped chart
    """
    # Load image
    img = cv2.imread(image_path)

    # Check if image was loaded properly
    if img is None:
        print(f"Failed to load image: {image_path}")
        return None

    # Convert to grayscale
    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)

    # Apply binary threshold (detect chart area)
    _, thresh = cv2.threshold(gray, 240, 255, cv2.THRESH_BINARY_INV)

    # Find contours (detect chart boundary)
    contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)

    # Check if any contours were found
    if not contours:
        print(f"No contours found in image: {image_path}")
        # If no contours, just resize the original image and apply edge detection
        resized = cv2.resize(img, output_size, interpolation=cv2.INTER_AREA)
        gray_resized = cv2.cvtColor(resized, cv2.COLOR_BGR2GRAY)
        blurred = cv2.GaussianBlur(gray_resized, (5, 5), 0)
        edges = cv2.Canny(blurred, threshold1=20, threshold2=100)
        return edges

    # Find bounding box of the largest contour (chart area)
    x, y, w, h = cv2.boundingRect(max(contours, key=cv2.contourArea))

    # Slightly adjust the crop to remove extra border
    x = max(0, x + crop_margin)
    y = max(0, y + crop_margin)
    w = max(0, w - 2 * crop_margin)
    h = max(0, h - 2 * crop_margin)

    # Crop the chart region
    cropped_chart = img[y:y+h, x:x+w]

    # Resize cropped image to maintain fixed size
    cropped_chart_resized = cv2.resize(cropped_chart, output_size, interpolation=cv2.INTER_AREA)

    # Convert to grayscale for edge detection
    cropped_gray = cv2.cvtColor(cropped_chart_resized, cv2.COLOR_BGR2GRAY)

    # Apply Gaussian Blur to reduce noise
    blurred_gray = cv2.GaussianBlur(cropped_gray, (5, 5), 0)

    # Apply Canny Edge Detection
    edges = cv2.Canny(blurred_gray, threshold1=20, threshold2=100)

    return edges

def process_folder(input_folder, output_folder):
    """
    Process all images in a folder and save to output folder.

    Args:
        input_folder: Folder containing input images
        output_folder: Folder to save processed images
    """
    # Check if input folder exists
    if not os.path.exists(input_folder):
        print(f"Input folder not found: {input_folder}")
        return

    # Get all image files
    image_files = [f for f in os.listdir(input_folder) if f.lower().endswith(('.png', '.jpg', '.jpeg'))]

    if not image_files:
        print(f"No image files found in {input_folder}")
        return

    print(f"Found {len(image_files)} images to process")

    # Process each image
    for image_file in tqdm(image_files):
        # Input and output paths
        input_path = os.path.join(input_folder, image_file)

        # Keep the same file name for output (maintain indexes)
        output_path = os.path.join(output_folder, image_file)

        # Process image
        edges = crop_and_detect_edges(input_path)

        # Save processed image if successful
        if edges is not None:
            cv2.imwrite(output_path, edges)

def extract_zip_file(zip_path, extract_to):
    """
    Extract a zip file to the specified directory

    Args:
        zip_path: Path to the zip file
        extract_to: Directory to extract to
    """
    print(f"Extracting {zip_path} to {extract_to}")
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall(extract_to)
    print(f"Extraction completed")

def main():
    # Mount Google Drive
    drive.mount('/content/drive')

    # Define paths
    base_dir = "/content/drive/MyDrive/FInancialMArkets data"  # Using the path from your output

    if not os.path.exists(base_dir):
        print(f"Base directory not found: {base_dir}")
        return

    # Create temp directory for extraction
    temp_dir = "/content/temp_data"
    os.makedirs(temp_dir, exist_ok=True)

    # Create output directory - CHANGED TO "datapreprocessed"
    output_base_dir = os.path.join(base_dir, "datapreprocessed")
    os.makedirs(output_base_dir, exist_ok=True)

    # Check for zip files
    zip_files = [f for f in os.listdir(base_dir) if f.lower().endswith('.zip')]

    if not zip_files:
        print(f"No zip files found in {base_dir}")
        return

    print(f"Found zip files: {zip_files}")

    # Process each zip file
    for zip_file in zip_files:
        # Extract zip file
        zip_path = os.path.join(base_dir, zip_file)
        extract_dir = os.path.join(temp_dir, zip_file[:-4])  # Remove .zip extension

        # Skip if already extracted
        if not os.path.exists(extract_dir):
            extract_zip_file(zip_path, extract_dir)
        else:
            print(f"{extract_dir} already exists, skipping extraction")

        # Get the class name (buy, sell, sideways)
        class_name = zip_file[:-4]  # Remove .zip extension
        if class_name.endswith("class"):
            prefix = class_name[:-5]  # Remove "class" suffix
        else:
            prefix = class_name

        # Look for input and output folders
        found_folders = os.listdir(extract_dir)

        # Print the contents of the extracted directory to help with debugging
        print(f"Contents of {extract_dir}:")
        for item in found_folders:
            print(f"  - {item}")

            # If it's a directory, check its contents
            item_path = os.path.join(extract_dir, item)
            if os.path.isdir(item_path):
                try:
                    subcontents = os.listdir(item_path)
                    print(f"    Contents: {subcontents[:5]}{'...' if len(subcontents) > 5 else ''}")
                except Exception as e:
                    print(f"    Error reading directory: {e}")

        # Check for different folder structures
        input_dir = None
        output_dir = None

        # Look for direct input/output folders
        input_folder_name = f"{prefix}input"
        output_folder_name = f"{prefix}output"

        # First try direct structure
        if input_folder_name in found_folders:
            input_dir = os.path.join(extract_dir, input_folder_name)
            print(f"Found input directory: {input_dir}")

        if output_folder_name in found_folders:
            output_dir = os.path.join(extract_dir, output_folder_name)
            print(f"Found output directory: {output_dir}")

        # If not found, try nested structure
        if not input_dir:
            for folder in found_folders:
                folder_path = os.path.join(extract_dir, folder)
                if os.path.isdir(folder_path):
                    # Check if input folder is inside
                    nested_input = os.path.join(folder_path, input_folder_name)
                    if os.path.exists(nested_input):
                        input_dir = nested_input
                        print(f"Found nested input directory: {input_dir}")
                        break

        if not output_dir:
            for folder in found_folders:
                folder_path = os.path.join(extract_dir, folder)
                if os.path.isdir(folder_path):
                    # Check if output folder is inside
                    nested_output = os.path.join(folder_path, output_folder_name)
                    if os.path.exists(nested_output):
                        output_dir = nested_output
                        print(f"Found nested output directory: {output_dir}")
                        break

        # Create unique output directories for each class
        processed_input_dir = os.path.join(output_base_dir, f"{class_name}_{input_folder_name}")
        processed_output_dir = os.path.join(output_base_dir, f"{class_name}_{output_folder_name}")

        os.makedirs(processed_input_dir, exist_ok=True)
        os.makedirs(processed_output_dir, exist_ok=True)

        # Process the input and output folders if found
        if input_dir:
            print(f"Processing {input_folder_name}...")
            process_folder(input_dir, processed_input_dir)
        else:
            print(f"Could not find {input_folder_name} folder in the extracted zip")

        if output_dir:
            print(f"Processing {output_folder_name}...")
            process_folder(output_dir, processed_output_dir)
        else:
            print(f"Could not find {output_folder_name} folder in the extracted zip")

    # Clean up temp directory to save space
    print("Cleaning up temporary files...")
    shutil.rmtree(temp_dir)

    print("Image preprocessing completed!")

if __name__ == "__main__":
    main()

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Found zip files: ['buyclass.zip', 'sellclass.zip', 'sidewaysclass.zip']
Extracting /content/drive/MyDrive/FInancialMArkets data/buyclass.zip to /content/temp_data/buyclass
Extraction completed
Contents of /content/temp_data/buyclass:
  - buyoutput
    Contents: ['outputimg4930.png', 'outputimg334.png', 'outputimg2348.png', 'outputimg4500.png', 'outputimg4957.png']...
  - buyinput
    Contents: ['inputimg1713.png', 'inputimg2118.png', 'inputimg2623.png', 'inputimg603.png', 'inputimg3525.png']...
Found input directory: /content/temp_data/buyclass/buyinput
Found output directory: /content/temp_data/buyclass/buyoutput
Processing buyinput...
Found 5000 images to process


100%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆ| 5000/5000 [01:23<00:00, 60.24it/s]


Processing buyoutput...
Found 5000 images to process


100%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆ| 5000/5000 [01:22<00:00, 60.47it/s]


Extracting /content/drive/MyDrive/FInancialMArkets data/sellclass.zip to /content/temp_data/sellclass
Extraction completed
Contents of /content/temp_data/sellclass:
  - selloutput
    Contents: ['outputimg4930.png', 'outputimg334.png', 'outputimg2348.png', 'outputimg4500.png', 'outputimg4957.png']...
  - sellinput
    Contents: ['inputimg1713.png', 'inputimg2118.png', 'inputimg2623.png', 'inputimg603.png', 'inputimg3525.png']...
Found input directory: /content/temp_data/sellclass/sellinput
Found output directory: /content/temp_data/sellclass/selloutput
Processing sellinput...
Found 5000 images to process


100%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆ| 5000/5000 [01:21<00:00, 61.00it/s]


Processing selloutput...
Found 5000 images to process


100%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆ| 5000/5000 [01:25<00:00, 58.75it/s]


Extracting /content/drive/MyDrive/FInancialMArkets data/sidewaysclass.zip to /content/temp_data/sidewaysclass
Extraction completed
Contents of /content/temp_data/sidewaysclass:
  - sidewaysinput
    Contents: ['inputimg1713.png', 'inputimg2118.png', 'inputimg2623.png', 'inputimg603.png', 'inputimg3525.png']...
  - sidewaysoutput
    Contents: ['outputimg4930.png', 'outputimg334.png', 'outputimg2348.png', 'outputimg4500.png', 'outputimg4957.png']...
Found input directory: /content/temp_data/sidewaysclass/sidewaysinput
Found output directory: /content/temp_data/sidewaysclass/sidewaysoutput
Processing sidewaysinput...
Found 5000 images to process


100%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆ| 5000/5000 [01:22<00:00, 60.40it/s]


Processing sidewaysoutput...
Found 5000 images to process


100%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆ| 5000/5000 [01:22<00:00, 60.42it/s]


Cleaning up temporary files...
Image preprocessing completed!


In [None]:
import shutil
from google.colab import files

# ðŸ“‚ Specify the folder you want to save
folder_to_save = "sidewaysclass"  # Change this to your folder name
zip_filename = f"{folder_to_save}.zip"                                                            ######save

# ðŸ“¦ Zip the folder
shutil.make_archive(folder_to_save, 'zip', folder_to_save)

# ðŸ“¥ Download the zipped folder
files.download(zip_filename)