# Laboratorio Sistemas Distribuidos

Integrantes:

            Luis Aguilar

            Juan De Pablo

            Gabriel Venegas

Profesores: 

            Daniel Wladimiro

            Joaquin Villagra
            
### Tema
Servicio streaming que sea capaz de leer un dato de otro emisor streaming, realizar una evalaución en algun motor de reglas o un cruce con base de datos en memoría y posterior emisión de dato a la red bajo patron publish-suscribe.

requisito no funcional: Lenguaje de programación libre dentro del stack de spark.

Proveedor cloud: Amazon Web Service




### Datos para la creación del data stream

In [271]:
stream_name = 'test'
region = 'us-east-2'
aws_profile = 'ejemplo'

## Definición de funciones para crear el stream

In [272]:
import boto
from boto.kinesis.exceptions import ResourceInUseException
import os
import time

if aws_profile:
    os.environ['AWS_PROFILE'] = aws_profile

# conexión a kinesis
kinesis = boto.kinesis.connect_to_region(region)

def get_status():
    r = kinesis.describe_stream(stream_name)
    description = r.get('StreamDescription')
    status = description.get('StreamStatus')
    return status

def create_stream(stream_name):
    try:
        # creación del stream
        kinesis.create_stream(stream_name, 1)
        print('stream {} created in region {}'.format(stream_name, region))
    except ResourceInUseException:
        print('stream {} already exists in region {}'.format(stream_name, region))


    while get_status() != 'ACTIVE':
        time.sleep(1)
    print('stream {} is active'.format(stream_name))

## Creación del stream

In [273]:
create_stream(stream_name)

stream test created in region us-east-2
stream test is active


## Definición funciones creación del data stream

In [279]:
import datetime
import time
import threading
import random
from boto.kinesis.exceptions import ResourceNotFoundException

class KinesisProducer(threading.Thread):
      
    def __init__(self, stream_name, sleep_interval=None, prod ='producer1'):
        self.stream_name = stream_name
        self.sleep_interval = sleep_interval
        self.prod = prod
        super().__init__()
        
    def put_record(self):
       
        nombre = ["Maria", "Juan", "Pedro", "Luis", "Gabriel", "Alejandro", "Susana", "Amanda"]
        apellidos = ["Gonzalez", "Saez", "Banda", "Fucceneco", "Aguilar", "Paredes", "Morales"]
        timestamp = datetime.datetime.utcnow()
        part_key = str(random.randint(1, 20000000))
        time = timestamp.isoformat()
        birthday = str(random.randint(1, 31)) + "/" + str(random.randint(3, 12)) + "/" + str(random.randint(1950, 2018))
        data = nombre[random.randint(0, 7)] + " " + apellidos[random.randint(0, 6)] + " " + apellidos[random.randint(0, 6)] +" , nacimiento:" + birthday + " , " + time
        kinesis.put_record(self.stream_name, data, part_key)
    
    def run_continously(self):
       
        while True:
            self.put_record()
            time.sleep(self.sleep_interval)
                
    def run(self):
        """run the producer"""
        try:
            if self.sleep_interval:
                self.run_continously()
            else:
                self.put_record()
        except ResourceNotFoundException:
            print('stream {} not found. Exiting'.format(self.stream_name))

## Productor de data stream

In [280]:
producer = KinesisProducer(stream_name, sleep_interval=45, prod='producer')

producer.start()


## Definición funciones del consumidor para visualizar data stream

In [283]:
from boto.kinesis.exceptions import ProvisionedThroughputExceededException
import datetime

class KinesisConsumer:
  
    def __init__(self, stream_name, shard_id, iterator_type,
                 worker_time=30, sleep_interval=0.5):
   
        self.stream_name = stream_name
        self.shard_id = str(shard_id)
        self.iterator_type = iterator_type
        self.worker_time = worker_time
        self.sleep_interval = sleep_interval
        
    def process_records(self, records):
  
        raise NotImplementedError
    
    @staticmethod
    def iter_records(records):
        for record in records:
            part_key = record['PartitionKey']
            data = record['Data']
            yield part_key, data
    
    def run(self):
  
        response = kinesis.get_shard_iterator(self.stream_name,
            self.shard_id, self.iterator_type)
        
        next_iterator = response['ShardIterator']

        start = datetime.datetime.now()
        finish = start + datetime.timedelta(seconds=self.worker_time)
        
        while finish > datetime.datetime.now():
            try:
                response = kinesis.get_records(next_iterator, limit=25)
        
                records = response['Records']
                if records:
                    self.process_records(records)
            
                next_iterator = response['NextShardIterator']
                time.sleep(self.sleep_interval)
            except ProvisionedThroughputExceededException as ptee:
                time.sleep(1)

In [284]:
class EchoConsumer(KinesisConsumer):
    
    def process_records(self, records):
        
        for part_key, data in self.iter_records(records):
            print("rut: ", part_key, " , ", data) 

In [285]:
shard_id = 'shardId-000000000000'
iterator_type =  'TRIM_HORIZON'
worker = EchoConsumer(stream_name, shard_id, iterator_type, worker_time=30)

In [286]:
worker.run()

rut:  7591718  ,  Amanda Aguilar Aguilar , nacimiento:16/5/1953 , 2019-05-21T22:34:49.158813
rut:  5052552  ,  Luis Paredes Aguilar , nacimiento:30/3/1981 , 2019-05-21T22:34:49.161638
rut:  15179585  ,  Pedro Morales Fucceneco , nacimiento:10/11/2008 , 2019-05-21T22:35:35.320319
rut:  24625  ,  Juan Morales Gonzalez , nacimiento:14/11/1971 , 2019-05-21T22:35:50.336729
rut:  18687227  ,  Pedro Morales Paredes , nacimiento:4/6/1968 , 2019-05-21T22:36:21.972356
rut:  8733185  ,  Gabriel Gonzalez Paredes , nacimiento:9/6/2015 , 2019-05-21T22:36:52.260326
rut:  7279753  ,  Juan Fucceneco Saez , nacimiento:12/12/1988 , 2019-05-21T22:37:08.620322
rut:  10159936  ,  Maria Saez Aguilar , nacimiento:25/8/1986 , 2019-05-21T22:37:53.396324
rut:  5454636  ,  Maria Saez Aguilar , nacimiento:29/9/1992 , 2019-05-21T22:37:55.264338
rut:  11638657  ,  Pedro Saez Banda , nacimiento:4/6/1958 , 2019-05-21T22:38:41.412328
rut:  7579572  ,  Amanda Paredes Paredes , nacimiento:27/12/1968 , 2019-05-21T22:38:55

rut:  1386080  ,  Susana Morales Morales , nacimiento:17/11/1978 , 2019-05-21T23:18:49.300285
rut:  16428698  ,  Gabriel Morales Gonzalez , nacimiento:18/5/1988 , 2019-05-21T23:19:01.552250
rut:  10809934  ,  Maria Fucceneco Paredes , nacimiento:18/4/1952 , 2019-05-21T23:19:35.775730
rut:  3763892  ,  Juan Aguilar Aguilar , nacimiento:29/11/2013 , 2019-05-21T23:20:03.263800
rut:  11024140  ,  Juan Saez Aguilar , nacimiento:2/8/2016 , 2019-05-21T23:20:21.764525
rut:  17236373  ,  Amanda Aguilar Saez , nacimiento:1/6/2009 , 2019-05-21T23:21:05.260317
rut:  12800617  ,  Pedro Fucceneco Paredes , nacimiento:20/12/2014 , 2019-05-21T23:21:08.820251
rut:  9236472  ,  Susana Saez Morales , nacimiento:26/4/1988 , 2019-05-21T23:21:54.388267
rut:  3016964  ,  Maria Paredes Morales , nacimiento:4/6/2016 , 2019-05-21T23:22:07.464258
rut:  13251709  ,  Pedro Aguilar Paredes , nacimiento:18/9/1952 , 2019-05-21T23:22:40.926861
rut:  7211043  ,  Alejandro Saez Morales , nacimiento:27/3/2003 , 2019-05-2

## Redefinición de función de procesamiento de data para busqueda por rut

In [287]:
from collections import defaultdict, Counter
from dateutil import parser
from operator import itemgetter

class SearchConsumer(KinesisConsumer):
  
    
    def __init__(self, stream_name, shard_id, iterator_type, worker_time, search = '111111'):
        self.time_buckets = defaultdict(Counter)
        sleep_interval = 20 # seconds
        self.search = search
        super().__init__(stream_name, shard_id, iterator_type, worker_time, sleep_interval)
        
    def process_records(self, records):
        print("Buscando el rut: ", self.search,"..." )
        
        for rut, timestamp_str in self.iter_records(records):
            data = timestamp_str
            if(rut == self.search):
                print("rut encontrado!!!!")
                print("rut:",rut,",", data)
                now2 = datetime.datetime.utcnow()
                print("hora termino: ",now2)
                return
                    

In [288]:
worker2 = SearchConsumer(stream_name, shard_id, iterator_type, worker_time=120, search='11957790')
now = datetime.datetime.utcnow()
print("hora inicio: ",now)
worker2.run()


hora inicio:  2019-05-21 23:31:36.838241
Buscando el rut:  11957790 ...
Buscando el rut:  11957790 ...
Buscando el rut:  11957790 ...
Buscando el rut:  11957790 ...
Buscando el rut:  11957790 ...
rut encontrado!!!!
rut: 11957790 , Amanda Saez Paredes , nacimiento:22/4/1971 , 2019-05-21T23:24:59.296255
hora termino:  2019-05-21 23:33:03.859030
Buscando el rut:  11957790 ...


In [None]:
kinesis.delete_stream(stream_name)