Permalink
Find file
Fetching contributors…
Cannot retrieve contributors at this time
441 lines (355 sloc) 11.8 KB
from string import digits as digit_chars, letters as letter_chars
from Queue import Queue
from threading import Thread
import inspect
from picoparse import p, compose, desc, eof
from picoparse import one_of, optional, choice, tri, commit, fail, follow
from picoparse import many, many1, string, not_followed_by, sep1, NoMatch
from picoparse import many_until
from picoparse.text import run_text_parser, make_caseless_literal as make_literal
from picoparse.text import build_string, lexeme, whitespace, quoted
from picoparse.text import whitespace_char, newline
from syntax_tree import export
from syntax_tree import ValueNode, BinaryNode, ParentheticalNode
from syntax_tree import FileNode, LetNode, LetInNode, BlockNode, LookupNode
from syntax_tree import PlayNode, WaitNode, LoadNode, CatalogNode
from syntax_tree import FunctionCallNode, FunctionDefNode
from scope import Scope, RootScope
# utility functions and classes
class Operator(object):
def __init__(self, token, precedence, function):
self.token = token
self.precedence = precedence
self.function = function
def __call__(self, *args, **kwargs):
return self.function(*args, **kwargs)
def __repr__(self):
return '"%s"' % self.token
class OperatorSet(object):
def __init__(self, *args):
self._operators = {}
self._parser = p(fail, ['no operators supported'])
for arg in args:
self.add(*arg)
def __call__(self):
return self[build_string(self._parser())]
def __getitem__(self, o):
return self._operators[o]
def add(self, token, function, precedence):
self._operators[token] = Operator(token, precedence, function)
self._parser = p('operator', choice, *[p(string, token) for
token in
self._operators.iterkeys()])
def precedence(self, o):
return self._operators[o].precedence
# default namespace functions
def if_(exp, true, false):
if exp():
return true()
return false()
def print_(s):
s = s()
print s
return s
default_scope = {
"add": export(lambda x, y: x() + y(), 2),
"if": export(if_, 3),
"true": export(lambda:True, 0),
"false": export(lambda:False, 0),
"nil": export(lambda:None, 0),
"is_nil": export(lambda n: n == None, 1),
"print": export(print_, 1),
}
# default operators
def do_times(times, node, scope):
r = None
for i in range(times(scope)):
r = node(scope)
return r
def dollar(l, r, scope):
l = l(scope)
return l(r, scope)
def dot(l, r, scope_a):
def fun(arg, scope_b):
l_ = l(scope_a)
r_ = r(scope_a)
return l_(lambda _: r_(arg, scope_b), scope_a)
return fun
operator = OperatorSet(('-', lambda l, r, scope: l(scope) - r(scope), 20),
('+', lambda l, r, scope: l(scope) + r(scope), 40),
('*', lambda l, r, scope: l(scope) * r(scope), 60),
('/', lambda l, r, scope: l(scope) / r(scope), 80),
('>', lambda l, r, scope: l(scope) > r(scope), 10),
('>=', lambda l, r, scope: l(scope) >= r(scope), 10),
('<', lambda l, r, scope: l(scope) < r(scope), 10),
('<=', lambda l, r, scope: l(scope) <= r(scope), 10),
('==', lambda l, r, scope: l(scope) == r(scope), 10),
('$', dollar, 0),
('.', dot, 0),
('times', do_times, 0),
)
# literal string parsers
literal = lambda s: p(lexeme, make_literal(s))
keywords = ["let", "in", "play", "wait", "then", "load", "cat", "fn", "def"]
let, in_, play, wait, then, load, cat, fn, def_ = [literal(kw) for kw in keywords]
# not implemented yet
when = literal('when')
assignment = literal('=')
comma = literal(',')
def statement_end():
one_of(';')
commit();
right_arrow = p("right arrow", choice, literal('->'), literal(unichr(8594)))
# identifier parsing
ident_first_char = p(one_of, letter_chars)
ident_char = p(one_of, letter_chars + digit_chars + '_-?!')
@desc('identifier')
@tri
def identifier():
whitespace()
ident = build_string([ident_first_char()] + many(ident_char))
if ident in keywords: fail(["'%s' is a reserved word" % ident])
return ident
# infix expressions
digits = compose(build_string, p(many1, p(one_of, digit_chars)))
time_multiplier = p(one_of, 'hms')
def int_value():
return int(digits())
@tri
def float_value():
whole_part = digits()
one_of('.')
commit()
decimal_part = digits()
return float('%s.%s' % (whole_part, decimal_part))
def value():
is_negative = optional(p(one_of, '-'), False)
val = choice(float_value, int_value) * (-1 if is_negative else 1)
# handle time notation following a number
val *= {'h':60 * 60, 'm': 60}.get(optional(time_multiplier, False), 1)
return ValueNode(val)
def variable_reference():
name = identifier()
return LookupNode(name)
# old and slow;
#@tri
#def function_call():
# whitespace()
# fun = expression_part()
# whitespace()
# args = many1(expression_part)
# commit()
# return reduce(FunctionCallNode, args, fun)
#
#def variable_reference():
# name = identifier()
# return LookupNode(name)
#
#
#term = p('term', choice,
# parenthetical,
# p(lexeme, variable_reference),
# p('value', lexeme, value))
#
#expression_part = p(choice, bin_op, term)
#expression_block = p('expression', choice, function_call, expression_part)
class Parser(object):
"""Parser implements the soundshell language parser.
This parser object is the public api that generates the majority
of the syntax nodes. It uses the more general parsers above.
"""
# infix expressions
@tri
def filename(self):
return FileNode(quoted())
@tri
def bin_op(self):
left = self.term()
op = operator()
commit()
right = self.expression_block()
whitespace()
n = BinaryNode(op, left)
return n.merge(right)
@tri
def parenthetical(self):
whitespace()
one_of('(')
commit()
whitespace()
v = self.expression()
whitespace()
one_of(')')
whitespace()
return ParentheticalNode(v)
@desc('expression')
def expression_block(self):
whitespace()
exprs = many1(self.expression_part)
whitespace()
if len(exprs) > 1:
return reduce(FunctionCallNode, exprs)
return exprs[0]
# statements
@tri
def function_definition(self):
whitespace()
fn()
arg_names = many_until(p(lexeme, identifier), right_arrow)[0]
expr = self.expression_block()
return FunctionDefNode(arg_names, expr)
@tri
def named_function_definition(self):
whitespace()
def_()
name = lexeme(identifier)
arg_names = many_until(p(lexeme, identifier), right_arrow)[0]
expr = self.expression_block()
return LetNode([(name, FunctionDefNode(arg_names, expr))])
def binding(self):
whitespace()
name = identifier()
assignment()
value = self.expression()
whitespace()
return (name, value)
@tri
def let_expression(self):
let()
commit()
pairs = sep1(self.binding, comma)
return optional(p(self.let_in, pairs), LetNode(pairs))
def let_in(self, pairs):
in_()
return LetInNode(pairs, self.expression())
@tri
def play_expression(self):
play()
resource = self.expression()
return PlayNode(self.sound_server, resource)
@tri
def wait_expression(self):
wait()
time_to_wait = self.expression()
optional(then)
action = self.expression()
return WaitNode(self.sound_server, time_to_wait, action)
@tri
def load_expression(self):
load()
resource = self.expression()
return LoadNode(resource, Parser(self.sound_server))
def cat_expression(self):
cat()
return CatalogNode()
def script_root(self):
whitespace()
s = p('expressions',
many,
p('expression',
follow,
self.expression,
statement_end))()
whitespace()
eof()
return BlockNode(s)
def repl_root(self, scope, replies):
whitespace()
try:
while True:
node = follow(self.expression, statement_end)
print node
try:
res = node(scope)
except Exception, e:
res = e;
if isinstance(res, basestring):
replies.put(res)
else:
replies.put(repr(res))
whitespace()
except NoMatch:
return True
def __init__(self, sound_server):
self.sound_server = sound_server
self.term = p('term', choice,
self.parenthetical,
p(lexeme, variable_reference),
p('value', lexeme, value))
self.expression_part = p(choice, self.bin_op, self.term)
self.expression = p('expression', choice, self.let_expression,
self.play_expression,
self.wait_expression,
self.load_expression,
self.cat_expression,
self.function_definition,
self.named_function_definition,
self.filename,
self.expression_block)
def parse_text(self, text, scope=None):
"""Parse a script.
"""
if scope is None:
scope = RootScope(default_scope)
tree, rem = run_text_parser(self.script_root, text)
return tree(scope)
def parse_chunks(self, reply_queue):
"""Runs a REPL parser
"""
buffer = AccumulatingIterator()
def chunked_parser():
root = lambda:self.repl_root(RootScope(default_scope), reply_queue)
tree, rem = run_text_parser(root, buffer)
print tree
thread = Thread(target=chunked_parser)
thread.setDaemon(True)
thread.start()
return buffer.consume
class AccumulatingIterator(object):
def __init__(self):
self.chunks = Queue()
def __iter__(self):
while True:
chunk = self.chunks.get()
if chunk is False:
return
for char in chunk:
yield char
def consume(self, chunk):
self.chunks.put(chunk)
def done(self):
self.chunks.put(False)
if __name__ == "__main__":
class DummyServer(object):
def play(self, *args, **kwargs):
"""docstring for play"""
print "print", args, kwargs
def wait(self, *args, **kwargs):
"""docstring for play"""
print "wait", args, kwargs
print Parser(DummyServer()).parse_text(
u"""
load "stdlib.soundshell";
load "examples.soundshell";
let first = nth 0;
def inc x -> x + 1;
nth 0 (list 1 2 3 nil);
"""
)
# ("""
#let rats = "rats.mp3";
#
#let foo = let bar = (1 + 2) in (bar + 5);
#
#let r = play rats;
#let w = wait 0.5m then play rats;
#let w2 = wait 0.5m r;
#
#r;
#w;
#
#0 times r;
#
#let l = let x = 1, y = 2 in x + y;
#l;
# """)