Skip to content

Commit

Permalink
Merge branch 'v2'
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrew Gallant (Ocelot) authored and Andrew Gallant (Ocelot) committed Feb 20, 2012
2 parents 7c2393f + 3415e18 commit ae28d0d
Show file tree
Hide file tree
Showing 16 changed files with 1,539 additions and 1,621 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Expand Up @@ -2,3 +2,5 @@ tests
*.pyc
*/tags
.*swp
testing.py
tags
4 changes: 2 additions & 2 deletions examples/get-current-desktop.py
@@ -1,8 +1,8 @@
from xpybutil import conn, root
import xpybutil.ewmh as ewmh

names = ewmh.get_desktop_names(conn, root).reply()
desk = ewmh.get_current_desktop(conn, root).reply()
names = ewmh.get_desktop_names().reply()
desk = ewmh.get_current_desktop().reply()

print names[desk] if desk < len(names) else desk

23 changes: 13 additions & 10 deletions xpybutil/cursor.py
@@ -1,3 +1,5 @@
from xpybutil import conn

class FontCursor:
XCursor = 0
Arrow = 2
Expand Down Expand Up @@ -77,16 +79,17 @@ class FontCursor:
Watch = 150
XTerm = 152

def create_font_cursor(c, cursor_id, fore_red=0, fore_green=0, fore_blue=0,
def create_font_cursor(cursor_id, fore_red=0, fore_green=0, fore_blue=0,
back_red=0xffff, back_green=0xffff, back_blue=0xffff):
font = c.generate_id()
cursor = c.generate_id()
font = conn.generate_id()
cursor = conn.generate_id()

conn.core.OpenFontChecked(font, len('cursor'), 'cursor').check()
conn.core.CreateGlyphCursorChecked(cursor, font, font, cursor_id,
cursor_id + 1, fore_red, fore_green,
fore_blue, back_red, back_green,
back_blue).check()
conn.core.CloseFont(font)

c.core.OpenFontChecked(font, len('cursor'), 'cursor').check()
c.core.CreateGlyphCursorChecked(cursor, font, font, cursor_id,
cursor_id + 1, fore_red, fore_green,
fore_blue, back_red, back_green,
back_blue).check()
c.core.CloseFont(font)
return cursor

return cursor
93 changes: 63 additions & 30 deletions xpybutil/event.py
@@ -1,16 +1,19 @@
from collections import deque
from collections import defaultdict, deque
import struct
import sys
import traceback

import xcb.xproto
import xcb
import xcb.xproto as xproto

from xpybutil import conn, root
import util

__queue = deque()
__callbacks = []
__callbacks = defaultdict(list)
EM = xproto.EventMask

class Event:
class Event(object):
KeyPressEvent = 2
KeyReleaseEvent = 3
ButtonPressEvent = 4
Expand Down Expand Up @@ -45,63 +48,93 @@ class Event:
ClientMessageEvent = 33
MappingNotifyEvent = 34

def connect(event_name, window, callback):
member = '%sEvent' % event_name
assert hasattr(xcb.xproto, member)

__callbacks.append((getattr(xcb.xproto, member), window, callback))

def disconnect(event_name, window):
member = '%sEvent' % event_name
assert hasattr(xcb.xproto, member)
def replay_pointer():
conn.core.AllowEventsChecked(xproto.Allow.ReplayPointer,
xproto.Time.CurrentTime).check()

member = getattr(xcb.xproto, member)
cbs = filter(lambda (et, win, cb): et == member and win == window,
__callbacks)
for item in cbs:
__callbacks.remove(item)
def send_event(destination, event_mask, event, propagate=False):
return conn.core.SendEvent(propagate, destination, event_mask, event)

def send_event(c, destination, event_mask, event, propagate=False):
return c.core.SendEvent(propagate, destination, event_mask, event)
def send_event_checked(destination, event_mask, event, propagate=False):
return conn.core.SendEventChecked(propagate, destination, event_mask, event)

def pack_client_message(window, message_type, *data):
assert len(data) <= 5

if isinstance(message_type, basestring):
message_type = util.get_atom(message_type)

data = list(data)
data += ([0] * (5 - len(data)))
data += [0] * (5 - len(data))

# Taken from
# http://xcb.freedesktop.org/manual/structxcb__client__message__event__t.html
return struct.pack('BBH7I', Event.ClientMessageEvent, 32, 0, window,
message_type, *data)

def read(c, block=False):
def root_send_client_event(window, message_type, *data):
mask = EM.SubstructureNotify | EM.SubstructureRedirect
packed = pack_client_message(window, message_type, *data)
return send_event(root, mask, packed)

def root_send_client_event_checked(window, message_type, *data):
mask = EM.SubstructureNotify | EM.SubstructureRedirect
packed = pack_client_message(window, message_type, *data)
return send_event_checked(root, mask, packed)

def is_connected(event_name, window, callback):
member = '%sEvent' % event_name
assert hasattr(xproto, member)

key = (getattr(xproto, member), window)
return key in __callbacks and callback in __callbacks[key]

def connect(event_name, window, callback):
member = '%sEvent' % event_name
assert hasattr(xproto, member)

key = (getattr(xproto, member), window)
__callbacks[key].append(callback)

def disconnect(event_name, window):
member = '%sEvent' % event_name
assert hasattr(xproto, member)

key = (getattr(xproto, member), window)
if key in __callbacks:
del __callbacks[key]

def read(block=False):
if block:
e = c.wait_for_event()
e = conn.wait_for_event()
__queue.appendleft(e)

while True:
e = c.poll_for_event()
e = conn.poll_for_event()

if not e:
break

__queue.appendleft(e)

def event_loop(conn):
def main():
try:
while True:
read(conn, block=True)
read(block=True)
for e in queue():
w = None
if hasattr(e, 'window'):
if isinstance(e, xproto.MappingNotifyEvent):
w = None
elif hasattr(e, 'window'):
w = e.window
elif hasattr(e, 'event'):
w = e.event
elif hasattr(e, 'requestor'):
w = e.requestor

for event_type, win, cb in __callbacks:
if win == w and isinstance(e, event_type):
cb(e)
key = (e.__class__, w)
for cb in __callbacks.get(key, []):
cb(e)
except xcb.Exception:
traceback.print_exc()
sys.exit(1)
Expand Down

0 comments on commit ae28d0d

Please sign in to comment.