Skip to content

Commit

Permalink
More porting on the components for a audio/video test stream
Browse files Browse the repository at this point in the history
  • Loading branch information
bananadine committed Jul 11, 2013
1 parent e7dc610 commit 8100019
Show file tree
Hide file tree
Showing 8 changed files with 65 additions and 66 deletions.
26 changes: 13 additions & 13 deletions flumotion/common/gstreamer.py
Expand Up @@ -120,7 +120,7 @@ def element_factory_exists(name):
@rtype: boolean
"""
registry = Gst.Registry.get()
factory = registry.find_feature(name, Gst.TYPE_ELEMENT_FACTORY)
factory = registry.lookup_feature(name)

This comment has been minimized.

Copy link
@mithro

mithro Jul 18, 2013

Put a

# FIXME(bananadine): Check that lookup_feature is returning a Gst.TYPE_ELEMENT_FACTORY

if factory:
return True
Expand Down Expand Up @@ -150,17 +150,17 @@ def get_plugin_version(plugin_name):

def get_state_change(old, new):
table = {(Gst.State.NULL, Gst.State.READY):
Gst.State.CHANGE_NULL_TO_READY,
Gst.StateChange.NULL_TO_READY,
(Gst.State.READY, Gst.State.PAUSED):
Gst.State.CHANGE_READY_TO_PAUSED,
Gst.StateChange.READY_TO_PAUSED,
(Gst.State.PAUSED, Gst.State.PLAYING):
Gst.State.CHANGE_PAUSED_TO_PLAYING,
Gst.StateChange.PAUSED_TO_PLAYING,
(Gst.State.PLAYING, Gst.State.PAUSED):
Gst.State.CHANGE_PLAYING_TO_PAUSED,
Gst.StateChange.PLAYING_TO_PAUSED,
(Gst.State.PAUSED, Gst.State.READY):
Gst.State.CHANGE_PAUSED_TO_READY,
Gst.StateChange.PAUSED_TO_READY,
(Gst.State.READY, Gst.State.NULL):
Gst.State.CHANGE_READY_TO_NULL}
Gst.StateChange.READY_TO_NULL}
return table.get((old, new), 0)


Expand Down Expand Up @@ -205,13 +205,13 @@ def state_changed(self, old, new):
def have_error(self, curstate, message):
# if we have a state change defer that has not yet
# fired, we should errback it
changes = [Gst.State.CHANGE_NULL_TO_READY,
Gst.State.CHANGE_READY_TO_PAUSED,
Gst.State.CHANGE_PAUSED_TO_PLAYING]
changes = [Gst.StateChange.NULL_TO_READY,
Gst.StateChange.READY_TO_PAUSED,
Gst.StateChange.PAUSED_TO_PLAYING]

extras = ((Gst.State.PAUSED, Gst.State.CHANGE_PLAYING_TO_PAUSED),
(Gst.State.READY, Gst.State.CHANGE_PAUSED_TO_READY),
(Gst.State.NULL, Gst.State.CHANGE_READY_TO_NULL))
extras = ((Gst.State.PAUSED, Gst.StateChange.PLAYING_TO_PAUSED),
(Gst.State.READY, Gst.StateChange.PAUSED_TO_READY),
(Gst.State.NULL, Gst.StateChange.READY_TO_NULL))
for state, change in extras:
if curstate <= state:
changes.append(change)
Expand Down
2 changes: 1 addition & 1 deletion flumotion/component/encoders/theora/theora.py
Expand Up @@ -46,7 +46,7 @@ def check_properties(self, props, addMessage):
raise errors.ConfigError(msg)

def get_pipeline_string(self, properties):
return "ffmpegcolorspace ! theoraenc name=encoder"
return "videoconvert ! theoraenc name=encoder"

def configure_pipeline(self, pipeline, properties):
element = pipeline.get_by_name('encoder')
Expand Down
6 changes: 3 additions & 3 deletions flumotion/component/encoders/vorbis/vorbis010.py
Expand Up @@ -65,7 +65,7 @@ def configure_pipeline(self, pipeline, properties):
else:
enc.set_property('quality', self.quality)

pad = ar.get_pad('sink')
pad = ar.get_static_pad('sink')
handle = None

def buffer_probe(pad, buffer):
Expand All @@ -90,10 +90,10 @@ def buffer_probe(pad, buffer):
self.channels)
cf.set_property('caps',
Gst.caps_from_string(caps_str))
pad.remove_buffer_probe(handle)
pad.remove_probe(handle)
return True

handle = pad.add_buffer_probe(buffer_probe)
handle = pad.add_probe(Gst.PadProbeType.BUFFER, buffer_probe, "user data")

This comment has been minimized.

Copy link
@mithro

mithro Jul 12, 2013

You probably don't want to use user data here, if you can't leave the argument out just give it None.


def modify_property_Bitrate(self, value):
if not self.checkPropertyType('bitrate', value, int):
Expand Down
45 changes: 22 additions & 23 deletions flumotion/component/feedcomponent.py
Expand Up @@ -23,8 +23,7 @@

import gi
gi.require_version('Gst', '1.0')
from gi.repository import GObject, Gst
#import gst.interfaces
from gi.repository import GObject, Gst #, GstInterfaces

from twisted.internet import reactor, defer
from twisted.spread import pb
Expand Down Expand Up @@ -339,7 +338,7 @@ def create_pipeline(self):

try:
pipeline = Gst.parse_launch(self.pipeline_string)
except gobject.GError, e:
except GObject.GError, e:
self.warning('Could not parse pipeline: %s' % e.message)
m = messages.Error(T_(N_(
"GStreamer error: could not parse component pipeline.")),
Expand Down Expand Up @@ -491,9 +490,9 @@ def get_eater_srcpad(self, eaterAlias):
e = self.eaters[eaterAlias]
identity = self.get_element(e.elementName + '-identity')
depay = self.get_element(e.depayName)
srcpad = depay.get_pad("src")
srcpad = depay.get_static_pad("src")
if identity:
srcpad = identity.get_pad("src")
srcpad = identity.get_static_pad("src")
return srcpad

def get_feeder_sinkpad(self, feederAlias):
Expand Down Expand Up @@ -604,8 +603,8 @@ def plug(self):
self.pipeline.add(self.effectBin)

# link it with the element src pad and its peer's sink pad
peerSinkPad.link(self.effectBin.get_pad('sink'))
self.effectBin.get_pad('src').link(peerSrcPad)
peerSinkPad.link(self.effectBin.get_static_pad('sink'))
self.effectBin.get_static_pad('src').link(peerSrcPad)
self.plugged = True

class MultiInputParseLaunchComponent(ParseLaunchComponent):
Expand Down Expand Up @@ -673,7 +672,7 @@ def _underrun_cb(element):
# Called from a streaming thread. The queue element does not hold
# the queue lock when this is called, so we block our sinkpad,
# then re-check the current level.
pad = element.get_pad("sink")
pad = element.get_static_pad("sink")
pad.set_blocked_async(True, _block_cb)
level = element.get_property("current-level-buffers")
if level < self.QUEUE_SIZE_BUFFERS:
Expand Down Expand Up @@ -723,7 +722,7 @@ def get_base_pipeline_string(self):
def get_eater_srcpad(self, eaterAlias):
e = self.eaters[eaterAlias]
inputq = self.get_element('input-' + e.elementName)
return inputq.get_pad('src')
return inputq.get_static_pad('src')

# Private methods

Expand Down Expand Up @@ -797,26 +796,26 @@ def got_new_buffer(pad, buff, element):
# Listen for incoming flumotion-reset events on eaters
for elem in self.get_input_elements():
self.debug('RESET: Add caps monitor for %s', elem.get_name())
sink = elem.get_pad('sink')
sink = elem.get_static_pad('sink')
sink.get_peer().add_buffer_probe(got_new_buffer, elem)
sink.connect("notify::caps", got_new_caps)

for elem in self.get_output_elements():
self.debug('RESET: adding event probe for %s', elem.get_name())
elem.get_pad('sink').add_event_probe(output_reset_event)
elem.get_static_pad('sink').add_event_probe(output_reset_event)

def _block_eaters(self):
"""
Function that blocks all the identities of the eaters
"""
for elem in self.get_input_elements():
pad = elem.get_pad('src')
pad = elem.get_static_pad('src')
self.debug("RESET: Blocking pad %s", pad)
pad.set_blocked_async(True, self._on_eater_blocked)

def _unblock_eaters(self):
for elem in self.get_input_elements():
pad = elem.get_pad('src')
pad = elem.get_static_pad('src')
self.debug("RESET: Unblocking pad %s", pad)
pad.set_blocked_async(False, self._on_eater_blocked)

Expand Down Expand Up @@ -899,7 +898,7 @@ def move_element(element, orig, dest):
element.link(peer)

done = []
start = pipeline.get_by_name('start').get_pad('src').get_peer()
start = pipeline.get_by_name('start').get_static_pad('src').get_peer()
move_element(start.get_parent(), pipeline, self.pipeline)

# Link eaters to the first element in the pipeline
Expand Down Expand Up @@ -945,7 +944,7 @@ def _on_pipeline_drained(self):
end = self.get_output_elements()
done = []
for element in start:
element = element.get_pad('src').get_peer().get_parent()
element = element.get_static_pad('src').get_peer().get_parent()
self._remove_pipeline(self.pipeline, element, end, done)
self._rebuild_pipeline()

Expand All @@ -959,7 +958,7 @@ def setup_completed(self):
ParseLaunchComponent.setup_completed(self)

encoder = self.get_element('encoder')
encoder.get_pad('sink').add_event_probe(self.handle_reset_event)
encoder.get_static_pad('sink').add_event_probe(self.handle_reset_event)

def handle_reset_event(self, pad, event):
if gstreamer.event_is_flumotion_reset(event):
Expand All @@ -982,7 +981,7 @@ def get_link_pad(self, muxer, srcpad, caps):
return muxer.get_compatible_pad(srcpad, caps)

def buffer_probe_cb(self, pad, buffer, depay, eaterAlias):
pad = depay.get_pad("src")
pad = depay.get_static_pad("src")
caps = pad.get_negotiated_caps()
if not caps:
return False
Expand All @@ -1003,7 +1002,7 @@ def buffer_probe_cb(self, pad, buffer, depay, eaterAlias):
return True
self.debug("Got link pad %r", linkpad)
srcpad_to_link.link(linkpad)
depay.get_pad("src").remove_buffer_probe(self._probes[eaterAlias])
depay.get_static_pad("src").remove_probe(self._probes[eaterAlias])
if srcpad_to_link.is_blocked():
self.is_blocked_cb(srcpad_to_link, True)
else:
Expand All @@ -1016,7 +1015,7 @@ def event_probe_cb(self, pad, event, depay, eaterAlias):
return True
# if this pad doesn't push audio, remove the probe
if 'audio' not in caps[0].to_string():
depay.get_pad("src").remove_buffer_probe(self._eprobes[eaterAlias])
depay.get_static_pad("src").remove_probe(self._eprobes[eaterAlias])
if event.get_structure() is None:
return True
if event.get_structure().get_name() == 'GstForceKeyUnit':
Expand All @@ -1038,14 +1037,14 @@ def configure_pipeline(self, pipeline, properties):
for e in self.eaters:
depay = self.get_element(self.eaters[e].depayName)
self._probes[e] = \
depay.get_pad("src").add_buffer_probe(
self.buffer_probe_cb, depay, e)
depay.get_static_pad("src").add_probe(
Gst.PadProbeType.BUFFER, self.buffer_probe_cb, e)
# Add an event probe to drop GstForceKeyUnit events
# in audio pads
if self.dropAudioKuEvents:
self._eprobes[e] = \
depay.get_pad("src").add_event_probe(
self.event_probe_cb, depay, e)
depay.get_static_pad("src").add_probe(
Gst.PadProbeType.EVENT_BOTH, self.event_probe_cb, e)

def is_blocked_cb(self, pad, is_blocked):
if is_blocked:
Expand Down
36 changes: 18 additions & 18 deletions flumotion/component/feedcomponent010.py
Expand Up @@ -17,7 +17,7 @@

import gi
gi.require_version('Gst', '1.0')
from gi.repository import GObject, Gst
from gi.repository import GObject, Gst, GstNet

import os
import time
Expand Down Expand Up @@ -129,7 +129,7 @@ def do_setup(self):
self.try_start_pipeline()

# no race, messages marshalled asynchronously via the bus
d = self._change_monitor.add(Gst.State.CHANGE_PAUSED_TO_PLAYING)
d = self._change_monitor.add(Gst.StateChange.PAUSED_TO_PLAYING)
d.addCallback(lambda x: self.do_pipeline_playing())

def setup_completed(self):
Expand Down Expand Up @@ -164,15 +164,15 @@ def attachPadMonitorToFeeder(self, feederName):
if not element:
raise errors.ComponentError("No such feeder %s" % feederName)

pad = element.get_pad('src')
pad = element.get_static_pad('src')
self._pad_monitors.attach(pad, "%s:%s" % (self.name, elementName))

def attachPadMonitorToElement(self, elementName,
setActive=None, setInactive=None):
element = self.pipeline.get_by_name(elementName)
if not element:
raise errors.ComponentError("No such element %s" % elementName)
pad = element.get_pad('src')
pad = element.get_static_pad('src')
name = "%s:%s" % (self.name, elementName)
self._pad_monitors.attach(pad, name)

Expand Down Expand Up @@ -284,9 +284,9 @@ def eos():
def default():
self.log('message received: %r', message)

handlers = {Gst.MESSAGE_STATE_CHANGED: state_changed,
Gst.MESSAGE_ERROR: error,
Gst.MESSAGE_EOS: eos}
handlers = {Gst.MessageType.STATE_CHANGED: state_changed,
Gst.MessageType.ERROR: error,
Gst.MessageType.EOS: eos}
t = message.type
src = message.src
handlers.get(t, default)()
Expand Down Expand Up @@ -392,9 +392,9 @@ def depay_event(pad, event):

self.debug('adding event probe for eater %s', eater.eaterAlias)
fdsrc = self.get_element(eater.elementName)
fdsrc.get_pad("src").add_event_probe(fdsrc_event)
fdsrc.get_static_pad("src").add_probe(Gst.PadProbeType.EVENT_BOTH,fdsrc_event,"user data")
depay = self.get_element(eater.depayName)
depay.get_pad("src").add_event_probe(depay_event)
depay.get_static_pad("src").add_probe(Gst.PadProbeType.EVENT_BOTH,depay_event,"user data")

def _setup_pipeline(self):
self.debug('setup_pipeline()')
Expand Down Expand Up @@ -432,7 +432,7 @@ def _setup_pipeline(self):

for eater in self.eaters.values():
self.install_eater_event_probes(eater)
pad = self.get_element(eater.elementName).get_pad('src')
pad = self.get_element(eater.elementName).get_static_pad('src')
name = "%s:%s" % (self.name, eater.elementName)
self._pad_monitors.attach(pad, name,
padmonitor.EaterPadMonitor,
Expand Down Expand Up @@ -527,7 +527,7 @@ def pipelinePaused(r):
# make sure the pipeline sticks with this clock
self.pipeline.use_clock(clock)

self.clock_provider = Gst.NetTimeProvider(clock, None, port)
self.clock_provider = GstNet.NetTimeProvider(clock, None, port)
realport = self.clock_provider.get_property('port')

base_time = self.pipeline.get_base_time()
Expand All @@ -551,7 +551,7 @@ def pipelinePaused(r):
(ret, state, pending) = self.pipeline.get_state(0)
if state != Gst.State.PAUSED and state != Gst.State.PLAYING:
self.debug("pipeline still spinning up: %r", state)
d = self._change_monitor.add(Gst.State.CHANGE_READY_TO_PAUSED)
d = self._change_monitor.add(Gst.StateChange.READY_TO_PAUSED)
d.addCallback(pipelinePaused)
return d
elif self.clock_provider:
Expand Down Expand Up @@ -696,7 +696,7 @@ def modify_element_property(self, element_name, property_name, value,
def drop_stream_headers(pad, buf):
if buf.flag_is_set(Gst.BUFFER_FLAG_IN_CAPS):
return False
pad.remove_buffer_probe(probes[pad])
pad.remove_probe(probes[pad])
return True

probes = {}
Expand Down Expand Up @@ -742,7 +742,7 @@ def drop_stream_headers(pad, buf):
# re-sending the headers again)
else:
for pad in src_pads:
probes[pad] = pad.add_buffer_probe(drop_stream_headers)
probes[pad] = pad.add_probe(Gst.PadProbeType.BUFFER,drop_stream_headers,"user data")

if state > mutable_state:
element.set_state(state)
Expand Down Expand Up @@ -864,7 +864,7 @@ def eatFromFD(self, eaterAlias, feedId, fd):
# To do this safely, we first block fdsrc:src, then let the
# component do any neccesary unlocking (needed for multi-input
# elements)
srcpad = element.get_pad('src')
srcpad = element.get_static_pad('src')

def _block_cb(pad, blocked):
pass
Expand All @@ -888,7 +888,7 @@ def remove_in_caps_buffers(pad, buffer, eater):
if eater.streamheaderBufferProbeHandler:
self.log("Removing buffer probe on depay src pad on "
"eater %r", eater)
pad.remove_buffer_probe(
pad.remove_probe(
eater.streamheaderBufferProbeHandler)
eater.streamheaderBufferProbeHandler = None
else:
Expand All @@ -905,8 +905,8 @@ def remove_in_caps_buffers(pad, buffer, eater):
self.log("Adding buffer probe on depay src pad on "
"eater %r", eater)
eater.streamheaderBufferProbeHandler = \
depay.get_pad("src").add_buffer_probe(
remove_in_caps_buffers, eater)
depay.get_static_pad("src").add_probe(
Gst.PadProbeType.BUFFER, remove_in_caps_buffers, eater)

self.unblock_eater(eaterAlias)

Expand Down

0 comments on commit 8100019

Please sign in to comment.