In [None]:
import copy 
import logging
import psycopg2
import itertools

from collections import Counter
from heapq import merge
from itertools import groupby
from operator import itemgetter

from t1000.embedding import video

def fetch(dbname, user, host, password, query):
    '''
    Executes query on a given database
    '''
    connection_string = "dbname='{0}' user='{1}' host='{2}' password='{3}'".format(
        dbname, user, host, password)
    try:
        conn = psycopg2.connect(connection_string)
        curr = conn.cursor()
        curr.execute(query)
        res = curr.fetchall()
    except Exception as e:
        print(e)
        logging.exception('')
    finally:
        curr.close()
        conn.close()
        
    return res

def inner_join(a, b):
    '''
    Joins two iterables of tuples on the first 
    element
    
    Arguments:
    a - list of tuples (id, x)
    b - list of tuples (id, y)
    
    Returns:
    list of tuples (id, x, y)
    '''
    key = itemgetter(0)
    a.sort(key=key) 
    b.sort(key=key)
    for _, group in groupby(merge(a, b, key=key), key):
        row_a, row_b = next(group), next(group, None)
        if row_b is not None: # join
            yield row_a + row_b[1:]
            

def filter_videos(videos, min_count = 10):
    '''
    Filters videos and returns mapping to the original tags
    
    Returns:
    filtered       - a list with transformed and filtered videos
    tags_2_indices - a dictinary that transforms tags to rank of their
                     frequencies
    indices_2_tags - inverse dictionary
    '''
    
    # we have to iterate twice, first to create dictionary, then 
    # then to filter tags and transform the list
    if not isinstance(videos, list):
        videos = list(videos)
        
    # Filters top tags and creates mapping
    count = Counter(itertools.chain(*[tup[1] for tup in videos]))
    tags_2_indices = { 
        tag_id: index 
            for index, (tag_id, count) in enumerate(count.most_common(), 1)
            if count >= min_count 
    }

    # reverse index for decoding 
    indices_2_tags = { 
        v: k for k, v in tags_2_indices.items()
    }

    filtered = []
    for video_id, tags, url in videos:
        encoded = [tags_2_indices[t] for t in tags if t in tags_2_indices]
        if encoded:
            filtered.append((video_id, encoded, url))
                
    return filtered, tags_2_indices, indices_2_tags

## Videos data

In [None]:
dbname='ds-wizards'
user='wizard'
host='192.95.32.117'
password='GaG23jVxZhMnQaU53r8o'

VQUERY = "select post_id, url from videos where status='ok'"

In [None]:
vres = fetch(dbname, user, host, password, VQUERY)
vres = [(post_id.split("_")[1], url) for post_id, url in vres]

## Tags

In [None]:
dbname='ds-content-tags'
user='ds-content-tags'
password='0fXjWl592vNf1gYvIw8w'
host='192.95.32.117'

TQUERY = "select id, tags from videos where tags is not NULL"
TAGS = "select tag_id, name, path from content_tags"

In [None]:
tres = fetch(dbname, user, host, password, TQUERY)

In [None]:
tags = { 
    tag_id: (name, path) for (tag_id, name, path) in fetch(
        dbname, user, host, password, TAGS) 
}

## Join videos with tags

In [None]:
videos = inner_join(tres, vres)
filtered, t2i, i2t = filter_videos(videos, 10)
print("Found %d videos with %d unique tags" % (len(filtered), len(t2i)))

In [None]:
import os
import numpy as np
import tensorflow as tf

from tensorflow.python.platform import gfile

In [None]:
os.environ["CUDA_VISIBLE_DEVICES"] = "0"

In [None]:
import pickle
from time import gmtime, strftime

class FramesIterator:
    '''iterator that yields raw frames from database'''

    def __init__(self, videos):
        self.videos = copy.deepcopy(videos)

    def __iter__(self):
        return self

    def __next__(self):
        if self.videos:
            print("[%s]Downloading url" % strftime("%Y-%m-%d %H:%M:%S", gmtime()))
            video_id, video_tags, video_path = self.videos.pop()
            
            # this could be done in parallel
            frames = video.extract_frames(video_path)
        else:
            raise StopIteration
        
        return video_id, frames, video_tags


def extract_incepction_v3(frame_iterator, model_dir, logging_step = 100):
    '''
    Extract incepction_v3 features from frame generator.
    
    Inputs:
    frame_iterator - an iterator yielding video frames
    model_dir      - a directory to inception model 
    logging_step   - log progress after this number of steps
    '''
    logger = logging.getLogger(__name__)
    logger.debug("Extracting inception features")

    # load incepction 3 graph
    with gfile.FastGFile(model_dir, 'rb') as f:
        graph_def = tf.GraphDef()
        graph_def.ParseFromString(f.read())
        _ = tf.import_graph_def(graph_def, name='')


    with tf.Session() as sess:
        # TODO: add queuing and batching for optimal performance
        for index, item in enumerate(frame_iterator):

            if index % logging_step == 0:
                print("Extracting features from video number %s" % index)
                logger.info("Extracting features from video number %s" % index)
            
            item_id, frames, tags = item
            img_features = []
            for frame in frames:
                # get tensor from network
                pool3_layer = sess.graph.get_tensor_by_name('pool_3:0')
                predictions = sess.run(pool3_layer, {'DecodeJpeg:0': frame})

                # concatenate features
                features = np.squeeze(predictions)
                img_features.append(features)

            np.array(img_features, dtype=np.float32) 

In [None]:
frames = FramesIterator(filtered[:5])

In [None]:
extract_incepction_v3(frames, '/models/image/inception/classify_image_graph_def.pb',  1)

In [81]:
import multiprocessing
import time
import logging
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s  %(levelname)-8s %(message)s',
                    datefmt='%m-%d %H:%M:%S')

#Producer class
class Producer(multiprocessing.Process):
    def __init__(self, items, idx, queue):
        super(producer, self).__init__()
        self.items = items
        self.queue = queue
        self.idx = idx

    def run(self):
        logging.info("Starting %d producer " % (self.idx ))
        
        while self.items:
            item = self.items.pop()
            self.queue.put("%d.%d" % (self.idx, item))
            time.sleep(2) #Unnecessary sleep to demonstrate order of events

        logging.info("This is it! [%d]" % self.idx)
        self.queue.put(None)
        
        logging.info('Ending producer')
        return

#Consumer class
class Consumer(multiprocessing.Process):
    def __init__(self, idx, queues):
        super(consumer, self).__init__()
        self.queues = queues
        self.idx = idx

    def run(self):
        logging.info("Starting %d consumer" % (self.idx ))
        while self.queues:
            for queue in self.queues:
                stuff = queue.get()
                if stuff is None:
                    self.queues[:] = [q for q in self.queues if q != queue]
                    logging.info("Rmoved %s from queues. %d left" % (queue, len(self.queues)))
                    continue
                
                producer_id, value = stuff.split(".")
                logging.info('Consumer %d: Got "%s" from %s' % (self.idx, value, producer_id))
                time.sleep(1) #Unnecessary sleep to demonstrate order of events

        logging.info("Ending %d consumer" % (self.idx ))
        return

def chunks(seq, num):
    avg = len(seq) / float(num)
    out = []
    last = 0.0
    while last < len(seq):
        out.append(seq[int(last):int(last + avg)])
        last += avg

    return out

if __name__=='__main__':
    NCORE = 4
    NPRORD = 3
    
    work = list(range(10))
    work = chunks(work, NPRORD)
    

    #make reader for reading data. lets call this object Producer
    producers = []
    queues = []
    for idx in range(NPRORD):   
        queues.append(multiprocessing.Queue())
        producers.append(producer(work[idx], idx, queues[idx]))
    
    
    q = multiprocessing.Queue()
    queues.append(q)
    producers.append(producer(list(range(20, 30)), 999, q))
    
    print(len(producers), len(queues))

    #make receivers for the data. Lets call these Consumers
    #Each consumer is assigned a queue
    consumer_object = consumer(1, queues)
    consumer_object.start()

    # start the producer processes
    for producer_object in producers:
        producer_object.start()


    #Join all started processes
    consumer_object.join()    
    
    for producer_object in producers:
        producer_object.join()