Permalink
Browse files

Added Progressive Stream support

* New rule matching logic to support only byte range matching
* Updated documentation
* Refactor tream_ts to use a common stream_url method
* Update coverage script to provide better data
  • Loading branch information...
1 parent 366a1cd commit b158ff55aae01bc84e8347bbff283676b288d7d0 @dallasmahrt dallasmahrt committed Aug 16, 2012
Showing with 373 additions and 29 deletions.
  1. +2 −2 .gitignore
  2. +43 −3 README.md
  3. +73 −17 dripls/main.py
  4. +114 −0 dripls/progressive.py
  5. +12 −6 dripls/shaper.py
  6. +1 −1 dripls/test/nose_with_coverage.sh
  7. +128 −0 dripls/test/progressive_tests.py
View
@@ -24,6 +24,6 @@ pip-log.txt
.coverage
.tox
lint.xml
-
-
+cover
+bin/dripls.pid
test/playlists/*
View
@@ -1,10 +1,10 @@
Summary
==========
-DripLS - Make a CDN in a box service that is able to perform traffic shaping for testing purposes on a http live stream.
+DripLS - Make a CDN in a box service that is able to perform traffic shaping for testing purposes on a http live stream or a progressive file stream.
-Usage
-==========
+Usage - HLS
+===========
http://dripls-host/cache?authkey=[authkey]&cid=[cid]&tag=[tag]&r=[rules]
@@ -229,6 +229,46 @@ Re-shape
>Now the segment s2 would continue to be streamed at 1000kbs
+
+Usage - Progressive
+==========
+
+ http://dripls-host/progressive?url=[url]&r=[rules]
+
+Stream a progressive media stream and shape the stream according to the specified rules. Each rule determines how that section of the media file should be shaped. Rules are not allowed to overlap their byte ranges.
+
+Rule Format
+==========
+
+Each rule has the following format: byte-range-expression~action.
+
+Byte Range Expression
+===============
+
+Each rule defines the byte range during which its action is used to shape the stream. The range is defined as an initial byte (zero based) and final byte (inclusive) within the content. When a progressive request is received, we determine based on the requested bytes whether any of the defined rules intersect with this byte range, if they do then the entire byte range will be executed using the specified action.
+
+__b[num]-[num]__ - A byte offset range for the content. The end offset can be '*' which indicates that it matches until the end of the content.
+
+
+
+Rule Conflicts
+===============
+Since a rule is defined as a byte range and a single request is likely to be for a different range of bytes, there is the possibility that a single request may intersect with multiple rule definitions. In this case we will choose the action related to the rule with that largest intersection with the requested byte range.
+
+For example, if a rule is defined as b0-1999 and another is defined as 2000-3999 and the request is for bytes=1024-2048 then we will select the action related to the first rule since its intessection (76) is greater than the second rule (48). In the event that they are the same, the rule with the lowest start will be chosen.
+
+Rule Action
+==============
+This is the action to be applied to a byte range, when a matching rule is found for it:
+
+Action
+==================
+__e[http error code]__ - Replace the playlist's matching segment url with a url that returns the specified response code upon invocation ( ex. e416 )
+
+__net[bandwidth in kbit]loss[% of packets dropped]__ - The net rule action, when applied to a byte range, causes the stream to be served at
+ [bandwidth in kbit] with [% of packets dropped] ( ex. net200loss10 - serve the matched data at 200kbit
+ max with 10% packet loss during transmission)
+
License
==========
Copyright (C) 2010-2011 by Hulu, LLC
View
@@ -12,6 +12,7 @@
import conf.data
import httpls_client
+import progressive
import shaper
import conf
@@ -46,29 +47,14 @@ def cache_info(self, cid=None, r=None, tag=None, **kwargs):
def stream_ts(self, p=None, **kwargs):
""" Stream a ts from original location """
- import socket
- socket._fileobject.default_bufsize = 0
-
serving_request = cherrypy.serving.request
if serving_request.protocol >= (1, 1):
r = httputil.get_ranges(serving_request.headers.get('Range'), sys.maxint)
# TODO : do something with range request if need be
- req = urllib2.Request(kwargs['url'])
- for header in serving_request.headers:
- if header not in ['Range','Accept','User-Agent']:
- continue
- req.headers[header] = serving_request.headers.get(header)
- ts = urllib2.urlopen(req)
-
- for h in ts.headers.keys():
- cherrypy.response.headers[h] = ts.headers[h]
-
- buffer = '_'
- while len(buffer) > 0:
- buffer = ts.read(30*1024)
- yield buffer
+ for bytes in self._stream_url(serving_request, url):
+ yield bytes
@cherrypy.expose
def updatesegment(self, url, new_action):
@@ -161,6 +147,62 @@ def variant_m3u8(self, cid=None, r=None, tag=None, **kwargs):
return master_content
+ @cherrypy.expose
+ def progressive(self, url, r=None, **kwargs):
+ """ Endpoint for shaping a progressive stream"""
+
+ stream_url = url
+ if not kwargs.has_key("from_dripls"):
+ # if we have already come from dripls, we will set from_dripls so we can bypass the rule matching since we are
+ # running through a net traffic rule
+ if r:
+ stream_url = self._handle_progressive_rules(stream_url, r, cherrypy.serving.request)
+
+ for bytes in self._stream_url(cherrypy.serving.request, stream_url):
+ yield bytes
+
+ def _handle_progressive_rules(self, url, rules, request, mock_shape_segment=True):
+ stream_url = url
+ matcher = progressive.from_rules(rules)
+
+ if 'Range' in request.headers:
+ ranges = httputil.get_ranges(request.headers.get('Range'), sys.maxint)
+ if ranges:
+ # can return multiple ranges
+ # assume 1 for now
+ action = matcher.get_action(ranges[0][0], ranges[0][1] -1) # make range inclusive
+ else:
+ # this is fetching an entire file, so this should be the same as matching 0-*
+ action = matcher.get_action(0, sys.maxint)
+ if action:
+ if action.startswith("e"):
+ self.ostatus(action[1:])
+ else:
+ (traffic_limit, traffic_loss, cache) = shaper.parse_net_rule_action(action)
+ port = shaper.get_shape_port_for(traffic_limit, traffic_loss, {}, mock_shape_segment)
+ stream_url=conf.common.get_final_url("/s/{0}/progressive?url={1}&from_dripls=1".format(port, urllib.quote_plus(url)),"")
+ return stream_url
+
+
+ def _stream_url(self, request, url):
+ import socket
+ socket._fileobject.default_bufsize = 0
+
+ req = urllib2.Request(url)
+ for header in request.headers:
+ if header not in ['Range','Accept','User-Agent']:
+ continue
+ req.headers[header] = request.headers.get(header)
+ content = urllib2.urlopen(req)
+ for h in content.headers.keys():
+ cherrypy.response.headers[h] = content.headers[h]
+ cherrypy.response.status = content.code
+
+ buffer = '_'
+ while len(buffer) > 0:
+ buffer = content.read(30*1024)
+ yield buffer
+
def cache_stream(self, cid=None, r=None, tag=None, kwargs=None):
""" Perform the actual caching and shaping of the stream """
@@ -193,6 +235,20 @@ def store_tag(self, cid, r, tag, kwargs):
with open("{0}/playlists/tag_{1}".format(shaper.shaper_store_path, tag), "w") as pf:
pf.write("{0}".format(urllib.urlencode(tag_args)))
+
+ @cherrypy.expose
+ def s(self, port, action, **kwargs):
+ """This can simulate the streaming calls through the net rule ports for testing"""
+ if action == 'progressive':
+ for bytes in self.progressive(**kwargs):
+ yield bytes
+ elif action == 'stream_ts':
+ for bytes in self.stream_ts(**kwargs):
+ yield bytes
+ else:
+ raise cherrypy.HTTPError(400, message="Invalid action specified")
+
+
conf.dripls_main_site_url = conf.app['root_url']
root = DriplsController()
View
@@ -0,0 +1,114 @@
+import sys
+
+import shaper
+
+def from_rules(rule_string):
+ return ProgressiveRuleMatcher(rule_string)
+
+class RuleMatch(object):
+ def __init__(self, start_byte, end_byte, action):
+ self._start_byte = int(start_byte)
+ self._end_byte = int(end_byte)
+ self._action = action
+
+ def start_byte(self):
+ return self._start_byte
+
+ def end_byte(self):
+ return self._end_byte
+
+ def action(self):
+ return self._action
+
+ def __str__(self):
+ return "RuleMatch(start_byte=%s, end_byte=%s, action=%s)" % (self._start_byte, self._end_byte, self._action)
+
+ def __repr__(self):
+ return self.__str__()
+
+ def __cmp__(self, other):
+ if isinstance(other, self.__class__):
+ return cmp(self._start_byte, other.start_byte())
+ return -1
+
+class ProgressiveRuleMatcher(object):
+
+ def __init__(self, rule_string):
+ """Build a Matcher from the rulestring. PErforms the parsing and validation"""
+ # parse the rules
+ if not rule_string:
+ raise ValueError("Rule string cannot be empty or None")
+
+ self._rule_matches = list()
+
+ # split rule definitions by ','
+ rules = rule_string.split(",")
+
+ for rule in rules:
+ # split the match from the action by '~'
+ match, action = rule.split("~")
+ self._validate_match(match)
+ self._validate_action(action)
+
+ start_byte, end_byte = match[1:].split("-")
+
+ if end_byte == '*':
+ end_byte = sys.maxint
+
+ self._rule_matches.append(RuleMatch(start_byte, end_byte, action))
+
+ self._rule_matches.sort()
+
+ self._check_for_overlaps_in_matches()
+
+ def get_action(self, start_byte, end_byte):
+ """Given a byte range, determine if there is an action that should be performed"""
+
+ # This is an O(n) search which is not as efficient as it could be, but given that the value of N
+ # will be very low < 3 (typically) this should be sufficient
+ start_byte = int(start_byte)
+ end_byte = int(end_byte)
+
+ matches = []
+
+ for match in self._rule_matches:
+ if (start_byte >= match.start_byte() and start_byte <= match.end_byte()) or \
+ (end_byte >= match.start_byte() and end_byte <= match.end_byte()) or \
+ (start_byte <= match.start_byte() and end_byte >= match.end_byte()):
+
+ matches.append(match)
+
+ if len(matches) == 1:
+ return matches[0].action()
+ elif len(matches) == 0:
+ return None
+ else:
+ largest_intersection = 0
+ best_match = None
+ for match in matches:
+ start_index = max(start_byte, match.start_byte())
+ end_index = min(end_byte, match.end_byte())
+ intersection = end_index - start_index
+ if largest_intersection < intersection:
+ best_match = match
+ largest_intersection = intersection
+ return best_match.action()
+
+
+ # Private methods
+
+ def _validate_match(self, match):
+ if not match:
+ raise ValueError("No match specified")
+ if match[0] != 'b':
+ raise ValueError("Invalid match '%s'. Only supported match is byte range queries: 'bxxx-yyy'" % match)
+
+ def _validate_action(self, action):
+ shaper.validate_action_part(action)
+
+ def _check_for_overlaps_in_matches(self):
+ last_match = None
+ for rule_match in self._rule_matches:
+ if last_match is not None and rule_match.start_byte() <= last_match.end_byte():
+ raise ValueError("Rules '%s' and '%s' overlap in their byte ranges" % (last_match, rule_match))
+ last_match = rule_match
View
@@ -15,11 +15,13 @@
import conf.data
import conf
import httpls_client
+import progressive
#port shaping queue
port_queue = Queue.Queue()
shaper_store_path = "{0}/".format(os.path.dirname(os.path.realpath(__file__)))
+
def get_next_shape_port():
if port_queue.empty():
for port in range(conf.common.shape_start_port, conf.common.shape_end_port):
@@ -50,8 +52,8 @@ def generate_status(status):
def validate_hls_match_rule_part(part):
parts = part.split('.')
if len(parts) > 3:
- return False
-
+ return False
+
#check segment part
if (len(parts) > 1 and
not parts[-1][-1] == "k" and
@@ -65,7 +67,7 @@ def validate_hls_match_rule_part(part):
invalid_part = not (range[0].isdigit() and range[1].isdigit())
if invalid_part:
- return False
+ return False
#check playlist part
if len(parts) == 1 and parts[-1][-1] != "k" and parts[-1][-1] != "*":
@@ -79,6 +81,10 @@ def validate_hls_match_rule_part(part):
return True
+def validate_action_part(part):
+ if not (part.startswith("e") or part.startswith("net")):
+ raise ValueError("Unable to parse rule action {0}".format(part))
+
def expand_hls_bitrate_match(matches, master_playlist_obj):
if not master_playlist_obj:
return matches
@@ -130,6 +136,8 @@ def expand_hls_rule_match(match_part, master_playlist_obj):
matches = [match_part]
return expand_hls_segment_match( expand_hls_bitrate_match(matches, master_playlist_obj) )
+def parse_prog_rules(rule_string):
+ return progressive.from_rules(rule_string)
def parse_hls_rules(rule_string, master_playlist_obj = None):
rules = {}
@@ -144,14 +152,12 @@ def parse_hls_rules(rule_string, master_playlist_obj = None):
rule_parts = rule.strip().split("~")
action = rule_parts[1].strip()
- if not (action.startswith("e") or action.startswith("net")):
- raise ValueError("Unable to parse rule action {0}".format(action))
+ validate_action_part(action)
match = rule_parts[0].strip()
if not validate_hls_match_rule_part(match):
raise ValueError("Rule invalid: " + match)
-
for r_match in expand_hls_rule_match(match, master_playlist_obj):
rules[r_match] = rule_parts[1].strip()
@@ -1 +1 @@
- nosetests -s --with-coverage --cover-erase --cover-package=dripls
+ nosetests -s --with-coverage --cover-inclusive --cover-erase --cover-package=dripls --cover-html
Oops, something went wrong.

0 comments on commit b158ff5

Please sign in to comment.