-
Notifications
You must be signed in to change notification settings - Fork 1
/
cloudevents_handler.py
179 lines (159 loc) 路 6.58 KB
/
cloudevents_handler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
# cloudevents_handler.py
import re
from datetime import datetime
from typing import Optional
from modules.messages.models import NATSMessage
# from modules.nats.publish_message import publish_message_to_jetstream
from modules.nats.nats_producer import NATSProducer
from modules.neo4j.neo4j_adapter import Neo4jAdapter
from modules.environment.settings import (
NATS_EMBEDDING_SUBJECT, NATS_EMBEDDING_STREAM
)
BOT_NAME = "threadr" # Ensure this matches the actual bot name used in messages
bot_nicknames = []
url_pattern = re.compile(r'https?://[^\s]+')
twitter_expansion_pattern = re.compile(r'\[.*twitter.com.*\]')
# username_pattern = re.compile(r'^(\w+):\s+')
# Update the pattern to match either a username or a user ID
# username_pattern = re.compile(r'(?:(\w+):\s+|<@(\d+)>)')
# username_pattern = re.compile(r'^(?:(\w+):\s+|<@(\d+)>)', re.M)
username_pattern = re.compile(r'(?<!//)(?<!:)(?<!@\w)(?<!http://)(?<!https://)(?<!ftp://)(?:(\w+):\s+|<@(\d+)>)', re.M)
def is_command(message: str) -> bool:
"""
Check if the message contains a command.
:param message:
:return:
"""
# Adjust the pattern to match the expected message format
command_pattern = re.compile(rf'^{re.escape(BOT_NAME)}:\s*(\w+)')
match = command_pattern.search(message)
if match:
return True
else:
return False
def extract_command_from_message(message: str) -> Optional[str]:
"""
Extract the command from the message.
:param message:
:return:
"""
command_pattern = re.compile(rf'^{re.escape(BOT_NAME)}:\s*(.*)')
match = command_pattern.search(message)
if match:
command = match.group(1).strip() # .strip() to remove any leading/trailing whitespace
return command
else:
return None
async def process_cloudevent(message_data: NATSMessage, neo4j_adapter: Neo4jAdapter, js):
"""
Process a message from a CloudEvent.
:param js:
:param message_data:
:param neo4j_adapter:
:return:
"""
if (message_data.nick in bot_nicknames or
url_pattern.search(message_data.message) or
twitter_expansion_pattern.search(message_data.message)):
print(f"Ignoring bot message or unwanted pattern from {message_data.nick}.")
return
# Decode Unicode escape sequences in the message
message_data.message = decode_message(message_data.message)
# Attempt to extract a mentioned user and relationship type from the message
mentioned_nick, relationship_type = extract_mentioned_nick(message_data.message)
print(f"Mentioned nick: {mentioned_nick}, Relationship type: {relationship_type}")
# If a specific user is mentioned, update the relationship and add the interaction
if mentioned_nick and relationship_type:
try:
# Add or update the interaction and relationship in Neo4j
message_id = await neo4j_adapter.add_interaction(
message_data.nick,
mentioned_nick,
message_data.message,
message_data.timestamp,
channel=message_data.channel,
platform=message_data.platform
)
await neo4j_adapter.add_or_update_relationship(
message_data.nick,
mentioned_nick,
relationship_type
)
print(
f"MSGID: {message_id} - Updated relationship and added interaction between {message_data.nick} and {mentioned_nick}.")
# Publish the message to Jetstream for embedding
message_data = {
"message_id": message_id,
"content": {
"response": message_data.message,
"channel": message_data.channel,
"timestamp": datetime.now().isoformat()
}
}
await js.publish_message(
subject=NATS_EMBEDDING_SUBJECT,
message_data=message_data
)
print(f"Published message ID {message_id} to Jetstream subject '{NATS_EMBEDDING_SUBJECT}'.")
return
except Exception as e:
print(f"Failed to update: {e}")
else:
if is_command(message_data.message):
command = extract_command_from_message(message_data.message)
if command:
print("Command found: ", command)
# await send_response_message(response_message, message_id, "outgoing", "results", message_data.channel)
else:
print("Command found but not recognized.")
print("Message: ", message_data.message)
await handle_generic_message(message_data, neo4j_adapter, js)
async def handle_generic_message(message_data: NATSMessage, neo4j_adapter: Neo4jAdapter, producer: NATSProducer):
"""
Handle a generic message that is not a command, log to Neo4j,
and publish to Jetstream for embedding.
:param producer:
:param message_data:
:param neo4j_adapter:
:return:
"""
# Example: Log the message to Neo4j and publish to Jetstream for embedding
message_id = await neo4j_adapter.add_message(
nick=message_data.nick,
message=message_data.message,
timestamp=message_data.timestamp,
channel=message_data.channel,
platform=message_data.platform
)
print(f"Added message with ID: {message_id}")
print(f"Publishing message to Jetstream for embedding: {message_data.message}")
message_data = {
"message_id": message_id,
"content": {
"response": message_data.message,
"channel": message_data.channel,
"timestamp": datetime.now().isoformat()
}
}
await producer.publish_message(
subject=NATS_EMBEDDING_SUBJECT,
message_data=message_data
)
print(f"Published message ID {message_id} to Jetstream subject '{NATS_EMBEDDING_SUBJECT}'.")
def extract_mentioned_nick(message):
"""
Extracts a mentioned user from the message, if present, while ensuring that the mentions are not part of a URL.
Parameters:
message (str): The message string to search for mentions.
Returns:
tuple: (mentioned_nick, "MENTIONED") if a mention is found, otherwise (None, None).
"""
match = username_pattern.search(message)
if match:
mentioned_nick = match.group(1) or match.group(2) # Group 1 for names, Group 2 for IDs
if mentioned_nick:
return mentioned_nick.strip(), "MENTIONED"
return None, None
def decode_message(message):
""" Decode Unicode escape sequences in a string. """
return bytes(message, "utf-8").decode("unicode_escape")