/
Kafka Producer
84 lines (70 loc) · 3.46 KB
/
Kafka Producer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
####################################ячейка в Google Colab №1 - установка и импорт библиотек###########################################
#установка библиотек
!pip install kafka-python
!pip install faker
#импорт модулей
import json
import random
from datetime import datetime
import time
from time import sleep
from kafka import KafkaProducer
# Импорт модуля faker
from faker import Faker
####################################ячейка в Google Colab №2 - публикация сообщений в Kafka###########################################
# объявление продюсера Kafka
producer = KafkaProducer(
bootstrap_servers=['........здесь взять название своего сервера.upstash.io:9092'],
sasl_mechanism='SCRAM-SHA-256',
security_protocol='SASL_SSL',
sasl_plain_username='.........здесь имя пользователя, взятое из upstash',
sasl_plain_password='..........здесь пароль для этого пользователя',
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
topic='InputsTopic' # название вашего топика
# Создание объекта Faker с использованием провайдера адресов для России
fake = Faker()
#списки полей в заявке
products = ['bred', 'garlic', 'oil', 'apples', 'water', 'soup', 'dress', 'tea', 'cacao', 'coffee', 'bananas', 'butter', 'eggs', 'oatmeal']
questions = ['payment', 'delivery', 'discount', 'vip', 'staff']
#бесконечный цикл публикации данных
while True:
#подготовка данных для публикации в JSON-формате
now=datetime.now()
id=now.strftime("%m/%d/%Y %H:%M:%S")
content = ''
theme = ''
corp = 0
part = 0
#подготовка списка возможных ключей маршрутизации (routing keys)
corp = random.choice([1,0])
if corp==1 :
name=fake.company()
else:
name=fake.name()
#случайный выбор предмета обращения
subject=random.choice( ['app', 'question'])
#добавление дополнительных данных для заголовка и тела сообщения в зависимости от темы заявки
if subject=='question':
theme = random.choice(questions)
content = 'Hello, I have a question about ' + theme
part=0 #все вопросы записывать в раздел 0
else :
theme ='app'
content = random.choice(products) + ' ' + str(random.randint(0,100))
if corp==1 :
part=1 #все корпоративные заявки записывать в раздел 1
else:
part=2 #заявки от частных лиц записывать в раздел 2
#создаем полезную нагрузку в JSON
data = {'id': id, 'name': name, 'subject': subject, 'content': content}
#публикуем данные в Kafka
future = producer.send(topic, value=data, partition=part)
record_metadata = future.get(timeout=60)
print(f' [x] Sent {record_metadata}')
print(f' [x] Payload {data}')
#повтор через 3 секунды
time.sleep(3)
####################################ячейка в Google Colab №3 - закрытие соединения###########################################
#Закрываем соединения
producer.close()