In [1]:
import findspark
findspark.init()

import pyspark
sc = pyspark.SparkContext(appName="Spark1")


import numpy as np
import itertools

# Reduce the amount that Spark logs to the console.
logger = sc._jvm.org.apache.log4j
logger.LogManager.getLogger("org"). setLevel( logger.Level.ERROR )
logger.LogManager.getLogger("akka").setLevel( logger.Level.ERROR )

In [2]:
import sys

In [3]:
def copartitioned(RDD1, RDD2):
    "check if two RDDs are copartitioned"
    return RDD1.partitioner == RDD2.partitioner

In [4]:
doctor_matchings = np.loadtxt("doctor_preferences.txt").astype(int)
hospital_matchings = np.loadtxt("hospital_preferences.txt").astype(int)

In [16]:
spots_per_hospital = 8
numPartitions = 64
numPositionsPerHospital = 8
numPreferencesPerDoctor = 15

In [21]:
print doctor_matchings[0]
print hospital_matchings[0]
x = [1,4,2,1]
y = sorted(x, reverse=True)
print y

[ 2 11  9 16 13 19  0 10  8  7 18  6  3  4  2]
[187  75  20 110  50  69  48 106   5 167 128 127  76  65  39  33 191  37
  29 124 105  61  96 108  44 192  68 177  77 107 184  74 114  72 137  92
  25  80   2 197  46  42  95 125 176  21 183   8  82   3  22 102  17 165
  41 144 168  90  71 193  99  79  11 139 136 122  63  62 171 100 195 178
 154  12 180  34  73  43 145 182 115  86 189  38 198 158  88 150 147 149
 152  91 156  36  19 196 140 151   7  55 188 153  89 172 134  93  94 162
 117 101  84 130 163 112   4 141 185 135  81 164 170 129 199  70  56  10
 181  83  97  54  78 173 143  14  40 160 161   0 111  32 157  27 155 148
  51 113  64 142  58  45 174  31  66 169 166  60 133 121  28 194 103  24
  57  49  59  30 119 179  53  26  85  52 131  23 146  87  67   9 190 126
  47 118  13 138  35 186 104 123  15 175   1  98 132  16 159 120 116  18
 109   6]
[4, 2, 1, 1]


In [34]:
# Simply perform the well-known Gale Shapley algorithm for the
# national residency match.  This is probably along the lines of
# what the national residency match actually implements.
def serial_implementation():
    hospital_accepted = [[] for i in range(len(hospital_matchings))]
    doc_indices = [0] * len(doctor_matchings)
    hospital_dicts = [{} for i in range(len(hospital_matchings))]
    for i, l in enumerate(hospital_matchings):
        for val, ind in enumerate(l):
            hospital_dicts[i][ind] = val
    iter_n = 0
    doc_pool = set(range(len(doctor_matchings)))
    # s.discard s.add
    brk = True
    while brk:
        brk = False
        hospital_queues = [[] for i in range(len(hospital_matchings))]
        to_pop = []
        # each remaining doctor proposes to their first choice school
        for i, doc in enumerate(doc_pool):
            if doc_indices[doc] == numPositionsPerHospital:
                doc_pool.discard(doc)
                continue
            hospital_queues[doctor_matchings[doc][doc_indices[doc]]].append(doc)
            doc_indices[doc] += 1
        # each hospital accepts its top spots_per_hospital applicants
        for j in range(len(hospital_queues)):
            old = hospital_accepted[j]
            cor = old + [a for a in hospital_queues[j] if a in hospital_dicts[j].keys()]
            cor = sorted(cor, key=lambda x: hospital_dicts[j][x])
            hospital_accepted[j] = cor[:8]
            if hospital_accepted[j] != old:
                brk = True
            for k in range(min(8, len(cor))):
                doc_pool.discard(cor[k])
            for k in range(9, len(cor)):
                doc_pool.add(cor[k])
            
        # TODO: check serial implementation stability
        iter_n += 1
        for p in sorted(to_pop, reverse=True):
            doc_pool.pop(p)
    return hospital_accepted

print serial_implementation()

[[106, 76, 191, 96, 42, 99, 182, 189], [57, 127, 84, 93, 186], [11, 192, 112, 37, 55, 58, 145, 9], [149, 193, 122, 7, 172, 41, 171, 156], [1, 100, 185, 151, 28, 86, 46, 19], [69, 165, 14, 184, 163, 23, 79, 31], [80, 187, 48, 120, 113, 13, 153, 25], [50, 152, 82, 20, 136, 45, 18, 148], [66, 132, 129, 198, 131, 176, 2, 73], [68, 115, 10, 85, 8, 94, 59, 101], [95, 174, 17, 119, 47, 121, 90, 36], [123, 118, 155, 103, 91, 110, 70, 125], [63, 29, 15, 162, 183, 40, 44, 35], [116, 197, 139, 21, 190, 89, 3, 88], [39, 196, 111, 61, 140, 75, 195, 105], [135, 62, 78, 173, 67, 74, 117, 166], [92, 175, 97, 16, 142, 168, 146, 72], [138, 177, 6, 181, 104, 133, 102, 188], [199, 81, 161, 33, 141, 4, 30, 154], [137, 144, 170, 194, 12, 32, 130, 77]]


In [163]:
# These RDD are KV pairs, where the key is the ID of the doctor/hospital, and the values are the IDs of the respective 
# doctors or hospitals in order of preference.
# By this, I mean that hospitals have a set of preferences over doctors, and doctors have preferences over hospitals
doctor_RDD = sc.textFile('doctor_preferences.txt', numPartitions).map(lambda x: map(int, x.split())).zipWithIndex()\
                                                        .map(lambda (x, y): (y, x)).partitionBy(numPartitions)
hospital_RDD = sc.textFile('hospital_preferences.txt', numPartitions).map(lambda x: map(int, x.split())).zipWithIndex()\
                                                        .map(lambda (x, y): (y, x)).partitionBy(numPartitions)

In [164]:
assert(copartitioned(doctor_RDD, hospital_RDD))

In [165]:
# Preferences is a list of ints in order that you want them
# pickingfrom is the ones you are picking from
# N is how many you are picking
# Ex: pick_top_N([3, 4, 5, 1, 2, 7, 8], [7, 1, 2, 4, 5], 3) -> [4, 5, 1]
def pick_top_N(preferences, pickingfrom, N):
    pickingfrom.sort(key=lambda x: preferences.index(x))
    return pickingfrom[:N] 

In [166]:
doctor_matchings = doctor_RDD.mapValues(lambda x : -1)
hospital_matchings = hospital_RDD.mapValues(lambda x: [])

In [167]:
def combine_old_new(newoptions, oldoptions, preferences):
    if newoptions == None:
        newoptions = []
    if oldoptions == None:
        oldoptions = []
    alloptions = list(set(newoptions).union(set(oldoptions)))
    return pick_top_N(preferences, alloptions, 8)


In [168]:
def accept_new_hospital_assignment(old, new):
    if new == None:
        return -1
    return new

In [169]:
# If unmatched (match == -1), then we are gonna use try to use the next pref, so one remove from the list
def remove_pref_if_needed(prefs, match):
    # If unmatched and pref remaining, remove it.
    if match == -1 and len(prefs) > 0:
        return prefs[1:]
    # Else return original prefs
    return prefs

In [170]:
def is_changed(old, new):
    if old == -1 and new == None:
        return False
    elif old == -1 and new != None:
        return True
    return old != new
    

In [171]:
doctor_prefs = doctor_RDD.map(lambda x: x).partitionBy(numPartitions).cache()

In [172]:
accum = sc.accumulator(1)
accum2 = sc.accumulator(1)
while accum.value > 0 or accum2.value > 0:
    accum.value = 0
    accum2.value = 0
    # These are the top remaining choices for the unmatched doctors
    assert(copartitioned(doctor_prefs, doctor_matchings))
    unmatched_doctor_preferences = doctor_prefs.join(doctor_matchings).filter(lambda (doc, (prefs, match)): match == -1 and len(prefs) > 0).mapValues(lambda (prefs, match): prefs)
    unmatched_doctor_preferences.foreach(lambda (x, prefs): accum2.add(len(prefs)))

    # Update all of the doctor prefs by removing the first pref from each unmatched doctor
    assert(copartitioned(doctor_prefs, doctor_matchings))
    doctor_prefs = doctor_prefs.join(doctor_matchings).mapValues(lambda (prefs, match): remove_pref_if_needed(prefs, match)).cache()
    # Take the first pref from each unmatched doctor 
    doctor_proposals = unmatched_doctor_preferences.mapValues(lambda x: x[0])
    # Group the requests by hospital
    hospital_groupings = doctor_proposals.map(lambda (x, y) : (y, x)).partitionBy(numPartitions).groupByKey()
    # Join the new requests for each hospital with what they previously had and their preferences
    assert(copartitioned(hospital_groupings, hospital_matchings))
    assert(copartitioned(hospital_groupings, hospital_RDD))
    joined_hospital = hospital_groupings.rightOuterJoin(hospital_matchings).join(hospital_RDD)
    # Determine the best ones
    hospital_matchings = joined_hospital.mapValues(lambda ((new, old), preferences): combine_old_new(new, old, preferences)).cache()
    # Inform the doctors of the match
    matched_doctors = hospital_matchings.flatMapValues(lambda x: x).map(lambda (x, y) : (y, x)).partitionBy(numPartitions)
    # Update the doctor matchings
    assert(copartitioned(doctor_matchings, matched_doctors))
    doctor_join = doctor_matchings.leftOuterJoin(matched_doctors).cache()
    doctor_join.filter(lambda (key, (old, new)): is_changed(old, new)).foreach(lambda x: accum.add(1))
    doctor_matchings = doctor_join.mapValues(lambda (old,new) : accept_new_hospital_assignment(old, new)).cache()
    print "Number of doctors whose status changed", accum.value
    print "Number of unmatched preferences to still check", accum2.value
    sys.stdout.flush()


Number of doctors whose status changed 151
Number of unmatched preferences to still check 3000
Number of doctors whose status changed 57
Number of unmatched preferences to still check 686
Number of doctors whose status changed 42
Number of unmatched preferences to still check 598
Number of doctors whose status changed 38
Number of unmatched preferences to still check 579
Number of doctors whose status changed 44
Number of unmatched preferences to still check 534
Number of doctors whose status changed 30
Number of unmatched preferences to still check 512
Number of doctors whose status changed 26
Number of unmatched preferences to still check 496
Number of doctors whose status changed 16
Number of unmatched preferences to still check 478
Number of doctors whose status changed 26
Number of unmatched preferences to still check 443
Number of doctors whose status changed 20
Number of unmatched preferences to still check 440
Number of doctors whose status changed 22
Number of unmatched prefer

In [173]:
print hospital_matchings.take(30)

[(0, [187, 75, 20, 106, 5, 128, 127, 76]), (1, [41, 4, 88, 197, 162, 182, 56, 154]), (2, [11, 126, 32, 30, 192, 72, 167, 60]), (3, [29, 71, 98, 44, 23, 115, 139, 176]), (4, [1, 100, 185, 31, 18, 186, 193, 148]), (5, [130, 57, 87, 69, 7, 67, 165, 53]), (6, [173, 158, 195, 129, 26, 80, 64, 73]), (7, [180, 50, 55, 159, 152, 171, 133, 22]), (8, [21, 84, 46, 89, 110, 146, 15, 66]), (9, [155, 103, 68, 111, 174, 108, 52, 47]), (10, [13, 93, 179, 105, 95, 191, 99, 112]), (11, [86, 9, 123, 184, 118, 8, 83, 28]), (12, [63, 27, 147, 36, 82, 104, 141, 48]), (13, [37, 33, 90, 24, 121, 116, 114, 65]), (14, [137, 119, 102, 39, 194, 196, 19, 151]), (15, [45, 135, 125, 156, 113, 178, 164, 34]), (16, [120, 25, 54, 143, 149, 77, 160, 51]), (17, [85, 35, 94, 79, 136, 150, 138, 42]), (18, [166, 97, 61, 58, 199, 81, 10, 38]), (19, [101, 198, 190, 172, 189, 43, 134, 175])]


In [174]:
print doctor_matchings.take(500)

[(0, -1), (64, 6), (128, 0), (192, 2), (129, 6), (193, 4), (65, 13), (1, 4), (194, 14), (66, 8), (2, -1), (130, 5), (3, -1), (67, 5), (131, -1), (195, 6), (132, -1), (68, 9), (196, 14), (4, 1), (197, 1), (5, 0), (69, 5), (133, 7), (6, -1), (70, -1), (198, 19), (134, 19), (7, 5), (135, 15), (199, 18), (71, 3), (8, 11), (72, 2), (136, 17), (137, 14), (73, 6), (9, 11), (10, 18), (74, -1), (138, 17), (11, 2), (75, 0), (139, 3), (140, -1), (76, 0), (12, -1), (13, 10), (77, 16), (141, 12), (14, -1), (78, -1), (142, -1), (15, 8), (143, 16), (79, 17), (16, -1), (80, 6), (144, -1), (145, -1), (81, 18), (17, -1), (18, 4), (82, 12), (146, 8), (19, 14), (83, 11), (147, 12), (148, 4), (84, 8), (20, 0), (21, 8), (85, 17), (149, 16), (22, 7), (86, 11), (150, 17), (23, 3), (151, 14), (87, 5), (24, 13), (88, 1), (152, 7), (153, -1), (89, 8), (25, 16), (26, 6), (90, 13), (154, 1), (27, 12), (91, -1), (155, 9), (156, 15), (92, -1), (28, 11), (29, 3), (93, 10), (157, -1), (30, 2), (94, 17), (158, 6), (31,

In [175]:
doctor_RDD.lookup(0)

[[2, 11, 9, 16, 13, 19, 0, 10, 8, 7, 18, 6, 3, 4, 2]]

In [176]:
# Given a match that a doctor had and the original preferences, determine all of the hospitals the doctor would have preferred.
def getpreferredhospitals(match, preferences):
    # If you weren't matched, you would have preferred any of the original ranked ones
    if match == -1:
        return preferences
    # If you were matched, you would have preferred everything up until that one.
    return preferences[:preferences.index(match)]

In [177]:
# Given the matches that a hospital got, determine all of the doctors the hospital would have preferred
def getpreferreddoctors(matches, preferences):
    # Figure out out which of the preferences were actually picked
    indicies = [preferences.index(match) for match in matches]
    max_index = max(indicies)
    # Get all of the people up until the worst person picked
    best_people = set(preferences[:max_index])
    # Remove the people that were successfully picked
    better_people = best_people - set(matches)
    return list(best_people)

In [178]:
# Checks if all of the matches are valid, by which we mean checks
# stability in the formal sense. As defined in the stable marriage problem,
# a matching is unstable if there are two pairs, (m, w) and (m', w') s.t.
# both would rather switch.
def verify_matches(doc_matches, hos_matches, original_doc_prefs, original_hos_prefs):
    doctor_to_hospital_preferences = doc_matches.join(original_doc_prefs).mapValues(lambda (match, preferences): getpreferredhospitals(match, preferences)).flatMapValues(lambda x: x)
    hospital_to_doctor_preferences = hos_matches.join(original_hos_prefs).mapValues(lambda (matches, preferences): getpreferreddoctors(matches, preferences)).flatMapValues(lambda x: x)
    return doctor_to_hospital_preferences.map(lambda (x,y): (y,x)).intersection(hospital_to_doctor_preferences)

In [179]:
bad_results = verify_matches(doctor_matchings, hospital_matchings, doctor_RDD, hospital_RDD)
# If the assertion passes, then this is a valid matching!
assert(bad_results.count() == 0)

In [150]:
doctor_RDD.lookup(137)

[[19, 16, 13, 2, 3, 6, 9, 1, 0, 11, 7, 12, 18, 10, 14]]