Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 108 additions & 0 deletions examples/live_channel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
# -*- coding: utf-8 -*-

import os
import shutil

import oss2


# 以下代码展示了视频直播相关接口的用法。


# 首先初始化AccessKeyId、AccessKeySecret、Endpoint等信息。
# 通过环境变量获取,或者把诸如“<你的AccessKeyId>”替换成真实的AccessKeyId等。
#
# 以杭州区域为例,Endpoint可以是:
# http://oss-cn-hangzhou.aliyuncs.com
# https://oss-cn-hangzhou.aliyuncs.com
access_key_id = os.getenv('OSS_TEST_ACCESS_KEY_ID', '<你的AccessKeyId>')
access_key_secret = os.getenv('OSS_TEST_ACCESS_KEY_SECRET', '<你的AccessKeySecret>')
bucket_name = os.getenv('OSS_TEST_BUCKET', '<你的Bucket>')
endpoint = os.getenv('OSS_TEST_ENDPOINT', '<你的访问域名>')


# 确认上面的参数都填写正确了
for param in (access_key_id, access_key_secret, bucket_name, endpoint):
assert '<' not in param, '请设置参数:' + param


# 创建Bucket对象,所有直播相关的接口都可以通过Bucket对象来进行
bucket = oss2.Bucket(oss2.Auth(access_key_id, access_key_secret), endpoint, bucket_name)


# 创建一个直播频道。
# 频道的名称是test_rtmp_live。直播生成的m3u8文件叫做test.m3u8,该索引文件包含3片ts文件,每片ts文件的时长为5秒(这只是一个建议值,具体的时长取决于关键帧)。
channel_id = 'test_rtmp_live'
create_result = bucket.create_live_channel(
channel_id,
oss2.models.LiveChannelInfo(
status = 'enabled',
description = '测试使用的直播频道',
target = oss2.models.LiveChannelInfoTarget(
playlist_name = 'test.m3u8',
frag_count = 3,
frag_duration = 5)))

# 创建直播频道之后拿到推流用的play_url(rtmp推流的url,如果Bucket不是公共读写权限那么还需要带上签名,见下文示例)和观流用的publish_url(推流产生的m3u8文件的url)。
publish_url = create_result.publish_url
play_url = create_result.play_url

# 创建好直播频道之后调用get_live_channel可以得到频道相关的信息。
get_result = bucket.get_live_channel(channel_id)
print(get_result.description)
print(get_result.status)
print(get_result.target.type)
print(get_result.target.frag_count)
print(get_result.target.frag_duration)
print(get_result.target.playlist_name)

# 拿到推流地址和观流地址之后就可以向OSS推流和观流。如果Bucket的权限不是公共读写,那么还需要对推流做签名,如果Bucket是公共读写的,那么可以直接用publish_url推流。
# 这里的expires是一个相对时间,指的是从现在开始这次推流过期的秒数。
# params是一个dict类型的参数,表示用户自定义的参数。所有的参数都会参与签名。
# 拿到这个签过名的signed_url就可以使用推流工具直接进行推流,一旦连接上OSS之后超过上面的expires流也不会断掉,OSS仅在每次推流连接的时候检查expires是否合法。
expires = 3600
params = {'param1': 'v1', 'param2': 'v2'}
signed_url = bucket.sign_rtmp_url(publish_url, channel_id, expires, params)

# 创建好直播频道,如果想把这个频道禁用掉(断掉正在推的流或者不再允许向一个地址推流),应该使用put_live_channel_status接口,将频道的status改成“disabled”,如果要将一个禁用状态的频道启用,那么也是调用这个接口,将status改成“enabled”。
bucket.put_live_channel_status(channel_id, 'enabled')
bucket.put_live_channel_status(channel_id, 'disabled')

# 对创建好的频道,可以使用LiveChannelIterator来进行列举已达到管理的目的。
# prefix可以按照前缀过滤list出来的频道。
# max_keys表示迭代器内部一次list出来的频道的最大数量,这个值最大不能超过1000,不填写的话默认为100。

prefix = ''
max_keys = 1000

for info in oss2.LiveChannelIterator(self.bucket, prefix, max_keys=max_keys):
print(info.id)

# 对于正在推流的频道调用get_live_channel_stat可以获得流的状态信息。
# 如果频道正在推流,那么stat_result中的所有字段都有意义。
# 如果频道闲置或者处于“disabled”状态,那么status为“Idle”或“Disabled”,其他字段无意义。
stat_result = bucket.get_live_channel_stat(channel_id)
print(stat_result.status)
print(stat_result.remote_addr)
print(stat_result.connected_time)
print(stat_result.video)
print(stat_result.audio)

# 如果想查看一个频道历史推流记录,可以调用get_live_channel_history。目前最多可以看到10次推流的记录
history_result = bucket.get_live_channel_history(channel_id)
print(len(history_result.records))

# 如果希望利用直播推流产生的ts文件生成一个点播列表,可以使用post_vod_playlist方法。
# 指定起始时间为当前时间减去60秒,结束时间为当前时间,这意味着将生成一个长度为60秒的点播视频。
# 播放列表指定为“vod_playlist.m3u8”,也就是说这个接口调用成功之后会在OSS上生成一个名叫“vod_playlist.m3u8”的播放列表文件。

end_time = int(time.time()) - 60
start_time = end_time - 3600
playlist_name = 'vod_playlist.m3u8'
bucket.post_vod_playlist(channel_id,
playlist_name,
start_time = start_time,
end_time = end_time)

# 如果一个直播频道已经不打算再使用了,那么可以调用delete_live_channel来删除频道。
bucket.delete_live_channel(channel_id)
3 changes: 2 additions & 1 deletion oss2/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@


from .iterators import (BucketIterator, ObjectIterator,
MultipartUploadIterator, ObjectUploadIterator, PartIterator)
MultipartUploadIterator, ObjectUploadIterator,
PartIterator, LiveChannelIterator)


from .resumable import resumable_upload, resumable_download, ResumableStore, ResumableDownloadStore, determine_part_size
Expand Down
109 changes: 109 additions & 0 deletions oss2/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,10 @@ class Bucket(_Base):
LOGGING = 'logging'
REFERER = 'referer'
WEBSITE = 'website'
LIVE = 'live'
COMP = 'comp'
STATUS = 'status'
VOD = 'vod'

def __init__(self, auth, endpoint, bucket_name,
is_cname=False,
Expand Down Expand Up @@ -273,6 +277,23 @@ def sign_url(self, method, key, expires, headers=None, params=None):
params=params)
return self.auth._sign_url(req, self.bucket_name, key, expires)

def sign_rtmp_url(self, publish_url, channel_id, expires, params=None):
"""生成RTMP推流的签名URL。

常见的用法是生成加签的URL以供授信用户向OSS推RTMP流。

>>> bucket.sign_rtmp_url('test_channel', 3600, params = {'use_id': '00001', 'device_id': 'AE9789798BC01'})
'http://your-bucket.oss-cn-hangzhou.aliyuncs.com/test_channel?OSSAccessKeyId=9uYePR6lL468aEUp&Expires=1462787071&use_id=00001&Signature=jprQLI0kGdcvmIvkm5rTx5LFkJ4%3D&device_id=AE9789798BC01'

:param publish_url: 创建直播频道得到的推流地址
:param channel_id: 直播频道的名称
:param expires: 过期时间(单位:秒),链接在当前时间再过expires秒后过期
:param params: 需要签名的HTTP查询参数
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

为什么需要params?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

推流的时候允许有params,这些params都必须参与签名


:return: 签名URL。
"""
return self.auth._sign_rtmp_url(publish_url, self.bucket_name, channel_id, expires, params)

def list_objects(self, prefix='', delimiter='', marker='', max_keys=100):
"""根据前缀罗列Bucket里的文件。

Expand Down Expand Up @@ -849,6 +870,94 @@ def delete_bucket_website(self):
resp = self.__do_bucket('DELETE', params={Bucket.WEBSITE: ''})
return RequestResult(resp)

def create_live_channel(self, channel_id, input):
"""创建推流直播频道

:param str channel_id: 要创建的live channel的名称
:param input: LiveChannelInfo类型,包含了live channel中的描述信息

:return: :class:`CreateLiveChannelResult <oss2.models.CreateLiveChannelResult>`
"""
data = self.__convert_data(LiveChannelInfo, xml_utils.to_create_live_channel, input)
resp = self.__do_object('PUT', channel_id, data=data, params={Bucket.LIVE: ''})
return self._parse_result(resp, xml_utils.parse_create_live_channel, CreateLiveChannelResult)

def delete_live_channel(self, channel_id):
"""删除推流直播频道

:param str channel_id: 要删除的live channel的名称
"""
resp = self.__do_object('DELETE', channel_id, params={Bucket.LIVE: ''})
return RequestResult(resp)

def get_live_channel(self, channel_id):
"""获取直播频道配置

:param str channel_id: 要获取的live channel的名称

:return: :class:`GetLiveChannelResult <oss2.models.GetLiveChannelResult>`
"""
resp = self.__do_object('GET', channel_id, params={Bucket.LIVE: ''})
return self._parse_result(resp, xml_utils.parse_get_live_channel, GetLiveChannelResult)

def list_live_channel(self, prefix='', marker='', max_keys=100):
"""列举出Bucket下所有符合条件的live channel

param: str prefix: list时channel_id的公共前缀
param: str marker: list时指定的起始标记
param: int max_keys: 本次list返回live channel的最大个数

return: :class:`ListLiveChannelResult <oss2.models.ListLiveChannelResult>`
"""
resp = self.__do_bucket('GET', params={Bucket.LIVE: '',
'prefix': prefix,
'marker': marker,
'max-keys': str(max_keys)})
return self._parse_result(resp, xml_utils.parse_list_live_channel, ListLiveChannelResult)

def get_live_channel_stat(self, channel_id):
"""获取live channel当前推流的状态

param str channel_id: 要获取推流状态的live channel的名称

return: :class:`GetLiveChannelStatResult <oss2.models.GetLiveChannelStatResult>`
"""
resp = self.__do_object('GET', channel_id, params={Bucket.LIVE: '', Bucket.COMP: 'stat'})
return self._parse_result(resp, xml_utils.parse_live_channel_stat, GetLiveChannelStatResult)

def put_live_channel_status(self, channel_id, status):
"""更改live channel的status,仅能在“enabled”和“disabled”两种状态中更改

param str channel_id: 要更改status的live channel的名称
param str status: live channel的目标status
"""
resp = self.__do_object('PUT', channel_id, params={Bucket.LIVE: '', Bucket.STATUS: status})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我们把'status'当成一个需要签名部分了?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

是的,这是获取频道状态的时候指定的

return RequestResult(resp)

def get_live_channel_history(self, channel_id):
"""获取live channel中最近的最多十次的推流记录,记录中包含推流的起止时间和远端的地址

param str channel_id: 要获取最近推流记录的live channel的名称

return: :class:`GetLiveChannelHistoryResult <oss2.models.GetLiveChannelHistoryResult>`
"""
resp = self.__do_object('GET', channel_id, params={Bucket.LIVE: '', Bucket.COMP: 'history'})
return self._parse_result(resp, xml_utils.parse_live_channel_history, GetLiveChannelHistoryResult)

def post_vod_playlist(self, channel_id, playlist_name, start_time = 0, end_time = 0):
"""根据指定的playlist name以及startTime和endTime生成一个点播的播放列表

param str channel_id: 要生成点播列表的live channel的名称
param str playlist_name: 要生成点播列表m3u8文件的名称
param int start_time: 点播的起始时间,为UNIX时间戳
param int end_time: 点播的结束时间,为UNIX时间戳
"""
key = channel_id + "/" + playlist_name
resp = self.__do_object('POST', key, params={Bucket.VOD: '',
'startTime': str(start_time),
'endTime': str(end_time)})
return RequestResult(resp)

def _get_bucket_config(self, config):
"""获得Bucket某项配置,具体哪种配置由 `config` 指定。该接口直接返回 `RequestResult` 对象。
通过read()接口可以获得XML字符串。不建议使用。
Expand Down
25 changes: 24 additions & 1 deletion oss2/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ class Auth(object):
'acl', 'uploadId', 'uploads', 'partNumber', 'group', 'link',
'delete', 'website', 'location', 'objectInfo',
'response-expires', 'response-content-disposition', 'cors', 'lifecycle',
'restore', 'qos', 'referer', 'append', 'position', 'security-token']
'restore', 'qos', 'referer', 'append', 'position', 'security-token',
'live', 'comp', 'status', 'vod', 'startTime', 'endTime']
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

为什么是startTime和endTime?而不是start-time和end-time?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这是follow了之前的驼峰的命名方式

)

def __init__(self, access_key_id, access_key_secret):
Expand Down Expand Up @@ -107,6 +108,28 @@ def __param_to_query(self, k, v):
else:
return k

def _sign_rtmp_url(self, url, bucket_name, channel_id, expires, params):
Copy link
Contributor

@yami yami May 15, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个普通签名有什么不同?为什么又多了一种签名方法?

注意到,如果真的需要增加一个接口,那么所有的Auth类都要支持新的sign接口

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个签名并不是给频道管理接口用的,而是给推流的程序专门算签名的,这个函数只是用在这个地方。

expiration_time = int(time.time()) + expires

canonicalized_resource = "/%s/%s" % (bucket_name, channel_id)
canonicalized_params = ''
if params:
for k in params:
if k != "OSSAccessKeyId" and k != "Signature" and k!= "Expires" and k!= "SecurityToken":
canonicalized_params += '%s:%s\n' % (k, params[k])

p = params if params else {}
string_to_sign = str(expiration_time) + "\n" + canonicalized_params + canonicalized_resource
logging.debug('string_to_sign={0}'.format(string_to_sign))

h = hmac.new(to_bytes(self.secret), to_bytes(string_to_sign), hashlib.sha1)
signature = utils.b64encode_as_string(h.digest())

p['OSSAccessKeyId'] = self.id
p['Expires'] = str(expiration_time)
p['Signature'] = signature

return url + '?' + '&'.join(_param_to_quoted_query(k, v) for k, v in p.items())

class AnonymousAuth(object):
"""用于匿名访问。
Expand Down
15 changes: 15 additions & 0 deletions oss2/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,11 @@ class NoSuchCors(NotFound):
code = 'NoSuchCORSConfiguration'


class NoSuchLiveChannel(NotFound):
status = 404
code = 'NoSuchLiveChannel'


class Conflict(ServerError):
status = 409
code = ''
Expand All @@ -153,6 +158,16 @@ class ObjectNotAppendable(Conflict):
code = 'ObjectNotAppendable'


class ChannelStillLive(Conflict):
status = 409
code = 'ChannelStillLive'


class LiveChannelDisabled(Conflict):
status = 409
code = 'LiveChannelDisabled'


class PreconditionFailed(ServerError):
status = 412
code = 'PreconditionFailed'
Expand Down
26 changes: 26 additions & 0 deletions oss2/iterators.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,3 +213,29 @@ def _fetch(self):

return result.is_truncated, result.next_marker


class LiveChannelIterator(_BaseIterator):
"""遍历Bucket里文件的迭代器。

每次迭代返回的是 :class:`LiveChannelInfo <oss2.models.LiveChannelInfo>` 对象。

:param bucket: :class:`Bucket <oss2.Bucket>` 对象
:param prefix: 只列举匹配该前缀的文件
:param marker: 分页符
:param max_keys: 每次调用 `list_live_channel` 时的max_keys参数。注意迭代器返回的数目可能会大于该值。
"""
def __init__(self, bucket, prefix='', marker='', max_keys=100, max_retries=None):
super(LiveChannelIterator, self).__init__(marker, max_retries)

self.bucket = bucket
self.prefix = prefix
self.max_keys = max_keys

def _fetch(self):
result = self.bucket.list_live_channel(prefix=self.prefix,
marker=self.next_marker,
max_keys=self.max_keys)
self.entries = result.channels

return result.is_truncated, result.next_marker

Loading