# Assigment problem

In this notebook we have a practical example where we count pairs of elements that are identified by id_1 and id_2. The problem consists in assigning the elements 1 to 1 minimizing the distance between the pairs.

**Not all combinations have values, it is considered that if they have missing values, the records are very different.**

In [7]:
import pyspark.sql.functions as F
from pyspark.sql.types import StructType
from scipy.optimize import linear_sum_assignment
from pyspark.sql import Window
import numpy as np

In [8]:
df = spark.createDataFrame(
    [
        ('_', 'a', 'a', '_', 9.1),
        ('_', 'a', 'b', '_', 5.1),
        ('_', 'b', 'a', '_', 7.1),
        ('_', 'b', 'c', '_', 1.1),
        ('_', 'c', 'd', '_', 8.1),
        ('_', 'c', 'b', '_', 7.1),
        ('_', 'd', 'd', '_', 1.1),
        ('_', 'd', 'a', '_', 3.1),
    ],
    ["id1_x", "id2_x", "id1_y", "id2_y", "distance"]
)

df.show()

+---------+---------------+---------+---------------+-----+
|id1_x|id2_x|id1_y|id2_y|distance|
+---------+---------------+---------+---------------+-----+
|        _|              a|        a|              _|  9.1|
|        _|              a|        b|              _|  5.1|
|        _|              b|        a|              _|  7.1|
|        _|              b|        c|              _|  1.1|
|        _|              c|        d|              _|  8.1|
|        _|              c|        b|              _|  7.1|
|        _|              d|        d|              _|  1.1|
|        _|              d|        a|              _|  3.1|
+---------+---------------+---------+---------------+-----+



In [9]:
# create row id to identify each combination
w = Window.orderBy(['id1_x', 'id2_x'])

df = df.withColumn('row_id', F.dense_rank().over(w))

df.show()

+---------+---------------+---------+---------------+-----+-----+
|id1_x|id2_x|id1_y|id2_y|distance|row_id|
+---------+---------------+---------+---------------+-----+-----+
|        _|              a|        a|              _|  9.1|    1|
|        _|              a|        b|              _|  5.1|    1|
|        _|              b|        a|              _|  7.1|    2|
|        _|              b|        c|              _|  1.1|    2|
|        _|              c|        d|              _|  8.1|    3|
|        _|              c|        b|              _|  7.1|    3|
|        _|              d|        d|              _|  1.1|    4|
|        _|              d|        a|              _|  3.1|    4|
+---------+---------------+---------+---------------+-----+-----+



In [10]:
df3 = df.select('id1_x','id2_x','row_id').dropDuplicates()

df3.show()

+---------+---------------+-----+
|id1_x|id2_x|row_id|
+---------+---------------+-----+
|        _|              a|    1|
|        _|              b|    2|
|        _|              c|    3|
|        _|              d|    4|
+---------+---------------+-----+



In [12]:
# create cost matrix with all combinations
df1 = df.groupby('id1_y', 'id2_y').pivot('row_id').agg(F.first('distance'))

df1.show()

+---------+---------------+----+----+----+----+
|id1_y|id2_y|   1|   2|   3|   4|
+---------+---------------+----+----+----+----+
|        a|              _| 9.1| 7.1|null| 3.1|
|        b|              _| 5.1|null| 7.1|null|
|        c|              _|null| 1.1|null|null|
|        d|              _|null|null| 8.1| 1.1|
+---------+---------------+----+----+----+----+



In [40]:
# define assigment algorithm as pandas udf fucntion
schema = StructType.fromJson(df1.schema.jsonValue()).add('assigned', 'string')

def hungarian_algorithm(df_matrix, fill_values=1e3):
    matrix = df_matrix.iloc[:,2:]
    cols = matrix.columns
    n_row, n_col = matrix.shape
    cost_matrix = matrix.fillna(fill_values).values

    if n_row > n_col:
        cost_matrix = np.hstack((cost_matrix, np.full((n_row, n_row - n_col), fill_values)))
    elif n_col > n_row:
        cost_matrix = np.vstack((cost_matrix, np.full((n_col - n_row , n_col), fill_values)))

    _, idx = linear_sum_assignment(cost_matrix)

    return df_matrix.assign(assigned=['%s' % (i + 1) for i in idx][:n_row])

In [41]:
find_assigned = F.pandas_udf(lambda x: hungarian_algorithm(x), schema, F.PandasUDFType.GROUPED_MAP)

In [42]:
df2 = df1.groupby('id2_y').apply(find_assigned)

df2.show()

+---------+---------------+----+----+----+----+--------+
|id1_y|id2_y|   1|   2|   3|   4|assigned|
+---------+---------------+----+----+----+----+--------+
|        a|              _| 9.1| 7.1|null| 3.1|       4|
|        b|              _| 5.1|null| 7.1|null|       1|
|        c|              _|null| 1.1|null|null|       2|
|        d|              _|null|null| 8.1| 1.1|       3|
+---------+---------------+----+----+----+----+--------+



In [43]:
# get column range
columns_ = df2.columns
columns_range = columns_[2:-1]

[1, 2, 3, 4]

In [46]:
df_new = df2.selectExpr(
    "id1_y",
    "id2_y",
    'assigned',
    'stack({},{}) as (row_id, distance)'.format(len(columns_range),
                                                ','.join("int('{0}'), `{0}`".format(c) for c in columns_range))
) \
.withColumn('is_match', F.expr("assigned=row_id"))

df_new.show()

+---------+---------------+--------+-----+-----+--------+
|id1_y|id2_y|assigned|row_id|distance|is_match|
+---------+---------------+--------+-----+-----+--------+
|        a|              _|       4|    1|  9.1|   false|
|        a|              _|       4|    2|  7.1|   false|
|        a|              _|       4|    3| null|   false|
|        a|              _|       4|    4|  3.1|    true|
|        b|              _|       1|    1|  5.1|    true|
|        b|              _|       1|    2| null|   false|
|        b|              _|       1|    3|  7.1|   false|
|        b|              _|       1|    4| null|   false|
|        c|              _|       2|    1| null|   false|
|        c|              _|       2|    2|  1.1|    true|
|        c|              _|       2|    3| null|   false|
|        c|              _|       2|    4| null|   false|
|        d|              _|       3|    1| null|   false|
|        d|              _|       3|    2| null|   false|
|        d|             

## Use cases

In [2]:
df = spark.createDataFrame(
    [
        ('_', 'a', 'a', '_', 9.1),
        ('_', 'a', 'b', '_', 5.1),
        ('_', 'b', 'a', '_', 7.1),
        ('_', 'b', 'c', '_', 1.1),
        ('_', 'c', 'd', '_', 8.1),
        ('_', 'c', 'b', '_', 7.1),
        ('_', 'd', 'd', '_', 1.1),
        ('_', 'd', 'a', '_', 3.1),
    ],
    ["id1_x", "id2_x", "id1_y", "id2_y", "distance"]
)

df_.show()

+---------+---------------+---------+---------------+-----+
|id1_x|id2_x|id1_y|id2_y|distance|
+---------+---------------+---------+---------------+-----+
|        _|              a|        a|              _|  9.1|
|        _|              a|        b|              _|  5.1|
|        _|              b|        a|              _|  7.1|
|        _|              b|        c|              _|  1.1|
|        _|              c|        d|              _|  8.1|
|        _|              c|        b|              _|  7.1|
|        _|              d|        d|              _|  1.1|
|        _|              d|        a|              _|  3.1|
+---------+---------------+---------+---------------+-----+



In [3]:
# more rows
df_1 = spark.createDataFrame(
    [
        ('_', 'a', 'a', '_', 9.1),
        ('_', 'a', 'b', '_', 5.1),
        ('_', 'b', 'a', '_', 7.1),
        ('_', 'b', 'c', '_', 1.1),
        ('_', 'c', 'd', '_', 8.1),
        ('_', 'c', 'b', '_', 7.1),
    ],
    ["id1_x", "id2_x", "id1_y", "id2_y", "distance"]
)

df_1.show()

+---------+---------------+---------+---------------+-----+
|id1_x|id2_x|id1_y|id2_y|distance|
+---------+---------------+---------+---------------+-----+
|        _|              a|        a|              _|  9.1|
|        _|              a|        b|              _|  5.1|
|        _|              b|        a|              _|  7.1|
|        _|              b|        c|              _|  1.1|
|        _|              c|        d|              _|  8.1|
|        _|              c|        b|              _|  7.1|
+---------+---------------+---------+---------------+-----+



In [4]:
# more cols
df_2 = spark.createDataFrame(
    [
        ('_', 'a', 'a', '_', 9.1),
        ('_', 'a', 'b', '_', 5.1),
        ('_', 'a', 'c', '_', 7.1),
        ('_', 'b', 'a', '_', 1.1),
        ('_', 'b', 'b', '_', 8.1),
        ('_', 'b', 'c', '_', 7.1),
    ],
    ["id1_x", "id2_x", "id1_y", "id2_y", "distance"]
)

df_2.show()

+---------+---------------+---------+---------------+-----+
|id1_x|id2_x|id1_y|id2_y|distance|
+---------+---------------+---------+---------------+-----+
|        _|              a|        a|              _|  9.1|
|        _|              a|        b|              _|  5.1|
|        _|              a|        c|              _|  7.1|
|        _|              b|        a|              _|  1.1|
|        _|              b|        b|              _|  8.1|
|        _|              b|        c|              _|  7.1|
+---------+---------------+---------+---------------+-----+



In [5]:
def hungarian_algorithm(df_matrix, fill_values=1e3):
    matrix = df_matrix.iloc[:,2:]
    cols = matrix.columns
    n_row, n_col = matrix.shape
    cost_matrix = matrix.fillna(fill_values).values

    if n_row > n_col:
        cost_matrix = np.hstack((cost_matrix, np.full((n_row, n_row - n_col), fill_values)))
    elif n_col > n_row:
        cost_matrix = np.vstack((cost_matrix, np.full((n_col - n_row , n_col), fill_values)))

    _, idx = linear_sum_assignment(cost_matrix)

    return df_matrix.assign(assigned=['%s' % (i + 1) for i in idx][:n_row])

def solve_assigment(df):
    n_rows = df_.drop_duplicates(['id1_x', 'id2_x']).count()
    w = Window.orderBy(['id1_x', 'id2_x'])

    # get row_id
    df = df.withColumn('row_id', F.dense_rank().over(w))

    # drop duplicates
    df3 = df.select('id1_x','id2_x','row_id').dropDuplicates()

    # create pivot table
    df1 = df.groupby('id1_y', 'id2_y').pivot('row_id').agg(F.first('distance'))

    # apply hungarian algorithm
    schema = StructType.fromJson(df1.schema.jsonValue()).add('assigned', 'string')
    find_assigned = F.pandas_udf(lambda x: __find_assigned(x), schema, F.PandasUDFType.GROUPED_MAP)
    df2 = df1.groupby('id2_y').apply(hungarian_algorithm)

    # get column range
    columns_ = df2.columns
    columns_range = columns_[2:-1]

    df_new = df2.selectExpr(
        "id1_y",
        "id2_y",
        'assigned',
        'stack({},{}) as (row_id, distance)'.format(len(columns_range),
                                                    ','.join("int('{0}'), `{0}`".format(c) for c in columns_range))
    ) \
    .withColumn('is_match', F.expr("assigned=row_id"))


    return df_new

In [6]:
df_1_assigned = emparejar(df)
df_1_assigned

4
+---------+---------------+---------+---------------+-----+-----+
|id1_x|id2_x|id1_y|id2_y|distance|row_id|
+---------+---------------+---------+---------------+-----+-----+
|        _|              a|        a|              _|  9.1|    1|
|        _|              a|        b|              _|  5.1|    1|
|        _|              b|        a|              _|  7.1|    2|
|        _|              b|        c|              _|  1.1|    2|
|        _|              c|        d|              _|  8.1|    3|
|        _|              c|        b|              _|  7.1|    3|
|        _|              d|        d|              _|  1.1|    4|
|        _|              d|        a|              _|  3.1|    4|
+---------+---------------+---------+---------------+-----+-----+

+---------+---------------+-----+
|id1_x|id2_x|row_id|
+---------+---------------+-----+
|        _|              a|    1|
|        _|              b|    2|
|        _|              c|    3|
|        _|              d|    4|
+---

In [7]:
df_1_assigned = emparejar(df_1)
df_1_assigned.show()

4
+---------+---------------+---------+---------------+-----+-----+
|id1_x|id2_x|id1_y|id2_y|distance|row_id|
+---------+---------------+---------+---------------+-----+-----+
|        _|              a|        a|              _|  9.1|    1|
|        _|              a|        b|              _|  5.1|    1|
|        _|              b|        a|              _|  7.1|    2|
|        _|              b|        c|              _|  1.1|    2|
|        _|              c|        d|              _|  8.1|    3|
|        _|              c|        b|              _|  7.1|    3|
+---------+---------------+---------+---------------+-----+-----+

+---------+---------------+-----+
|id1_x|id2_x|row_id|
+---------+---------------+-----+
|        _|              a|    1|
|        _|              b|    2|
|        _|              c|    3|
+---------+---------------+-----+

+---------+---------------+----+----+----+
|id1_y|id2_y|   1|   2|   3|
+---------+---------------+----+----+----+
|        a|         

In [8]:
df_2_assigned = emparejar(df_2)
df_2_assigned.show()

4
+---------+---------------+---------+---------------+-----+-----+
|id1_x|id2_x|id1_y|id2_y|distance|row_id|
+---------+---------------+---------+---------------+-----+-----+
|        _|              a|        a|              _|  9.1|    1|
|        _|              a|        b|              _|  5.1|    1|
|        _|              a|        c|              _|  7.1|    1|
|        _|              b|        a|              _|  1.1|    2|
|        _|              b|        b|              _|  8.1|    2|
|        _|              b|        c|              _|  7.1|    2|
+---------+---------------+---------+---------------+-----+-----+

+---------+---------------+-----+
|id1_x|id2_x|row_id|
+---------+---------------+-----+
|        _|              a|    1|
|        _|              b|    2|
+---------+---------------+-----+

+---------+---------------+---+---+
|id1_y|id2_y|  1|  2|
+---------+---------------+---+---+
|        a|              _|9.1|1.1|
|        b|              _|5.1|8.1|
|  