Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

retab Python scripts according to PEP8

Recent Vim sets sts=4 for Python files by default. Might as well follow
it already, I guess. Use `git show -w` for this one.
  • Loading branch information...
commit 992046f082a6235f1ec3fc9eeff6ff4cc3eee9e3 1 parent 781ecaa
@grawity authored
Showing with 5,536 additions and 5,559 deletions.
  1. +4 −4 lib/python/nullroute/__init__.py
  2. +150 −154 lib/python/nullroute/authorized_keys.py
  3. +25 −25 lib/python/nullroute/clipboard.py
  4. +254 −257 lib/python/nullroute/irc.py
  5. +147 −147 lib/python/nullroute/mp3tags.py
  6. +374 −376 lib/python/nullroute/sexp.py
  7. +50 −50 lib/python/nullroute/tests.py
  8. +200 −200 lib/python/nullroute/windows/firewall.py
  9. +7 −7 lib/python/nullroute/windows/io.py
  10. +63 −63 lib/python/nullroute/windows/registry.py
  11. +9 −9 lib/python/nullroute/windows/util.py
  12. +57 −57 misc/bthkeys.py
  13. +117 −118 misc/dupes
  14. +7 −8 misc/ldifunwrap
  15. +103 −103 misc/putty-hostalias.py
  16. +75 −75 misc/sony-ericsson-screenshot
  17. +40 −40 misc/zlib
  18. +103 −103 music/id3-cover
  19. +66 −66 music/id3-lyrics
  20. +1 −1  music/id3-purge-priv
  21. +3 −3 music/id3-sync-rg
  22. +8 −9 music/rg-copytags
  23. +2 −4 net/freenet-monitor.py
  24. +40 −43 net/obml-parser
  25. +211 −211 net/old-protocols/in.rfingerd
  26. +101 −101 net/scrape-reddit-comment
  27. +144 −144 net/unixify
  28. +63 −63 net/update-etchosts.py
  29. +419 −420 security/git-credential-lib
  30. +81 −81 security/gpg-sync-keyrings
  31. +100 −100 security/ssh-duphosts
  32. +474 −475 security/ssh-publickeyd
  33. +79 −79 security/update-k5login
  34. +194 −194 security/win32-cred.py
  35. +19 −20 system/envcp
  36. +206 −207 system/ldmissing
  37. +77 −77 system/systemd-coredump-extract
  38. +358 −361 term/xterm-color-chooser
  39. +204 −206 win32/df.py
  40. +53 −51 win32/fingerd.py
  41. +73 −72 win32/fw.py
  42. +363 −363 win32/identd/win32-identd.py
  43. +145 −146 win32/pathed.py
  44. +2 −2 win32/powermonitor/actions.py.example
  45. +71 −71 win32/powermonitor/powermonitor.py
  46. +194 −193 win32/ts.py
View
8 lib/python/nullroute/__init__.py
@@ -1,11 +1,11 @@
import sys
def warn(*args):
- print("\033[1;33mwarning:\033[m", *args, file=sys.stderr)
+ print("\033[1;33mwarning:\033[m", *args, file=sys.stderr)
def err(*args):
- print("\033[1;31merror:\033[m", *args, file=sys.stderr)
+ print("\033[1;31merror:\033[m", *args, file=sys.stderr)
def die(*args):
- err(*args)
- sys.exit(1)
+ err(*args)
+ sys.exit(1)
View
304 lib/python/nullroute/authorized_keys.py
@@ -1,9 +1,5 @@
-#!/usr/bin/env python
+# parser for OpenSSH authorized_keys files
# vim: ft=python
-
-# State-machine-based parser for OpenSSH authorized_keys files.
-# (c) 2011 Mantas M. <grawity@gmail.com>
-# Released under WTFPL v2 <http://sam.zoy.org/wtfpl/>
#
# for line in open("authorized_keys"):
# if line and not line.startswith("#"):
@@ -13,156 +9,156 @@
import hashlib
class PublicKeyOptions(list):
- def __str__(self):
- o = []
- for k, v in self:
- if v is True:
- o.append(k)
- else:
- o.append("%s=%s" % (k, v))
- return ",".join(o)
-
- @classmethod
- def parse(klass, text):
- keys = []
- values = []
- current = ""
- state = "key"
-
- for char in text:
- if state == "key":
- if char == ",":
- keys.append(current)
- values.append(True)
- current = ""
- elif char == "=":
- keys.append(current)
- current = ""
- state = "value"
- else:
- current += char
- elif state == "value":
- if char == ",":
- values.append(current)
- current = ""
- state = "key"
- elif char == "\"":
- current += char
- state = "value dquote"
- else:
- current += char
- elif state == "value dquote":
- if char == "\"":
- current += char
- state = "value"
- elif char == "\\":
- current += char
- state = "value dquote escape"
- else:
- current += char
- elif state == "value dquote escape":
- current += char
- state = "value dquote"
-
- if current:
- if state == "key":
- keys.append(current)
- values.append(True)
- else:
- values.append(current)
-
- return klass(zip(keys, values))
+ def __str__(self):
+ o = []
+ for k, v in self:
+ if v is True:
+ o.append(k)
+ else:
+ o.append("%s=%s" % (k, v))
+ return ",".join(o)
+
+ @classmethod
+ def parse(klass, text):
+ keys = []
+ values = []
+ current = ""
+ state = "key"
+
+ for char in text:
+ if state == "key":
+ if char == ",":
+ keys.append(current)
+ values.append(True)
+ current = ""
+ elif char == "=":
+ keys.append(current)
+ current = ""
+ state = "value"
+ else:
+ current += char
+ elif state == "value":
+ if char == ",":
+ values.append(current)
+ current = ""
+ state = "key"
+ elif char == "\"":
+ current += char
+ state = "value dquote"
+ else:
+ current += char
+ elif state == "value dquote":
+ if char == "\"":
+ current += char
+ state = "value"
+ elif char == "\\":
+ current += char
+ state = "value dquote escape"
+ else:
+ current += char
+ elif state == "value dquote escape":
+ current += char
+ state = "value dquote"
+
+ if current:
+ if state == "key":
+ keys.append(current)
+ values.append(True)
+ else:
+ values.append(current)
+
+ return klass(zip(keys, values))
class PublicKey(object):
- def __init__(self, line=None):
- if line:
- tokens = self.parse(line)
- else:
- tokens = ["", None, None, None]
-
- self.prefix, self.algo, self.blob, self.comment = tokens
-
- self.options = PublicKeyOptions.parse(self.prefix)
-
- def __repr__(self):
- return "<PublicKey prefix=%r algo=%r comment=%r>" % \
- (self.prefix, self.algo, self.comment)
-
- def __str__(self):
- options = self.options
- blob = base64.b64encode(self.blob).decode("utf-8")
- comment = self.comment
- k = [self.algo, blob]
- if len(options):
- k.insert(0, str(options))
- if len(comment):
- k.append(comment)
- return " ".join(k)
-
- def fingerprint(self, alg=None, hex=False):
- if alg is None:
- alg = hashlib.md5
- m = alg()
- m.update(self.blob)
- return m.hexdigest() if hex else m.digest()
-
- @classmethod
- def parse(self, line):
- tokens = []
- current = ""
- state = "normal"
-
- for char in line:
- old = state
- if state == "normal":
- if char in " \t":
- tokens.append(current)
- current = ""
- elif char == "\"":
- current += char
- state = "dquote"
- else:
- current += char
- elif state == "dquote":
- if char == "\"":
- current += char
- state = "normal"
- elif char == "\\":
- current += char
- state = "dquote escape"
- else:
- current += char
- elif state == "dquote escape":
- current += char
- state = "dquote"
-
- if current:
- tokens.append(current)
-
- if tokens[0] in {"ssh-rsa", "ssh-dss", "ecdsa-sha2-nistp256",
- "ecdsa-sha2-nistp384", "ecdsa-sha2-nistp521"}:
- prefix = ""
- else:
- prefix = tokens.pop(0)
- algo = tokens[0]
- blob = tokens[1]
- blob = base64.b64decode(blob.encode("utf-8"))
- comment = " ".join(tokens[2:])
-
- return prefix, algo, blob, comment
+ def __init__(self, line=None):
+ if line:
+ tokens = self.parse(line)
+ else:
+ tokens = ["", None, None, None]
+
+ self.prefix, self.algo, self.blob, self.comment = tokens
+
+ self.options = PublicKeyOptions.parse(self.prefix)
+
+ def __repr__(self):
+ return "<PublicKey prefix=%r algo=%r comment=%r>" % \
+ (self.prefix, self.algo, self.comment)
+
+ def __str__(self):
+ options = self.options
+ blob = base64.b64encode(self.blob).decode("utf-8")
+ comment = self.comment
+ k = [self.algo, blob]
+ if len(options):
+ k.insert(0, str(options))
+ if len(comment):
+ k.append(comment)
+ return " ".join(k)
+
+ def fingerprint(self, alg=None, hex=False):
+ if alg is None:
+ alg = hashlib.md5
+ m = alg()
+ m.update(self.blob)
+ return m.hexdigest() if hex else m.digest()
+
+ @classmethod
+ def parse(self, line):
+ tokens = []
+ current = ""
+ state = "normal"
+
+ for char in line:
+ old = state
+ if state == "normal":
+ if char in " \t":
+ tokens.append(current)
+ current = ""
+ elif char == "\"":
+ current += char
+ state = "dquote"
+ else:
+ current += char
+ elif state == "dquote":
+ if char == "\"":
+ current += char
+ state = "normal"
+ elif char == "\\":
+ current += char
+ state = "dquote escape"
+ else:
+ current += char
+ elif state == "dquote escape":
+ current += char
+ state = "dquote"
+
+ if current:
+ tokens.append(current)
+
+ if tokens[0] in {"ssh-rsa", "ssh-dss", "ecdsa-sha2-nistp256",
+ "ecdsa-sha2-nistp384", "ecdsa-sha2-nistp521"}:
+ prefix = ""
+ else:
+ prefix = tokens.pop(0)
+ algo = tokens[0]
+ blob = tokens[1]
+ blob = base64.b64decode(blob.encode("utf-8"))
+ comment = " ".join(tokens[2:])
+
+ return prefix, algo, blob, comment
if __name__ == "__main__":
- import os
-
- path = os.path.expanduser("~/.ssh/authorized_keys")
-
- for line in open(path, "r"):
- line = line.strip()
- if line and not line.startswith("#"):
- key = PublicKey(line)
- print("key = %r" % key)
- print("prefix = %r" % key.prefix)
- print("algo = %r" % key.algo)
- print("comment = %r" % key.comment)
- print("options = %r" % key.options)
- print()
+ import os
+
+ path = os.path.expanduser("~/.ssh/authorized_keys")
+
+ for line in open(path, "r"):
+ line = line.strip()
+ if line and not line.startswith("#"):
+ key = PublicKey(line)
+ print("key = %r" % key)
+ print("prefix = %r" % key.prefix)
+ print("algo = %r" % key.algo)
+ print("comment = %r" % key.comment)
+ print("options = %r" % key.options)
+ print()
View
50 lib/python/nullroute/clipboard.py
@@ -1,30 +1,30 @@
import sys
def get():
- if sys.platform == "win32":
- import win32clipboard as clip
- clip.OpenClipboard()
- # TODO: what type does this return?
- data = clip.GetClipboardData(clip.CF_UNICODETEXT)
- #print("clipboard.get =", repr(data))
- clip.CloseClipboard()
- return data
- else:
- raise RuntimeError("Unsupported platform")
+ if sys.platform == "win32":
+ import win32clipboard as clip
+ clip.OpenClipboard()
+ # TODO: what type does this return?
+ data = clip.GetClipboardData(clip.CF_UNICODETEXT)
+ #print("clipboard.get =", repr(data))
+ clip.CloseClipboard()
+ return data
+ else:
+ raise RuntimeError("Unsupported platform")
def put(data):
- if sys.platform == "win32":
- import win32clipboard as clip
- clip.OpenClipboard()
- clip.EmptyClipboard()
- clip.SetClipboardText(data, clip.CF_UNICODETEXT)
- clip.CloseClipboard()
- elif sys.platform.startswith("linux"):
- import subprocess
- proc = subprocess.Popen(("xsel", "-i", "-b", "-l", "/dev/null"),
- stdin=subprocess.PIPE)
- proc.stdin.write(data.encode("utf-8"))
- proc.stdin.close()
- proc.wait()
- else:
- raise RuntimeError("Unsupported platform")
+ if sys.platform == "win32":
+ import win32clipboard as clip
+ clip.OpenClipboard()
+ clip.EmptyClipboard()
+ clip.SetClipboardText(data, clip.CF_UNICODETEXT)
+ clip.CloseClipboard()
+ elif sys.platform.startswith("linux"):
+ import subprocess
+ proc = subprocess.Popen(("xsel", "-i", "-b", "-l", "/dev/null"),
+ stdin=subprocess.PIPE)
+ proc.stdin.write(data.encode("utf-8"))
+ proc.stdin.close()
+ proc.wait()
+ else:
+ raise RuntimeError("Unsupported platform")
View
511 lib/python/nullroute/irc.py
@@ -1,269 +1,266 @@
-#!/usr/bin/env python
from __future__ import (print_function, unicode_literals)
import base64
import socket
import re
class InvalidPrefixError(Exception):
- pass
+ pass
class Prefix(object):
- def __init__(self, nick=None, user=None, host=None, is_server=False):
- self.nick = nick
- self.user = user
- self.host = host
- self.is_server = is_server
-
- @classmethod
- def parse(cls, prefix):
- if len(prefix) == 0:
- return None
-
- dpos = prefix.find(".") + 1
- upos = prefix.find("!") + 1
- hpos = prefix.find("@", upos) + 1
-
- if upos == 1 or hpos == 1:
- return None
- if 0 < dpos < min(upos, hpos):
- return None
-
- self = cls()
- if upos > 0:
- self.nick = prefix[:upos-1]
- if hpos > 0:
- self.user = prefix[upos:hpos-1]
- self.host = prefix[hpos:]
- else:
- self.user = prefix[upos:]
- elif hpos > 0:
- self.nick = prefix[:hpos-1]
- self.host = prefix[hpos:]
- elif dpos > 0:
- self.host = prefix
- self.is_server = True
- else:
- self.nick = prefix
-
- return self
-
- def unparse(self):
- if not (self.nick is None or self.user is None or self.host is None):
- return self.nick + "!" + self.user + "@" + self.host
- elif self.nick:
- return self.nick
- elif self.host:
- return self.host
- else:
- return None
-
- def __str__(self):
- if not (self.nick is None or self.user is None or self.host is None):
- return "%s!%s@%s" % (self.nick, self.user, self.host)
- elif self.nick:
- return self.nick
- elif self.host:
- return self.host
- else:
- return "(empty)"
-
- def __repr__(self):
- return "<IRC.Prefix: %r ! %r @ %r>" % (self.nick, self.user, self.host)
-
- def to_a(self):
- return [self.nick, self.user, self.host, self.is_server]
+ def __init__(self, nick=None, user=None, host=None, is_server=False):
+ self.nick = nick
+ self.user = user
+ self.host = host
+ self.is_server = is_server
+
+ @classmethod
+ def parse(cls, prefix):
+ if len(prefix) == 0:
+ return None
+
+ dpos = prefix.find(".") + 1
+ upos = prefix.find("!") + 1
+ hpos = prefix.find("@", upos) + 1
+
+ if upos == 1 or hpos == 1:
+ return None
+ if 0 < dpos < min(upos, hpos):
+ return None
+
+ self = cls()
+ if upos > 0:
+ self.nick = prefix[:upos-1]
+ if hpos > 0:
+ self.user = prefix[upos:hpos-1]
+ self.host = prefix[hpos:]
+ else:
+ self.user = prefix[upos:]
+ elif hpos > 0:
+ self.nick = prefix[:hpos-1]
+ self.host = prefix[hpos:]
+ elif dpos > 0:
+ self.host = prefix
+ self.is_server = True
+ else:
+ self.nick = prefix
+
+ return self
+
+ def unparse(self):
+ if not (self.nick is None or self.user is None or self.host is None):
+ return self.nick + "!" + self.user + "@" + self.host
+ elif self.nick:
+ return self.nick
+ elif self.host:
+ return self.host
+ else:
+ return None
+
+ def __str__(self):
+ if not (self.nick is None or self.user is None or self.host is None):
+ return "%s!%s@%s" % (self.nick, self.user, self.host)
+ elif self.nick:
+ return self.nick
+ elif self.host:
+ return self.host
+ else:
+ return "(empty)"
+
+ def __repr__(self):
+ return "<IRC.Prefix: %r ! %r @ %r>" % (self.nick, self.user, self.host)
+
+ def to_a(self):
+ return [self.nick, self.user, self.host, self.is_server]
class Line(object):
- """
- An IRC protocol line.
- """
- def __init__(self, tags=None, prefix=None, cmd=None, args=None):
- self.tags = tags or {}
- self.prefix = prefix
- self.cmd = cmd
- self.args = args or []
-
- @classmethod
- def split(cls, line):
- """
- Split an IRC protocol line into tokens as defined in RFC 1459
- and the IRCv3 message-tags extension.
- """
-
- line = line.decode("utf-8", "replace")
- line = line.rstrip("\r\n").split(" ")
- i, n = 0, len(line)
- parv = []
-
- while i < n and line[i] == "":
- i += 1
-
- if i < n and line[i].startswith("@"):
- parv.append(line[i])
- i += 1
- while i < n and line[i] == "":
- i += 1
-
- if i < n and line[i].startswith(":"):
- parv.append(line[i])
- i += 1
- while i < n and line[i] == "":
- i += 1
-
- while i < n:
- if line[i].startswith(":"):
- break
- elif line[i] != "":
- parv.append(line[i])
- i += 1
-
- if i < n:
- trailing = " ".join(line[i:])
- parv.append(trailing[1:])
-
- return parv
-
- @classmethod
- def parse(cls, line, parse_prefix=True):
- """
- Parse an IRC protocol line into a Line object consisting of
- tags, prefix, command, and arguments.
- """
-
- line = line.decode("utf-8", "replace")
- parv = line.rstrip("\r\n").split(" ")
- i, n = 0, len(parv)
- self = cls()
-
- while i < n and parv[i] == "":
- i += 1
-
- if i < n and parv[i].startswith("@"):
- tags = parv[i][1:]
- i += 1
- while i < n and parv[i] == "":
- i += 1
-
- self.tags = dict()
- for item in tags.split(";"):
- if "=" in item:
- k, v = item.split("=", 1)
- else:
- k, v = item, True
- self.tags[k] = v
-
- if i < n and parv[i].startswith(":"):
- prefix = parv[i][1:]
- i += 1
- while i < n and parv[i] == "":
- i += 1
-
- if parse_prefix:
- self.prefix = Prefix.parse(prefix)
- else:
- self.prefix = prefix
-
- if i < n:
- self.cmd = parv[i].upper()
-
- while i < n:
- if parv[i].startswith(":"):
- trailing = " ".join(parv[i:])
- self.args.append(trailing[1:])
- break
- elif parv[i] != "":
- self.args.append(parv[i])
- i += 1
-
- return self
-
- @classmethod
- def join(cls, argv):
- i, n = 0, len(argv)
-
- if i < n and argv[i].startswith("@"):
- if " " in argv[i]:
- raise ValueError("Argument %d contains spaces: %r" % (i, argv[i]))
- i += 1
-
- if i < n and " " in argv[i]:
- raise ValueError("Argument %d contains spaces: %r" % (i, argv[i]))
-
- if i < n and argv[i].startswith(":"):
- if " " in argv[i]:
- raise ValueError("Argument %d contains spaces: %r" % (i, argv[i]))
- i += 1
-
- while i < n-1:
- if not argv[i]:
- raise ValueError("Argument %d is empty: %r" % (i, argv[i]))
- elif argv[i].startswith(":"):
- raise ValueError("Argument %d starts with ':': %r" % (i, argv[i]))
- elif " " in argv[i]:
- raise ValueError("Argument %d contains spaces: %r" % (i, argv[i]))
- i += 1
-
- parv = argv[:i]
-
- if i < n:
- if not argv[i] or argv[i].startswith(":") or " " in argv[i]:
- parv.append(":%s" % argv[i])
- else:
- parv.append(argv[i])
-
- return " ".join(parv)
-
- def unparse(self):
- parv = []
-
- if self.tags:
- tags = [k if v is True else k + b"=" + v
- for k, v in self.tags.items()]
- parv.append("@" + b",".join(tags))
-
- if self.prefix:
- parv.append(":" + self.prefix.unparse())
-
- parv.append(self.cmd)
-
- parv.extend(self.args)
-
- return self.join(parv)
-
- def __repr__(self):
- return "<IRC.Line: tags=%r prefix=%r cmd=%r args=%r>" % (
- self.tags, self.prefix,
- self.cmd, self.args)
+ """
+ An IRC protocol line.
+ """
+ def __init__(self, tags=None, prefix=None, cmd=None, args=None):
+ self.tags = tags or {}
+ self.prefix = prefix
+ self.cmd = cmd
+ self.args = args or []
+
+ @classmethod
+ def split(cls, line):
+ """
+ Split an IRC protocol line into tokens as defined in RFC 1459
+ and the IRCv3 message-tags extension.
+ """
+
+ line = line.decode("utf-8", "replace")
+ line = line.rstrip("\r\n").split(" ")
+ i, n = 0, len(line)
+ parv = []
+
+ while i < n and line[i] == "":
+ i += 1
+
+ if i < n and line[i].startswith("@"):
+ parv.append(line[i])
+ i += 1
+ while i < n and line[i] == "":
+ i += 1
+
+ if i < n and line[i].startswith(":"):
+ parv.append(line[i])
+ i += 1
+ while i < n and line[i] == "":
+ i += 1
+
+ while i < n:
+ if line[i].startswith(":"):
+ break
+ elif line[i] != "":
+ parv.append(line[i])
+ i += 1
+
+ if i < n:
+ trailing = " ".join(line[i:])
+ parv.append(trailing[1:])
+
+ return parv
+
+ @classmethod
+ def parse(cls, line, parse_prefix=True):
+ """
+ Parse an IRC protocol line into a Line object consisting of
+ tags, prefix, command, and arguments.
+ """
+
+ line = line.decode("utf-8", "replace")
+ parv = line.rstrip("\r\n").split(" ")
+ i, n = 0, len(parv)
+ self = cls()
+
+ while i < n and parv[i] == "":
+ i += 1
+
+ if i < n and parv[i].startswith("@"):
+ tags = parv[i][1:]
+ i += 1
+ while i < n and parv[i] == "":
+ i += 1
+
+ self.tags = dict()
+ for item in tags.split(";"):
+ if "=" in item:
+ k, v = item.split("=", 1)
+ else:
+ k, v = item, True
+ self.tags[k] = v
+
+ if i < n and parv[i].startswith(":"):
+ prefix = parv[i][1:]
+ i += 1
+ while i < n and parv[i] == "":
+ i += 1
+
+ if parse_prefix:
+ self.prefix = Prefix.parse(prefix)
+ else:
+ self.prefix = prefix
+
+ if i < n:
+ self.cmd = parv[i].upper()
+
+ while i < n:
+ if parv[i].startswith(":"):
+ trailing = " ".join(parv[i:])
+ self.args.append(trailing[1:])
+ break
+ elif parv[i] != "":
+ self.args.append(parv[i])
+ i += 1
+
+ return self
+
+ @classmethod
+ def join(cls, argv):
+ i, n = 0, len(argv)
+
+ if i < n and argv[i].startswith("@"):
+ if " " in argv[i]:
+ raise ValueError("Argument %d contains spaces: %r" % (i, argv[i]))
+ i += 1
+
+ if i < n and " " in argv[i]:
+ raise ValueError("Argument %d contains spaces: %r" % (i, argv[i]))
+
+ if i < n and argv[i].startswith(":"):
+ if " " in argv[i]:
+ raise ValueError("Argument %d contains spaces: %r" % (i, argv[i]))
+ i += 1
+
+ while i < n-1:
+ if not argv[i]:
+ raise ValueError("Argument %d is empty: %r" % (i, argv[i]))
+ elif argv[i].startswith(":"):
+ raise ValueError("Argument %d starts with ':': %r" % (i, argv[i]))
+ elif " " in argv[i]:
+ raise ValueError("Argument %d contains spaces: %r" % (i, argv[i]))
+ i += 1
+
+ parv = argv[:i]
+
+ if i < n:
+ if not argv[i] or argv[i].startswith(":") or " " in argv[i]:
+ parv.append(":%s" % argv[i])
+ else:
+ parv.append(argv[i])
+
+ return " ".join(parv)
+
+ def unparse(self):
+ parv = []
+
+ if self.tags:
+ tags = [k if v is True else k + b"=" + v
+ for k, v in self.tags.items()]
+ parv.append("@" + b",".join(tags))
+
+ if self.prefix:
+ parv.append(":" + self.prefix.unparse())
+
+ parv.append(self.cmd)
+
+ parv.extend(self.args)
+
+ return self.join(parv)
+
+ def __repr__(self):
+ return "<IRC.Line: tags=%r prefix=%r cmd=%r args=%r>" % (
+ self.tags, self.prefix,
+ self.cmd, self.args)
class Connection(object):
- def __init__(self):
- self.host = None
- self.port = None
- self.ai = None
- self._fd = None
- self._file = None
-
- def connect(self, host, port, ssl=False):
- self.ai = socket.getaddrinfo(host, str(port), 0, socket.SOCK_STREAM)
- print(repr(self.ai))
- for af, proto, _, cname, addr in self.ai:
- self._fd = socket.socket(af, proto)
- self._fd.connect(addr)
- break
- import io
- self._fi = self._fd.makefile("rwb")
-
- def writeraw(self, buf):
- self._fi.write(buf+b"\r\n")
- self._fi.flush()
-
- def readraw(self):
- return self._fi.readline()
-
- def write(self, *args):
- self.writeraw(Line.join(args))
-
- def read(self):
- return Line.parse(self.readraw())
-
-# vim: ts=4:sw=4
+ def __init__(self):
+ self.host = None
+ self.port = None
+ self.ai = None
+ self._fd = None
+ self._file = None
+
+ def connect(self, host, port, ssl=False):
+ self.ai = socket.getaddrinfo(host, str(port), 0, socket.SOCK_STREAM)
+ print(repr(self.ai))
+ for af, proto, _, cname, addr in self.ai:
+ self._fd = socket.socket(af, proto)
+ self._fd.connect(addr)
+ break
+ import io
+ self._fi = self._fd.makefile("rwb")
+
+ def writeraw(self, buf):
+ self._fi.write(buf+b"\r\n")
+ self._fi.flush()
+
+ def readraw(self):
+ return self._fi.readline()
+
+ def write(self, *args):
+ self.writeraw(Line.join(args))
+
+ def read(self):
+ return Line.parse(self.readraw())
View
294 lib/python/nullroute/mp3tags.py
@@ -1,157 +1,157 @@
import mutagen
def rva_from_string(gain, peak):
- rg_gain = float(gain[0].split(' ')[0])
- rg_peak = float(peak[0])
- return mutagen.id3.RVA2(desc=u'track', channel=1, gain=rg_gain, peak=rg_peak)
+ rg_gain = float(gain[0].split(' ')[0])
+ rg_peak = float(peak[0])
+ return mutagen.id3.RVA2(desc=u'track', channel=1, gain=rg_gain, peak=rg_peak)
def rva_to_string(rva):
- rg_gain = rva._raw_gain or "%.2f dB" % rva.gain
- rg_peak = rva._raw_peak or "%.2f dB" % rva.peak
- return (rg_gain, rg_peak)
+ rg_gain = rva._raw_gain or "%.2f dB" % rva.gain
+ rg_peak = rva._raw_peak or "%.2f dB" % rva.peak
+ return (rg_gain, rg_peak)
def rva_to_soundcheck(rva):
- # http://projects.robinbowes.com/flac2mp3/trac/ticket/30#comment:7
- # [-1]: prefixed with a space
- # [0, 1]: volume adjustment in 1/1000 W per dBm
- # [2, 3]: volume adjustment in 1/2500 W per dBm
- # [4, 5]: unsure (identical for same song when volumes differ)
- # [6, 7]: peak (max sample) as absolute value: 0x7FFF for 16-bit samples
- # [8, 9]: same as [4, 5]
- gain2sc = lambda gain, base: u"%08X" % min(round((10 ** (-gain/10)) * base), 65534)
-
- sc = [
- u"",
- # 1/1000 W per dBm
- gain2sc(rva.gain, 1000),
- gain2sc(rva.gain, 1000),
- # 1/2500 W per dBm
- gain2sc(rva.gain, 2500),
- gain2sc(rva.gain, 2500),
- u"00024CA8",
- u"00024CA8",
- u"00007FFF",
- u"00007FFF",
- u"00024CA8",
- u"00024CA8",
- ]
-
- return u" ".join(sc)
+ # http://projects.robinbowes.com/flac2mp3/trac/ticket/30#comment:7
+ # [-1]: prefixed with a space
+ # [0, 1]: volume adjustment in 1/1000 W per dBm
+ # [2, 3]: volume adjustment in 1/2500 W per dBm
+ # [4, 5]: unsure (identical for same song when volumes differ)
+ # [6, 7]: peak (max sample) as absolute value: 0x7FFF for 16-bit samples
+ # [8, 9]: same as [4, 5]
+ gain2sc = lambda gain, base: u"%08X" % min(round((10 ** (-gain/10)) * base), 65534)
+
+ sc = [
+ u"",
+ # 1/1000 W per dBm
+ gain2sc(rva.gain, 1000),
+ gain2sc(rva.gain, 1000),
+ # 1/2500 W per dBm
+ gain2sc(rva.gain, 2500),
+ gain2sc(rva.gain, 2500),
+ u"00024CA8",
+ u"00024CA8",
+ u"00007FFF",
+ u"00007FFF",
+ u"00024CA8",
+ u"00024CA8",
+ ]
+
+ return u" ".join(sc)
class GainValue(object):
- MODES = {'track', 'album'}
-
- def __init__(self, mode=u'track'):
- if mode not in self.MODES:
- raise ValueError("mode must be one of %r" % self.MODES)
-
- self._mode = unicode(mode)
-
- self.gain = None
- self.peak = 1.0
- self._raw_gain = None
- self._raw_peak = None
-
- def __repr__(self):
- return "<GainValue mode=%s gain=%f peak=%f>" % (self._mode, self.gain, self.peak)
-
- @property
- def mode(self):
- return self._mode
-
- @mode.setter
- def mode(self, value):
- if value not in self.MODES:
- raise ValueError("mode must be one of %r" % self.MODES)
-
- self._mode = unicode(value)
-
- @classmethod
- def from_rva2(self, mode, frame):
- gv = self(mode)
- gv.gain = frame.gain
- gv.peak = frame.peak
- return gv
-
- @classmethod
- def from_string(self, mode, gain, peak):
- rg_gain = float(gain[0].split(' ')[0])
- rg_peak = float(peak[0].split(' ')[0])
-
- gv = self(mode)
- gv._raw_gain = gain
- gv._raw_peak = peak
- gv.gain = rg_gain
- gv.peak = rg_peak
- return gv
-
- def to_string(self):
- rg_gain = self._raw_gain or "%.2f dB" % self.gain
- rg_peak = self._raw_peak or "%.2f dB" % self.peak
- return (rg_gain, rg_peak)
-
- def to_rva2(self):
- return mutagen.id3.RVA2(desc=self._mode, channel=1, gain=self.gain, peak=self.peak)
-
- def to_soundcheck(self):
- return rva_to_soundcheck(self)
-
- @classmethod
- def import_tag(self, tag, mode):
- if mode not in self.MODES:
- raise ValueError("mode must be one of %r" % self.MODES)
-
- if (u'RVA2:%s' % mode) in tag:
- # ID3v2.4 RVA2
- #print "Found ID3v2.4 RVA2 frame"
- return self.from_rva2(mode, tag[u'RVA2:%s' % mode])
- elif (u'TXXX:replaygain_%s_gain' % mode) in tag:
- # ID3v2 foobar2000
- #print "Found ID3v2 foobar2000 tag"
- return self.from_string(mode,
- tag[u'TXXX:replaygain_%s_gain' % mode],
- tag[u'TXXX:replaygain_%s_peak' % mode])
- elif ('----:com.apple.iTunes:replaygain_%s_gain' % mode) in tag:
- # MP4 foobar2000
- #print "Found MP4 foobar2000 tag"
- return self.from_string(mode,
- tag['----:com.apple.iTunes:replaygain_%s_gain' % mode],
- tag['----:com.apple.iTunes:replaygain_%s_peak' % mode])
- elif ('replaygain_%s_gain' % mode) in tag:
- # FLAC
- #print "Found FLAC tag"
- return self.from_string(mode,
- tag['replaygain_%s_gain' % mode],
- tag['replaygain_%s_peak' % mode])
- else:
- return None
-
- def export_id3(self, tag):
- tag[u'RVA2:%s' % self._mode] = self.to_rva2()
-
- rg_gain, rg_peak = self.to_string()
- tx_gain = mutagen.id3.TXXX(desc=u'replaygain_%s_gain' % self._mode,
- encoding=1, text=[rg_gain])
- tx_peak = mutagen.id3.TXXX(desc=u'replaygain_%s_peak' % self._mode,
- encoding=1, text=[rg_peak])
- tag[u'TXXX:'+tx_gain.desc] = tx_gain
- tag[u'TXXX:'+tx_peak.desc] = tx_peak
-
- if self._mode == 'track':
- sc_raw = self.to_soundcheck()
- sc_norm = mutagen.id3.COMM(desc=u'iTunNORM', lang='eng',
- encoding=0, text=[sc_raw])
- tag[u"COMM:%s:'%s'" % (sc_norm.desc, sc_norm.lang)] = sc_norm
-
- def export_mp4(self, tag):
- #print "Adding MP4 foobar2000 tag"
- rg_gain, rg_peak = self.to_string()
- tag['----:com.apple.iTunes:replaygain_%s_gain' % self._mode] = rg_gain
- tag['----:com.apple.iTunes:replaygain_%s_peak' % self._mode] = rg_peak
-
- def export_flac(self, tag):
- #print "Adding FLAC tag"
- rg_gain, rg_peak = self.to_string()
- tag['replaygain_%s_gain' % self._mode] = rg_gain
- tag['replaygain_%s_peak' % self._mode] = rg_peak
+ MODES = {'track', 'album'}
+
+ def __init__(self, mode=u'track'):
+ if mode not in self.MODES:
+ raise ValueError("mode must be one of %r" % self.MODES)
+
+ self._mode = unicode(mode)
+
+ self.gain = None
+ self.peak = 1.0
+ self._raw_gain = None
+ self._raw_peak = None
+
+ def __repr__(self):
+ return "<GainValue mode=%s gain=%f peak=%f>" % (self._mode, self.gain, self.peak)
+
+ @property
+ def mode(self):
+ return self._mode
+
+ @mode.setter
+ def mode(self, value):
+ if value not in self.MODES:
+ raise ValueError("mode must be one of %r" % self.MODES)
+
+ self._mode = unicode(value)
+
+ @classmethod
+ def from_rva2(self, mode, frame):
+ gv = self(mode)
+ gv.gain = frame.gain
+ gv.peak = frame.peak
+ return gv
+
+ @classmethod
+ def from_string(self, mode, gain, peak):
+ rg_gain = float(gain[0].split(' ')[0])
+ rg_peak = float(peak[0].split(' ')[0])
+
+ gv = self(mode)
+ gv._raw_gain = gain
+ gv._raw_peak = peak
+ gv.gain = rg_gain
+ gv.peak = rg_peak
+ return gv
+
+ def to_string(self):
+ rg_gain = self._raw_gain or "%.2f dB" % self.gain
+ rg_peak = self._raw_peak or "%.2f dB" % self.peak
+ return (rg_gain, rg_peak)
+
+ def to_rva2(self):
+ return mutagen.id3.RVA2(desc=self._mode, channel=1, gain=self.gain, peak=self.peak)
+
+ def to_soundcheck(self):
+ return rva_to_soundcheck(self)
+
+ @classmethod
+ def import_tag(self, tag, mode):
+ if mode not in self.MODES:
+ raise ValueError("mode must be one of %r" % self.MODES)
+
+ if (u'RVA2:%s' % mode) in tag:
+ # ID3v2.4 RVA2
+ #print "Found ID3v2.4 RVA2 frame"
+ return self.from_rva2(mode, tag[u'RVA2:%s' % mode])
+ elif (u'TXXX:replaygain_%s_gain' % mode) in tag:
+ # ID3v2 foobar2000
+ #print "Found ID3v2 foobar2000 tag"
+ return self.from_string(mode,
+ tag[u'TXXX:replaygain_%s_gain' % mode],
+ tag[u'TXXX:replaygain_%s_peak' % mode])
+ elif ('----:com.apple.iTunes:replaygain_%s_gain' % mode) in tag:
+ # MP4 foobar2000
+ #print "Found MP4 foobar2000 tag"
+ return self.from_string(mode,
+ tag['----:com.apple.iTunes:replaygain_%s_gain' % mode],
+ tag['----:com.apple.iTunes:replaygain_%s_peak' % mode])
+ elif ('replaygain_%s_gain' % mode) in tag:
+ # FLAC
+ #print "Found FLAC tag"
+ return self.from_string(mode,
+ tag['replaygain_%s_gain' % mode],
+ tag['replaygain_%s_peak' % mode])
+ else:
+ return None
+
+ def export_id3(self, tag):
+ tag[u'RVA2:%s' % self._mode] = self.to_rva2()
+
+ rg_gain, rg_peak = self.to_string()
+ tx_gain = mutagen.id3.TXXX(desc=u'replaygain_%s_gain' % self._mode,
+ encoding=1, text=[rg_gain])
+ tx_peak = mutagen.id3.TXXX(desc=u'replaygain_%s_peak' % self._mode,
+ encoding=1, text=[rg_peak])
+ tag[u'TXXX:'+tx_gain.desc] = tx_gain
+ tag[u'TXXX:'+tx_peak.desc] = tx_peak
+
+ if self._mode == 'track':
+ sc_raw = self.to_soundcheck()
+ sc_norm = mutagen.id3.COMM(desc=u'iTunNORM', lang='eng',
+ encoding=0, text=[sc_raw])
+ tag[u"COMM:%s:'%s'" % (sc_norm.desc, sc_norm.lang)] = sc_norm
+
+ def export_mp4(self, tag):
+ #print "Adding MP4 foobar2000 tag"
+ rg_gain, rg_peak = self.to_string()
+ tag['----:com.apple.iTunes:replaygain_%s_gain' % self._mode] = rg_gain
+ tag['----:com.apple.iTunes:replaygain_%s_peak' % self._mode] = rg_peak
+
+ def export_flac(self, tag):
+ #print "Adding FLAC tag"
+ rg_gain, rg_peak = self.to_string()
+ tag['replaygain_%s_gain' % self._mode] = rg_gain
+ tag['replaygain_%s_peak' % self._mode] = rg_peak
View
750 lib/python/nullroute/sexp.py
@@ -1,5 +1,3 @@
-#!python
-#
# S-exp parser and dumper
# (c) 2011 Mantas M. <grawity@gmail.com>
#
@@ -16,396 +14,396 @@
import base64
from io import BytesIO
-ALPHA = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
-DIGITS = b"0123456789"
-WHITESPACE = b" \t\v\f\r\n"
-PSEUDO_ALPHA = b"-./_:*+="
-PUNCTUATION = b'()[]{}|#"&\\'
-#VERBATIM = b"!%^~;',<>?" # Rivest's spec uses these
-VERBATIM = b"!%^~'<>" # nettle's sexp-conv is more permissive?
-
-TOKEN_CHARS = DIGITS + ALPHA + PSEUDO_ALPHA
-
-HEX_DIGITS = b"0123456789ABCDEFabcdef"
-B64_DIGITS = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/="
-
-PRINTABLE_CHARS = bytes(range(0x20, 0x80))
-ESCAPE_CHARS = b"\b\t\v\n\f\r\\"
-
-ESCAPE_CHARS_TB = {
- b"\b": b"b",
- b"\t": b"t",
- b"\v": b"v",
- b"\n": b"n",
- b"\f": b"f",
- b"\r": b"r",
- b"\\": b"\\",
+ALPHA = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
+DIGITS = b"0123456789"
+WHITESPACE = b" \t\v\f\r\n"
+PSEUDO_ALPHA = b"-./_:*+="
+PUNCTUATION = b'()[]{}|#"&\\'
+#VERBATIM = b"!%^~;',<>?" # Rivest's spec uses these
+VERBATIM = b"!%^~'<>" # nettle's sexp-conv is more permissive?
+
+TOKEN_CHARS = DIGITS + ALPHA + PSEUDO_ALPHA
+
+HEX_DIGITS = b"0123456789ABCDEFabcdef"
+B64_DIGITS = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/="
+
+PRINTABLE_CHARS = bytes(range(0x20, 0x80))
+ESCAPE_CHARS = b"\b\t\v\n\f\r\\"
+
+ESCAPE_CHARS_TB = {
+ b"\b": b"b",
+ b"\t": b"t",
+ b"\v": b"v",
+ b"\n": b"n",
+ b"\f": b"f",
+ b"\r": b"r",
+ b"\\": b"\\",
}
class SexpParser(object):
- def __init__(self, buf):
- self.bytesize = 8
- self.bits = 0
- self.nBits = 0
- self.buf = buf if hasattr(buf, "read") else BytesIO(buf)
- self.char = ""
- self.advance()
-
- def __iter__(self):
- return self
-
- def __next__(self):
- return self.next()
-
- def next(self):
- obj = self.scan_object()
- if obj:
- return obj
- else:
- raise StopIteration
-
- @property
- def pos(self):
- return self.buf.tell()
-
- def advance(self):
- """Get the next byte in the input stream.
-
- Will read as many input bytes as needed from Base64 or hex areas.
- """
- while True:
- self.last = self.char
- self.char = self.buf.read(1)
- if not self.char:
- self.bytesize = 8
- self.char = None
- return self.char
-
- if self.char is None:
- return self.char
- elif (self.bytesize == 6 and self.char in b"|}") \
- or (self.bytesize == 4 and self.char == b"#"):
- if self.nBits and (1 << self.nBits)-1 & self.bits:
- raise IOError("%d-bit region ended with %d unused bits at %d" %
- (self.bytesize, self.nBits, self.pos))
- self.bytesize = 8
- return self.char
- elif self.bytesize != 8 and self.char in WHITESPACE:
- # ignore whitespace in hex/base64 regions
- pass
- elif self.bytesize == 6 and self.char == b"=":
- # Base64 padding
- self.nBits -= 2
- elif self.bytesize == 8:
- return self.char
- elif self.bytesize < 8:
- self.bits <<= self.bytesize
- self.nBits += self.bytesize
- if self.bytesize == 6 and self.char in B64_DIGITS:
- self.bits |= B64_DIGITS.index(self.char)
- elif self.bytesize == 4 and self.char in HEX_DIGITS:
- self.bits |= int(self.char, 16)
- else:
- raise IOError("char %r found in %d-bit region" %
- (self.char, self.bytesize))
-
- if self.nBits >= 8:
- self.nBits -= 8
- byte = (self.bits >> self.nBits) & 0xFF
- self.bits &= (1 << self.nBits)-1
- self.char = bytes([byte])
- return self.char
-
- def skip_whitespace(self):
- while self.char:
- if self.char in WHITESPACE:
- self.advance()
- elif self.char == b";" and self.last in b"\r\n":
- while self.char and self.char not in b"\r\n":
- self.advance()
- else:
- return
-
- def skip_char(self, char):
- """Skip the next character if it matches expectations."""
- if len(char) != 1:
- raise ValueError("only single characters allowed")
- elif not self.char:
- raise IOError("EOF found where %r expected" % char)
- elif self.char == char:
- self.advance()
- else:
- raise IOError("char %r found where %r expected" % (
- self.char, char))
-
- def scan_token(self):
- self.skip_whitespace()
- out = b""
- while self.char and self.char in TOKEN_CHARS:
- out += self.char
- self.advance()
- #print("scan_simple_string ->", repr(out))
- return out
-
- def scan_decimal(self):
- i, value = 0, 0
- while self.char and self.char in DIGITS:
- value = value*10 + int(self.char)
- i += 1
- if i > 8:
- raise IOError("decimal %d... too long" % value)
- self.advance()
- return value
-
- def scan_verbatim_string(self, length=None):
- """Return the value of verbatim string with given length."""
- self.skip_whitespace()
- self.skip_char(b":")
- if not length:
- raise ValueError("verbatim string had no length")
- out, i = b"", 0
- while i < length:
- out += self.char
- self.advance()
- i += 1
- return out
-
- def scan_quoted_string(self, length=None):
- self.skip_char(b"\"")
- out = b""
- while length is None or len(out) <= length:
- if not self.char:
- raise ValueError("quoted string is missing closing quote")
- elif self.char == b"\"":
- if length is None or len(out) == length:
- self.skip_char(b"\"")
- break
- else:
- raise ValueError("quoted string ended too early (expected %d)" % length)
- elif self.char == b"\\":
- c = self.advance()
- if c == b"\r":
- continue
- elif c == b"\n":
- continue
- elif c in b"0123":
- s = c + self.advance() + self.advance()
- val = int(s, 8)
- out += chr(val)
- elif c == b"b":
- out += b"\b"
- elif c == b"f":
- out += b"\f"
- elif c == b"n":
- out += b"\n"
- elif c == b"r":
- out += b"\r"
- elif c == b"t":
- out += b"\t"
- elif c == b"v":
- out += b"\v"
- elif c == b"x":
- s = self.advance() + self.advance()
- val = int(s, 16)
- out += chr(val)
- else:
- raise ValueError("unknown escape character \\%s at %d" % (c, self.pos))
- else:
- out += self.char
- self.advance()
- return out
-
- def scan_hex_string(self, length=None):
- self.bytesize = 4
- self.skip_char(b"#")
- out = b""
- while self.char and (self.char != b"#" or self.bytesize == 4):
- out += self.char
- self.advance()
- self.skip_char(b"#")
- if length and length != len(out):
- raise ValueError("hexstring length %d != declared length %d" %
- (len(out), length))
- return out
-
- def scan_base64_string(self, length=None):
- self.bytesize = 6
- self.skip_char(b"|")
- out = b""
- while self.char and (self.char != b"|" or self.bytesize == 6):
- out += self.char
- self.advance()
- self.skip_char(b"|")
- if length and length != len(out):
- raise ValueError("base64 length %d != declared length %d" %
- (len(out), length))
- return out
-
- def scan_simple_string(self):
- self.skip_whitespace()
- if not self.char:
- return None
- elif self.char in TOKEN_CHARS and self.char not in DIGITS:
- return self.scan_token()
- elif self.char in DIGITS or self.char in b"\"#|:":
- if self.char in DIGITS:
- length = self.scan_decimal()
- else:
- length = None
- if self.char == b"\"":
- return self.scan_quoted_string(length)
- elif self.char == b"#":
- return self.scan_hex_string(length)
- elif self.char == b"|":
- return self.scan_base64_string(length)
- elif self.char == b":":
- return self.scan_verbatim_string(length)
- else:
- raise ValueError("illegal char %r at %d" % (self.char, self.pos))
- else:
- raise ValueError("illegal char %r at %d" % (self.char, self.pos))
-
- def scan_string(self):
- # TODO: How should hints be handled in a Pythonic way?
- hint = None
- if self.char == b"[":
- self.skip_char(b"[")
- hint = self.scan_simple_string()
- self.skip_whitespace()
- self.skip_char(b"]")
- self.skip_whitespace()
- out = self.scan_simple_string()
- return (hint, out) if hint else out
-
- def scan_list(self):
- out = []
- self.skip_char(b"(")
- while True:
- self.skip_whitespace()
- if not self.char:
- raise ValueError("list is missing closing paren")
- elif self.char == b")":
- self.skip_char(b")")
- return out
- else:
- out.append(self.scan_object())
-
- def scan_object(self):
- """Return the next object of any type."""
- self.skip_whitespace()
- if not self.char:
- out = None
- elif self.char == b"{":
- self.bytesize = 6
- self.skip_char(b"{")
- out = self.scan_object()
- self.skip_char(b"}")
- elif self.char == b"(":
- out = self.scan_list()
- else:
- out = self.scan_string()
- return out
+ def __init__(self, buf):
+ self.bytesize = 8
+ self.bits = 0
+ self.nBits = 0
+ self.buf = buf if hasattr(buf, "read") else BytesIO(buf)
+ self.char = ""
+ self.advance()
+
+ def __iter__(self):
+ return self
+
+ def __next__(self):
+ return self.next()
+
+ def next(self):
+ obj = self.scan_object()
+ if obj:
+ return obj
+ else:
+ raise StopIteration
+
+ @property
+ def pos(self):
+ return self.buf.tell()
+
+ def advance(self):
+ """Get the next byte in the input stream.
+
+ Will read as many input bytes as needed from Base64 or hex areas.
+ """
+ while True:
+ self.last = self.char
+ self.char = self.buf.read(1)
+ if not self.char:
+ self.bytesize = 8
+ self.char = None
+ return self.char
+
+ if self.char is None:
+ return self.char
+ elif (self.bytesize == 6 and self.char in b"|}") \
+ or (self.bytesize == 4 and self.char == b"#"):
+ if self.nBits and (1 << self.nBits)-1 & self.bits:
+ raise IOError("%d-bit region ended with %d unused bits at %d" %
+ (self.bytesize, self.nBits, self.pos))
+ self.bytesize = 8
+ return self.char
+ elif self.bytesize != 8 and self.char in WHITESPACE:
+ # ignore whitespace in hex/base64 regions
+ pass
+ elif self.bytesize == 6 and self.char == b"=":
+ # Base64 padding
+ self.nBits -= 2
+ elif self.bytesize == 8:
+ return self.char
+ elif self.bytesize < 8:
+ self.bits <<= self.bytesize
+ self.nBits += self.bytesize
+ if self.bytesize == 6 and self.char in B64_DIGITS:
+ self.bits |= B64_DIGITS.index(self.char)
+ elif self.bytesize == 4 and self.char in HEX_DIGITS:
+ self.bits |= int(self.char, 16)
+ else:
+ raise IOError("char %r found in %d-bit region" %
+ (self.char, self.bytesize))
+
+ if self.nBits >= 8:
+ self.nBits -= 8
+ byte = (self.bits >> self.nBits) & 0xFF
+ self.bits &= (1 << self.nBits)-1
+ self.char = bytes([byte])
+ return self.char
+
+ def skip_whitespace(self):
+ while self.char:
+ if self.char in WHITESPACE:
+ self.advance()
+ elif self.char == b";" and self.last in b"\r\n":
+ while self.char and self.char not in b"\r\n":
+ self.advance()
+ else:
+ return
+
+ def skip_char(self, char):
+ """Skip the next character if it matches expectations."""
+ if len(char) != 1:
+ raise ValueError("only single characters allowed")
+ elif not self.char:
+ raise IOError("EOF found where %r expected" % char)
+ elif self.char == char:
+ self.advance()
+ else:
+ raise IOError("char %r found where %r expected" % (
+ self.char, char))
+
+ def scan_token(self):
+ self.skip_whitespace()
+ out = b""
+ while self.char and self.char in TOKEN_CHARS:
+ out += self.char
+ self.advance()
+ #print("scan_simple_string ->", repr(out))
+ return out
+
+ def scan_decimal(self):
+ i, value = 0, 0
+ while self.char and self.char in DIGITS:
+ value = value*10 + int(self.char)
+ i += 1
+ if i > 8:
+ raise IOError("decimal %d... too long" % value)
+ self.advance()
+ return value
+
+ def scan_verbatim_string(self, length=None):
+ """Return the value of verbatim string with given length."""
+ self.skip_whitespace()
+ self.skip_char(b":")
+ if not length:
+ raise ValueError("verbatim string had no length")
+ out, i = b"", 0
+ while i < length:
+ out += self.char
+ self.advance()
+ i += 1
+ return out
+
+ def scan_quoted_string(self, length=None):
+ self.skip_char(b"\"")
+ out = b""
+ while length is None or len(out) <= length:
+ if not self.char:
+ raise ValueError("quoted string is missing closing quote")
+ elif self.char == b"\"":
+ if length is None or len(out) == length:
+ self.skip_char(b"\"")
+ break
+ else:
+ raise ValueError("quoted string ended too early (expected %d)" % length)
+ elif self.char == b"\\":
+ c = self.advance()
+ if c == b"\r":
+ continue
+ elif c == b"\n":
+ continue
+ elif c in b"0123":
+ s = c + self.advance() + self.advance()
+ val = int(s, 8)
+ out += chr(val)
+ elif c == b"b":
+ out += b"\b"
+ elif c == b"f":
+ out += b"\f"
+ elif c == b"n":
+ out += b"\n"
+ elif c == b"r":
+ out += b"\r"
+ elif c == b"t":
+ out += b"\t"
+ elif c == b"v":
+ out += b"\v"
+ elif c == b"x":
+ s = self.advance() + self.advance()
+ val = int(s, 16)
+ out += chr(val)
+ else:
+ raise ValueError("unknown escape character \\%s at %d" % (c, self.pos))
+ else:
+ out += self.char
+ self.advance()
+ return out
+
+ def scan_hex_string(self, length=None):
+ self.bytesize = 4
+ self.skip_char(b"#")
+ out = b""
+ while self.char and (self.char != b"#" or self.bytesize == 4):
+ out += self.char
+ self.advance()
+ self.skip_char(b"#")
+ if length and length != len(out):
+ raise ValueError("hexstring length %d != declared length %d" %
+ (len(out), length))
+ return out
+
+ def scan_base64_string(self, length=None):
+ self.bytesize = 6
+ self.skip_char(b"|")
+ out = b""
+ while self.char and (self.char != b"|" or self.bytesize == 6):
+ out += self.char
+ self.advance()
+ self.skip_char(b"|")
+ if length and length != len(out):
+ raise ValueError("base64 length %d != declared length %d" %
+ (len(out), length))
+ return out
+
+ def scan_simple_string(self):
+ self.skip_whitespace()
+ if not self.char:
+ return None
+ elif self.char in TOKEN_CHARS and self.char not in DIGITS:
+ return self.scan_token()
+ elif self.char in DIGITS or self.char in b"\"#|:":
+ if self.char in DIGITS:
+ length = self.scan_decimal()
+ else:
+ length = None
+ if self.char == b"\"":
+ return self.scan_quoted_string(length)
+ elif self.char == b"#":
+ return self.scan_hex_string(length)
+ elif self.char == b"|":
+ return self.scan_base64_string(length)
+ elif self.char == b":":
+ return self.scan_verbatim_string(length)
+ else:
+ raise ValueError("illegal char %r at %d" % (self.char, self.pos))
+ else:
+ raise ValueError("illegal char %r at %d" % (self.char, self.pos))
+
+ def scan_string(self):
+ # TODO: How should hints be handled in a Pythonic way?
+ hint = None
+ if self.char == b"[":
+ self.skip_char(b"[")
+ hint = self.scan_simple_string()
+ self.skip_whitespace()
+ self.skip_char(b"]")
+ self.skip_whitespace()
+ out = self.scan_simple_string()
+ return (hint, out) if hint else out
+
+ def scan_list(self):
+ out = []
+ self.skip_char(b"(")
+ while True:
+ self.skip_whitespace()
+ if not self.char:
+ raise ValueError("list is missing closing paren")
+ elif self.char == b")":
+ self.skip_char(b")")
+ return out
+ else:
+ out.append(self.scan_object())
+
+ def scan_object(self):
+ """Return the next object of any type."""
+ self.skip_whitespace()
+ if not self.char:
+ out = None
+ elif self.char == b"{":
+ self.bytesize = 6
+ self.skip_char(b"{")
+ out = self.scan_object()
+ self.skip_char(b"}")
+ elif self.char == b"(":
+ out = self.scan_list()
+ else:
+ out = self.scan_string()
+ return out
def load(buf):
- out = list(SexpParser(buf))
- if not out:
- return None
- elif len(out) == 1:
- return out[0]
- else:
- return out
+ out = list(SexpParser(buf))
+ if not out:
+ return None
+ elif len(out) == 1:
+ return out[0]
+ else:
+ return out
def dump(obj, canonical=False, transport=False):
- if transport:
- canonical = True
-
- if isinstance(obj, (str, bytes)):
- exp = dump_string(obj, canonical)
- elif isinstance(obj, dict):
- exp = dump_list(obj.items(), canonical)
- elif isinstance(obj, (list, tuple)):
- exp = dump_list(obj, canonical)
- elif isinstance(obj, int):
- exp = dump_string(str(obj), canonical)
- else:
- raise TypeError("unsupported object type %r of %r" % (type(obj), obj))
-
- if transport:
- return b"{" + base64.b64encode(exp) + b"}"
- else:
- return exp
+ if transport:
+ canonical = True
+
+ if isinstance(obj, (str, bytes)):
+ exp = dump_string(obj, canonical)
+ elif isinstance(obj, dict):
+ exp = dump_list(obj.items(), canonical)
+ elif isinstance(obj, (list, tuple)):
+ exp = dump_list(obj, canonical)
+ elif isinstance(obj, int):
+ exp = dump_string(str(obj), canonical)
+ else:
+ raise TypeError("unsupported object type %r of %r" % (type(obj), obj))
+
+ if transport:
+ return b"{" + base64.b64encode(exp) + b"}"
+ else:
+ return exp
def dump_string(obj, canonical=False, hex=False, hint=None):
- if hasattr(obj, "encode"):
- obj = obj.encode("utf-8")
-
- if canonical:
- out = ("%d:" % len(obj)).encode("utf-8") + obj
- elif is_token(obj):
- out = bytes(obj)
- elif is_quoteable(obj):
- out = bytearray(b'"')
- # This sucks.
- # In python2, iterates over 1-char strings.
- # In python3, iterates over integers. NOT 1-char bytes()
- # No, screw it. I officially drop Python 2 compatibility here.
- for char in obj:
- if char in ESCAPE_CHARS_TB:
- out += b"\\"
- out += ESCAPE_CHARS_TB[char]
- elif char in b"'\"":
- out += b"\\"
- out.append(char)
- else:
- out.append(char)
- out += b'"'
- out = bytes(out)
- elif hex:
- out = b"#" + obj.encode("hex") + b"#"
- else:
- out = b"|" + base64.b64encode(obj) + b"|"
-
- # Add [mimetypehint]
- if hint:
- return b"[" + dump_string(hint, canonical, hex, None) + b"]" + out
- else:
- return out
+ if hasattr(obj, "encode"):
+ obj = obj.encode("utf-8")
+
+ if canonical:
+ out = ("%d:" % len(obj)).encode("utf-8") + obj
+ elif is_token(obj):
+ out = bytes(obj)
+ elif is_quoteable(obj):
+ out = bytearray(b'"')
+ # This sucks.
+ # In python2, iterates over 1-char strings.
+ # In python3, iterates over integers. NOT 1-char bytes()
+ # No, screw it. I officially drop Python 2 compatibility here.
+ for char in obj:
+ if char in ESCAPE_CHARS_TB:
+ out += b"\\"
+ out += ESCAPE_CHARS_TB[char]
+ elif char in b"'\"":
+ out += b"\\"
+ out.append(char)
+ else:
+ out.append(char)
+ out += b'"'
+ out = bytes(out)
+ elif hex:
+ out = b"#" + obj.encode("hex") + b"#"
+ else:
+ out = b"|" + base64.b64encode(obj) + b"|"
+
+ # Add [mimetypehint]
+ if hint:
+ return b"[" + dump_string(hint, canonical, hex, None) + b"]" + out
+ else:
+ return out
def dump_hint(obj, canonical=False):
- return b"[" + dump_string(obj, canonical) + b"]"
+ return b"[" + dump_string(obj, canonical) + b"]"
def dump_list(obj, canonical=False):
- out = b"("
- if canonical:
- out += b"".join(dump(x, canonical=True) for x in obj)
- else:
- out += b" ".join(dump(x) for x in obj)
- out += b")"
- return out
+ out = b"("
+ if canonical:
+ out += b"".join(dump(x, canonical=True) for x in obj)
+ else:
+ out += b" ".join(dump(x) for x in obj)
+ out += b")"
+ return out
def to_int(buf):
- num = 0
- for byte in buf:
- num <<= 8
- num |= ord(byte)
- return num
+ num = 0
+ for byte in buf:
+ num <<= 8
+ num |= ord(byte)
+ return num
def is_token(string):
- if string[0] in DIGITS:
- return False
- for char in string:
- if char not in TOKEN_CHARS:
- return False
- return True
+ if string[0] in DIGITS:
+ return False
+ for char in string:
+ if char not in TOKEN_CHARS:
+ return False
+ return True
def is_quoteable(string):
- for char in string:
- if char in VERBATIM:
- return False
- elif char in PRINTABLE_CHARS:
- pass
- elif char in ESCAPE_CHARS:
- pass
- else:
- return False
- return True
+ for char in string:
+ if char in VERBATIM:
+ return False
+ elif char in PRINTABLE_CHARS:
+ pass
+ elif char in ESCAPE_CHARS:
+ pass
+ else:
+ return False
+ return True
View
100 lib/python/nullroute/tests.py
@@ -5,66 +5,66 @@
import irc
def parse_test(file):
- for line in open(file):
- line = line.strip()
- if not line or line.startswith("//"):
- continue
- yield json.loads("[%s]" % line)
+ for line in open(file):
+ line = line.strip()
+ if not line or line.startswith("//"):
+ continue
+ yield json.loads("[%s]" % line)
def run_test(file, func):
- passed, failed = 0, 0
- for input, wanted_output in parse_test(file):
- actual_output = func(input)
- if wanted_output == actual_output:
- msg = " OK "
- passed += 1
- else:
- msg = "FAIL"
- failed += 1
- print("%s: %r -> %s" % (msg, input, json.dumps(actual_output)))
- if msg == "FAIL":
- print("\033[33m%s: %r -> %s\033[m" % ("WANT",
- input, json.dumps(wanted_output)))
- print("Tests: %s passed, %d failed" % (passed, failed))
- return failed
+ passed, failed = 0, 0
+ for input, wanted_output in parse_test(file):
+ actual_output = func(input)
+ if wanted_output == actual_output:
+ msg = " OK "
+ passed += 1
+ else:
+ msg = "FAIL"
+ failed += 1
+ print("%s: %r -> %s" % (msg, input, json.dumps(actual_output)))
+ if msg == "FAIL":
+ print("\033[33m%s: %r -> %s\033[m" % ("WANT",
+ input, json.dumps(wanted_output)))
+ print("Tests: %s passed, %d failed" % (passed, failed))
+ return failed
def test_split(input):
- input = input.encode("utf-8")
- try:
- return irc.Line.split(input)
- except ValueError:
- return None
+ input = input.encode("utf-8")
+ try:
+ return irc.Line.split(input)
+ except ValueError:
+ return None
def test_join(input):
- try:
- return irc.Line.join(input)
- except ValueError:
- return None
+ try:
+ return irc.Line.join(input)
+ except ValueError:
+ return None
def test_prefix_split(input):
- try:
- p = irc.Prefix.parse(input)
- if p:
- return p.to_a()
- else:
- return None
- except ValueError:
- return None
+ try:
+ p = irc.Prefix.parse(input)
+ if p:
+ return p.to_a()
+ else:
+ return None
+ except ValueError:
+ return None
def test_parse(input):
- input = input.encode("utf-8")
- p = irc.Line.parse(input)
- tags = [k if v is True or v == "" else "%s=%s" % (k, v)
- for k, v in p.tags.items()]
- if tags:
- tags.sort()
- else:
- tags = None
- if p.prefix:
- prefix = p.prefix.to_a()
- else:
- prefix = None
- return [tags, prefix, p.args]
+ input = input.encode("utf-8")
+ p = irc.Line.parse(input)
+ tags = [k if v is True or v == "" else "%s=%s" % (k, v)
+ for k, v in p.tags.items()]
+ if tags:
+ tags.sort()
+ else:
+ tags = None
+ if p.prefix:
+ prefix = p.prefix.to_a()
+ else:
+ prefix = None
+ return [tags, prefix, p.args]
dir = "../../tests"
View
400 lib/python/nullroute/windows/firewall.py
@@ -2,213 +2,213 @@
from .registry import RegistryKey
class Firewall(object):
- ROOT_KEY = "SYSTEM\\CurrentControlSet\\Services\\SharedAccess\\Parameters\\FirewallPolicy\\StandardProfile"
+ ROOT_KEY = "SYSTEM\\CurrentControlSet\\Services\\SharedAccess\\Parameters\\FirewallPolicy\\StandardProfile"
- SCOPE_ALL = "*"
- SCOPE_SUBNET = "LocalSubNet"
+ SCOPE_ALL = "*"
+ SCOPE_SUBNET = "LocalSubNet"
- status_ENABLED = "Enabled"
- status_DISABLED = "Disabled"
+ status_ENABLED = "Enabled"
+ status_DISABLED = "Disabled"
- def __init__(self, machine=None):
- self.machine = machine
- self.hKey = RegistryKey(Con.HKEY_LOCAL_MACHINE, path=self.ROOT_KEY,
- machine=self.machine)
+ def __init__(self, machine=None):
+ self.machine = machine
+ self.hKey = RegistryKey(Con.HKEY_LOCAL_MACHINE, path=self.ROOT_KEY,
+ machine=self.machine)
- self._appList = None
- self._portList = None
+ self._appList = None
+ self._portList = None
- @property
- def apps(self):
- if not self._appList:
- self._appList = _FirewallApplicationList(self)
- return self._appList
+ @property
+ def apps(self):
+ if not self._appList:
+ self._appList = _FirewallApplicationList(self)
+ return self._appList
- @property
- def ports(self):
- if not self._portList:
- self._portList = _FirewallPortList(self)
- return self._portList
+ @property
+ def ports(self):
+ if not self._portList:
+ self._portList = _FirewallPortList(self)
+ return self._portList
class _FirewallApplicationList(object):
- APPS_SUBKEY = "AuthorizedApplications\\List"
-
- POS_EXEPATH = 0
- POS_SCOPE = 1
- POS_status = 2
- POS_NAME = 3
-
- def __init__(self, fw):
- self.fwProfile = fw
- self.hKey = self.fwProfile.hKey.open_subkey(self.APPS_SUBKEY)
-
- @classmethod
- def _pack(self, exepath, scope=None, enabled=None, name=None):
- if scope is not None:
- status = "Enabled" if enabled else "Disabled"
- return ":".join([exepath, scope, status, name])
- else:
- return exepath
-
- @classmethod
- def _unpack_value(self, val):
- return val
-
- @classmethod
- def _unpack_data(self, val):
- # deal with drive:\path bogosity
- _drive, _rest = val[:2], val[2:]
- exepath, scope, status, name = _rest.split(":", 3)
- exepath = _drive + exepath
- enabled = (status.lower() == "enabled")
- return exepath, scope, enabled, name
-
- # Lookup
-
- def query(self, exepath):
- value = self._pack(exepath)
- data, _reg_type = self.hKey[value]
- return self._unpack_data(data)
-
- def __getitem__(self, key):
- return self.query(key)
-
- def set(self, exepath, scope, enabled, name):
- scope = scope or self.SCOPE_ALL
- value = self._pack(exepath)
- data = self._pack(exepath, scope, enabled, name)
- self.hKey[value] = (data, Con.REG_SZ)
-
- def __setitem__(self, key, val):
- if val[0] != key:
- raise ValueError("exepath must be identical to the key")
- self.set(*val)
-
- def delete(self, exepath):
- value = self._pack(exepath)
- del self.hKey[value]
-
- def __delitem__(self, key):
- self.delete(key)
-
- def __iter__(self):
- for k, (v, t) in self.hKey:
- yield self._unpack_value(k)
-
- def values(self):
- for k, (v, t) in self.hKey:
- yield self._unpack_data(v)
-
- def items(self):
- for k, (v, t) in self.hKey:
- yield self._unpack_value(k), self._unpack_data(v)
+ APPS_SUBKEY = "AuthorizedApplications\\List"
+
+ POS_EXEPATH = 0
+ POS_SCOPE = 1
+ POS_status = 2
+ POS_NAME = 3
+
+ def __init__(self, fw):
+ self.fwProfile = fw
+ self.hKey = self.fwProfile.hKey.open_subkey(self.APPS_SUBKEY)
+
+ @classmethod
+ def _pack(self, exepath, scope=None, enabled=None, name=None):
+ if scope is not None:
+ status = "Enabled" if enabled else "Disabled"
+ return ":".join([exepath, scope, status, name])
+ else:
+ return exepath
+
+ @classmethod
+ def _unpack_value(self, val):
+ return val
+
+ @classmethod
+ def _unpack_data(self, val):
+ # deal with drive:\path bogosity
+ _drive, _rest = val[:2], val[2:]
+ exepath, scope, status, name = _rest.split(":", 3)
+ exepath = _drive + exepath
+ enabled = (status.lower() == "enabled")
+ return exepath, scope, enabled, name
+
+ # Lookup
+
+ def query(self, exepath):
+ value = self._pack(exepath)
+ data, _reg_type = self.hKey[value]
+ return self._unpack_data(data)
+
+ def __getitem__(self, key):
+ return self.query(key)
+