### Create streams and tables

In [1]:
import nuclio

In [2]:
%run ../util/set_env.ipynb

In [3]:
# nuclio: start-code

In [4]:
import os
import v3io_frames as v3f
from v3io import dataplane

def handler(context, event):
    # Create streams
    frames_client = v3f.Client("framesd:8081", container="bigdata")
    frames_client.create("stream",
                         table=os.getenv("RAW_VIDEO_STREAM"),
                         shards=100,
                         retention_hours=24)
    context.log_result('raw_video_stream_url', "http://v3io-webapi:8081/%s/%s/"% (os.getenv('IGZ_CONTAINER'),os.getenv('RAW_VIDEO_STREAM')))
     
    frames_client.create("stream",
                         table=os.getenv("TAGGED_VIDEO_STREAM"),
                         shards=100,
                         retention_hours=24)
    context.log_result('tagged_video_stream_url', "http://v3io-webapi:8081/%s/%s/"% (os.getenv('IGZ_CONTAINER'),os.getenv('TAGGED_VIDEO_STREAM')))

    # Create camera list table
    v3io_client = dataplane.Client(max_connections=1)
    v3io_client.create_schema(container=os.getenv("IGZ_CONTAINER"),
                              path=os.getenv("CAMERA_LIST_TBL"),
                              key='cameraID',
                              fields=[
                                  {'name': 'cameraID', 'type': 'string', 'nullable': False},
                                  {'name': 'shard', 'type': 'long', 'nullable': False},
                                  {'name': 'url', 'type': 'string', 'nullable': False},
                                  {'name': 'active', 'type': 'boolean', 'nullable': False},
                                  {'name': 'feed_method', 'type': 'string', 'nullable': False},
                                  {'name': 'users', 'type': 'string', 'nullable': True},
                                  {'name': 'password', 'type': 'string', 'nullable': True},
                              ])
    
    # Add camera to table
    v3io_client.put_item(container=os.getenv("IGZ_CONTAINER"),
                         path=os.path.join(os.getenv("CAMERA_LIST_TBL"), os.getenv("CAMERA_ID")),
                         attributes={'cameraID': os.getenv("CAMERA_ID"),
                                     'shard': int(os.getenv("SHARD_ID")),
                                     'url': os.getenv("CAMERA_URL"),
                                     'feed_method': "push",
                                     'active': True})

In [5]:
# nuclio: end-code

In [6]:
from mlrun import code_to_function, mount_v3io, mlconf
import os

fn = code_to_function('create-streams-tables', kind='job', handler="handler", project=os.getenv("PROJECT"))

fn.spec.build.image = f"docker-registry.{os.getenv('IGZ_NAMESPACE_DOMAIN')}:80/{os.getenv('DOCKER_IMAGE')}"
fn.spec.min_replicas = 1
fn.spec.max_replicas = 1

fn.set_env('RAW_VIDEO_STREAM', RAW_VIDEO_STREAM)
fn.set_env('TAGGED_VIDEO_STREAM', TAGGED_VIDEO_STREAM)
fn.set_env('IGZ_CONTAINER', IGZ_CONTAINER)
fn.set_env('CAMERA_LIST_TBL', CAMERA_LIST_TBL)
fn.set_env('CAMERA_ID', CAMERA_ID)
fn.set_env('SHARD_ID', SHARD_ID)
fn.set_env('CAMERA_URL', CAMERA_URL)

fn.export("../yaml/create-streams-tables.yaml")

> 2020-10-28 19:55:33,246 [info] function spec saved to path: ../yaml/create-streams-tables.yaml


<mlrun.runtimes.kubejob.KubejobRuntime at 0x7efd33cee390>