# developing external testting tricks?

## the OpenAI/BigCode variant only works on UNix

## I know that tempdir and subprocess works and does timeout... but can you actually recover from that?
should I allow `wgpu-shadertoy` to take a code arg in cli? instead of just
this will only work for singlepass shaders just now.

wgpu-22 gott shader compilation info (not in wgpu-native yet)

In [1]:
import os
import tempfile
import subprocess


file_template = """
from wgpu_shadertoy import Shadertoy

shader_code = '''{}'''

shader = Shadertoy(shader_code, shader_type="glsl", offscreen=True)

if __name__ == "__main__":
    shader.show()
    shader.snapshot(0.0)
"""

def run_shader_in_subprocess(shader_code, timeout=5):
    status = "ok" # default case
    with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False, encoding="utf-8") as f:
        f.write(file_template.format(shader_code))
        f.flush()
        try:
            p = subprocess.run(["python", f.name], capture_output=True, timeout=timeout)
            
        except subprocess.SubprocessError as e:
            if isinstance(e, subprocess.TimeoutExpired):
                status = "timeout"
            else:
                status = "error"
            # return status # early exit here never removes the temp file -.-
    
    # cleanup temp file, delete_on_close was only added in Python 3.12?
    os.remove(f.name)
        
    if status == "ok":
        if p.returncode != 0:
            status = "error"
    
    return status


In [2]:
new_code = """
void mainImage( out vec4 fragColor, in vec2 fragCoord )
{
    // Normalized pixel coordinates (from 0 to 1)
    vec2 uv = fragCoord/iResolution.xy;

    // Time varying pixel color
    vec3 col = 0.5 + 0.5*cos(iTime+uv.xyx+vec3(0,2,4));

    // Output to screen
    fragColor = vec4(col,1.0);
}
"""


error_code = """
void mainImage( out vec4 fragColor, in vec2 fragCoord )
{
    // Normalized pixel coordinates (from 0 to 1)
    vec2 uv = fragCoord/iResolution.xy;

    // Time varying pixel color
    vec3 col = 0.5 + 0.5*cos(iTime+uv.xyx+vec3(0,2,4));

    // Output to screen
    fragColor = vec4(coll,1.0);
}
"""

# this panics because it loses device, works in 22.1!
minimal_code = """
void mainImage( out vec4 fragColor, in vec2 fragCoord ) {

    vec3 col = vec3(0.0);
    float incr = 0.1;
    for (float i = 0.5; i < 3.0; i += max(0.0, iTime)) {
        col += vec3(0.2);
        // continue;
    }
    fragColor = vec4(col, 1.0);
}
"""

In [3]:
seconds = 10
print(run_shader_in_subprocess(new_code, seconds))
print(run_shader_in_subprocess(error_code, seconds))
print(run_shader_in_subprocess(minimal_code, seconds))

ok
error
ok


In [4]:
from annotate import run_shader, validate_shader

print(run_shader(new_code)) # erroniously timesout because it's so slow...
print(run_shader(error_code))
print(run_shader(minimal_code))

print(validate_shader(new_code))
print(validate_shader(error_code))
print(validate_shader(minimal_code))

ok
error
ok
valid
error
valid


In [18]:
# initilize the default device once
from wgpu.utils.device import get_default_device
get_default_device()


from wgpu_shadertoy import Shadertoy
def minimal_run(shader_code):
    try:
        shader = Shadertoy(shader_code, shader_type="glsl", offscreen=True)
        # shader.show()
        shader.snapshot(0.0)
        return "ok"
    except Exception as e:
        return "error"
    # return "untested"
# trying to use concurrent.futures this time...

shader_codes = [new_code, error_code, minimal_code]
TIMEOUT = 10

import concurrent.futures
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor

with ThreadPoolExecutor(max_workers=1) as executor:
    for shader_code in shader_codes:
        try:
            future = executor.submit(minimal_run, shader_code)
            print(future.result(timeout=TIMEOUT))
        except concurrent.futures.TimeoutError:
            print("timeout")
        except Exception as e:
            print("ERROR", e)

ok
error


: 

In [4]:
import multiprocessing
from wgpu_shadertoy import Shadertoy

def minimal_run(shader_code, return_dict):
    try:
        shader = Shadertoy(shader_code, shader_type="glsl", offscreen=True)
        # shader.show()
        shader.snapshot(0.0)
        return_dict['result'] = "ok"
    except Exception as e:
        return_dict['result'] = f"error: {e}"

def run_shader_code(shader_code, timeout):
    manager = multiprocessing.Manager()
    return_dict = manager.dict()
    process = multiprocessing.Process(target=minimal_run, args=(shader_code, return_dict))
    process.start()
    process.join(timeout)

    if process.is_alive():
        process.terminate()
        process.join()
        return "timeout"
    else:
        return return_dict.get('result', 'error: unknown')

shader_codes = [new_code, error_code, minimal_code]
TIMEOUT = 10

results = []
for shader_code in shader_codes:
    result = run_shader_code(shader_code, TIMEOUT)
    results.append(result)
    print(result)

error: unknown
error: unknown
error: unknown
