This file creates a dataset of PNG images and saves them to the specified output directory.

In [22]:
output_dir = "data2/unzipped"
output_zip_dir = "data2/zips"
total_per_pattern = 2000

In [None]:
import os
import random
from multiprocessing import cpu_count
from multiprocessing.dummy import Pool 
from PIL import ImageDraw, Image
from tqdm import tqdm


def gen_coherent_candle(prev_close, max_dev):
    open_p = prev_close + random.uniform(-max_dev, max_dev)
    body = random.uniform(0.5, 2.0)
    close_p = open_p + random.choice([-1, 1]) * body
    high = max(open_p, close_p) + random.uniform(0.2, 0.7)
    low  = min(open_p, close_p) - random.uniform(0.2, 0.7)
    return (open_p, close_p, high, low), close_p

def gen_pattern_values(pattern_name, base):
    step = random.uniform(1.0, 2.0)
    candles = []
    if pattern_name == "bullish_spinning_top":
        o = base
        c = o + random.uniform(0.2, 0.6)
        h = c + random.uniform(0.6, 1.0)
        l = o - random.uniform(0.6, 1.0)
        candles = [(o, c, h, l)]

    elif pattern_name == "bearish_spinning_top":
        o = base
        c = o - random.uniform(0.2, 0.6)
        h = o + random.uniform(0.6, 1.0)
        l = c - random.uniform(0.6, 1.0)
        candles = [(o, c, h, l)]

    elif pattern_name == "three_white_soldiers":
        for i in range(3):
            o = base + i * step
            c = o + random.uniform(0.8 * step, 1.2 * step)
            h = c + 0.5
            l = o - 0.5
            candles.append((o, c, h, l))

    elif pattern_name == "three_black_crows":
        for i in range(3):
            o = base - i * step
            c = o - random.uniform(0.8 * step, 1.2 * step)
            h = o + 0.5
            l = c - 0.5
            candles.append((o, c, h, l))

    elif pattern_name == "rising_three":
        c1  = (base, base+step, base+step+0.5, base-0.5)
        m1  = (c1[1], c1[1]-0.4, c1[1]+0.3, c1[1]-0.7)
        m2  = (m1[1], m1[1]-0.3, m1[1]+0.3, m1[1]-0.7)
        m3  = (m2[1], m2[1]-0.3, m2[1]+0.3, m2[1]-0.7)
        c5  = (m3[1], c1[1]+step, c1[1]+step+0.5, m3[1]-0.5)
        candles = [c1, m1, m2, m3, c5]

    elif pattern_name == "falling_three":
        c1  = (base, base-step, base+0.5, base-step-0.5)
        m1  = (c1[1], c1[1]+0.4, c1[1]+0.7, c1[1]-0.3)
        m2  = (m1[1], m1[1]+0.3, m1[1]+0.6, m1[1]-0.3)
        m3  = (m2[1], m2[1]+0.3, m2[1]+0.6, m2[1]-0.3)
        c5  = (m3[1], c1[1]-step, m3[1]+0.5, c1[1]-step-0.5)
        candles = [c1, m1, m2, m3, c5]

    elif pattern_name == "piercing_line":
        black = (base, base-step, base+0.5, base-step-0.5)
        white = (base-step-0.2, base-step/2, base-step/2+0.5, base-step-0.7)
        candles = [black, white]

    elif pattern_name == "dark_cloud_cover":
        white = (base, base+step, base+step+0.5, base-0.5)
        black = (base+step+0.2, base+step/2, base+step+0.8, base+step/2-0.6)
        candles = [white, black]

    elif pattern_name == "random_pattern":
        num_candles = random.randint(1, 5)
        current = base
        for _ in range(num_candles):
            cdl, current = gen_coherent_candle(current, max_dev=5)
            candles.append(cdl)

    return candles

def build_sequence_with_pattern(pattern_name, insert_at):
    max_dev = random.uniform(0.5, 2.0)
    current_close = random.uniform(90, 110)
    seq = []

    for _ in range(insert_at):
        candle, current_close = gen_coherent_candle(current_close, max_dev)
        seq.append(candle)

    base_noise = current_close + random.uniform(-0.1, 0.1)
    pat = gen_pattern_values(pattern_name, base_noise)
    for o, c, h, l in pat:
        seq.append((o, c, h, l))
        current_close = c

    # On d√©cide si on impose la direction (80%) ou on laisse al√©atoire (20%)
    is_bullish_pattern = (
        pattern_name.startswith("bullish") or
        pattern_name in ["three_white_soldiers", "rising_three", "piercing_line"]
    )
    if random.random() < 0.8:
        # on force up si bullish, down si bearish
        body = random.uniform(0.5, 2.0)
        open_p = current_close + random.uniform(-max_dev, max_dev)
        close_p = open_p + (body if is_bullish_pattern else -body)
        high = max(open_p, close_p) + random.uniform(0.2, 0.7)
        low  = min(open_p, close_p) - random.uniform(0.2, 0.7)
        candle = (open_p, close_p, high, low)
    else:
        candle, current_close = gen_coherent_candle(current_close, max_dev)
    # on met √† jour seq et current_close
    seq.append(candle)
    current_close = candle[1]

    while len(seq) < 10:
        candle, current_close = gen_coherent_candle(current_close, max_dev)
        seq.append(candle)

    return seq

def save_sequence_image(candles, filepath, size=64):
    img = Image.new("L", (size, size), color=255)  # Grayscale canvas, white background
    draw = ImageDraw.Draw(img)

    n = len(candles)
    spacing = size / (n + 1)
    candle_width = spacing * 0.5

    # Compute Y scale
    lows = [l for (_, _, _, l) in candles]
    highs = [h for (_, _, h, _) in candles]
    y_min, y_max = min(lows), max(highs)
    y_range = y_max - y_min or 1e-6  # avoid division by zero

    def to_y(val):
        return size - int((val - y_min) / y_range * (size - 4)) - 2  # keep small margin

    for i, (o, c, h, l) in enumerate(candles):
        x_center = int((i + 1) * spacing)
        x0 = int(x_center - candle_width / 2)
        x1 = int(x_center + candle_width / 2)

        # Wick
        y_high = to_y(h)
        y_low = to_y(l)
        draw.line((x_center, y_high, x_center, y_low), fill=0, width=1)  # black wick

        # Body
        y_open = to_y(o)
        y_close = to_y(c)
        top = min(y_open, y_close)
        bottom = max(y_open, y_close)
        fill = 255 if c > o else 0  # white for bullish, black for bearish
        draw.rectangle((x0, top, x1, bottom), fill=fill, outline=0)

    img.save(filepath)

def generate_single_image(args):
    seq = []
    while len(seq) < 10 or len(seq) > 11:
        # make sure the seq is a list of 10 candles
        pattern, i, output_root = args
        pattern_dir = os.path.join(output_root, pattern)
        pat_len = len(gen_pattern_values(pattern, 100))
        insert_at = random.randint(0, 10 - pat_len)
        seq = build_sequence_with_pattern(pattern, insert_at)
        filename = f"{pattern}_{i:05d}.png"
        if len(seq) < 10 or len(seq) > 11:
            pass
            #print("file:", filename, "has wrong length:", len(seq))
    save_sequence_image(seq, os.path.join(pattern_dir, filename))

def generate_dataset(output_root="data", total_per_pattern=1000):
    patterns = [
        "bullish_spinning_top",
        "bearish_spinning_top",
        "three_white_soldiers",
        "three_black_crows",
        "rising_three",
        "falling_three",
        "piercing_line",
        "dark_cloud_cover",
        "random_pattern"
    ]

    os.makedirs(output_root, exist_ok=True)

    for pattern in patterns:
        pattern_dir = os.path.join(output_root, pattern)
        os.makedirs(pattern_dir, exist_ok=True)

        args_list = [(pattern, i, output_root) for i in range(total_per_pattern)]

        with Pool(cpu_count()) as pool:
            list(tqdm(pool.imap_unordered(generate_single_image, args_list), total=total_per_pattern, desc=f"Gen {pattern}"))


In [24]:
generate_dataset(output_root=output_dir, total_per_pattern=total_per_pattern)

Gen bullish_spinning_top:   8%|‚ñä         | 162/2000 [00:00<00:01, 1600.81it/s]

Gen bullish_spinning_top: 100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 2000/2000 [00:01<00:00, 1329.47it/s]
Gen bearish_spinning_top: 100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 2000/2000 [00:01<00:00, 1353.95it/s]
Gen three_white_soldiers: 100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 2000/2000 [00:01<00:00, 1333.82it/s]
Gen three_black_crows: 100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 2000/2000 [00:01<00:00, 1458.34it/s]
Gen rising_three: 100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 2000/2000 [00:01<00:00, 1482.09it/s]
Gen falling_three: 100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 2000/2000 [00:01<00:00, 1442.17it/s]
Gen piercing_line: 100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 2000/2000 [00:01<00:00, 1500.22it/s]
Gen dark_cloud_cover: 100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 2000/2000 [00:01<00:00, 1528.71it/s]
Gen random_pattern: 100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 2000/2000 [00:01<00:00, 1349.93it/s]


In [27]:
import os
import shutil
import zipfile

def zip_each_pattern(source="data/unzipped", destination="data/zips"):
    """
    Zips each subfolder (pattern) inside the source directory

    Parameters:
        source (str): The parent folder containing one subfolder per pattern
    """
    # If destination folder does not exist, create it
    if not os.path.exists(destination):
        os.makedirs(destination)
    # List all pattern subfolders
    for pattern_name in os.listdir(source):
        pattern_path = os.path.join(source, pattern_name)
        if not os.path.isdir(pattern_path):
            continue  # Skip files

        zip_filename = f"{pattern_name}.zip"
        zip_path = os.path.join(destination, zip_filename)

        # Zip the current folder
        with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
            for root, _, files in os.walk(pattern_path):
                for file in files:
                    file_path = os.path.join(root, file)
                    arcname = os.path.relpath(file_path, start=pattern_path)
                    zipf.write(file_path, arcname)

        print(f"üóúÔ∏è Zipped {pattern_name} ‚Üí {zip_path}")


In [28]:
zip_each_pattern(source=output_dir, destination=output_zip_dir)

üóúÔ∏è Zipped falling_three ‚Üí data2/zips/falling_three.zip
üóúÔ∏è Zipped bullish_spinning_top ‚Üí data2/zips/bullish_spinning_top.zip
üóúÔ∏è Zipped rising_three ‚Üí data2/zips/rising_three.zip
üóúÔ∏è Zipped piercing_line ‚Üí data2/zips/piercing_line.zip
üóúÔ∏è Zipped three_white_soldiers ‚Üí data2/zips/three_white_soldiers.zip
üóúÔ∏è Zipped dark_cloud_cover ‚Üí data2/zips/dark_cloud_cover.zip
üóúÔ∏è Zipped random_pattern ‚Üí data2/zips/random_pattern.zip
üóúÔ∏è Zipped bearish_spinning_top ‚Üí data2/zips/bearish_spinning_top.zip
üóúÔ∏è Zipped three_black_crows ‚Üí data2/zips/three_black_crows.zip
