In [None]:
import kafka
import logging
import asyncio
#from zeppelin.notebook.utils.crypto_producer import CryptoProducer
consumer = kafka.KafkaConsumer(group_id='check', bootstrap_servers=['localhost:9094'])
consumer.topics()

In [None]:
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers='localhost:9094')
producer.send('btc', b'Hello, Kafka!')

In [None]:
cl = CryptoProducer(
  kafka_topic='btc',
  kafka_server='localhost:9094',
  start_time='2024-12-03 20:03:00'
)

cl.feed_stream()

In [None]:
async def _produce_message_async(self, producer, topic, messages, key):

    def on_send_success(record_metadata):
      print(record_metadata.topic)
      print(record_metadata.partition)
      print(record_metadata.offset)

    def on_send_error(excp):
      logging.error('Error sending message', exc_info=excp)

    futures = []
    for message in messages:
      future = producer.send(topic, key=message[key], value=message)
      future.add_callback(on_send_success)
      future.add_errback(on_send_error)
      futures.append(future)

    await asyncio.gather(*[asyncio.get_event_loop().run_in_executor(None, future.get) for future in futures])

    producer.flush()
    producer.close()

In [34]:
from datetime import datetime, timedelta
import requests as r
import asyncio
import aiokafka
import kafka
import logging
import json
import time

class CryptoProducer:

  format_str = '%Y-%m-%d %H:%M:%S'
  schema = ['Timestamp', 'Low', 'High', 'Open', 'Close', 'Volume']
  
  def __init__(
    self,
    kafka_topic: str = None,
    kafka_server: str = None,
    start_time: str = None,
    end_time: str = None,
    granularity: int = 60,
    symbol: str = None,
    window: int = 5,
    buffer: int = 60,
    mode: str = 'sync',
  ):
    self.kafka_topic = kafka_topic
    self.kafka_server = kafka_server
    self.granularity = granularity
    self.symbol = symbol
    self.window = window
    self.start_time = start_time
    self.end_time = end_time
    self.buffer = buffer
    self.mode = mode

    self.is_running = False

    if not end_time:
      self.end_time = datetime.utcnow().strftime(self.format_str)

    self.producer = kafka.KafkaProducer(
      bootstrap_servers=self.kafka_server, 
      value_serializer=lambda v: json.dumps(v, default=str).encode('utf-8'),
      key_serializer=lambda k: k.encode('utf-8')
    )

    self.aio_producer = None

  def _get_data(self, start_time, end_time):
    url = f'https://api.exchange.coinbase.com/products/{self.symbol}/candles?granularity={self.granularity}&start={start_time}&end={end_time}'
    return r.get(url).json()
  
  def _break_time_range(self, start_time, end_time):
    start = datetime.strptime(start_time, self.format_str)
    end = datetime.strptime(end_time, self.format_str)
    intervals = []

    while start < end:
      current_end = start + timedelta(hours=self.window)
      if current_end > end:
        current_end = end
      intervals.append((start.strftime(self.format_str), current_end.strftime(self.format_str)))
      start = current_end
    
    return intervals
  
  def _transform_data(self, data):
    return dict(
      [
        (
          col,
          datetime.utcfromtimestamp(val)
        )
        if col == 'Timestamp' else (col, float(val)) for col, val in zip(self.schema, data)
      ]
    )
  
  def _on_send_success(self, record_metadata):
    logging.info(record_metadata.topic)
    logging.info(record_metadata.partition)
    logging.info(record_metadata.offset)

  def _on_send_error(self, excp):
    logging.error('Error sending message', exc_info=excp)

  def _feed_stream(self):

    self.is_running = True
    last = None
    
    try:
      while self.is_running:
        # Set data interval
        intervals = self._break_time_range(self.start_time, self.end_time)
        print(intervals)

        # Get data for the interval
        for interval in intervals:
          data = self._get_data(interval[0], interval[1])
          data = sorted(data, key=lambda x: x[0])
          for row in data:
            event = self._transform_data(row)
            event_timestamp = event['Timestamp'].strftime(self.format_str)
            if not last or event_timestamp > self.start_time:
              self.producer.send(self.kafka_topic, key=event_timestamp, value=event).add_callback(self._on_send_success).add_errback(self._on_send_error)
              last = event_timestamp
              self.producer.flush()
        
        self.start_time = last

        # Wait for more data to be available
        time_diff = datetime.utcnow() - datetime.strptime(last, self.format_str)
        time_diff_sec = time_diff.total_seconds()

        if time_diff_sec < self.buffer:
          wait_time = self.buffer - time_diff_sec
          if wait_time > 0:
            time.sleep(wait_time)
          else:
            time.sleep(1)

        self.end_time = datetime.utcnow().strftime(self.format_str)
    except Exception as e:
      self.producer.close()
      raise RuntimeError(e)

  
  async def _async_feed_stream(self):

    if not self.aio_producer:
      self.aio_producer = aiokafka.AIOKafkaProducer(
        bootstrap_servers=self.kafka_server, 
        value_serializer=lambda v: json.dumps(v, default=str).encode('utf-8'),
        key_serializer=lambda k: k.encode('utf-8'),
        loop=asyncio.get_event_loop()
      )

    self.is_running = True
    last = None
    
    async with self.aio_producer:

      while self.is_running:
        # Set data interval
        intervals = self._break_time_range(self.start_time, self.end_time)

        futures = []
        timestamps = []

        # Get data for the interval
        for interval in intervals:
          data = self._get_data(interval[0], interval[1])
          for row in data:
            event = self._transform_data(row)
            event_timestamp = event['Timestamp'].strftime(self.format_str)
            max_timestamp = max(timestamps) if len(timestamps) > 0 else last
            if not last or event_timestamp > max_timestamp:
              future = self.aio_producer.send(self.kafka_topic, key=event_timestamp, value=event)
              futures.append(future)
              timestamps.append(event_timestamp)

        last = max(timestamps) if len(timestamps) > 0 else last
        self.start_time = last
        logging.info(self.start_time)

        await asyncio.gather(*futures)
        await self.aio_producer.flush()

        # Wait for more data to be available
        time_diff = datetime.utcnow() - datetime.strptime(last, self.format_str)
        time_diff_sec = time_diff.total_seconds()

        if time_diff_sec < self.buffer:
          wait_time = self.buffer - time_diff_sec
          if wait_time > 0:
            time.sleep(wait_time)
          else:
            time.sleep(1)

        self.end_time = datetime.utcnow().strftime(self.format_str)

  def start(self):
    if self.mode == 'sync':
      self._feed_stream()
    else:
      loop = asyncio.new_event_loop()
      asyncio.set_event_loop(loop)
      try:
        loop.run_until_complete(self._async_feed_stream())
      finally:
        loop.close()
    logging.info("Producer started.")

  def stop(self):
    self.is_running = False
    if self.mode == 'sync':
      self.producer.close()
    else:
      self.aio_producer.close()
    
  def status(self):
    return self.is_running

In [None]:
import requests as r

url = f'https://api.exchange.coinbase.com/products/BTC-USD/candles?granularity=60&start=2024-12-16 00:00:00&end=2024-12-16 01:18:45'
r.get(url).json()

In [None]:
CryptoProducer('btc', 'localhost:9094', '2024-12-16 00:00:00', None, 60, 'BTC-USD').start()