In [1]:
#pip install azure-eventhub
#pip install mysql-connector-python

import time
import os
import asyncio
import uuid
import datetime
import random
import json
import csv
import mysql.connector as MySQL

from datetime import timedelta
from azure.eventhub import EventHubProducerClient, EventData

In [None]:
def data_simulation():
    # This script simulates the production of events for 10 devices.
    devices = []
    for x in range(0, 10):
        devices.append(str(uuid.uuid4()))

    # Create a producer client to produce and publish events to the event hub.
    producer = EventHubProducerClient.from_connection_string(conn_str="", eventhub_name="")

    for y in range(0,20):    # For each device, produce 20 events. 
        event_data_batch = producer.create_batch() # Create a batch. You will add events to the batch later. 
        for dev in devices:
            # Create a dummy reading.
            reading = {'id': dev, 'timestamp': str(datetime.datetime.utcnow()), 'uv': random.random(), 'temperature': random.randint(70, 100), 'humidity': random.randint(70, 100)}
            s = json.dumps(reading) # Convert the reading into a JSON string.
            event_data_batch.add(EventData(s)) # Add event data to the batch.
        producer.send_batch(event_data_batch) # Send the batch of events to the event hub.

    # Close the producer.    
    producer.close()
    
    return()

In [4]:
def data_simulation_sql():
    # This script simulates the production of events for 10 devices.
    i = 0
    devices = []
    for x in range(0, 10):
        devices.append(str(uuid.uuid4()))

    sqlConn = MySQL.connect(host='localhost', user='root', passwd='', db='')
    cur = sqlConn.cursor()

    for y in range(0,20):    # For each device, produce 20 events. 
        for dev in devices:
            sql = ("INSERT INTO clima (id, fechalectura, uv, humedad, temperatura) VALUES (%s, %s, %s, %s, %s)")
            val = (dev, datetime.datetime.utcnow() + timedelta(seconds=i), random.random(), random.randint(70, 100), random.randint(70, 100))
            
            try:
                cur.execute(sql, val)
            except Exception as e:
                print(e)

            i=i+1
    
    return()

In [None]:
def data_csv(filename):
    # Create a producer client to produce and publish events to the event hub.
    producer = EventHubProducerClient.from_connection_string(conn_str="", eventhub_name="")
    
    event_data_batch = producer.create_batch() # Create a batch. You will add events to the batch later. 

    # read csvfile
    with open(filename, 'r', encoding = 'utf8', newline='') as txtfile: 
        # creating a txt reader object 
        reader = csv.reader(txtfile, delimiter=',') 

        for line in reader:
            reading = {'id': line[0], 'timestamp': str(datetime.datetime.utcnow()), 'temperature': line[1], 'humidity': line[2], 'light': line[3], 'co2': line[4], 'humidityRatio': line[5]}
            s = json.dumps(reading) # Convert the reading into a JSON string.
            
            try:
                event_data_batch.add(EventData(s)) # Add event data to the batch.
            except ValueError:
                # EventDataBatch object reaches max_size.
                # New EventDataBatch object can be created here to send more data.
                # break
                producer.send_batch(event_data_batch) # Send the batch of events to the event hub.    
                event_data_batch = producer.create_batch() # Create a batch. You will add events to the batch later.
                event_data_batch.add(EventData(s)) # Add event data to the batch.
            
                continue

    
    producer.close() # Close the producer.
    
    return()

In [None]:
def data_sql():
    # Create a producer client to produce and publish events to the event hub.
    producer = EventHubProducerClient.from_connection_string(conn_str="", eventhub_name="")
    
    event_data_batch = producer.create_batch() # Create a batch. You will add events to the batch later. 

    sqlConn = MySQL.connect(host='localhost', user='root', passwd='', db='iot')
    cur = sqlConn.cursor()
    cur.execute("SELECT id, fecha, hora, valor, lectura from clima")
    
    for id, fecha, hora, valor, lectura in cur.fetchall:
        reading = {'id': id, 'fecha': fecha, 'hora': hora, 'valor': valor, 'lectura': lectura}
        s = json.dumps(reading) # Convert the reading into a JSON string.
            
        try:
            event_data_batch.add(EventData(s)) # Add event data to the batch.
        except ValueError:
            producer.send_batch(event_data_batch) # Send the batch of events to the event hub.    
            event_data_batch = producer.create_batch() # Create a batch. You will add events to the batch later.
            event_data_batch.add(EventData(s)) # Add event data to the batch.
            
            continue

    
    producer.close() # Close the producer.
    
    return()

In [None]:
async def send_event_data_batch(producer):
    # the events will be distributed to available partitions via round-robin.
    event_data_batch = await producer.create_batch(max_size_in_bytes=1000)

    while True:
        try:
            event_data_batch.add(EventData('Message inside EventBatchData'))
        except ValueError:
            # EventDataBatch object reaches max_size.
            # New EventDataBatch object can be created here to send more data.
            break

    await producer.send_batch(event_data_batch)

In [None]:
async def run():
    producer = EventHubProducerClient.from_connection_string(
        conn_str='',
        eventhub_name=''
    )
    async with producer:
        await send_event_data_batch(producer)

In [5]:
data_simulation_sql()

()

In [None]:
data_sql()

In [None]:
data_csv('DatosPruebaMQTT.csv')