In [45]:
import numpy as np
import random
import matplotlib.pyplot as plt
from ipywidgets import interactive
from IPython.display import display
import ipywidgets as widgets

symbols = ['B','.']
data_length = 10 # set it to >= 4

# generate random data
data = np.array([random.choice(symbols) for _ in range(data_length)])
B_count = np.sum(data == 'B')
print(data)

['.' '.' 'B' 'B' 'B' 'B' 'B' '.' '.' 'B']


In [46]:
def get_longest_streak(data):
    streaks = []
    i = 0
    max_score = 0
    while i < len(data):
        symbol = data[i]
        if symbol == 'B':
            score = 1
            start_pos = i
            while i < len(data):
                try:
                    if data[i+2] == 'B' and data[i+1] == '.':
                        score += 1
                        i += 2
                    else:
                        if score > max_score:
                            max_score = score
                        streaks.append({'score': score, 'start_pos': start_pos, 'end_pos': i})
                        break
                except IndexError:
                    if score > max_score:
                        max_score = score
                    streaks.append({'score': score, 'start_pos': start_pos, 'end_pos': i})
                    break
        i += 1

    # get all streaks with max_score
    max_score_streaks = [streak for streak in streaks if streak['score'] == max_score]
    
    for streak in max_score_streaks:
        # prevent zero length streaks
        if streak['start_pos'] == streak['end_pos']:
            streak['start_pos'] = 0
            streak['end_pos'] = -1


    return max_score_streaks
        
longest_streak = get_longest_streak(data)
print(longest_streak) 

[{'score': 1, 'start_pos': 0, 'end_pos': -1}, {'score': 1, 'start_pos': 0, 'end_pos': -1}, {'score': 1, 'start_pos': 0, 'end_pos': -1}, {'score': 1, 'start_pos': 0, 'end_pos': -1}, {'score': 1, 'start_pos': 0, 'end_pos': -1}, {'score': 1, 'start_pos': 0, 'end_pos': -1}]


In [47]:
def correct_B_indices(B_indices,start_pos=1):
    # correct B's are left, incorrect B's are right
    # correct B means it is in correct position mathing start_pos(even,odd)
    even = start_pos % 2 == 0
    correct_Bs = B_indices[B_indices % 2 != even] # use correct B's if incorrent ran out

    incorrect_Bs = B_indices[B_indices % 2 == even] # use incorrect B's first
    B_indices = np.concatenate((correct_Bs, incorrect_Bs))
    return B_indices

In [48]:
def process_default(data):

    B_indices = np.where(data == 'B')[0]

    b_count = len(B_indices)
    if not b_count:
        # no B's on this side
        return 0
    solutions = []
    swaps = [] # indices of swaps
    start_positions = [B_indices[0], B_indices[0]+1] # find which is better, even or odd start position

    for orientation in range(2):
        for start_pos in start_positions:
            
            B_correct = correct_B_indices(B_indices, start_pos=start_pos) # get correct B's

            for i in range(start_pos, len(data), 2):

                if not len(B_correct):
                    # no more B's to swap
                    break
            
                if data[i] == 'B':
                    if i not in B_correct:
                        # skip because this B was used in swap
                        continue
                    # B in correct position, prevent it from swapping in future
                    B_correct = B_correct[B_correct != i]

                elif data[i] == '.':
                    # dot is in incorrect position, so swap it
                    # print('Swapping', i, 'with', B_correct[-1], 'to make a streak')
                    if orientation == 0:
                        swaps.append([i, B_correct[-1]])
                    else:

                        swaps.append([len(data)-i-1, len(data) - B_correct[-1]-1])

                    B_correct = np.delete(B_correct, -1)

            if not len(B_correct):
                solutions.append(swaps)
            swaps = []
        
        data = np.flip(data)
        B_indices = np.where(data == 'B')[0]
        start_positions = [B_indices[0], B_indices[0]+1]
    best_solution = min(solutions, key=lambda x: len(x))

    return best_solution
process_default(data)

ValueError: min() arg is an empty sequence

In [None]:
def process_multiside(data, streak):
    def get_B_indices(data, start_pos=1):
        # correct B's are left, incorrect B's are right
        # correct B means it is in correct position mathing start_pos(even,odd)
        B_indices = np.where(data == 'B')[0]
        mask = np.logical_or(B_indices < streak['start_pos'], B_indices > streak['end_pos'])
        B_indices = B_indices[mask]

        even = start_pos % 2 == 0
        correct_Bs = B_indices[B_indices % 2 != even] # use correct B's if incorrent ran out
 
        incorrect_Bs = B_indices[B_indices % 2 == even] # use incorrect B's first

        return correct_Bs, incorrect_Bs


    correct_Bs, incorrect_bs = get_B_indices(data, streak['start_pos'])
    B_indices = np.concatenate((correct_Bs, incorrect_bs))

    sides = []
    swaps = [] # indices of swaps
    if len(B_indices):

        left_correct_Bs = correct_Bs[correct_Bs < streak['start_pos']] # correct B's on left side
        right_correct_Bs = correct_Bs[correct_Bs > streak['end_pos']] # correct B's on right side
        first_correct_B = left_correct_Bs[0] if len(left_correct_Bs) else float('inf') # first B on left side
        last_correct_B = right_correct_Bs[-1] if len(right_correct_Bs) else 0 # last B on right side


        required_space_right = max(last_correct_B, streak['end_pos'] + (len(incorrect_bs)+len(right_correct_Bs))*2) # find how much space is needed on right side
        required_space_left = min(first_correct_B, max(0, streak['start_pos'] - (len(incorrect_bs)+len(left_correct_Bs))*2)) # find how much space is needed on left side

        right_side = data[streak['end_pos']+1 : required_space_right+1] 
        left_side = data[required_space_left: streak['start_pos']][::-1]

        correct_bs_right, _ = get_B_indices(right_side, streak['start_pos'])
        required_bs_right = len(right_side) // 2 - correct_bs_right.size # how many B's are needed to make a streak on right side

        correct_bs_left, _ = get_B_indices(left_side, streak['start_pos'])
        required_bs_left = len(left_side) // 2 - correct_bs_left.size # how many B's are needed to make a streak on left side

        if len(incorrect_bs) == required_bs_right + required_bs_left:
            sides = [{'side': 'right', 'data': right_side}, 
                     {'side': 'left', 'data': left_side}]
        else:
            if required_bs_right == required_bs_left:
                    if len(left_side) == len(right_side):
                        sides = [{'side': 'right', 'data': right_side}]
                    elif len(left_side) > len(right_side):
                        sides = [{'side': 'left', 'data': left_side}]
                    else:
                        sides = [{'side': 'right', 'data': right_side}]
            else:
                if required_bs_right < required_bs_left:
                    sides = [{'side': 'left', 'data': left_side}]
                else:
                    sides = [{'side': 'right', 'data': right_side}]

    for side in sides:
        # print('Processing side:', side['data'])
        for i in range(1, len(side['data']), 2):
            if not len(B_indices):
                # no more B's to swap
                break
        
            if side['data'][i] == 'B':
                if i not in B_indices:
                    # skip because this B was used in swap
                    continue
                # B in correct position, prevent it from swapping in future
                if side['side'] == 'right':
                    B_indices = B_indices[B_indices != streak['end_pos']+1+i]
                else:
                    B_indices = B_indices[B_indices != streak['start_pos']-i-1]


            elif side['data'][i] == '.':
                # dot is in incorrect position, so swap it
                if side['side'] == 'right':
                    # swap with last B in B_indices
                    swaps.append([i + streak['end_pos']+1, B_indices[-1]])
                else:
                    swaps.append([streak['start_pos'] - i - 1, B_indices[-1]])
                B_indices = np.delete(B_indices, -1)



    # print('Swaps to make a streak on this side:', len(swaps))
    return swaps

In [None]:
def swaps_to_streak(data):
    '''
    Input: np.array of side data, example: ['B', '.', '.', '.', '.', '.', 'B', 'B', '.']
           B_even: Bool, should B be placed in even positions like [0,2,4,6...]
    Output: number of swaps needed to continue the streak, for the example above it will be a 3, to make a streak like ['B', '.', 'B', '.', 'B', '.'...]
    '''
    B_count = np.sum(data == 'B')
    D_count = np.sum(data == '.')
    if D_count - B_count < 0 or B_count < 2:
        # it is impossible to sort, not enough space
        return -1, []
    
    max_streaks = get_longest_streak(data)
    swaps = []
    if max_streaks[0]['end_pos'] == -1:
        swaps.append(process_default(data))
    else:
        for streak in max_streaks:
            swaps.append(process_multiside(data, streak))
    
    shortest_swaps = min(swaps, key=lambda x: len(x))
    
    return len(shortest_swaps), shortest_swaps



# data = np.array(['.','.','B','B','.','B','.','B','.','.','.','B','.','.','.','.','.','.','.','.'])
# print(data)
# longest_streak = get_longest_streak(data)




In [None]:
def visualize(data, swaps, iter):
    '''
    Input: swaps - list of tuples with indices of elements to swap like (1,2)
    Output: None
    '''
    plt.figure()
    # plt.axis('off')
    # hide y axis
    plt.yticks([])
    plt.xticks([x for x in range(len(data))])
    # make size normal
    plt.xlim(-1, len(data))
    plt.ylim(-1, 1)
    visual_data = np.array(data, dtype=object)

    for i, symbol in enumerate(visual_data):
        # replace B with green, .(D) with blue
        if symbol == 'B':
            visual_data[i] = {'color': 'green', 'symbol': 'B'}
        else:
            visual_data[i] = {'color': 'blue', 'symbol': 'D'}

    if iter != 0:
        print("Iter:"   , iter)
        for i in range(iter):
            # swap and color last swap red
            if i == iter - 1:
                print("Swapping:", swaps[i])
                visual_data[swaps[i][0]]['color'] = 'red'
                visual_data[swaps[i][1]]['color'] = 'red'
            visual_data[swaps[i][0]], visual_data[swaps[i][1]] = visual_data[swaps[i][1]], visual_data[swaps[i][0]]
    else:
        print("Initial data")

    for i, symbol in enumerate(visual_data):
        # plot symbols
        plt.text(i, 0, symbol['symbol'], horizontalalignment='center', verticalalignment='center', color=symbol['color'])

    plt.show()


In [None]:
print('Current data:\n', data)
print("="*10, " RESULT ", "="*10)
swap_count, swaps = swaps_to_streak(data)
if(swap_count == -1):
    print('It is impossible to sort this data')
else:
    print('Required swaps:', swap_count)
    print('Swaps:', swaps)
    iter_visual =  interactive(visualize, data=widgets.fixed(data), swaps=widgets.fixed(swaps), iter=widgets.IntSlider(min=0, max=swap_count, step=1, value=0))
    display(iter_visual)




Current data:
 ['.' '.' '.' 'B' 'B' '.' '.' '.' '.' '.']
Required swaps: 1
Swaps: [[5, 4]]


interactive(children=(IntSlider(value=0, description='iter', max=1), Output()), _dom_classes=('widget-interact…