# 001 CLI Music randomiser 

Threading and curset practice with this note randomiser scripts.

This will be run on the CLI because of curset

In [17]:
from IPython.core.interactiveshell import InteractiveShell

InteractiveShell.ast_node_interactivity = "all"

# 001.000 ASSETS

In [18]:
import random
import time

notes = ["c", "c+d", "d", "d+e", "e", "f", "f+g", "g", "g+a", "a", "a+b", "b"]
strings = ["E", "A", "D", "G"]
intervals = [
    "I",
    "II-",
    "II",
    "III-",
    "III",
    "IV",
    "V-",
    "V",
    "VI-",
    "VI",
    "VII+",
    "VII",
    "VIII",
    "IX-",
    "IX",
    "X-",
    "X",
]
directions = ["top down", "bottom up"]

### 001.001 Use curset to ask for user input

1. We want to create a function that returns a namedtuple `Params`, but we want to collect all user input into a dict and then turn it into a Param, and we want type annotations to work.
    1. `create_params` turns the dict into the namedtuple. Make sure it is annotated
1. `get_params` is passed a curset window and asks the user asks for input
    1. Clears the window first
    1. Hides the cursor
    1. Make sure the user can see what they type
    1. `input` doesn't work with curset; you need to print the prompt...
    1. ...then capture the part of the window with what they typed
    1. For this particular one wrap it in a try/except to ensure it is an int between 0 and 10
    1. In case of error turn off curses and exits
    1. Do it again (without the error checking)
    1. Now you can clear the screen again..
    1. ...and stop showing what the user typed
    1. Use the function you created in 1. to return a NamedTuple
1. Call the function within `main`
    1. a function `main` calls `get_params`, then stops curset and prints what it got
    1. the main function is wrapped by curses, which automagically passes it a window object


In [19]:
# solution

# import curses
# from typing import NamedTuple


# class Params(NamedTuple):
#     duration: int
#     interval: float


# def create_params(params_dict: dict) -> Params:
#     return Params(**params_dict)


# def get_params(stdscr: curses.window) -> NamedTuple:
#     params_dict: dict[str, int | float] = {}
#     stdscr.clear()
#     curses.echo()

#     msg = "Enter timer duration in seconds: "
#     stdscr.addstr(msg)
#     duration_input = stdscr.getstr(0, len(msg) + 1)
#     try:
#         duration = int(duration_input)
#         if not 0 <= duration <= 10:
#             raise ValueError()
#         params_dict["duration"] = duration
#     except ValueError:
#         curses.endwin()
#         print(
#             "Invalid input for duration. Please enter a valid integer between 0 and 10."
#         )
#         exit()

#     msg = "BPM: "
#     stdscr.addstr(msg)
#     params_dict["interval"] = 60 / int(stdscr.getstr(1, len(msg) + 1))

#     stdscr.clear()
#     curses.echo(False)

#     return create_params(params_dict)


# def main(stdscr):
#     params = get_params(stdscr)
#     curses.endwin()
#     print(params)


# if __name__ == "__main__":
#     curses.wrapper(main)
#     print("DONE")



In [30]:
%%bash
mypy ./_001_get_params.py

echo "cd $(realpath .)/.."
echo "if [ -d "solutions" ]; then python -m solutions._001_get_params; else python -m python_misc._001_get_params; fi"


[1m[32mSuccess: no issues found in 1 source file[m
cd /Users/fritz/work/jupyter/katas/python_misc/solutions/..
if [ -d solutions ]; then python -m solutions._001_get_params; else python -m python_misc._001_get_params; fi


### 001.002 Use threads to time out function

You will reuse the previous `get_params`, but expand on the main, which will go into a new file

1. In `_001_002_main.py` you will import `get_params` from `._001_get_params` 
1. `main` happens in a try / except block
    1. The except catches the CTRL-C so the user can exit early
    1. Before printing output, exit `curses`
1. In the main body
    1. User curses to hide the cursor
    1. stop_event is a threading event, which is shared across threads
    1. `main_thread` runs the function `timer`, passing it stop_event and the duration params
    1. After setting it up start the main thread...
    1. ...and wait for it to finish
    1. The timer sets the stop event at the end (although there is no need in this case)
    1. At the end of the script you want to unset the stop event just to make sure no resources are tied up

In [25]:
# solution

# import curses
# import threading
# import time
# from ._001_get_params import get_params


# def timer(stop_event, timer_duration):
#     time.sleep(timer_duration)
#     stop_event.set()


# def main(stdscr):
#     stop_event = None
#     try:
#         curses.curs_set(0)  # Hide cursor

#         params = get_params(stdscr)

#         stdscr.clear()
#         stdscr.refresh()

#         stop_event = threading.Event()

#         # cap the whole program length
#         main_thread = threading.Thread(target=timer, args=(stop_event, params.duration))
#         main_thread.start()

#         # Wait for all events to stop
#         main_thread.join()
#         curses.endwin()
#         print(f"\nRun for {params.duration} secs")
#     except KeyboardInterrupt:
#         curses.endwin()
#         print("\nProgram terminated by user.")

#     finally:
#         if stop_event:
#             # Signal the printing thread to stop
#             stop_event.set()


# if __name__ == "__main__":
#     curses.wrapper(main)
#     print("DONE")



In [6]:
%%bash
mypy ./_001_002_main.py

# because of curses the script doesn't run in jupyter

echo "cd $(realpath .)/.."
echo "if [ -d "solutions" ]; then python -m solutions._001_002_main; else python -m python_misc._001_002_main; fi"


[1m[32mSuccess: no issues found in 1 source file[m
cd /Users/fritz/work/jupyter/katas/python_misc/solutions/..
if [ -d solutions ]; then python -m solutions._001_002_main; else python -m python_misc._001_002_main; fi


### 001.004 Add a metronome thread

We will add a metronome thread which prints in its own region (=window) of the CLI

1. Same set up as earlier, this file is `_001_003_main.py`
1. Create the metronome thread
    1. Create a `curses` window for the metronome, 1 char high, 15 wide, starting from 0,0
    1. Create a thread for the metronome, which calls `print_beat` and passes `metronome_win` plus obvious arguments, then starts
    1. make sure script is not hanging waiting for thread to finish
1. Run it, and prove that it can either be stopped with CTRL-C or run until the end



In [None]:
# solution
#
# import curses
# import threading
# import time
# from ._001_get_params import get_params


# def print_beat(win: curses.window, stop_event, interval):
#     win.clear()
#     time_signature = 4
#     BEAT = ".  "
#     count = 0
#     while not stop_event.is_set():
#         if count == time_signature:
#             win.clear()
#             count = 0
#         win.addstr(0, count * len(BEAT), BEAT)
#         win.refresh()
#         count += 1
#         time.sleep(interval)


# def timer(stop_event, timer_duration):
#     time.sleep(timer_duration)
#     stop_event.set()


# def main(stdscr):
#     stop_event = None
#     try:
#         curses.curs_set(0)  # Hide cursor

#         params = get_params(stdscr)

#         stdscr.clear()
#         stdscr.refresh()

#         stop_event = threading.Event()

#         metronome_win = curses.newwin(1, 15, 0, 0)

#         metronome_thread = threading.Thread(
#             target=print_beat,
#             args=(metronome_win, stop_event, params.interval),
#         )
#         metronome_thread.start()

#         # cap the whole program length
#         main_thread = threading.Thread(target=timer, args=(stop_event, params.duration))
#         main_thread.start()

#         # Wait for all events to stop
#         main_thread.join()
#         curses.endwin()
#         print(f"\nRun for {params.duration} secs")
#     except KeyboardInterrupt:
#         curses.endwin()
#         print("\nProgram terminated by user.")

#     finally:
#         if stop_event:
#             # Signal the printing thread to stop
#             stop_event.set()
#             metronome_thread.join()


# if __name__ == "__main__":
#     curses.wrapper(main)
#     print("DONE")



In [1]:
%%bash
mypy ./_001_003_main.py

# because of curses the script doesn't run in jupyter

echo "cd $(realpath .)/.."
echo "if [ -d "solutions" ]; then python -m solutions._001_003_main; else python -m python_misc._001_003_main; fi"


[1m[32mSuccess: no issues found in 1 source file[m
cd /Users/fritz/work/jupyter/katas/python_misc/solutions/..
if [ -d solutions ]; then python -m solutions._001_003_main; else python -m python_misc._001_003_main; fi


### 001.004 Create a thread safe, shuffling iterator

You will create an iterator like itertools cycle, but this one will shuffle the items once it comes to the end of the list, and will be thread safe

1. You will work on `./_001_shuffling_iterator.py`
    1. Create a `ShufflingIterator` class which will be used in later exercises
    1. The `main` method in that file will run a little test to prove it works
1. Create the `ShufflingIterator` class
    1. It uses a lock to ensure thread safety
    1. In debug mode it adds '--' as the zeroth item in the list, so that we can more easily see what it's doing
    1. `__iter__` does what `__iter__` do
    1. `__next__` uses the lock
    1. `_reset` will shuffle the item, or only the part that needs it if in debug mode
1. The `__main__` function will print a few iterations
    1. There is a constant in Python which has all the alphabet etc; get 'A', 'B', ..., 'E' from it

In [None]:
# solution


# import random
# import string
# import threading


# class ShufflingIterator:
#     def __init__(self, items, debug=False):
#         self.debug = debug
#         self.index = 0
#         self.lock = threading.Lock()
#         if self.debug:
#             self.items = ["--"] + items
#         else:
#             self.items = items
#         self._reset()

#     def __iter__(self):
#         return self

#     def __next__(self):
#         with self.lock:
#             self._reset()
#             current_number = self.items[self.index]
#             self.index += 1
#             return current_number

#     def _reset(self):
#         if self.index >= len(self.items):
#             if self.debug:
#                 self.items[1:] = random.sample(self.items[1:], k=len(self.items) - 1)
#             else:
#                 random.shuffle(self.items)
#             self.index = 0


# if __name__ == "__main__":
#     five_letters = list(string.ascii_uppercase)[:5]
#     name = ShufflingIterator(five_letters, True)
#     for _ in range(24):
#         print(next(name))
#     print("DONE")


In [4]:
%%bash
mypy ./_001_shuffling_iterator.py

# because of curses the script doesn't run in jupyter

echo "cd $(realpath .)/.."
echo "if [ -d "solutions" ]; then python -m solutions._001_shuffling_iterator; else python -m python_misc._001_shuffling_iterator; fi"


[1m[32mSuccess: no issues found in 1 source file[m
cd /Users/fritz/work/jupyter/katas/python_misc/solutions/..
if [ -d solutions ]; then python -m solutions._001_shuffling_iterator; else python -m python_misc._001_shuffling_iterator; fi


### 001.005 Print a message while waiting for a key press

 
1. Same set up as earlier, this file is `_001_005_main.py`
    1. `NotesGenerator` prints some instructions to a musician, which will stay there until a key is pressed (or the program stops)
1. `listen_for_key` will handle printing the message in its own curset window
    1. Make sure getting a character is non-blocking
    1. `stop_event` controls every function
    1. Get the character from the CLI, and if it's not CTRL-C print a new message
1. The message listening happens in its own thread and prints in its own `curses` "window"
    1. `message_win`  is a 50x50 square at 2, 0


In [None]:
# solution

# import curses
# from textwrap import dedent
# import threading
# import time
# from ._001_get_params import get_params
# from ._001_shuffling_iterator import ShufflingIterator
# from ._001_constants import notes, intervals, directions


# class NotesGenerator:
#     def __init__(self):
#         self.notes = ShufflingIterator(notes)
#         self.intervals = ShufflingIterator(intervals)
#         self.directions = ShufflingIterator(directions)

#     def generate(self):
#         return dedent(
#             f"""
#                   NOTE: {next(self.notes)}
#               INTERVAL: {next(self.intervals)}
#              DIRECTION: {next(self.directions)}
#             """
#         ).strip("\n")


# def listen_for_key(win: curses.window, notes_generator, stop_event, interval):
#     win.nodelay(True)  # Make getch non-blocking
#     win.clear()
#     win.addstr(0, 0, notes_generator.generate())
#     win.refresh()
#     while not stop_event.is_set():
#         time.sleep(interval)
#         key = win.getch()
#         if key != -1:
#             win.clear()
#             win.addstr(0, 0, notes_generator.generate())
#             win.refresh()


# def print_beat(win: curses.window, stop_event, interval):
#     win.clear()
#     time_signature = 4
#     BEAT = ".  "
#     count = 0
#     while not stop_event.is_set():
#         if count == time_signature:
#             win.clear()
#             count = 0
#         win.addstr(0, count * len(BEAT), BEAT)
#         win.refresh()
#         count += 1
#         time.sleep(interval)


# def timer(stop_event, timer_duration):
#     time.sleep(timer_duration)
#     stop_event.set()


# def main(stdscr):
#     stop_event = None
#     try:
#         curses.curs_set(0)  # Hide cursor

#         params = get_params(stdscr)

#         stdscr.clear()
#         stdscr.refresh()

#         stop_event = threading.Event()

#         metronome_win = curses.newwin(1, 15, 0, 0)
#         metronome_thread = threading.Thread(
#             target=print_beat,
#             args=(metronome_win, stop_event, params.interval),
#         )
#         metronome_thread.start()

#         # Create a window for the printing thread
#         message_win = curses.newwin(50, 50, 2, 0)

#         # Start the key listening thread
#         key_thread = threading.Thread(
#             target=listen_for_key,
#             args=(message_win, NotesGenerator(), stop_event, params.interval),
#         )
#         key_thread.start()

#         # cap the whole program length
#         main_thread = threading.Thread(target=timer, args=(stop_event, params.duration))
#         main_thread.start()

#         # Wait for all events to stop
#         main_thread.join()
#         curses.endwin()
#         print(f"\nRun for {params.duration} secs")
#     except KeyboardInterrupt:
#         curses.endwin()
#         print("\nProgram terminated by user.")

#     finally:
#         if stop_event:
#             # Signal the printing thread to stop
#             stop_event.set()
#             metronome_thread.join()
#             key_thread.join()


# if __name__ == "__main__":
#     curses.wrapper(main)
#     print("DONE")


In [5]:
%%bash
mypy ./_001_005_main.py

# because of curses the script doesn't run in jupyter

echo "cd $(realpath .)/.."
echo "if [ -d "solutions" ]; then python -m solutions._001_005_main; else python -m python_misc._001_005_main; fi"


[1m[32mSuccess: no issues found in 1 source file[m
cd /Users/fritz/work/jupyter/katas/python_misc/solutions/..
if [ -d solutions ]; then python -m solutions._001_005_main; else python -m python_misc._001_005_main; fi
