## Level 1

In [10]:
import typing as tp
from collections import defaultdict


def solution(queries) -> tp.List[str]:
    workers_ = {}
    work_time_ = defaultdict(list)

    def add_worker(worker_id, position, compensation):
        if worker_id in workers_:
            return "false"

        workers_[worker_id] = (position, int(compensation))
        return "true"

    def register(worker_id, timestamp):
        if worker_id not in workers_:
            return "invalid_request"

        if worker_id not in work_time_:
            work_time_[worker_id].append(("ENTER", int(timestamp)))
        elif work_time_[worker_id][-1][0] == "ENTER":
            work_time_[worker_id].append(("EXIT", int(timestamp)))
        else:
            work_time_[worker_id].append(("ENTER", int(timestamp)))

        return "registered"

    def get(worker_id):
        if worker_id not in workers_:
            return ""
        if worker_id not in work_time_:
            return ""

        total_time = 0
        history = work_time_[worker_id]
        for i in range(0, len(history), 2):
            if i == len(history) - 1:
                break

            total_time += history[i + 1][1] - history[i][1]
        return str(total_time)

    result = []
    for q in queries:
        operation = q[0]

        if operation == "ADD_WORKER":
            result.append(add_worker(q[1], q[2], q[3]))
        elif operation == "REGISTER":
            result.append(register(q[1], q[2]))
        elif operation == "GET":
            result.append(get(q[1]))
    return result


queries = [
    ["ADD_WORKER", "Ashley", "Middle Developer", "150"],
    ["ADD_WORKER", "Ashley", "Junior Developer", "100"],
    ["REGISTER", "Ashley", "10"],
    ["REGISTER", "Ashley", "25"],
    ["GET", "Ashley"],
    ["REGISTER", "Ashley", "40"],
    ["REGISTER", "Ashley", "67"],
    ["REGISTER", "Ashley", "100"],
    ["GET", "Ashley"],
    ["GET", "Marshal"],
    ["REGISTER", "Marshal", "130"],
]
result = solution(queries)
print(result)

['true', 'false', 'registered', 'registered', '15', 'registered', 'registered', 'registered', '42', '', 'invalid_request']


## Level 2

In [16]:
import typing as tp
from collections import defaultdict


def solution(queries) -> tp.List[str]:
    workers_ = {}
    work_time_ = defaultdict(list)

    def add_worker(worker_id, position, compensation):
        if worker_id in workers_:
            return "false"

        workers_[worker_id] = (position, int(compensation))
        return "true"

    def register(worker_id, timestamp):
        if worker_id not in workers_:
            return "invalid_request"

        if worker_id not in work_time_:
            work_time_[worker_id].append(("ENTER", int(timestamp)))
        elif work_time_[worker_id][-1][0] == "ENTER":
            work_time_[worker_id].append(("EXIT", int(timestamp)))
        else:
            work_time_[worker_id].append(("ENTER", int(timestamp)))

        return "registered"

    def get(worker_id):
        if worker_id not in workers_:
            return ""
        if worker_id not in work_time_:
            return ""

        total_time = 0
        history = work_time_[worker_id]
        for i in range(0, len(history), 2):
            if i == len(history) - 1:
                break

            total_time += history[i + 1][1] - history[i][1]
        return str(total_time)

    def top_n_workers(topk, position):
        candidates = [worker_id for worker_id, worker_info in workers_.items() if worker_info[0] == position]
        ranking = {worker_id: 0 if get(worker_id) == "" else int(get(worker_id)) for worker_id in candidates}
        ranking = sorted(ranking.items(), key=lambda item: (-item[1], item[0]))[: int(topk)]
        return ", ".join([f"{t[0]}({t[1]})" for t in ranking])

    result = []
    for q in queries:
        operation = q[0]

        if operation == "ADD_WORKER":
            result.append(add_worker(q[1], q[2], q[3]))
        elif operation == "REGISTER":
            result.append(register(q[1], q[2]))
        elif operation == "GET":
            result.append(get(q[1]))
        elif operation == "TOP_N_WORKERS":
            result.append(top_n_workers(q[1], q[2]))
    return result


queries = [
    ["ADD_WORKER", "Ashley", "Junior Developer", "120"],
    ["ADD_WORKER", "John", "Junior Developer", "120"],
    ["ADD_WORKER", "Jason", "Junior Developer", "120"],
    ["REGISTER", "John", "100"],
    ["REGISTER", "John", "150"],
    ["REGISTER", "Jason", "200"],
    ["REGISTER", "Jason", "250"],
    ["REGISTER", "Jason", "275"],
    ["TOP_N_WORKERS", "5", "Junior Developer"],
    ["TOP_N_WORKERS", "1", "Junior Developer"],
    ["REGISTER", "Ashley", "400"],
    ["REGISTER", "Ashley", "500"],
    ["REGISTER", "Jason", "575"],
    ["TOP_N_WORKERS", "3", "Junior Developer"],
    ["TOP_N_WORKERS", "3", "Middle Developer"],
]
result = solution(queries)
print(result)

['true', 'true', 'true', 'registered', 'registered', 'registered', 'registered', 'registered', 'Jason(50), John(50), Ashley(0)', 'Jason(50)', 'registered', 'registered', 'registered', 'Jason(350), Ashley(100), John(50)', '']


## Level 3

In [25]:
import typing as tp
from collections import defaultdict


def solution(queries) -> tp.List[str]:
    workers_ = {}
    work_time_ = defaultdict(list)
    pending_promotion_ = {}
    history_ = defaultdict(list)

    def add_worker(worker_id, position, compensation):
        if worker_id in workers_:
            return "false"

        workers_[worker_id] = (position, int(compensation))
        history_[worker_id].append((0, position, int(compensation)))
        return "true"

    def register(worker_id, timestamp):
        if worker_id not in workers_:
            return "invalid_request"

        if worker_id not in work_time_:
            work_time_[worker_id].append(("ENTER", int(timestamp)))
        elif work_time_[worker_id][-1][0] == "ENTER":
            work_time_[worker_id].append(("EXIT", int(timestamp)))
        else:
            work_time_[worker_id].append(("ENTER", int(timestamp)))

        # activate promotion
        if worker_id in pending_promotion_ and int(timestamp) >= pending_promotion_[worker_id][2]:
            new_position, new_compensation, _ = pending_promotion_[worker_id]
            workers_[worker_id] = (new_position, new_compensation)
            history_[worker_id].append((int(timestamp), new_position, new_compensation))
            del pending_promotion_[worker_id]

        return "registered"

    def get(worker_id):
        if worker_id not in workers_:
            return ""
        if worker_id not in work_time_:
            return ""

        intervals = _get_intervals(worker_id)
        return str(sum(interval[1] - interval[0] for interval in intervals))

    def top_n_workers(topk, position):
        candidates = [worker_id for worker_id, worker_info in workers_.items() if worker_info[0] == position]

        def get_new(worker_id):
            if worker_id not in workers_:
                return 0
            if worker_id not in work_time_:
                return 0

            intervals = _get_intervals(worker_id)
            start_timestamp = history_[worker_id][-1][0]

            total_time = 0
            for interval in intervals:
                if interval[1] <= start_timestamp:
                    continue
                total_time += interval[1] - max(start_timestamp, interval[0])
            return total_time

        ranking = {worker_id: get_new(worker_id) for worker_id in candidates}
        ranking = sorted(ranking.items(), key=lambda item: (-item[1], item[0]))[: int(topk)]
        return ", ".join([f"{t[0]}({t[1]})" for t in ranking])

    def promote(worker_id, new_position, new_compensation, start_timestamp):
        if worker_id not in workers_:
            return "invalid_request"

        if worker_id in pending_promotion_:
            return "invalid_request"

        pending_promotion_[worker_id] = (new_position, int(new_compensation), int(start_timestamp))
        return "success"

    def calc_salary(worker_id, start_timestamp, end_timestamp):
        if worker_id not in workers_:
            return ""

        if worker_id not in work_time_:
            return "0"

        intervals = _get_intervals(worker_id)

        total_compensation = 0
        start_timestamp = int(start_timestamp)
        end_timestamp = int(end_timestamp)
        for interval in intervals:
            if interval[0] >= int(end_timestamp):
                break
            if interval[1] <= int(start_timestamp):
                continue

            start = max(interval[0], start_timestamp)
            end = min(interval[1], end_timestamp)

            for i in range(len(history_[worker_id])):
                curr = history_[worker_id][i]
                if curr[0] >= end:
                    break
                if i < len(history_[worker_id]) - 1 and history_[worker_id][i + 1][0] <= start:
                    continue
                if i < len(history_[worker_id]) - 1:
                    next = history_[worker_id][i + 1]
                    total_compensation += curr[2] * (min(end, next[0]) - max(start, curr[0]))
                else:
                    total_compensation += curr[2] * (end - max(start, curr[0]))

        return str(total_compensation)

    def _get_intervals(worker_id):
        """Working time intervals."""
        intervals = []
        for i in range(0, len(work_time_[worker_id]), 2):
            if i == len(work_time_[worker_id]) - 1:
                break
            intervals.append((work_time_[worker_id][i][1], work_time_[worker_id][i + 1][1]))
        return intervals

    result = []
    for q in queries:
        operation = q[0]

        if operation == "ADD_WORKER":
            result.append(add_worker(q[1], q[2], q[3]))
        elif operation == "REGISTER":
            result.append(register(q[1], q[2]))
        elif operation == "GET":
            result.append(get(q[1]))
        elif operation == "TOP_N_WORKERS":
            result.append(top_n_workers(q[1], q[2]))
        elif operation == "PROMOTE":
            result.append(promote(q[1], q[2], q[3], q[4]))
        elif operation == "CALC_SALARY":
            result.append(calc_salary(q[1], q[2], q[3]))
    return result


queries = [
    ["ADD_WORKER", "John", "Middle Developer", "200"],
    ["REGISTER", "John", "100"],
    ["REGISTER", "John", "125"],
    ["PROMOTE", "John", "Senior Developer", "500", "200"],
    ["REGISTER", "John", "150"],
    ["PROMOTE", "John", "Senior Developer", "350", "250"],
    ["REGISTER", "John", "300"],
    ["REGISTER", "John", "325"],
    ["CALC_SALARY", "John", "0", "500"],
    ["TOP_N_WORKERS", "3", "Senior Developer"],
    ["REGISTER", "John", "400"],
    ["GET", "John"],
    ["TOP_N_WORKERS", "10", "Senior Developer"],
    ["TOP_N_WORKERS", "10", "Middle Developer"],
    ["CALC_SALARY", "John", "110", "350"],
    ["CALC_SALARY", "John", "900", "1400"],
]
result = solution(queries)
print(result)

['true', 'registered', 'registered', 'success', 'registered', 'invalid_request', 'registered', 'registered', '35000', 'John(0)', 'registered', '250', 'John(75)', '', '45500', '0']


## Level 4

In [28]:
import typing as tp
from collections import defaultdict


def solution(queries) -> tp.List[str]:
    workers_ = {}
    work_time_ = defaultdict(list)
    pending_promotion_ = {}
    history_ = defaultdict(list)
    double_pay_intervals = []

    def add_worker(worker_id, position, compensation):
        if worker_id in workers_:
            return "false"

        workers_[worker_id] = (position, int(compensation))
        history_[worker_id].append((0, position, int(compensation)))
        return "true"

    def register(worker_id, timestamp):
        if worker_id not in workers_:
            return "invalid_request"

        if worker_id not in work_time_:
            work_time_[worker_id].append(("ENTER", int(timestamp)))
        elif work_time_[worker_id][-1][0] == "ENTER":
            work_time_[worker_id].append(("EXIT", int(timestamp)))
        else:
            work_time_[worker_id].append(("ENTER", int(timestamp)))

        # activate promotion
        if worker_id in pending_promotion_ and int(timestamp) >= pending_promotion_[worker_id][2]:
            new_position, new_compensation, _ = pending_promotion_[worker_id]
            workers_[worker_id] = (new_position, new_compensation)
            history_[worker_id].append((int(timestamp), new_position, new_compensation))
            del pending_promotion_[worker_id]

        return "registered"

    def get(worker_id):
        if worker_id not in workers_:
            return ""
        if worker_id not in work_time_:
            return ""

        intervals = _get_intervals(worker_id)
        return str(sum(interval[1] - interval[0] for interval in intervals))

    def top_n_workers(topk, position):
        candidates = [worker_id for worker_id, worker_info in workers_.items() if worker_info[0] == position]

        def get_new(worker_id):
            if worker_id not in workers_:
                return 0
            if worker_id not in work_time_:
                return 0

            intervals = _get_intervals(worker_id)
            start_timestamp = history_[worker_id][-1][0]

            total_time = 0
            for interval in intervals:
                if interval[1] <= start_timestamp:
                    continue
                total_time += interval[1] - max(start_timestamp, interval[0])
            return total_time

        ranking = {worker_id: get_new(worker_id) for worker_id in candidates}
        ranking = sorted(ranking.items(), key=lambda item: (-item[1], item[0]))[: int(topk)]
        return ", ".join([f"{t[0]}({t[1]})" for t in ranking])

    def promote(worker_id, new_position, new_compensation, start_timestamp):
        if worker_id not in workers_:
            return "invalid_request"

        if worker_id in pending_promotion_:
            return "invalid_request"

        pending_promotion_[worker_id] = (new_position, int(new_compensation), int(start_timestamp))
        return "success"

    def calc_salary(worker_id, start_timestamp, end_timestamp):
        base_salary = _calc_salary(worker_id, start_timestamp, end_timestamp)
        additional_salaries = [_calc_salary(worker_id, interval[0], interval[1]) for interval in double_pay_intervals]
        if base_salary == "":
            is_all_empty = True
            for s in additional_salaries:
                if s != "":
                    is_all_empty = False
            if is_all_empty:
                return ""

        else:
            additional_salary = sum(0 if s == "" else int(s) for s in additional_salaries)
            base_salary = 0 if base_salary == "" else int(base_salary)
            return str(base_salary + additional_salary)

    def _calc_salary(worker_id, start_timestamp, end_timestamp):
        if worker_id not in workers_:
            return ""

        if worker_id not in work_time_:
            return "0"

        intervals = _get_intervals(worker_id)

        total_compensation = 0
        start_timestamp = int(start_timestamp)
        end_timestamp = int(end_timestamp)
        for interval in intervals:
            if interval[0] >= int(end_timestamp):
                break
            if interval[1] <= int(start_timestamp):
                continue

            start = max(interval[0], start_timestamp)
            end = min(interval[1], end_timestamp)

            for i in range(len(history_[worker_id])):
                curr = history_[worker_id][i]
                if curr[0] >= end:
                    break
                if i < len(history_[worker_id]) - 1 and history_[worker_id][i + 1][0] <= start:
                    continue
                if i < len(history_[worker_id]) - 1:
                    next = history_[worker_id][i + 1]
                    total_compensation += curr[2] * (min(end, next[0]) - max(start, curr[0]))
                else:
                    total_compensation += curr[2] * (end - max(start, curr[0]))

        return str(total_compensation)

    def _get_intervals(worker_id):
        """Working time intervals."""
        intervals = []
        for i in range(0, len(work_time_[worker_id]), 2):
            if i == len(work_time_[worker_id]) - 1:
                break
            intervals.append((work_time_[worker_id][i][1], work_time_[worker_id][i + 1][1]))
        return intervals

    def set_double_paid(start_timestamp, end_timestamp):
        nonlocal double_pay_intervals

        start_timestamp = int(start_timestamp)
        end_timestamp = int(end_timestamp)
        if not double_pay_intervals:
            double_pay_intervals.append((start_timestamp, end_timestamp))
        else:
            # insert intervals
            result = []
            curr = (start_timestamp, end_timestamp)
            idx = 0
            while idx < len(double_pay_intervals) and double_pay_intervals[idx][1] < curr[0]:
                result.append(double_pay_intervals[idx])
                idx = idx + 1
            while idx < len(double_pay_intervals) and curr[1] >= double_pay_intervals[idx][0]:
                curr = (min(curr[0], double_pay_intervals[idx][0]), max(curr[1], double_pay_intervals[idx][1]))
                idx = idx + 1
            result.append(curr)
            while idx < len(double_pay_intervals):
                result.append(double_pay_intervals[idx])
                idx = idx + 1
            double_pay_intervals = result

        return ""

    result = []
    for q in queries:
        operation = q[0]

        if operation == "ADD_WORKER":
            result.append(add_worker(q[1], q[2], q[3]))
        elif operation == "REGISTER":
            result.append(register(q[1], q[2]))
        elif operation == "GET":
            result.append(get(q[1]))
        elif operation == "TOP_N_WORKERS":
            result.append(top_n_workers(q[1], q[2]))
        elif operation == "PROMOTE":
            result.append(promote(q[1], q[2], q[3], q[4]))
        elif operation == "CALC_SALARY":
            result.append(calc_salary(q[1], q[2], q[3]))
        elif operation == "SET_DOUBLE_PAID":
            result.append(set_double_paid(q[1], q[2]))
    return result


queries = [
    ["ADD_WORKER", "John", "Middle Developer", "200"],
    ["REGISTER", "John", "100"],
    ["REGISTER", "John", "125"],
    ["PROMOTE", "John", "Senior Developer", "500", "200"],
    ["REGISTER", "John", "150"],
    ["PROMOTE", "John", "Senior Developer", "350", "250"],
    ["REGISTER", "John", "300"],
    ["REGISTER", "John", "325"],
    ["CALC_SALARY", "John", "0", "500"],
    ["TOP_N_WORKERS", "3", "Senior Developer"],
    ["REGISTER", "John", "400"],
    ["GET", "John"],
    ["TOP_N_WORKERS", "10", "Senior Developer"],
    ["TOP_N_WORKERS", "10", "Middle Developer"],
    ["CALC_SALARY", "John", "110", "350"],
    ["CALC_SALARY", "John", "900", "1400"],
    ["SET_DOUBLE_PAID", "110", "160"],
    ["CALC_SALARY", "John", "110", "350"],
]
result = solution(queries)
print(result)

['true', 'registered', 'registered', 'success', 'registered', 'invalid_request', 'registered', 'registered', '35000', 'John(0)', 'registered', '250', 'John(75)', '', '45500', '0', '', '50500']
