/
export.py
230 lines (169 loc) 路 7.23 KB
/
export.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
import argparse
import logging
import typing
from typing import List, Text, Optional
import rasa.cli.utils as cli_utils
import rasa.core.utils as rasa_core_utils
from rasa.cli.arguments import export as arguments
from rasa.constants import DOCS_URL_TRACKER_STORES, DOCS_URL_EVENT_BROKERS
from rasa.exceptions import (
PublishingError,
RasaException,
)
if typing.TYPE_CHECKING:
from rasa.core.brokers.broker import EventBroker
from rasa.core.brokers.pika import PikaEventBroker, PikaProducer
from rasa.core.tracker_store import TrackerStore
from rasa.core.exporter import Exporter
from rasa.core.utils import AvailableEndpoints
logger = logging.getLogger(__name__)
# noinspection PyProtectedMember
def add_subparser(
subparsers: argparse._SubParsersAction, parents: List[argparse.ArgumentParser]
) -> None:
shell_parser = subparsers.add_parser(
"export",
parents=parents,
conflict_handler="resolve",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
help="Export conversations using an event broker.",
)
shell_parser.set_defaults(func=export_trackers)
arguments.set_export_arguments(shell_parser)
def _get_tracker_store(endpoints: "AvailableEndpoints") -> "TrackerStore":
"""Get `TrackerStore` from `endpoints`.
Prints an error and exits if no tracker store could be loaded.
Args:
endpoints: `AvailableEndpoints` to initialize the tracker store from.
Returns:
Initialized tracker store.
"""
if not endpoints.tracker_store:
cli_utils.print_error_and_exit(
f"Could not find a `tracker_store` section in the supplied "
f"endpoints file. Instructions on how to configure a tracker store "
f"can be found here: {DOCS_URL_TRACKER_STORES}. "
f"Exiting. "
)
from rasa.core.tracker_store import TrackerStore
return TrackerStore.create(endpoints.tracker_store)
def _get_event_broker(endpoints: "AvailableEndpoints") -> Optional["EventBroker"]:
"""Get `EventBroker` from `endpoints`.
Prints an error and exits if no event broker could be loaded.
Args:
endpoints: `AvailableEndpoints` to initialize the event broker from.
Returns:
Initialized event broker.
"""
if not endpoints.event_broker:
cli_utils.print_error_and_exit(
f"Could not find an `event_broker` section in the supplied "
f"endpoints file. Instructions on how to configure an event broker "
f"can be found here: {DOCS_URL_EVENT_BROKERS}. Exiting."
)
from rasa.core.brokers.broker import EventBroker
return EventBroker.create(endpoints.event_broker)
def _get_requested_conversation_ids(
conversation_ids_arg: Optional[Text] = None,
) -> Optional[List[Text]]:
"""Get list of conversation IDs requested as a command-line argument.
Args:
conversation_ids_arg: Value of `--conversation-ids` command-line argument.
If provided, this is a string of comma-separated conversation IDs.
Return:
List of conversation IDs requested as a command-line argument.
`None` if that argument was left unspecified.
"""
if not conversation_ids_arg:
return None
return conversation_ids_arg.split(",")
def _assert_max_timestamp_is_greater_than_min_timestamp(
args: argparse.Namespace,
) -> None:
"""Inspect CLI timestamp parameters.
Prints an error and exits if a maximum timestamp is provided that is smaller
than the provided minimum timestamp.
Args:
args: Command-line arguments to process.
"""
min_timestamp = args.minimum_timestamp
max_timestamp = args.maximum_timestamp
if (
min_timestamp is not None
and max_timestamp is not None
and max_timestamp < min_timestamp
):
cli_utils.print_error_and_exit(
f"Maximum timestamp '{max_timestamp}' is smaller than minimum "
f"timestamp '{min_timestamp}'. Exiting."
)
def _prepare_event_broker(event_broker: "EventBroker") -> None:
"""Sets `should_keep_unpublished_messages` flag to `False` if
`self.event_broker` is a `PikaEventBroker`.
If publishing of events fails, the `PikaEventBroker` instance should not keep a
list of unpublished messages, so we can retry publishing them. This is because
the instance is launched as part of this short-lived export script, meaning the
object is destroyed before it might be published.
In addition, wait until the event broker reports a `ready` state.
"""
from rasa.core.brokers.pika import PikaEventBroker, PikaProducer
if isinstance(event_broker, (PikaEventBroker, PikaProducer)):
event_broker.should_keep_unpublished_messages = False
event_broker.raise_on_failure = True
if not event_broker.is_ready():
cli_utils.print_error_and_exit(
f"Event broker of type '{type(event_broker)}' is not ready. Exiting."
)
def export_trackers(args: argparse.Namespace) -> None:
"""Export events for a connected tracker store using an event broker.
Args:
args: Command-line arguments to process.
"""
_assert_max_timestamp_is_greater_than_min_timestamp(args)
endpoints = rasa_core_utils.read_endpoints_from_path(args.endpoints)
tracker_store = _get_tracker_store(endpoints)
event_broker = _get_event_broker(endpoints)
_prepare_event_broker(event_broker)
requested_conversation_ids = _get_requested_conversation_ids(args.conversation_ids)
from rasa.core.exporter import Exporter
exporter = Exporter(
tracker_store,
event_broker,
args.endpoints,
requested_conversation_ids,
args.minimum_timestamp,
args.maximum_timestamp,
)
try:
published_events = exporter.publish_events()
cli_utils.print_success(
f"Done! Successfully published {published_events} events 馃帀"
)
except PublishingError as e:
command = _get_continuation_command(exporter, e.timestamp)
cli_utils.print_error_and_exit(
f"Encountered error while publishing event with timestamp '{e}'. To "
f"continue where I left off, run the following command:"
f"\n\n\t{command}\n\nExiting."
)
except RasaException as e:
cli_utils.print_error_and_exit(str(e))
def _get_continuation_command(exporter: "Exporter", timestamp: float) -> Text:
"""Build CLI command to continue 'rasa export' where it was interrupted.
Called when event publishing stops due to an error.
Args:
exporter: Exporter object containing objects relevant for this export.
timestamp: Timestamp of the last event attempted to be published.
"""
# build CLI command command based on supplied timestamp and options
command = f"rasa export"
if exporter.endpoints_path is not None:
command += f" --endpoints {exporter.endpoints_path}"
command += f" --minimum-timestamp {timestamp}"
if exporter.maximum_timestamp is not None:
command += f" --maximum-timestamp {exporter.maximum_timestamp}"
if exporter.requested_conversation_ids:
command += (
f" --conversation-ids {','.join(exporter.requested_conversation_ids)}"
)
return command