Skip to content

Commit

Permalink
everything uses PollReactor, "completed" port for audiotest.py
Browse files Browse the repository at this point in the history
  • Loading branch information
bananadine committed Jun 24, 2013
1 parent 1cd8163 commit 7e20c17
Show file tree
Hide file tree
Showing 9 changed files with 138 additions and 140 deletions.
40 changes: 20 additions & 20 deletions flumotion/common/gstreamer.py
Expand Up @@ -119,7 +119,7 @@ def element_factory_exists(name):
@rtype: boolean
"""
registry = Gst.registry_get()
registry = Gst.Registry.get()
factory = registry.find_feature(name, Gst.TYPE_ELEMENT_FACTORY)

if factory:
Expand All @@ -135,7 +135,7 @@ def get_plugin_version(plugin_name):
@rtype: tuple of (major, minor, micro, nano), or None if it could not be
found or determined
"""
plugin = Gst.registry_get().find_plugin(plugin_name)
plugin = Gst.Registry.get().find_plugin(plugin_name)

if not plugin:
return None
Expand All @@ -149,18 +149,18 @@ def get_plugin_version(plugin_name):


def get_state_change(old, new):
table = {(Gst.STATE.NULL, Gst.STATE.READY):
Gst.STATE.CHANGE_NULL_TO_READY,
(Gst.STATE.READY, Gst.STATE.PAUSED):
Gst.STATE.CHANGE_READY_TO_PAUSED,
(Gst.STATE.PAUSED, Gst.STATE.PLAYING):
Gst.STATE.CHANGE_PAUSED_TO_PLAYING,
(Gst.STATE.PLAYING, Gst.STATE.PAUSED):
Gst.STATE.CHANGE_PLAYING_TO_PAUSED,
(Gst.STATE.PAUSED, Gst.STATE.READY):
Gst.STATE.CHANGE_PAUSED_TO_READY,
(Gst.STATE.READY, Gst.STATE.NULL):
Gst.STATE.CHANGE_READY_TO_NULL}
table = {(Gst.State.NULL, Gst.State.READY):
Gst.State.CHANGE_NULL_TO_READY,
(Gst.State.READY, Gst.State.PAUSED):
Gst.State.CHANGE_READY_TO_PAUSED,
(Gst.State.PAUSED, Gst.State.PLAYING):
Gst.State.CHANGE_PAUSED_TO_PLAYING,
(Gst.State.PLAYING, Gst.State.PAUSED):
Gst.State.CHANGE_PLAYING_TO_PAUSED,
(Gst.State.PAUSED, Gst.State.READY):
Gst.State.CHANGE_PAUSED_TO_READY,
(Gst.State.READY, Gst.State.NULL):
Gst.State.CHANGE_READY_TO_NULL}
return table.get((old, new), 0)


Expand Down Expand Up @@ -205,13 +205,13 @@ def state_changed(self, old, new):
def have_error(self, curstate, message):
# if we have a state change defer that has not yet
# fired, we should errback it
changes = [Gst.STATE.CHANGE_NULL_TO_READY,
Gst.STATE.CHANGE_READY_TO_PAUSED,
Gst.STATE.CHANGE_PAUSED_TO_PLAYING]
changes = [Gst.State.CHANGE_NULL_TO_READY,
Gst.State.CHANGE_READY_TO_PAUSED,
Gst.State.CHANGE_PAUSED_TO_PLAYING]

extras = ((Gst.STATE.PAUSED, Gst.STATE.CHANGE_PLAYING_TO_PAUSED),
(Gst.STATE.READY, Gst.STATE.CHANGE_PAUSED_TO_READY),
(Gst.STATE.NULL, Gst.STATE.CHANGE_READY_TO_NULL))
extras = ((Gst.State.PAUSED, Gst.State.CHANGE_PLAYING_TO_PAUSED),
(Gst.State.READY, Gst.State.CHANGE_PAUSED_TO_READY),
(Gst.State.NULL, Gst.State.CHANGE_READY_TO_NULL))
for state, change in extras:
if curstate <= state:
changes.append(change)
Expand Down
26 changes: 13 additions & 13 deletions flumotion/common/pygobject.py
Expand Up @@ -23,7 +23,7 @@

import sys

import gobject
from gi.repository import GObject

__version__ = "$Rev$"

Expand All @@ -36,33 +36,33 @@ def gobject_set_property(object, property, value):
@type property: string
@param value: value to set property to
"""
for pspec in gobject.list_properties(object):
for pspec in GObject.list_properties(object):
if pspec.name == property:
break
else:
raise errors.PropertyError(
"Property '%s' in element '%s' does not exist" % (
property, object.get_property('name')))

if pspec.value_type in (gobject.TYPE_INT, gobject.TYPE_UINT,
gobject.TYPE_INT64, gobject.TYPE_UINT64):
if pspec.value_type in (GObject.TYPE_INT, GObject.TYPE_UINT,
GObject.TYPE_INT64, GObject.TYPE_UINT64):
try:
value = int(value)
except ValueError:
msg = "Invalid value given for property '%s' in element '%s'" % (
property, object.get_property('name'))
raise errors.PropertyError(msg)

elif pspec.value_type == gobject.TYPE_BOOLEAN:
elif pspec.value_type == GObject.TYPE_BOOLEAN:
if value == 'False':
value = False
elif value == 'True':
value = True
else:
value = bool(value)
elif pspec.value_type in (gobject.TYPE_DOUBLE, gobject.TYPE_FLOAT):
elif pspec.value_type in (GObject.TYPE_DOUBLE, GObject.TYPE_FLOAT):
value = float(value)
elif pspec.value_type == gobject.TYPE_STRING:
elif pspec.value_type == GObject.TYPE_STRING:
value = str(value)
# FIXME: this is superevil ! we really need to find a better way
# of checking if this property is a param enum
Expand Down Expand Up @@ -92,7 +92,7 @@ def gsignal(name, *args):
else:
_dict = _locals['__gsignals__']

_dict[name] = (gobject.SIGNAL_RUN_FIRST, None, args)
_dict[name] = (GObject.SIGNAL_RUN_FIRST, None, args)

PARAM_CONSTRUCT = 1<<9

Expand Down Expand Up @@ -138,13 +138,13 @@ def _do_set_property(self, prop, value):
if k == 'construct':
flags |= PARAM_CONSTRUCT
elif k == 'construct_only':
flags |= gobject.PARAM_CONSTRUCT_ONLY
flags |= GObject.PARAM_CONSTRUCT_ONLY
elif k == 'readable':
flags |= gobject.PARAM_READABLE
flags |= GObject.PARAM_READABLE
elif k == 'writable':
flags |= gobject.PARAM_WRITABLE
flags |= GObject.PARAM_WRITABLE
elif k == 'lax_validation':
flags |= gobject.PARAM_LAX_VALIDATION
flags |= GObject.PARAM_LAX_VALIDATION
else:
raise Exception('Invalid GObject property flag: %r=%r' % (k, v))

Expand All @@ -155,4 +155,4 @@ def type_register(klass):
if klass.__gtype__.pytype is not klass:
# all subclasses will at least have a __gtype__ from their
# parent, make sure it corresponds to the exact class
gobject.type_register(klass)
GObject.type_register(klass)
46 changes: 20 additions & 26 deletions flumotion/component/feedcomponent.py
Expand Up @@ -21,9 +21,10 @@

import os

import gst
import gst.interfaces
import gobject
import gi
gi.require_version('Gst', '1.0')
from gi.repository import GObject, Gst
#import gst.interfaces

from twisted.internet import reactor, defer
from twisted.spread import pb
Expand Down Expand Up @@ -98,12 +99,12 @@ def remote_setGstDebug(self, debug):
if glob:
try:
# value has to be an integer
gst.debug_set_threshold_for_name(glob, value)
Gst.debug_set_threshold_for_name(glob, value)
except TypeError:
self.warning("Cannot set glob %s to value %s" % (
glob, value))
else:
gst.debug_set_default_threshold(value)
Gst.debug_set_default_threshold(value)

self.comp.uiState.set('gst-debug', debug)

Expand Down Expand Up @@ -278,7 +279,6 @@ def remote_dumpGstreamerDotFile(self, filename):

FeedComponent.componentMediumClass = FeedComponentMedium


class ParseLaunchComponent(FeedComponent):
"""A component using gst-launch syntax
Expand Down Expand Up @@ -338,7 +338,7 @@ def create_pipeline(self):
self.pipeline_string = self.parse_pipeline(unparsed)

try:
pipeline = gst.parse_launch(self.pipeline_string)
pipeline = Gst.parse_launch(self.pipeline_string)
except gobject.GError, e:
self.warning('Could not parse pipeline: %s' % e.message)
m = messages.Error(T_(N_(
Expand Down Expand Up @@ -507,7 +507,6 @@ def get_feeder_sinkpad(self, feederAlias):
gdppay = self.get_element(e.elementName + '-pay')
return gdppay.get_static_pad("sink")


class Effect(log.Loggable):
"""
I am a part of a feed component for a specific group
Expand Down Expand Up @@ -555,7 +554,6 @@ def getComponent(self):
"""
return self.component


class PostProcEffect (Effect):
"""
I am an effect that is plugged in the pipeline to do a post processing
Expand Down Expand Up @@ -602,15 +600,14 @@ def plug(self):
peerSinkPad.unlink(peerSrcPad)

# Add the deinterlacer bin to the pipeline
self.effectBin.set_state(gst.STATE_PLAYING)
self.effectBin.set_state(Gst.State.PLAYING)
self.pipeline.add(self.effectBin)

# link it with the element src pad and its peer's sink pad
peerSinkPad.link(self.effectBin.get_pad('sink'))
self.effectBin.get_pad('src').link(peerSrcPad)
self.plugged = True


class MultiInputParseLaunchComponent(ParseLaunchComponent):
"""
This class provides for multi-input ParseLaunchComponents, such as muxers,
Expand Down Expand Up @@ -687,7 +684,6 @@ def _underrun_cb(element):

signalid = queue.connect("underrun", _underrun_cb)


class ReconfigurableComponent(ParseLaunchComponent):

disconnectedPads = False
Expand Down Expand Up @@ -740,7 +736,7 @@ def _install_changes_probes(self):
# FIXME: Add documentation

def output_reset_event(pad, event):
if event.type != gst.EVENT_FLUSH_START:
if event.type != Gst.EVENT_FLUSH_START:
return True

self.debug('RESET: out reset event received on output pad %r', pad)
Expand Down Expand Up @@ -786,15 +782,15 @@ def got_new_buffer(pad, buff, element):
self.info("INCAPS: Got buffer but we're still disconnected.")
return True

if not buff.flag_is_set(gst.BUFFER_FLAG_IN_CAPS):
if not buff.flag_is_set(Gst.BUFFER_FLAG_IN_CAPS):
return True

self.info("INCAPS: Got buffer with caps of len %d", buff.size)
if buff.caps:
newcaps = buff.caps[0].copy()
resets = self.uiState.get('reset-count')
newcaps['count'] = resets
buff.set_caps(gst.Caps(newcaps))
buff.set_caps(Gst.Caps(newcaps))
return True

self.log('RESET: installing event probes for detecting changes')
Expand Down Expand Up @@ -830,11 +826,11 @@ def _unlink_pads(self, element, directions):
if not ppad:
continue
if (pad.get_direction() in directions and
pad.get_direction() == gst.PAD_SINK):
pad.get_direction() == Gst.PadDirection.SINK):
self.debug('RESET: unlink %s with %s', pad, ppad)
ppad.unlink(pad)
elif (pad.get_direction() in directions and
pad.get_direction() == gst.PAD_SRC):
pad.get_direction() == Gst.PadDirection.SRC):
self.debug('RESET: unlink %s with %s', pad, ppad)
pad.unlink(ppad)

Expand All @@ -858,7 +854,7 @@ def _remove_pipeline(self, pipeline, element, end, done=None):
element.unlink(peer)

self.log("RESET: removing old element %s from pipeline", element)
element.set_state(gst.STATE_NULL)
element.set_state(Gst.State.NULL)
pipeline.remove(element)

def _rebuild_pipeline(self):
Expand All @@ -873,7 +869,7 @@ def _rebuild_pipeline(self):
# Place a fakesrc element so we can know from where to start
# rebuilding the pipeline.
fake_pipeline = 'fakesrc name=start ! %s' % base_pipe
pipeline = gst.parse_launch(fake_pipeline)
pipeline = Gst.parse_launch(fake_pipeline)

def move_element(element, orig, dest):
if not element:
Expand All @@ -893,7 +889,7 @@ def move_element(element, orig, dest):

move_element(to_link[-1], orig, dest)

self._unlink_pads(element, [gst.PAD_SRC, gst.PAD_SINK])
self._unlink_pads(element, [Gst.PadDirection.SRC, Gst.PadDirection.SINK])
orig.remove(element)
dest.add(element)

Expand Down Expand Up @@ -923,7 +919,7 @@ def move_element(element, orig, dest):
done[-1].link(elem)

self.configure_pipeline(self.pipeline, self.config['properties'])
self.pipeline.set_state(gst.STATE_PLAYING)
self.pipeline.set_state(Gst.State.PLAYING)
self._unblock_eaters()

resets = self.uiState.get('reset-count')
Expand All @@ -939,7 +935,7 @@ def _on_eater_blocked(self, pad, blocked):
self._on_pad_blocked(pad, blocked)
if blocked:
peer = pad.get_peer()
peer.send_event(gst.event_new_flush_start())
peer.send_event(Gst.event_new_flush_start())
#peer.send_event(gst.event_new_eos())
#self._unlink_pads(pad.get_parent(), [gst.PAD_SRC])

Expand All @@ -953,7 +949,6 @@ def _on_pipeline_drained(self):
self._remove_pipeline(self.pipeline, element, end, done)
self._rebuild_pipeline()


class EncoderComponent(ParseLaunchComponent):
"""
Component that is reconfigured when new changes arrive through the
Expand All @@ -970,11 +965,10 @@ def handle_reset_event(self, pad, event):
if gstreamer.event_is_flumotion_reset(event):
self.debug("Got reset event in the encoder... reseting it!")
encoder = self.get_element('encoder')
encoder.set_state(gst.STATE_READY)
encoder.set_state(Gst.State.READY)
self.try_start_pipeline(force=True)
return True


class MuxerComponent(MultiInputParseLaunchComponent):
"""
This class provides for multi-input ParseLaunchComponents, such as muxers,
Expand Down Expand Up @@ -1005,7 +999,7 @@ def buffer_probe_cb(self, pad, buffer, depay, eaterAlias):
self.addMessage(m)
# this is the streaming thread, cannot set state here
# so we do it in the mainloop
reactor.callLater(0, self.pipeline.set_state, gst.STATE_NULL)
reactor.callLater(0, self.pipeline.set_state, Gst.State.NULL)
return True
self.debug("Got link pad %r", linkpad)
srcpad_to_link.link(linkpad)
Expand Down

0 comments on commit 7e20c17

Please sign in to comment.