In [None]:
import asyncio
%pip install ballsort
from ballsort.ballsort_display_utils import open_bs_window
open_bs_window()

In [None]:
from control_factory import get_control_sim
from ch12_scenario import Ch12Scenario

In [None]:
from state_update_model import StatePosition
from ball_control import BallControl

In [None]:
async def move_ball(bc: BallControl, src: StatePosition, dest: StatePosition, claw_index: int = 0):
    rel_x = src.x - bc.get_position(claw_index=claw_index).x
    rel_y = src.y - bc.get_position(claw_index=claw_index).y
    await asyncio.gather(
        bc.move_horizontally(rel_x, claw_index=claw_index),
        bc.move_vertically(rel_y, claw_index=claw_index),
        bc.open_claw(claw_index=claw_index))
    await bc.close_claw(claw_index=claw_index)
    
    rel_x = dest.x - bc.get_position(claw_index=claw_index).x
    rel_y = dest.y - bc.get_position(claw_index=claw_index).y
    await asyncio.gather(
        bc.move_horizontally(rel_x, claw_index=claw_index),
        bc.move_vertically(rel_y, claw_index=claw_index))
    await bc.open_claw(claw_index=claw_index)

async def move_ball_by_column(bc: BallControl, src_x: int, dest_x: int, claw_index: int = 0):
    src_column_top_occupied_y = min([ball.pos.y for ball in bc.get_state().balls if ball.pos.x == src_x],default=bc.get_state().max_y)
    dest_column_top_vacant_y = min([ball.pos.y for ball in bc.get_state().balls if ball.pos.x == dest_x],default=bc.get_state().max_y + 1) - 1
    await move_ball(bc=bc, src=StatePosition(x=src_x, y=src_column_top_occupied_y), dest=StatePosition(x=dest_x, y=dest_column_top_vacant_y), claw_index=claw_index)

In [None]:
async def reveal_color_values(
    bc: BallControl,
    color_to_x: dict[str, int],
    color_to_event: dict[str, asyncio.Event],
):
    """reveal all color values with claw 1"""

    nof_balls = 6  # to reveal
    right_src_x = bc.get_state().max_x - 1
    reveal_x = bc.get_state().max_x
    scrap_x = bc.get_state().max_x - 2

    for _ in range(nof_balls):
        # move to reveal spot (can be done conditionally if the color is already known)
        await move_ball_by_column(
            bc=bc, src_x=right_src_x, dest_x=reveal_x, claw_index=1
        )

        # add revealed color to dict
        revealed_ball = next(
            ball for ball in bc.get_state().balls if ball.pos.x == reveal_x
        )
        assert revealed_ball
        assert revealed_ball.value
        color_to_x[revealed_ball.color] = revealed_ball.value
        ev = color_to_event.get(revealed_ball.color)
        if ev:
            ev.set()

        # move to scrap heap column
        await move_ball_by_column(bc=bc, src_x=reveal_x, dest_x=scrap_x, claw_index=1)


async def sort_into_buckets(
    bc: BallControl,
    color_to_x: dict[str, int],
    color_to_event: dict[str, asyncio.Event],
):
    """sort into buckets with claw 0"""

    nof_balls = 6  # to sort into buckets
    right_src_x = 0
    max_y = bc.get_state().max_y
    min_y = max_y + 1 - nof_balls
    for y in range(min_y, max_y + 1):
        color = next(
            ball.color
            for ball in bc.get_state().balls
            if ball.pos == StatePosition(x=right_src_x, y=y)
        )
        ev = color_to_event.get(color)
        if ev:
            await ev.wait()
        dest_x = color_to_x[color]
        await move_ball_by_column(bc=bc, src_x=right_src_x, dest_x=dest_x, claw_index=0)



In [None]:
async def challenge12_solution():
    bc = get_control_sim(delay_multiplier=0.3)
    await bc.set_scenario(Ch12Scenario(seed=None))
    
    color_to_x: dict[str, int] = {}
    color_to_event: dict[str, asyncio.Event] = {}

    # create an event for each color
    for color in [ball.color for ball in bc.get_state().balls if ball.pos.x == 0]:
        if color_to_event.get(color) == None:
            color_to_event[color] = asyncio.Event()

    # sort and decode concurrently
    await asyncio.gather(
        reveal_color_values(
            bc=bc, color_to_x=color_to_x, color_to_event=color_to_event
        ),
        sort_into_buckets(bc=bc, color_to_x=color_to_x, color_to_event=color_to_event),
    )

    assert bc.get_state().goal_accomplished

In [None]:
await challenge12_solution()