Data is from this dataset:
https://www.kaggle.com/datasets/igorbalteiro/gps-data-from-rio-de-janeiro-buses

In [1]:
import requests
from datetime import datetime
import json
from itertools import islice

In [2]:
base_uri = "http://localhost:8083"
topic = "jetstream_example"

In [4]:
with open("/home/damion/Downloads/bus_gps_data/treatedGPSDataWithoutEmptyLinesSeparateColumns.csv") as f:
    header = f.readline()
    lines = [f.readline() for _ in range(100_000)]

In [5]:
def process(csvline: str):
    date, time, order, line, latitude, longitude, speed = csvline.strip("\n").replace('"', "").split(",")

    month, day, year = date.split("-")
    latitude = float(latitude)
    longitude = float(longitude)
    speed = float(speed)
    line = line.replace(".0", "")
    msg = {
        "postime": datetime.fromisoformat(f"{year}-{month}-{day}T{time}").isoformat(),
        "order": order,
        "line": line,
        "lon": longitude,
        "lat": latitude,
        "speed": speed
    }
    return msg

In [6]:
processed = [process(l) for l in lines]

In [7]:
processed

[{'postime': '2019-01-25T00:03:05',
  'order': 'B27178',
  'line': '639',
  'lon': -43.301788,
  'lat': -22.816891,
  'speed': 0.0},
 {'postime': '2019-01-25T00:07:29',
  'order': 'D13324',
  'line': '731',
  'lon': -43.357971,
  'lat': -22.87425,
  'speed': 37.0},
 {'postime': '2019-01-25T00:07:59',
  'order': 'D13324',
  'line': '731',
  'lon': -43.35767,
  'lat': -22.880329,
  'speed': 37.0},
 {'postime': '2019-01-25T00:17:31',
  'order': 'D13150',
  'line': 'SV790',
  'lon': -43.348221,
  'lat': -22.858509,
  'speed': 59.0},
 {'postime': '2019-01-25T00:19:26',
  'order': 'D13195',
  'line': '756',
  'lon': -43.552929,
  'lat': -22.8836,
  'speed': 0.0},
 {'postime': '2019-01-25T00:30:08',
  'order': 'D53551',
  'line': '759',
  'lon': -43.496078,
  'lat': -22.883551,
  'speed': 0.0},
 {'postime': '2019-01-25T00:45:17',
  'order': 'B44535',
  'line': '685',
  'lon': -43.355808,
  'lat': -22.861271,
  'speed': 0.0},
 {'postime': '2019-01-25T01:54:12',
  'order': 'C47970',
  'line': '

In [110]:
for i in range(0, 100_000, 5_000):
    print(f"{i} -> {i+5000}")
    res = requests.post(
        url=f"{base_uri}/topics/jetstream_example",
        data=json.dumps(
            {"records": [{"value": x, "partition": 0}  for x in processed[i:i+5000]]}
        ),
        headers={"Content-Type": "application/vnd.kafka.json.v2+json"}).json()
    print(res)

0 -> 5000
{'offsets': [{'partition': 0, 'offset': 0}]}
5000 -> 10000
{'offsets': [{'partition': 0, 'offset': 5000}]}
10000 -> 15000
{'offsets': [{'partition': 0, 'offset': 10000}]}
15000 -> 20000
{'offsets': [{'partition': 0, 'offset': 15000}]}
20000 -> 25000
{'offsets': [{'partition': 0, 'offset': 20000}]}
25000 -> 30000
{'offsets': [{'partition': 0, 'offset': 25000}]}
30000 -> 35000
{'offsets': [{'partition': 0, 'offset': 30000}]}
35000 -> 40000
{'offsets': [{'partition': 0, 'offset': 35000}]}
40000 -> 45000
{'offsets': [{'partition': 0, 'offset': 40000}]}
45000 -> 50000
{'offsets': [{'partition': 0, 'offset': 45000}]}
50000 -> 55000
{'offsets': [{'partition': 0, 'offset': 50000}]}
55000 -> 60000
{'offsets': [{'partition': 0, 'offset': 55000}]}
60000 -> 65000
{'offsets': [{'partition': 0, 'offset': 60000}]}
65000 -> 70000
{'offsets': [{'partition': 0, 'offset': 65000}]}
70000 -> 75000
{'offsets': [{'partition': 0, 'offset': 70000}]}
75000 -> 80000
{'offsets': [{'partition': 0, 'offse

In [62]:
%pip install kafka-python

[0mCollecting kafka-python
  Downloading kafka_python-2.0.2-py2.py3-none-any.whl (246 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m246.5/246.5 kB[0m [31m2.2 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[0mInstalling collected packages: kafka-python
[0mSuccessfully installed kafka-python-2.0.2
[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.0.1[0m[39;49m -> [0m[32;49m23.2.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


In [121]:
from kafka import KafkaConsumer, TopicPartition
import json
import uuid
from kafka import KafkaAdminClient

class ChatConsumer:
  def __init__(self, brokers, topic, group_id=None):
    if group_id is None:
      group_id = str(uuid.uuid4())
    self.consumer = KafkaConsumer(
      topic,
      auto_offset_reset='earliest',
      group_id="foo",
      bootstrap_servers=brokers,
      value_deserializer=lambda m: json.loads(m.decode("utf-8")),
    )
  def print_messages(self):
    for msg in self.consumer:
      print(f"{msg.value['user']}: {msg.value['message']}")
  def close(self):
    self.consumer.close()

In [119]:
REDPANDA_BROKERS="redpanda-0.customredpandadomain.local:31092,redpanda-1.customredpandadomain.local:31092,redpanda-2.customredpandadomain.local:31092"

In [120]:
con = ChatConsumer(REDPANDA_BROKERS, "jetstream_example")

In [134]:
con.consumer.poll()

{TopicPartition(topic='jetstream_example', partition=0): [ConsumerRecord(topic='jetstream_example', partition=0, offset=10000, timestamp=1696336545970, timestamp_type=0, key=b'', value={'postime': '2019-01-25T08:59:19', 'order': 'C30286', 'line': '954', 'lon': -43.478828, 'lat': -23.030331, 'speed': 54.0}, headers=[], checksum=None, serialized_key_size=0, serialized_value_size=110, serialized_header_size=-1),
  ConsumerRecord(topic='jetstream_example', partition=0, offset=10001, timestamp=1696336545970, timestamp_type=0, key=b'', value={'postime': '2019-01-25T08:59:19', 'order': 'C30305', 'line': '557', 'lon': -43.33334, 'lat': -22.979469, 'speed': 0.0}, headers=[], checksum=None, serialized_key_size=0, serialized_value_size=108, serialized_header_size=-1),
  ConsumerRecord(topic='jetstream_example', partition=0, offset=10002, timestamp=1696336545970, timestamp_type=0, key=b'', value={'postime': '2019-01-25T08:59:19', 'order': 'C30319', 'line': '557', 'lon': -43.185478, 'lat': -22.9707

In [130]:
con.print_messages()

KeyError: 'user'