# Nuclio - Generator

## Setup the environment

In [1]:
# nuclio: ignore
import nuclio

### Define environment variables

In [2]:
%%nuclio env

# Iguazio access
V3IO_FRAMESD=${V3IO_FRAMESD}
V3IO_USERNAME=${V3IO_USERNAME}
V3IO_ACCESS_KEY=${V3IO_ACCESS_KEY}

# Function variables
BATCH_SIZE=100
DATA_STREAM=customers_stream
london_locations=03311311313011311011000321002320,03311311311233323013031101320003,03311311313010023000032330133111,03311311311222300331010333220231
ACCURACY=20
BASE_ACCURACY=14

%nuclio: setting 'V3IO_FRAMESD' environment variable
%nuclio: setting 'V3IO_USERNAME' environment variable
%nuclio: setting 'V3IO_ACCESS_KEY' environment variable
%nuclio: setting 'BATCH_SIZE' environment variable
%nuclio: setting 'DATA_STREAM' environment variable
%nuclio: setting 'london_locations' environment variable
%nuclio: setting 'ACCURACY' environment variable
%nuclio: setting 'BASE_ACCURACY' environment variable


%nuclio: cannot find "=" in line
%nuclio: cannot find "=" in line


### Base image

In [4]:
%nuclio config spec.build.baseImage = "python:3.6-jessie"

%nuclio: setting spec.build.baseImage to 'python:3.6-jessie'


### Set cron trigger

In [33]:
%nuclio config spec.triggers.secs.kind = "cron"
%nuclio config spec.triggers.secs.attributes.interval = "1m"

%nuclio: setting spec.triggers.secs.kind to 'cron'
%nuclio: setting spec.triggers.secs.attributes.interval to '1m'


### Install packages

In [5]:
%%nuclio cmd

# General
pip install pandas
# pip install json

# DB
pip install v3io_frames

# Function
pip install faker



## Function code

### Imports

In [3]:
import os
import itertools
import random

# Data handling
import pandas as pd

# DB
import v3io_frames as v3f

# Function
import faker
from faker.providers import BaseProvider

### Helper classes definitions

In [4]:
class LocationProvider(BaseProvider):
    '''
    Creates locations within base_location

    Uses QuadTree for Geohashing
        @{http://tech.taskrabbit.com/blog/2015/06/09/elasticsearch-geohash-vs-geotree/}
        @{http://mapzen.github.io/leaflet-spatial-prefix-tree/}
        @{http://blog.notdot.net/2009/11/Damn-Cool-Algorithms-Spatial-indexing-with-Quadtrees-and-Hilbert-Curves}
    '''    
    def location(self, location_base: str, base_acc: int=10, acc: int=20):
        coordinates = location_base[:base_acc]
        for i in range(acc-len(coordinates)):
            coordinates += str(random.randint(0, 3))
        return coordinates

### Init context

In [5]:
def init_context(context):
    ##########
    # Setups #
    ##########
    
    # DB Contexts
    v3c_frames = v3f.Client('http://' + os.environ['V3IO_FRAMESD'])
    setattr(context, 'v3f', v3c_frames)
    
    # DB Tables
    customers_table = os.getenv('CUSTOMERS', 'customers')
    setattr(context, 'customers_table', customers_table)
    
    customers_stream = os.getenv('CUSTOMERS_STREAM', 'customers_stream')
    setattr(context, 'customers_stream', customers_stream)
    
    # Function
    fakers = faker.Faker()
    fakers.add_provider(LocationProvider)
    setattr(context, 'faker', fakers)
    
    locations = list(os.getenv('london_locations', '').split(','))
    setattr(context, 'locations', locations)
    
    customers = v3c_frames.read('kv', customers_table)
    customers = range(customers.shape[0])
    setattr(context, 'customers', customers)
    
    location_accuracy_params = [int(os.getenv('BASE_ACCURACY', 14)), int(os.getenv('ACCURACY', 20))]
    setattr(context, 'location_accuracy_params', location_accuracy_params)
    
    ###########
    # Actions #
    ###########
    v3c_frames.delete('stream', customers_stream)
    v3c_frames.create('stream', customers_stream, attrs={'retention_hours':48,'shards':1})

### Helper functions

In [6]:
def create_customer(context):
    customer_id = random.choice(context.customers)
    location = context.faker.location(random.choice(context.locations), *context.location_accuracy_params)
    customer = {
        'id': customer_id,
        'location': location
    }
    return customer

In [7]:
def create_batch(context, batch_size: int):
    customers = (create_customer(context) for i in range(batch_size))
    customers = pd.DataFrame.from_records(itertools.chain(customers))
    customers = customers.set_index(['id'])
    return customers

### Handler

In [8]:
def handler(context, event):
    # Create customers
    customers = create_batch(context, int(os.getenv('BATCH_SIZE', 100)))
    context.logger.debug(f'created {customers.count()} customers')
    context.logger.debug(f'example:\n{customers.head(1)}')
    
    # Send to stream
    context.v3f.write('stream', context.customers_stream, [customers])

In [34]:
%nuclio deploy -n generator -p recommendation_engine -c

%nuclio: ['deploy', '-n', 'generator', '-p', 'recommendation_engine', '-c', '/User/tutorials/demos/location_based_recommendation/generator.ipynb']
%nuclio: [nuclio.deploy] 2019-04-29 11:33:24,413 (info) Building processor image
%nuclio: [nuclio.deploy] 2019-04-29 11:33:29,469 (info) Pushing image
%nuclio: [nuclio.deploy] 2019-04-29 11:33:30,478 (info) Build complete
%nuclio: [nuclio.deploy] 2019-04-29 11:33:37,544 (info) Function deploy complete
%nuclio: [nuclio.deploy] 2019-04-29 11:33:37,550 done updating generator, function address: 35.158.112.89:31803
%nuclio: function deployed


In [9]:
# nuclio: ignore
init_context(context)

In [10]:
# nuclio: ignore
event = nuclio.Event(body='')
handler(context, event)

In [14]:
%nuclio show




[NbConvertApp] Converting notebook /User/tutorials/demos/location_based_recommendation/generator.ipynb to nuclio.export.NuclioExporter
Traceback (most recent call last):
  File "/conda/lib/python3.6/runpy.py", line 193, in _run_module_as_main
    "__main__", mod_spec)
  File "/conda/lib/python3.6/runpy.py", line 85, in _run_code
    exec(code, run_globals)
  File "/conda/lib/python3.6/site-packages/nbconvert/__main__.py", line 2, in <module>
    main()
  File "/conda/lib/python3.6/site-packages/jupyter_core/application.py", line 266, in launch_instance
    return super(JupyterApp, cls).launch_instance(argv=argv, **kwargs)
  File "/conda/lib/python3.6/site-packages/traitlets/config/application.py", line 658, in launch_instance
    app.start()
  File "/conda/lib/python3.6/site-packages/nbconvert/nbconvertapp.py", line 337, in start
    self.convert_notebooks()
  File "/conda/lib/python3.6/site-packages/nbconvert/nbconvertapp.py", line 507, in convert_notebooks
    self.convert_single_not

Exception: cannot convert notebook

In [32]:
# nuclio: ignore
v3f.Client('http://' + os.environ['V3IO_FRAMESD']).read('stream', table='customers_stream', seek='time',start='now-1m', shard_id='0', iterator=False)

Unnamed: 0_level_0,id,location,stream_time
seq_number,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
140201,1957.0,03311311313010320303,2019-04-29 11:28:59.018522886
140202,216.0,03311311313011201121,2019-04-29 11:28:59.018522886
140203,905.0,03311311311222222212,2019-04-29 11:28:59.018522886
140204,1013.0,03311311313011001221,2019-04-29 11:28:59.018522886
140205,1575.0,03311311311233232323,2019-04-29 11:28:59.018522886
140206,1440.0,03311311313010223032,2019-04-29 11:28:59.018522886
140207,2276.0,03311311311233313102,2019-04-29 11:28:59.018522886
140208,551.0,03311311311233331210,2019-04-29 11:28:59.018522886
140209,523.0,03311311311222202313,2019-04-29 11:28:59.018522886
140210,2751.0,03311311313011313131,2019-04-29 11:28:59.018522886
