In [1]:
!mkdir -p pruebas

In [1]:
%%file pruebas/genera_json.c

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>

// Recibe como argumetno la cantidad de mensajes a enviar
// por defecto son 100. El máximo son 200 y el mínumo 1
int main(int argc, char **argv) {
    int i, x, y, cantidad;
    char buffer[500];
    
    srand(getpid());
    
    if (argc < 2) {
        cantidad = 100;
    } else {
        cantidad = atoi(argv[1]);
        if ((cantidad < 1) || (cantidad > 1000)) {
            cantidad = 100;
        }
    }

    for (i=0; i<cantidad; i++) {
        x = rand() % 500;
        y = rand() % 500;
        sprintf(buffer, "{\"numero\": %d, \"x\": %d, \"y\": %d}\n", 
                1 + (rand() % 3), x, y);
        //fprintf(stderr, "Mensaje %i\n", i);
        printf("%s", buffer);
        fflush(stdout);
    }
}


Overwriting pruebas/genera_json.c


In [2]:
!gcc pruebas/genera_json.c -o pruebas/genera_json

In [42]:
!./pruebas/genera_json 2 | nc localhost 4455

random: not enough state (4 bytes); ignored.


# Separar por `{` y `}`

In [25]:
!lsof -i TCP:4455 | grep LISTEN | awk '{print $2}' | xargs kill

In [26]:
# coding: utf-8

from Queue import Queue


class SeparaJSON(object):
    """Maquina de pila estados para separar los paquetes, basados en la
    cantidad de llaves"""

    # Estados
    ESPERO_APERTURA = 'ESPERO_APERTURA'
    ABIERTO = 'ABIERTO'

    def __init__(self):
        """Constructor"""
        # Almacenamiento temporal
        self.buff = ''
        self.estado = self.ESPERO_APERTURA
        self.cantidad_llaves = 0
        self.mensajes = Queue()

    def procesar(self, data):
        """Procesa una cadena de entrada (de caracter a caracter)"""
        for c in data:
            self.procesar_char(c)

    def procesar_char(self, c):
        """Procesa un caracter de entrad"""

        if self.estado == self.ESPERO_APERTURA:
            if c == '{':
                self.estado = self.ABIERTO
                self.cantidad_llaves += 1
                self.buff += c
            else:
                pass  # Descarte
        elif self.estado == self.ABIERTO:
            if c == '}':
                self.cantidad_llaves -= 1
                self.buff += c
                if self.cantidad_llaves == 0:
                    # Mensaje completo, se desapilaron todas las llaves
                    self.mensajes.put(self.buff)
                    print self.buff
                    self.buff = ''
                    self.estado = self.ESPERO_APERTURA
            elif c == '{':
                self.cantidad_llaves += 1
                self.buff += c
            else:
                self.buff += c

    def disponible(self):
        return not self.mensajes.empty()

    def siguiente(self):
        """Devuelve el próximo mensaje"""
        return self.mensajes.get()

    def __iter__(self):
        """Iterador para for"""
        return IterQueue(self.mensajes)


class IterQueue(object):
    def __init__(self, q):
        self.q = q

    def next(self):
        if self.q.empty():
            raise StopIteration()
        return self.q.get()

In [31]:
from socket import socket
import json
from time import sleep
from subprocess import call
from time import time
import signal
import os
from multiprocessing import Process
import sys

PUERTO = 4455

def generar_json_y_pasar_por_netcat():
    sleep(0.5)
    print "Lanzando C"
    call("./pruebas/genera_json 200 | nc localhost %d" % PUERTO, shell=True)

def escuchar_conexiones():
    conexion = None
    def terminar():
        # Evitar el Error ADDRESS ALREADY IN USE
        conexion.shutdown(socket.SHUT_RDWR)
        conexion.close()
        sys.exit()   # Terminar el proceso
        
    signal.signal(signal.SIGTERM, terminar)
    conexion = socket()
    conexion.bind(('', 4455))
    conexion.listen(10)
    sock, addr = conexion.accept()
    errores, oks = 0, 0
    

    separador = SeparaJSON()
    
    while True:
        data = sock.recv(100)
        if not data:
            print "Socket Cerrado, finalizando!"
            print "OK=%d ERRORES=%d" % (oks, errores)
            break

        separador.procesar(data)
        
        for m in separador:
            try:
                json.loads(m)
            except ValueError:
                print "Error en JSON: %s" % m
                errores += 1
            else:
                oks += 1
            
p1 = Process(target=generar_json_y_pasar_por_netcat)
p2 = Process(target=escuchar_conexiones)
p2.start()
p1.start()
sleep(1)
p1.terminate()
p2.terminate()


Lanzando C
{"numero": 1, "x": 383, "y": 386}
{"numero": 2, "x": 415, "y": 293}
{"numero": 1, "x": 386, "y": 492}
{"numero": 2, "x": 421, "y": 362}
{"numero": 3, "x": 190, "y": 59}
{"numero": 1, "x": 426, "y": 40}
{"numero": 3, "x": 172, "y": 236}
{"numero": 1, "x": 368, "y": 67}
{"numero": 3, "x": 282, "y": 30}
{"numero": 2, "x": 123, "y": 67}
{"numero": 1, "x": 429, "y": 302}
{"numero": 1, "x": 58, "y": 69}
{"numero": 2, "x": 393, "y": 456}
{"numero": 1, "x": 42, "y": 229}
{"numero": 3, "x": 421, "y": 419}
{"numero": 3, "x": 37, "y": 198}
{"numero": 3, "x": 315, "y": 370}
{"numero": 3, "x": 26, "y": 91}
{"numero": 3, "x": 456, "y": 373}
{"numero": 2, "x": 170, "y": 496}
{"numero": 3, "x": 305, "y": 425}
{"numero": 3, "x": 327, "y": 336}
{"numero": 2, "x": 346, "y": 229}
{"numero": 1, "x": 357, "y": 124}
{"numero": 3, "x": 82, "y": 45}
{"numero": 3, "x": 367, "y": 434}
{"numero": 2, "x": 43, "y": 250}
{"numero": 3, "x": 308, "y": 276}
{"numero": 1, "x": 288, "y": 84}
{"numero": 2, "x":