-
Notifications
You must be signed in to change notification settings - Fork 0
/
parse_captions.py
261 lines (222 loc) · 9.04 KB
/
parse_captions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
import argparse
import collections
import csv
import datetime
import json
from io import StringIO
import pprint
import re
import sys
import jellyfish
import webvtt
Block = collections.namedtuple(
"Block", ["start", "end", "duration", "speaker", "speech"]
)
PLACEHOLDER_DATE = datetime.date(1970, 1, 1)
UNKNOWN_SPEAKER = "UNKNOWN"
def calc_duration(start_time, end_time):
start_time = datetime.datetime.combine(
PLACEHOLDER_DATE, datetime.time.fromisoformat(start_time)
)
end_time = datetime.datetime.combine(
PLACEHOLDER_DATE, datetime.time.fromisoformat(end_time)
)
return (end_time - start_time).total_seconds()
def get_closest_match(query, knowns):
"""For a string 'query', find which string in the 'knowns' list has the lowest
levenshtein edit distance."""
(lowest_score, best_candidate) = min(
(jellyfish.levenshtein_distance(query, candidate), candidate)
for candidate in knowns
)
return best_candidate
def get_speech_blocks(captions, no_infer_speakers, known_speakers):
# Implemented as a closure rather than a standalone function so we
# can cache results in "speaker_map". We'll only run the
# levenshtein distance checks when we encounter a typo we haven't
# seen before!
speaker_map = {s: s for s in known_speakers}
blocks = []
def infer_speaker(speaker):
if no_infer_speakers:
return speaker
if speaker not in speaker_map:
inferred_speaker = get_closest_match(speaker, known_speakers)
speaker_map[speaker] = inferred_speaker
return inferred_speaker
else:
return speaker_map[speaker]
last_line = ""
start_time = None
end_time = None
duration = 0
current_speech = ""
speaker = UNKNOWN_SPEAKER
for idx, caption in enumerate(captions):
# Apparently CTN's webvtt caption text are terminated in null bytes
line = caption.text.strip("\r\n \u0000")
# CTN also repeats captions, verbatim, quite frequently. You
# can actually see this if you watch the video stream -
# there's often two lines of captions overlayed on the video;
# the top line is the previous text and the bottom line is the
# newest (kindof like we're scrolling through a document on a
# two-line screen). The WebVTT file contains raw instructions
# on what text to display when, so this means those lines
# actually are duplicated in the file. We want to ignore any
# lines' second occurrences, both to construct transcripts and
# also to better-approximate speech durations.
if line == last_line:
continue
last_line = line
# CTN's convention appears to be that ">>" indicates a new
# speaker. These are entered by hand, so be warned that
# accuracy is not perfect.
if line.startswith(">>"):
# Record the previous speech "block" and start a new one
if start_time is not None:
blocks.append(
Block(start_time, end_time, duration, speaker, current_speech)
)
current_speech = line
start_time = caption.start
end_time = caption.end
duration = 0
# For the "regular cast" (councilmembers, mayor, attorney,
# administrator), CTN will also insert the name of the
# speaker after a ">>". For everybody else, they don't
# make any attempt to identify who's talking.
if ":" in line:
# Typos are fairly common in speaker names, so we'll
# try to correct them
speaker = infer_speaker(line[2:].split(":")[0].strip().lower())
else:
speaker = UNKNOWN_SPEAKER
else:
# Append the line to the existing speech block, and extend
# the end time to the end time of the latest caption
current_speech += " " + line
end_time = caption.end
# We're calculating the "duration" of a speech block as the
# total amount of time captions associated with a speaker
# appear on the screen (except, excluding duplicates as noted
# above). This is distinct from just doing "end time - start
# time" on the assumption that maybe, if there's a long pause
# in the middle of a speech, the captions will go away for a
# while and we won't attribute silence as somebody's speaking
# time. I don't know if this actually happens / makes any
# difference in practice.
duration += calc_duration(caption.start, caption.end)
# Record the final "block"
if start_time is not None:
blocks.append(Block(start_time, end_time, duration, speaker, current_speech))
return blocks, speaker_map
def preprocess(webvtt_fp):
"""The "webvtt" module doesn't like something in the header content on CTN's
VTT files, so here's a hack to just skip all the headers and return only the
caption content itself, starting from the first timestamp'ed line."""
timestamp_pattern = re.compile(r"^\d\d\:\d\d\:\d\d\.\d\d\d")
# go line-by-line until we find a line starting with a timestamp
file_pos = 0
while True:
line = webvtt_fp.readline()
if not line:
# End-of-file!
raise Exception("No timestamp-like lines found!")
if timestamp_pattern.match(line):
break
file_pos = webvtt_fp.tell()
# go back to the beginning of that line
webvtt_fp.seek(file_pos)
# read the rest of the file, and prepend the magic "WEBVTT" header!
return "WEBVTT\r\n\r\n" + webvtt_fp.read()
class ParsedCaptions(object):
def __init__(self, blocks, speaker_map):
self.blocks = blocks
self.speaker_map = speaker_map
def get_transcript(self):
return "\n".join("{}: {}".format(b.start, b.speech) for b in self.blocks)
def get_speaker_times(self):
speaker_times = {}
for block in self.blocks:
speaker_times[block.speaker] = (
speaker_times.get(block.speaker, 0) + block.duration
)
return speaker_times
def parse(captions_fp, no_infer_speakers=True, known_speakers=None):
if known_speakers is None:
known_speakers = []
known_speakers = known_speakers + [UNKNOWN_SPEAKER]
content = preprocess(captions_fp)
captions = webvtt.read_buffer(StringIO(content))
blocks, speaker_map = get_speech_blocks(captions, no_infer_speakers, known_speakers)
return ParsedCaptions(blocks, speaker_map)
def main():
parser = argparse.ArgumentParser()
parser.add_argument("captions_file")
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument(
"--check-speakers",
action="store_true",
help="Show inferred spelling corrections for unknown speakers",
)
group.add_argument(
"--get-transcript",
action="store_true",
help="Output a reconstructed text transcript of all the captions",
)
group.add_argument(
"--get-speaker-times",
action="store_true",
help="Calculate approximate total speaking times for each speaker",
)
group.add_argument(
"--get-blocks",
action="store_true",
help=("Output raw information about all reconstructed speech blocks"),
)
parser.add_argument(
"--no-infer-speakers",
action="store_true",
help=(
"Don't attempt to correct typos by matching speaker names "
"against a fixed set of known speakers."
),
)
parser.add_argument(
"--speaker-list-file",
help=(
"Text file containing a list of known speaker names. Must be "
'lowercase, one per line. (default: "known_speakers.txt")'
),
default="known_speakers.txt",
)
parser.add_argument(
"--output-format",
choices=["json", "csv"],
help=("Output format for --get-speaker-times"),
default="json",
)
args = parser.parse_args()
with open(args.speaker_list_file, "r") as known_speakers_fp:
known_speakers = [line.strip() for line in known_speakers_fp if line.strip()]
with open(args.captions_file, "r") as captions_fp:
captions = parse(captions_fp, args.no_infer_speakers, known_speakers)
if args.get_transcript:
print(captions.get_transcript())
elif args.get_speaker_times:
speaker_times = captions.get_speaker_times()
if args.output_format == "json":
print(json.dumps(speaker_times, indent=4, sort_keys=True))
else:
w = csv.writer(sys.stdout)
w.writerow(["speaker", "time_in_seconds"])
for s in sorted(speaker_times.keys()):
w.writerow([s, speaker_times[s]])
elif args.get_blocks:
pprint.pprint(captions.blocks)
elif args.check_speakers:
for k, v in captions.speaker_map.items():
if k != v:
print("Inferred {} -> {}".format(k, v))
if __name__ == "__main__":
main()