/
message.py
707 lines (625 loc) · 23.9 KB
/
message.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
"""
The message module provides a low-level API for rendering chat messages.
"""
from __future__ import annotations
import datetime
import re
from contextlib import ExitStack
from dataclasses import dataclass
from functools import partial
from io import BytesIO
from tempfile import NamedTemporaryFile
from textwrap import indent
from typing import (
TYPE_CHECKING, Any, ClassVar, Dict, Iterable, List, Union,
)
from zoneinfo import ZoneInfo
import param
from ..io.resources import CDN_DIST
from ..io.state import state
from ..layout import Column, Row
from ..pane.base import PaneBase, ReplacementPane, panel as _panel
from ..pane.image import (
PDF, FileBase, Image, ImageBase,
)
from ..pane.markup import (
HTML, DataFrame, HTMLBasePane, Markdown,
)
from ..pane.media import Audio, Video
from ..param import ParamFunction
from ..viewable import Viewable
from ..widgets.base import Widget
from .icon import ChatCopyIcon, ChatReactionIcons
if TYPE_CHECKING:
from bokeh.document import Document
from bokeh.model import Model
from pyviz_comms import Comm
Avatar = Union[str, BytesIO, bytes, ImageBase]
AvatarDict = Dict[str, Avatar]
USER_LOGO = "🧑"
ASSISTANT_LOGO = "🤖"
SYSTEM_LOGO = "⚙️"
ERROR_LOGO = "❌"
GPT_3_LOGO = "https://upload.wikimedia.org/wikipedia/commons/thumb/0/04/ChatGPT_logo.svg/1024px-ChatGPT_logo.svg.png?20230318122128"
GPT_4_LOGO = "https://upload.wikimedia.org/wikipedia/commons/a/a4/GPT-4.png"
WOLFRAM_LOGO = "https://upload.wikimedia.org/wikipedia/commons/thumb/e/eb/WolframCorporateLogo.svg/1920px-WolframCorporateLogo.svg.png"
DEFAULT_AVATARS = {
# User
"client": USER_LOGO,
"customer": USER_LOGO,
"employee": USER_LOGO,
"human": USER_LOGO,
"person": USER_LOGO,
"user": USER_LOGO,
# Assistant
"agent": ASSISTANT_LOGO,
"ai": ASSISTANT_LOGO,
"assistant": ASSISTANT_LOGO,
"bot": ASSISTANT_LOGO,
"chatbot": ASSISTANT_LOGO,
"machine": ASSISTANT_LOGO,
"robot": ASSISTANT_LOGO,
# System
"system": SYSTEM_LOGO,
"exception": ERROR_LOGO,
"error": ERROR_LOGO,
# Human
"adult": "🧑",
"baby": "👶",
"boy": "👦",
"child": "🧒",
"girl": "👧",
"man": "👨",
"woman": "👩",
# Machine
"chatgpt": GPT_3_LOGO,
"gpt3": GPT_3_LOGO,
"gpt4": GPT_4_LOGO,
"dalle": GPT_4_LOGO,
"openai": GPT_4_LOGO,
"huggingface": "🤗",
"calculator": "🧮",
"langchain": "🦜",
"retriever": "📄",
"tool": "🛠️",
"translator": "🌐",
"wolfram": WOLFRAM_LOGO,
"wolfram alpha": WOLFRAM_LOGO,
# Llama
"llama": "🦙",
"llama2": "🐪",
}
@dataclass
class _FileInputMessage:
"""
A dataclass to hold the contents of a file input message.
Parameters
----------
contents : bytes
The contents of the file.
file_name : str
The name of the file.
mime_type : str
The mime type of the file.
"""
contents: bytes
file_name: str
mime_type: str
class ChatMessage(PaneBase):
"""
A widget for displaying chat messages with support for various content types.
This widget provides a structured view of chat messages, including features like:
- Displaying user avatars, which can be text, emoji, or images.
- Showing the user's name.
- Displaying the message timestamp in a customizable format.
- Associating reactions with messages and mapping them to icons.
- Rendering various content types including text, images, audio, video, and more.
Reference: https://panel.holoviz.org/reference/chat/ChatMessage.html
:Example:
>>> ChatMessage(object="Hello world!", user="New User", avatar="😊")
"""
avatar = param.ClassSelector(default="", class_=(str, BytesIO, bytes, ImageBase), doc="""
The avatar to use for the user. Can be a single character
text, an emoji, or anything supported by `pn.pane.Image`. If
not set, checks if the user is available in the
default_avatars mapping; else uses the first character of the
name.""")
avatar_lookup = param.Callable(default=None, doc="""
A function that can lookup an `avatar` from a user name. The
function signature should be `(user: str) -> Avatar`. If this
is set, `default_avatars` is disregarded.""")
css_classes = param.List(default=["chat-message"],doc="""
The CSS classes to apply to the widget.""")
default_avatars = param.Dict(default=DEFAULT_AVATARS, doc="""
A default mapping of user names to their corresponding avatars
to use when the user is specified but the avatar is. You can
modify, but not replace the dictionary.""")
max_width = param.Integer(default=1200, bounds=(0, None))
object = param.Parameter(allow_refs=False, doc="""
The message contents. Can be any Python object that panel can display.""")
reactions = param.List(doc="""
Reactions to associate with the message.""")
reaction_icons = param.ClassSelector(class_=ChatReactionIcons, doc="""
A mapping of reactions to their reaction icons; if not provided
defaults to `{"favorite": "heart"}`.""", allow_refs=False)
timestamp = param.Date(doc="""
Timestamp of the message. Defaults to the creation time.""")
timestamp_format = param.String(default="%H:%M", doc="The timestamp format.")
timestamp_tz = param.String(default=None, doc="""
The timezone to used for the creation timestamp; only applicable
if timestamp is not set. If None, tries to use pn.state.browser_info.timezone,
else, the default tz of datetime.datetime.now(); see `zoneinfo.available_timezones()`
for a list of valid timezones.
""")
show_avatar = param.Boolean(default=True, doc="""
Whether to display the avatar of the user.""")
show_user = param.Boolean(default=True, doc="""
Whether to display the name of the user.""")
show_timestamp = param.Boolean(default=True, doc="""
Whether to display the timestamp of the message.""")
show_reaction_icons = param.Boolean(default=True, doc="""
Whether to display the reaction icons.""")
show_copy_icon = param.Boolean(default=True, doc="""
Whether to display the copy icon.""")
renderers = param.HookList(doc="""
A callable or list of callables that accept the object and return a
Panel object to render the object. If a list is provided, will
attempt to use the first renderer that does not raise an
exception. If None, will attempt to infer the renderer
from the object."""
)
user = param.Parameter(default="User", doc="""
Name of the user who sent the message.""")
_stylesheets: ClassVar[List[str]] = [f"{CDN_DIST}css/chat_message.css"]
# Declares whether Pane supports updates to the Bokeh model
_updates: ClassVar[bool] = True
def __init__(self, object=None, **params):
self._exit_stack = ExitStack()
self.chat_copy_icon = ChatCopyIcon(
visible=False, width=15, height=15, css_classes=["copy-icon"]
)
if params.get("timestamp") is None:
tz = params.get("timestamp_tz")
if tz is not None:
tz = ZoneInfo(tz)
elif state.browser_info.timezone:
tz = ZoneInfo(state.browser_info.timezone)
params["timestamp"] = datetime.datetime.now(tz=tz)
reaction_icons = params.get("reaction_icons", {"favorite": "heart"})
if isinstance(reaction_icons, dict):
params["reaction_icons"] = ChatReactionIcons(
options=reaction_icons, width=15, height=15,
value=params.get('reactions', []),
)
self._internal = True
super().__init__(object=object, **params)
self.reaction_icons.link(self, value="reactions", bidirectional=True)
self.reaction_icons.visible = self.param.show_reaction_icons
if not self.avatar:
self._update_avatar()
self._build_layout()
def _build_layout(self):
self._left_col = left_col = Column(
self._render_avatar(),
max_width=60,
height=100,
css_classes=["left"],
stylesheets=self._stylesheets,
visible=self.param.show_avatar,
sizing_mode=None,
)
self.param.watch(self._update_avatar_pane, "avatar")
self._object_panel = self._create_panel(self.object)
self._update_chat_copy_icon()
self._center_row = Row(
self._object_panel,
self.reaction_icons,
css_classes=["center"],
stylesheets=self._stylesheets,
sizing_mode=None
)
self.param.watch(self._update_object_pane, "object")
self._user_html = HTML(
self.param.user, height=20, css_classes=["name"],
visible=self.param.show_user, stylesheets=self._stylesheets,
)
self._timestamp_html = HTML(
self.param.timestamp.rx().strftime(self.param.timestamp_format),
css_classes=["timestamp"],
visible=self.param.show_timestamp
)
self._right_col = right_col = Column(
Row(
self._user_html,
self.chat_copy_icon,
stylesheets=self._stylesheets,
sizing_mode="stretch_width",
),
self._center_row,
self._timestamp_html,
css_classes=["right"],
stylesheets=self._stylesheets,
sizing_mode=None
)
viewable_params = {
p: self.param[p] for p in self.param if p in Viewable.param
if p in Viewable.param and p != 'name'
}
viewable_params['stylesheets'] = self._stylesheets + self.param.stylesheets.rx()
self._composite = Row(left_col, right_col, **viewable_params)
def _get_obj_label(self, obj):
"""
Get the label for the object; defaults to specified object name;
if unspecified, defaults to the type name.
"""
label = obj.name
type_name = type(obj).__name__
# If the name is just type + ID, simply use type
# e.g. Column10241 -> Column
if label.startswith(type_name) or not label:
label = type_name
return label
def _serialize_recursively(
self,
obj: Any,
prefix_with_viewable_label: bool = True,
prefix_with_container_label: bool = True
) -> str:
"""
Recursively serialize the object to a string.
"""
if isinstance(obj, Iterable) and not isinstance(obj, str):
content = tuple(
self._serialize_recursively(
o,
prefix_with_viewable_label=prefix_with_viewable_label,
prefix_with_container_label=prefix_with_container_label
) for o in obj
)
if prefix_with_container_label:
if len(content) == 1:
return f"{self._get_obj_label(obj)}({content[0]})"
else:
indented_content = indent(",\n".join(content), prefix=" " * 4)
# outputs like:
# Row(
# 1,
# "str",
# )
return f"{self._get_obj_label(obj)}(\n{indented_content}\n)"
else:
# outputs like:
# (1, "str")
return f"({', '.join(content)})"
string = obj
if hasattr(obj, "value"):
string = obj.value
elif hasattr(obj, "object"):
string = obj.object
if hasattr(string, "decode") or isinstance(string, BytesIO):
self.param.warning(
f"Serializing byte-like objects are not supported yet; "
f"using the label of the object as a placeholder for {obj}"
)
return self._get_obj_label(obj)
if prefix_with_viewable_label and isinstance(obj, Viewable):
label = self._get_obj_label(obj)
string = f"{label}={string!r}"
return string
def __str__(self) -> str:
"""
Serialize the message object to a string.
"""
return self.serialize()
@property
def _synced_params(self) -> List[str]:
return []
def _get_model(
self, doc: Document, root: Model | None = None,
parent: Model | None = None, comm: Comm | None = None
) -> Model:
model = self._composite._get_model(doc, root, parent, comm)
ref = (root or model).ref['id']
self._models[ref] = (model, parent)
return model
@staticmethod
def _to_alpha_numeric(user: str) -> str:
"""
Convert the user name to an alpha numeric string,
removing all non-alphanumeric characters.
"""
return re.sub(r"\W+", "", user).lower()
def _avatar_lookup(self, user: str) -> Avatar:
"""
Lookup the avatar for the user.
"""
alpha_numeric_key = self._to_alpha_numeric(user)
# always use the default first
updated_avatars = DEFAULT_AVATARS.copy()
# update with the user input
updated_avatars.update(self.default_avatars)
# correct the keys to be alpha numeric
updated_avatars = {
self._to_alpha_numeric(key): value for key, value in updated_avatars.items()
}
# now lookup the avatar
return updated_avatars.get(alpha_numeric_key, self.avatar)
def _select_renderer(
self,
contents: Any,
mime_type: str,
):
"""
Determine the renderer to use based on the mime type.
"""
renderer = _panel
if mime_type == "application/pdf":
contents = self._exit_stack.enter_context(BytesIO(contents))
renderer = partial(PDF, embed=True)
elif mime_type.startswith("audio/"):
file = self._exit_stack.enter_context(
NamedTemporaryFile(suffix=".mp3", delete=False)
)
file.write(contents)
file.seek(0)
contents = file.name
renderer = Audio
elif mime_type.startswith("video/"):
contents = self._exit_stack.enter_context(BytesIO(contents))
renderer = Video
elif mime_type.startswith("image/"):
contents = self._exit_stack.enter_context(BytesIO(contents))
renderer = Image
elif mime_type.endswith("/csv"):
import pandas as pd
with BytesIO(contents) as buf:
contents = pd.read_csv(buf)
renderer = DataFrame
elif mime_type.startswith("text"):
if isinstance(contents, bytes):
contents = contents.decode("utf-8")
return contents, renderer
def _set_params(self, obj, **params):
"""
Set the sizing mode and height of the object.
"""
if hasattr(obj, "objects"):
params['css_classes'] = (
[css for css in obj.stylesheets if css not in self._stylesheets] +
self._stylesheets
)
for subobj in obj.objects:
self._set_params(subobj)
obj.param.update(params)
return
is_markup = isinstance(obj, HTMLBasePane) and not isinstance(obj, FileBase)
if is_markup:
if len(str(obj.object)) > 0: # only show a background if there is content
params['css_classes'] = [*(css for css in obj.css_classes if css != "message"), "message"]
params['sizing_mode'] = None
else:
if obj.sizing_mode is None and not obj.width:
params['sizing_mode'] = "stretch_width"
if obj.height is None and not isinstance(obj, ParamFunction):
params['height'] = 500
obj.param.update(params)
@staticmethod
def _is_widget_renderer(renderer):
return isinstance(renderer, type) and issubclass(renderer, Widget)
def _create_panel(self, value, old=None):
"""
Create a panel object from the value.
"""
if isinstance(value, Viewable):
self._internal = False
return value
renderer = None
if isinstance(value, _FileInputMessage):
contents = value.contents
mime_type = value.mime_type
value, renderer = self._select_renderer(contents, mime_type)
else:
try:
import magic
mime_type = magic.from_buffer(value, mime=True)
value, renderer = self._select_renderer(value, mime_type)
except Exception:
pass
renderers = self.renderers.copy() or []
if renderer is not None:
renderers.append(renderer)
for renderer in renderers:
try:
if self._is_widget_renderer(renderer):
object_panel = renderer(value=value)
else:
object_panel = renderer(value)
if isinstance(object_panel, Viewable):
break
except Exception:
pass
else:
if isinstance(old, Markdown) and isinstance(value, str):
self._set_params(old, object=value)
return old
object_panel = _panel(value)
self._set_params(object_panel)
if type(old) is type(object_panel) and self._internal:
ReplacementPane._recursive_update(old, object_panel)
return object_panel
self._internal = True
return object_panel
def _render_avatar(self) -> HTML | Image:
"""
Render the avatar pane as some HTML text or Image pane.
"""
avatar = self.avatar
if not avatar and self.user:
avatar = self.user[0]
avatar_params = {'css_classes': ["avatar"]}
if isinstance(avatar, ImageBase):
avatar_pane = avatar
avatar_params['css_classes'] = (
avatar_params.get('css_classes', []) +
avatar_pane.css_classes
)
avatar_params.update(width=35, height=35)
avatar_pane.param.update(avatar_params)
elif not isinstance(avatar, (BytesIO, bytes)) and len(avatar) == 1:
# single character
avatar_pane = HTML(avatar, **avatar_params)
else:
try:
avatar_pane = Image(
avatar, width=35, height=35, **avatar_params
)
except ValueError:
# likely an emoji
avatar_pane = HTML(avatar, **avatar_params)
return avatar_pane
def _update_avatar_pane(self, event=None):
new_avatar = self._render_avatar()
old_type = type(self._left_col[0])
new_type = type(new_avatar)
if isinstance(event.old, (HTML, ImageBase)) or new_type is not old_type:
self._left_col[:] = [new_avatar]
else:
params = new_avatar.param.values()
del params['name']
self._left_col[0].param.update(**params)
def _update(self, ref, old_models):
"""
Internals will be updated inplace.
"""
def _update_object_pane(self, event=None):
old = self._object_panel
self._object_panel = new = self._create_panel(self.object, old=old)
if old is not new:
self._center_row[0] = new
self._update_chat_copy_icon()
@param.depends("avatar_lookup", "user", watch=True)
def _update_avatar(self):
"""
Update the avatar based on the user name.
We do not use on_init here because if avatar is set,
we don't want to override the provided avatar.
However, if the user is updated, we want to update the avatar.
"""
if self.avatar_lookup:
self.avatar = self.avatar_lookup(self.user)
else:
self.avatar = self._avatar_lookup(self.user)
def _update_chat_copy_icon(self):
object_panel = self._object_panel
if isinstance(object_panel, HTMLBasePane):
object_panel = object_panel.object
elif isinstance(object_panel, Widget):
object_panel = object_panel.value
if isinstance(object_panel, str) and self.show_copy_icon:
self.chat_copy_icon.value = object_panel
self.chat_copy_icon.visible = True
else:
self.chat_copy_icon.value = ""
self.chat_copy_icon.visible = False
def _cleanup(self, root=None) -> None:
"""
Cleanup the exit stack.
"""
self._composite._cleanup(root)
return super()._cleanup(root)
def stream(self, token: str):
"""
Updates the message with the new token traversing the object to
allow updating nested objects. When traversing a nested Panel
the last object that supports rendering strings is updated, e.g.
in a layout of `Column(Markdown(...), Image(...))` the Markdown
pane is updated.
Arguments
---------
token: str
The token to stream to the text pane.
"""
i = -1
parent_panel = None
object_panel = self
attr = "object"
obj = self.object
while not isinstance(obj, str) or isinstance(object_panel, ImageBase):
object_panel = obj
if hasattr(obj, "objects"):
parent_panel = obj
attr = "objects"
obj = obj.objects[i]
i = -1
elif hasattr(obj, "object"):
attr = "object"
obj = obj.object
elif hasattr(obj, "value"):
attr = "value"
obj = obj.value
elif parent_panel is not None:
obj = parent_panel
parent_panel = None
i -= 1
setattr(object_panel, attr, obj + token)
def update(
self,
value: dict | ChatMessage | Any,
user: str | None = None,
avatar: str | bytes | BytesIO | None = None,
):
"""
Updates the message with a new value, user and avatar.
Arguments
---------
value : ChatMessage | dict | Any
The message contents to send.
user : str | None
The user to send as; overrides the message message's user if provided.
avatar : str | bytes | BytesIO | None
The avatar to use; overrides the message message's avatar if provided.
"""
updates = {}
if isinstance(value, dict):
updates.update(value)
if user:
updates["user"] = user
if avatar:
updates["avatar"] = avatar
elif isinstance(value, ChatMessage):
if user is not None or avatar is not None:
raise ValueError(
"Cannot set user or avatar when explicitly sending "
"a ChatMessage. Set them directly on the ChatMessage."
)
updates = value.param.values()
else:
updates["object"] = value
self.param.update(**updates)
def serialize(
self,
prefix_with_viewable_label: bool = True,
prefix_with_container_label: bool = True
) -> str:
"""
Format the object to a string.
Arguments
---------
obj : Any
The object to format.
prefix_with_viewable_label : bool
Whether to include the name of the Viewable, or type
of the viewable if no name is specified.
prefix_with_container_label : bool
Whether to include the name of the container, or type
of the container if no name is specified.
Returns
-------
str
The serialized string.
"""
return self._serialize_recursively(
self.object,
prefix_with_viewable_label=prefix_with_viewable_label,
prefix_with_container_label=prefix_with_container_label,
)