Skip to content

Commit

Permalink
Add kinesisvideo (#3271)
Browse files Browse the repository at this point in the history
* kinesisvideo create_stream

* add kinesis video stream description

* add kinesisvideo describe_stream

* add kinesisvideo list_streams

* add kinesisvideo delete_stream

* remove unused comment

* remove duplicated definition

* add kinesis video exceptions

* pass region_name to kinesisvideo client in test

* fix kinesisvideo url path

* resolve conflict of kinesisvideo url and kinesis url

* specify region name to kinesisvideobackend

* Add get-dataendpoint to kinesisvideo

* include stream name in ResourceInUseException of kinesisvideo

* use ACCOUNT_ID from moto.core in kinesisvideo

* add server test for kinesisvideo

* split up kinesisvideo test
  • Loading branch information
toshitanian committed Sep 2, 2020
1 parent 00a5641 commit 25161c0
Show file tree
Hide file tree
Showing 10 changed files with 427 additions and 1 deletion.
1 change: 1 addition & 0 deletions moto/__init__.py
Expand Up @@ -113,6 +113,7 @@ def f(*args, **kwargs):
XRaySegment = lazy_load(".xray", "XRaySegment")
mock_xray = lazy_load(".xray", "mock_xray")
mock_xray_client = lazy_load(".xray", "mock_xray_client")
mock_kinesisvideo = lazy_load(".kinesisvideo", "mock_kinesisvideo")

# import logging
# logging.getLogger('boto').setLevel(logging.CRITICAL)
Expand Down
1 change: 1 addition & 0 deletions moto/backends.py
Expand Up @@ -69,6 +69,7 @@
"sts": ("sts", "sts_backends"),
"swf": ("swf", "swf_backends"),
"xray": ("xray", "xray_backends"),
"kinesisvideo": ("kinesisvideo", "kinesisvideo_backends"),
}


Expand Down
3 changes: 2 additions & 1 deletion moto/kinesis/urls.py
Expand Up @@ -2,7 +2,8 @@
from .responses import KinesisResponse

url_bases = [
"https?://kinesis.(.+).amazonaws.com",
# Need to avoid conflicting with kinesisvideo
r"https?://kinesis\.(.+).amazonaws.com",
"https?://firehose.(.+).amazonaws.com",
]

Expand Down
6 changes: 6 additions & 0 deletions moto/kinesisvideo/__init__.py
@@ -0,0 +1,6 @@
from __future__ import unicode_literals
from .models import kinesisvideo_backends
from ..core.models import base_decorator

kinesisvideo_backend = kinesisvideo_backends["us-east-1"]
mock_kinesisvideo = base_decorator(kinesisvideo_backends)
24 changes: 24 additions & 0 deletions moto/kinesisvideo/exceptions.py
@@ -0,0 +1,24 @@
from __future__ import unicode_literals

from moto.core.exceptions import RESTError


class KinesisvideoClientError(RESTError):
code = 400


class ResourceNotFoundException(KinesisvideoClientError):
def __init__(self):
self.code = 404
super(ResourceNotFoundException, self).__init__(
"ResourceNotFoundException",
"The requested stream is not found or not active.",
)


class ResourceInUseException(KinesisvideoClientError):
def __init__(self, message):
self.code = 400
super(ResourceInUseException, self).__init__(
"ResourceInUseException", message,
)
147 changes: 147 additions & 0 deletions moto/kinesisvideo/models.py
@@ -0,0 +1,147 @@
from __future__ import unicode_literals
from boto3 import Session
from moto.core import BaseBackend, BaseModel
from datetime import datetime
from .exceptions import (
ResourceNotFoundException,
ResourceInUseException,
)
import random
import string
from moto.core.utils import get_random_hex
from moto.core import ACCOUNT_ID


class Stream(BaseModel):
def __init__(
self,
region_name,
device_name,
stream_name,
media_type,
kms_key_id,
data_retention_in_hours,
tags,
):
self.region_name = region_name
self.stream_name = stream_name
self.device_name = device_name
self.media_type = media_type
self.kms_key_id = kms_key_id
self.data_retention_in_hours = data_retention_in_hours
self.tags = tags
self.status = "ACTIVE"
self.version = self._get_random_string()
self.creation_time = datetime.utcnow()
stream_arn = "arn:aws:kinesisvideo:{}:{}:stream/{}/1598784211076".format(
self.region_name, ACCOUNT_ID, self.stream_name
)
self.data_endpoint_number = get_random_hex()
self.arn = stream_arn

def _get_random_string(self, length=20):
letters = string.ascii_lowercase
result_str = "".join([random.choice(letters) for _ in range(length)])
return result_str

def get_data_endpoint(self, api_name):
data_endpoint_prefix = "s-" if api_name in ("PUT_MEDIA", "GET_MEDIA") else "b-"
return "https://{}{}.kinesisvideo.{}.amazonaws.com".format(
data_endpoint_prefix, self.data_endpoint_number, self.region_name
)

def to_dict(self):
return {
"DeviceName": self.device_name,
"StreamName": self.stream_name,
"StreamARN": self.arn,
"MediaType": self.media_type,
"KmsKeyId": self.kms_key_id,
"Version": self.version,
"Status": self.status,
"CreationTime": self.creation_time.isoformat(),
"DataRetentionInHours": self.data_retention_in_hours,
}


class KinesisVideoBackend(BaseBackend):
def __init__(self, region_name=None):
super(KinesisVideoBackend, self).__init__()
self.region_name = region_name
self.streams = {}

def reset(self):
region_name = self.region_name
self.__dict__ = {}
self.__init__(region_name)

def create_stream(
self,
device_name,
stream_name,
media_type,
kms_key_id,
data_retention_in_hours,
tags,
):
streams = [_ for _ in self.streams.values() if _.stream_name == stream_name]
if len(streams) > 0:
raise ResourceInUseException(
"The stream {} already exists.".format(stream_name)
)
stream = Stream(
self.region_name,
device_name,
stream_name,
media_type,
kms_key_id,
data_retention_in_hours,
tags,
)
self.streams[stream.arn] = stream
return stream.arn

def _get_stream(self, stream_name, stream_arn):
if stream_name:
streams = [_ for _ in self.streams.values() if _.stream_name == stream_name]
if len(streams) == 0:
raise ResourceNotFoundException()
stream = streams[0]
elif stream_arn:
stream = self.streams.get(stream_arn)
if stream is None:
raise ResourceNotFoundException()
return stream

def describe_stream(self, stream_name, stream_arn):
stream = self._get_stream(stream_name, stream_arn)
stream_info = stream.to_dict()
return stream_info

def list_streams(self, max_results, next_token, stream_name_condition):
stream_info_list = [_.to_dict() for _ in self.streams.values()]
next_token = None
return stream_info_list, next_token

def delete_stream(self, stream_arn, current_version):
stream = self.streams.get(stream_arn)
if stream is None:
raise ResourceNotFoundException()
del self.streams[stream_arn]

def get_data_endpoint(self, stream_name, stream_arn, api_name):
stream = self._get_stream(stream_name, stream_arn)
return stream.get_data_endpoint(api_name)

# add methods from here


kinesisvideo_backends = {}
for region in Session().get_available_regions("kinesisvideo"):
kinesisvideo_backends[region] = KinesisVideoBackend(region)
for region in Session().get_available_regions(
"kinesisvideo", partition_name="aws-us-gov"
):
kinesisvideo_backends[region] = KinesisVideoBackend(region)
for region in Session().get_available_regions("kinesisvideo", partition_name="aws-cn"):
kinesisvideo_backends[region] = KinesisVideoBackend(region)
70 changes: 70 additions & 0 deletions moto/kinesisvideo/responses.py
@@ -0,0 +1,70 @@
from __future__ import unicode_literals
from moto.core.responses import BaseResponse
from .models import kinesisvideo_backends
import json


class KinesisVideoResponse(BaseResponse):
SERVICE_NAME = "kinesisvideo"

@property
def kinesisvideo_backend(self):
return kinesisvideo_backends[self.region]

def create_stream(self):
device_name = self._get_param("DeviceName")
stream_name = self._get_param("StreamName")
media_type = self._get_param("MediaType")
kms_key_id = self._get_param("KmsKeyId")
data_retention_in_hours = self._get_int_param("DataRetentionInHours")
tags = self._get_param("Tags")
stream_arn = self.kinesisvideo_backend.create_stream(
device_name=device_name,
stream_name=stream_name,
media_type=media_type,
kms_key_id=kms_key_id,
data_retention_in_hours=data_retention_in_hours,
tags=tags,
)
return json.dumps(dict(StreamARN=stream_arn))

def describe_stream(self):
stream_name = self._get_param("StreamName")
stream_arn = self._get_param("StreamARN")
stream_info = self.kinesisvideo_backend.describe_stream(
stream_name=stream_name, stream_arn=stream_arn,
)
return json.dumps(dict(StreamInfo=stream_info))

def list_streams(self):
max_results = self._get_int_param("MaxResults")
next_token = self._get_param("NextToken")
stream_name_condition = self._get_param("StreamNameCondition")
stream_info_list, next_token = self.kinesisvideo_backend.list_streams(
max_results=max_results,
next_token=next_token,
stream_name_condition=stream_name_condition,
)
return json.dumps(dict(StreamInfoList=stream_info_list, NextToken=next_token))

def delete_stream(self):
stream_arn = self._get_param("StreamARN")
current_version = self._get_param("CurrentVersion")
self.kinesisvideo_backend.delete_stream(
stream_arn=stream_arn, current_version=current_version,
)
return json.dumps(dict())

def get_data_endpoint(self):
stream_name = self._get_param("StreamName")
stream_arn = self._get_param("StreamARN")
api_name = self._get_param("APIName")
data_endpoint = self.kinesisvideo_backend.get_data_endpoint(
stream_name=stream_name, stream_arn=stream_arn, api_name=api_name,
)
return json.dumps(dict(DataEndpoint=data_endpoint))

# add methods from here


# add templates from here
18 changes: 18 additions & 0 deletions moto/kinesisvideo/urls.py
@@ -0,0 +1,18 @@
from __future__ import unicode_literals
from .responses import KinesisVideoResponse

url_bases = [
"https?://kinesisvideo.(.+).amazonaws.com",
]


response = KinesisVideoResponse()


url_paths = {
"{0}/createStream$": response.dispatch,
"{0}/describeStream$": response.dispatch,
"{0}/deleteStream$": response.dispatch,
"{0}/listStreams$": response.dispatch,
"{0}/getDataEndpoint$": response.dispatch,
}

0 comments on commit 25161c0

Please sign in to comment.