Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add kinesisvideo #3271

Merged
merged 17 commits into from Sep 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions moto/__init__.py
Expand Up @@ -113,6 +113,7 @@ def f(*args, **kwargs):
XRaySegment = lazy_load(".xray", "XRaySegment")
mock_xray = lazy_load(".xray", "mock_xray")
mock_xray_client = lazy_load(".xray", "mock_xray_client")
mock_kinesisvideo = lazy_load(".kinesisvideo", "mock_kinesisvideo")

# import logging
# logging.getLogger('boto').setLevel(logging.CRITICAL)
Expand Down
1 change: 1 addition & 0 deletions moto/backends.py
Expand Up @@ -69,6 +69,7 @@
"sts": ("sts", "sts_backends"),
"swf": ("swf", "swf_backends"),
"xray": ("xray", "xray_backends"),
"kinesisvideo": ("kinesisvideo", "kinesisvideo_backends"),
}


Expand Down
3 changes: 2 additions & 1 deletion moto/kinesis/urls.py
Expand Up @@ -2,7 +2,8 @@
from .responses import KinesisResponse

url_bases = [
"https?://kinesis.(.+).amazonaws.com",
# Need to avoid conflicting with kinesisvideo
r"https?://kinesis\.(.+).amazonaws.com",
"https?://firehose.(.+).amazonaws.com",
]

Expand Down
6 changes: 6 additions & 0 deletions moto/kinesisvideo/__init__.py
@@ -0,0 +1,6 @@
from __future__ import unicode_literals
from .models import kinesisvideo_backends
from ..core.models import base_decorator

kinesisvideo_backend = kinesisvideo_backends["us-east-1"]
mock_kinesisvideo = base_decorator(kinesisvideo_backends)
24 changes: 24 additions & 0 deletions moto/kinesisvideo/exceptions.py
@@ -0,0 +1,24 @@
from __future__ import unicode_literals

from moto.core.exceptions import RESTError


class KinesisvideoClientError(RESTError):
code = 400


class ResourceNotFoundException(KinesisvideoClientError):
def __init__(self):
self.code = 404
super(ResourceNotFoundException, self).__init__(
"ResourceNotFoundException",
"The requested stream is not found or not active.",
)


class ResourceInUseException(KinesisvideoClientError):
def __init__(self, message):
self.code = 400
super(ResourceInUseException, self).__init__(
"ResourceInUseException", message,
)
147 changes: 147 additions & 0 deletions moto/kinesisvideo/models.py
@@ -0,0 +1,147 @@
from __future__ import unicode_literals
from boto3 import Session
from moto.core import BaseBackend, BaseModel
from datetime import datetime
from .exceptions import (
ResourceNotFoundException,
ResourceInUseException,
)
import random
import string
from moto.core.utils import get_random_hex
from moto.core import ACCOUNT_ID


class Stream(BaseModel):
def __init__(
self,
region_name,
device_name,
stream_name,
media_type,
kms_key_id,
data_retention_in_hours,
tags,
):
self.region_name = region_name
self.stream_name = stream_name
self.device_name = device_name
self.media_type = media_type
self.kms_key_id = kms_key_id
self.data_retention_in_hours = data_retention_in_hours
self.tags = tags
self.status = "ACTIVE"
self.version = self._get_random_string()
self.creation_time = datetime.utcnow()
stream_arn = "arn:aws:kinesisvideo:{}:{}:stream/{}/1598784211076".format(
self.region_name, ACCOUNT_ID, self.stream_name
)
self.data_endpoint_number = get_random_hex()
self.arn = stream_arn

def _get_random_string(self, length=20):
letters = string.ascii_lowercase
result_str = "".join([random.choice(letters) for _ in range(length)])
return result_str

def get_data_endpoint(self, api_name):
data_endpoint_prefix = "s-" if api_name in ("PUT_MEDIA", "GET_MEDIA") else "b-"
return "https://{}{}.kinesisvideo.{}.amazonaws.com".format(
data_endpoint_prefix, self.data_endpoint_number, self.region_name
)

def to_dict(self):
return {
"DeviceName": self.device_name,
"StreamName": self.stream_name,
"StreamARN": self.arn,
"MediaType": self.media_type,
"KmsKeyId": self.kms_key_id,
"Version": self.version,
"Status": self.status,
"CreationTime": self.creation_time.isoformat(),
"DataRetentionInHours": self.data_retention_in_hours,
}


class KinesisVideoBackend(BaseBackend):
def __init__(self, region_name=None):
super(KinesisVideoBackend, self).__init__()
self.region_name = region_name
self.streams = {}

def reset(self):
region_name = self.region_name
self.__dict__ = {}
self.__init__(region_name)

def create_stream(
self,
device_name,
stream_name,
media_type,
kms_key_id,
data_retention_in_hours,
tags,
):
streams = [_ for _ in self.streams.values() if _.stream_name == stream_name]
if len(streams) > 0:
raise ResourceInUseException(
"The stream {} already exists.".format(stream_name)
)
stream = Stream(
self.region_name,
device_name,
stream_name,
media_type,
kms_key_id,
data_retention_in_hours,
tags,
)
self.streams[stream.arn] = stream
return stream.arn

def _get_stream(self, stream_name, stream_arn):
if stream_name:
streams = [_ for _ in self.streams.values() if _.stream_name == stream_name]
if len(streams) == 0:
raise ResourceNotFoundException()
stream = streams[0]
elif stream_arn:
stream = self.streams.get(stream_arn)
if stream is None:
raise ResourceNotFoundException()
return stream

def describe_stream(self, stream_name, stream_arn):
stream = self._get_stream(stream_name, stream_arn)
stream_info = stream.to_dict()
return stream_info

def list_streams(self, max_results, next_token, stream_name_condition):
stream_info_list = [_.to_dict() for _ in self.streams.values()]
next_token = None
return stream_info_list, next_token

def delete_stream(self, stream_arn, current_version):
stream = self.streams.get(stream_arn)
if stream is None:
raise ResourceNotFoundException()
del self.streams[stream_arn]

def get_data_endpoint(self, stream_name, stream_arn, api_name):
stream = self._get_stream(stream_name, stream_arn)
return stream.get_data_endpoint(api_name)

# add methods from here


kinesisvideo_backends = {}
for region in Session().get_available_regions("kinesisvideo"):
kinesisvideo_backends[region] = KinesisVideoBackend(region)
for region in Session().get_available_regions(
"kinesisvideo", partition_name="aws-us-gov"
):
kinesisvideo_backends[region] = KinesisVideoBackend(region)
for region in Session().get_available_regions("kinesisvideo", partition_name="aws-cn"):
kinesisvideo_backends[region] = KinesisVideoBackend(region)
70 changes: 70 additions & 0 deletions moto/kinesisvideo/responses.py
@@ -0,0 +1,70 @@
from __future__ import unicode_literals
from moto.core.responses import BaseResponse
from .models import kinesisvideo_backends
import json


class KinesisVideoResponse(BaseResponse):
SERVICE_NAME = "kinesisvideo"

@property
def kinesisvideo_backend(self):
return kinesisvideo_backends[self.region]

def create_stream(self):
device_name = self._get_param("DeviceName")
stream_name = self._get_param("StreamName")
media_type = self._get_param("MediaType")
kms_key_id = self._get_param("KmsKeyId")
data_retention_in_hours = self._get_int_param("DataRetentionInHours")
tags = self._get_param("Tags")
stream_arn = self.kinesisvideo_backend.create_stream(
device_name=device_name,
stream_name=stream_name,
media_type=media_type,
kms_key_id=kms_key_id,
data_retention_in_hours=data_retention_in_hours,
tags=tags,
)
return json.dumps(dict(StreamARN=stream_arn))

def describe_stream(self):
stream_name = self._get_param("StreamName")
stream_arn = self._get_param("StreamARN")
stream_info = self.kinesisvideo_backend.describe_stream(
stream_name=stream_name, stream_arn=stream_arn,
)
return json.dumps(dict(StreamInfo=stream_info))

def list_streams(self):
max_results = self._get_int_param("MaxResults")
next_token = self._get_param("NextToken")
stream_name_condition = self._get_param("StreamNameCondition")
stream_info_list, next_token = self.kinesisvideo_backend.list_streams(
max_results=max_results,
next_token=next_token,
stream_name_condition=stream_name_condition,
)
return json.dumps(dict(StreamInfoList=stream_info_list, NextToken=next_token))

def delete_stream(self):
stream_arn = self._get_param("StreamARN")
current_version = self._get_param("CurrentVersion")
self.kinesisvideo_backend.delete_stream(
stream_arn=stream_arn, current_version=current_version,
)
return json.dumps(dict())

def get_data_endpoint(self):
stream_name = self._get_param("StreamName")
stream_arn = self._get_param("StreamARN")
api_name = self._get_param("APIName")
data_endpoint = self.kinesisvideo_backend.get_data_endpoint(
stream_name=stream_name, stream_arn=stream_arn, api_name=api_name,
)
return json.dumps(dict(DataEndpoint=data_endpoint))

# add methods from here


# add templates from here
18 changes: 18 additions & 0 deletions moto/kinesisvideo/urls.py
@@ -0,0 +1,18 @@
from __future__ import unicode_literals
from .responses import KinesisVideoResponse

url_bases = [
"https?://kinesisvideo.(.+).amazonaws.com",
]


response = KinesisVideoResponse()


url_paths = {
"{0}/createStream$": response.dispatch,
"{0}/describeStream$": response.dispatch,
"{0}/deleteStream$": response.dispatch,
"{0}/listStreams$": response.dispatch,
"{0}/getDataEndpoint$": response.dispatch,
}