-
Notifications
You must be signed in to change notification settings - Fork 0
/
degrees.py
169 lines (139 loc) · 4.8 KB
/
degrees.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
import csv
import sys
from util import Node, StackFrontier, QueueFrontier
# Maps names to a set of corresponding person_ids
names = {}
# Maps person_ids to a dictionary of: name, birth, movies (a set of movie_ids)
people = {}
# Maps movie_ids to a dictionary of: title, year, stars (a set of person_ids)
movies = {}
def load_data(directory):
"""
Load data from CSV files into memory.
"""
# Load people
with open(f"{directory}/people.csv", encoding="utf-8") as f:
reader = csv.DictReader(f)
for row in reader:
people[row["id"]] = {
"name": row["name"],
"birth": row["birth"],
"movies": set(),
}
if row["name"].lower() not in names:
names[row["name"].lower()] = {row["id"]}
else:
names[row["name"].lower()].add(row["id"])
# Load movies
with open(f"{directory}/movies.csv", encoding="utf-8") as f:
reader = csv.DictReader(f)
for row in reader:
movies[row["id"]] = {
"title": row["title"],
"year": row["year"],
"stars": set(),
}
# Load stars
with open(f"{directory}/stars.csv", encoding="utf-8") as f:
reader = csv.DictReader(f)
for row in reader:
try:
people[row["person_id"]]["movies"].add(row["movie_id"])
movies[row["movie_id"]]["stars"].add(row["person_id"])
except KeyError:
pass
def main():
if len(sys.argv) > 2:
sys.exit("Usage: python degrees.py [directory]")
directory = sys.argv[1] if len(sys.argv) == 2 else "large"
# Load data from files into memory
print("Loading data...")
load_data(directory)
print("Data loaded.")
source = person_id_for_name(input("Name: "))
if source is None:
sys.exit("Person not found.")
target = person_id_for_name(input("Name: "))
if target is None:
sys.exit("Person not found.")
path = shortest_path(source, target)
if path is None:
print("Not connected.")
else:
degrees = len(path)
print(f"{degrees} degrees of separation.")
path = [(None, source)] + path
for i in range(degrees):
person1 = people[path[i][1]]["name"]
person2 = people[path[i + 1][1]]["name"]
movie = movies[path[i + 1][0]]["title"]
print(f"{i + 1}: {person1} and {person2} starred in {movie}")
def shortest_path(source, target):
"""
Returns the shortest list of (movie_id, person_id) pairs
that connect the source to the target.
If no possible path, returns None.
"""
# Initialize frontier at the source
start = Node(state=(None, source), parent=None, action=None)
frontier = QueueFrontier()
frontier.add(start)
# Initialize an empty explored set
explored = set()
while True:
if frontier.empty():
return None
# Choose a node from the frontier
node = frontier.remove()
# Return the solution if state equals the target
if node.state[1] == target:
shortest_path = []
while node.parent is not None:
shortest_path.append(node.state)
node = node.parent
shortest_path.reverse()
return shortest_path
# Mark node as explored
explored.add(node.state)
# Add neighbors to the frontier
for pair in neighbors_for_person(node.state[1]):
if not frontier.contains_state(pair) and pair not in explored:
child = Node(state=pair, parent=node, action=pair[0])
frontier.add(child)
def person_id_for_name(name):
"""
Returns the IMDB id for a person's name,
resolving ambiguities as needed.
"""
person_ids = list(names.get(name.lower(), set()))
if len(person_ids) == 0:
return None
elif len(person_ids) > 1:
print(f"Which '{name}'?")
for person_id in person_ids:
person = people[person_id]
name = person["name"]
birth = person["birth"]
print(f"ID: {person_id}, Name: {name}, Birth: {birth}")
try:
person_id = input("Intended Person ID: ")
if person_id in person_ids:
return person_id
except ValueError:
pass
return None
else:
return person_ids[0]
def neighbors_for_person(person_id):
"""
Returns (movie_id, person_id) pairs for people
who starred with a given person.
"""
movie_ids = people[person_id]["movies"]
neighbors = set()
for movie_id in movie_ids:
for person_id in movies[movie_id]["stars"]:
neighbors.add((movie_id, person_id))
return neighbors
if __name__ == "__main__":
main()