/
process_podcast_rss.py
160 lines (128 loc) · 6.04 KB
/
process_podcast_rss.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
from __future__ import print_function
import json
import os
import boto3
from urllib.request import urlopen
from urllib.error import URLError, HTTPError
import xml.etree.ElementTree as ET
import logging
from dateutil import parser
from common_lib import find_duplicate_person, id_generator
client = boto3.client('comprehend')
# Log level
logging.basicConfig()
logger = logging.getLogger()
if os.getenv('LOG_LEVEL') == 'DEBUG':
logger.setLevel(logging.DEBUG)
else:
logger.setLevel(logging.INFO)
class InvalidInputError(ValueError):
pass
# Entry point for the lambda function
def lambda_handler(event, context):
logger.info("Received event: " + json.dumps(event, indent=2))
feed_url = event['rss']
max_episodes_to_process = None
if 'maxEpisodesToProcess' in event:
max_episodes_to_process = int(event['maxEpisodesToProcess'])
maxConcurrentEpisodes = 10
# Open the url and process the RSS feed
retval = []
bucket = os.environ['BUCKET_NAME']
episode_count = 0
# This array holds the entity types that are included in the custom vocabulary
vocabularyTypes = ['COMMERCIAL_ITEM', 'EVENT', 'LOCATION', 'ORGANIZATION', 'TITLE']
vocabularyItems = []
try:
filename = '/tmp/' + id_generator() + '.rss'
# HTTP GET the RSS feed XML file
f = urlopen(feed_url)
# Open our local file for writing
with open(filename, "wb") as local_file:
local_file.write(f.read())
# The RSS feed is an XML file, so parse it and traverse the tree and pull all the /channel/items
tree = ET.parse(filename)
root = tree.getroot()
# Extract the title of the podcast
channelTitle = root.find('channel/title')
for child in root.findall('channel/item'):
title = child.find('title')
envelope = child.find('enclosure')
date_entry = child.find('pubDate').text
dt = parser.parse(date_entry)
date_string = dt.strftime("%Y:%m:%d %H:%M:%S")
keywords = []
description = child.find('description').text
description = description[0:4900]
comprehendResponse = client.detect_entities(Text=description, LanguageCode='en')
# we estimate the number of speakers in the podcast by parsing people names from the episode summary
speaker_list = []
for i in range(len(comprehendResponse["Entities"])):
entity = comprehendResponse["Entities"][i]
# For every person mentioned in the description, increment the number of
# speakers. This is making the assumption that the episode text will
# mention all the speakers and not include mentions to people that
# are not in the podcast.
# Is isn't critical that this number is correct, it is simply used to break
# up the body of the podcast into smaller chunks. If the speaker detection
# is inaccurate, it doesn't have a major impact on the functionality of
# the system.
if entity['Type'] == 'PERSON':
if not entity['Text'].startswith('@'):
speaker_list.append(entity['Text'])
else:
logger.info(f'skipping person {entity["Text"]}')
# add to vocabulary if not already in there
if entity['Type'] in vocabularyTypes and not entity['Text'] in vocabularyItems:
cleanText = entity['Text'].replace('@', '')
cleanText = cleanText.replace('.', '')
if cleanText:
vocabularyItems.append(cleanText)
duplicates = find_duplicate_person(speaker_list)
for d in duplicates:
speaker_list.remove(d)
num_speakers = len(speaker_list)
# If there is an envelope, the link will point to an audio file
if envelope != None:
episode_url = envelope.attrib['url']
file_type = envelope.attrib["type"]
episode_count += 1
episode = {
'Episode': title.text,
'PodcastName': channelTitle.text,
'podcastUrl': episode_url,
'audioType': file_type,
'tags': keywords,
'speakers': num_speakers,
'speakerNames': speaker_list,
'status': 'PENDING',
'publishedTime': date_string,
'summary': description,
'sourceFeed': feed_url
}
logger.debug(json.dumps(episode, indent=2))
if "dryrun" in event:
episode["dryrun"] = event["dryrun"]
# Add this item to the collection
retval.append(episode)
if max_episodes_to_process is not None and episode_count >= max_episodes_to_process:
break
# handle errors
except HTTPError as e:
print("HTTP Error:", e.code, feed_url)
raise InvalidInputError("Unable to download RSS feed: " + feed_url)
except URLError as e:
print("URL Error:", e.reason, feed_url)
raise InvalidInputError("Unable to download RSS feed: " + feed_url)
logger.info(json.dumps(retval, indent=2))
# This connection can be pretty big and exceed the capacity of the Step Function state data, so we store it
# in S3 instead and return a link to the S3 file.
s3_client = boto3.client('s3')
key = 'podcasts/episodelist/' + id_generator() + '.json'
response = s3_client.put_object(
Body=json.dumps({"maxConcurrentEpisodes": maxConcurrentEpisodes, "episodes": retval}, indent=2), Bucket=bucket,
Key=key)
event['episodes'] = {"status": 'RUNNING', "remainingEpisodes": episode_count, "bucket": bucket, "key": key}
event['customVocabulary'] = vocabularyItems
# Return the link to the episode JSON document and the custom vocabulary items.
return event