## Programming Assignment: Flash

In [None]:
#Write class Flash for modeling work with USB flash drive. Each Flash has limited capacity. 
#The class should only allow to write files. The class constructor has a parameter capacity.

# #Implement method write.
# First, it must check that the flash drive has enough memory to write this file, 
# otherwise it need to throw the ValueError exception. 
# If there is enough free memory, you need to write the file.

In [55]:
class Flash:
    def __init__(self, capacity):
        self.capacity = capacity
        self.size = 0

    def write(self, v):
        if v + self.size > self.capacity:
            raise ValueError
        else:
            self.size += v            

In [56]:
f = Flash(15)
f.write(12)

## Buffer

In [11]:
# Implement class Buffer, which accumulates the elements of a sequence and 
# prints the sum of the elements when it is full. 

# The class parameter is the maximum size of the buffer.
# Once the buffer is filled, you should print the sum of elements in the buffer and empty it.

# Elements are added to the buffer using the method add, which takes a sequence of elements, 
# the number of received elements is unknown and can be larger than the buffer size. 
# Also implement the method get_current_part, which returns the current buffer state.

In [64]:
class Buffer:
    def __init__(self, max_size):
        self.buf = []
        self.max_size = max_size

    def add(self, *a):
        self.buf.extend(a)
        while len(self.buf) >= self.max_size:
            print(sum(self.buf[:self.max_size]))
            self.buf = self.buf[self.max_size:]

    def get_current_part(self):
        return self.buf

In [65]:
buf = Buffer(max_size=5)
buf.add(1, 2, 3)
buf.get_current_part()  # return [1, 2, 3]
buf.add(4, 5, 6)  # print(15) – print sum of elements [1, 2, 3, 4, 5]
buf.get_current_part() # return [6]
buf.add(7, 8, 9, 10)  # print(40)
buf.get_current_part()  # return []
buf.add(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1)  # print(5), print(5)
buf.get_current_part()  # return [1]

15
40
5
5


[1]

## Queue on 2 stacks

In [None]:
# There is an abstract data type Stack that implements LIFO contract. 
# The stack has methods push and pop - 
# add an element to the end of the stack and pick up an element at the end of the stack. 
# In Python type list implements stack: 
# push method is replaced by append method, pop method remains the same.

# There is an another abstract data type Queue that implements FIFO contract. 
# The queue also has methods push and pop. 
# However, method push adds an element to the end of the queue,
# and method pop extracts the element from the beginning of the queue. 
# Using list in this case is not very good because method pop(index=0) method
# works over a linear time, but we want the push and pop methods to work over O(1)O(1).

# In this task, you need to implement a queue on two stacks.
# One stack is used only for writing elements, 
# and the second stack is used only for extracting elements. 
# If the method pop is called for an empty queue throw an exception IndexError 
# with the message 'pop from an empty queue'.

In [126]:
class Queue:
    def __init__(self):
        self.stack = []
    
    def push(self, x):
        self.stack.append(x)
    def show_stack(self):
        return self.stack
    def pop(self):
        if len(self.stack) == 0: 
            raise IndexError("pop from an empty queue")
        pop_item = self.stack[0]
        self.stack = self.stack[1:]
        return pop_item

In [98]:
queue = Queue()
#queue.pop()  # IndexError('pop from an empty queue')

queue.push(3)
queue.push(5)
queue.push(7)
queue.push(9)

queue.show_stack()
queue.pop()  # return 3
queue.pop()  # return 5
queue.pop()  # return 7
queue.pop()  # return 9
queue.pop()  # IndexError('pop from an empty queue')


3

## CountVectorizer

In [None]:
# Most traditional machine learning algorithms work with structured data
# (numbers, categories, boolean data).
# But how to work with sequences? We can vectorize this sequence: 
# cut the sequence into pieces and increment the value on the corresponding index in the vector.
# In this task, you need to write a class for vectorizing strings. 
# The work of the class will be tested on genomic chains.

# An important parameter is ngram_size.
# It is responsible for what size we need to take the pieces of the string, will call them tokens.
# For ngram_size = 44: 'TTTCTGCCA' \rightarrow→ ['TTTC', 'TTCT', 'TCTG', 'CTGC', 'TGCC', 'GCCA']. 
# To encode tokens into indexes, first sort them in lexicographic order, 
# and then give integer indexes starting from zero in the same order.
# Tokens can appear more than once in one row, so we will count the total number of each token 
# in this row. The list of strings is usually called a corpus. 
# The dictionary for converting tokens to indexes is built for the entire corpus. 
# Example of transformation:

In [16]:
from collections import OrderedDict 
class CountVectorizer:
    """
    __init__ − take the parameter ngram_size and save it as an attribute of the class;
    fit − build a dictionary "token to index" from the input corpus 
          and save it as an attribute of the class;
    transform − transform a new corpus based on a saved dictionary, should return a list of lists. 
                If some token from the new corpus is not represented in the dictionary,
                then you need to ignore it;
    fit_transform − fit and transform on the same corpus, should return a list of lists.

    """
    def __init__(self, ngram_size):
        self.ngram_size = ngram_size
        
        self.vocab = OrderedDict()
        
    def fit(self, corpus):
        corpus = sorted(corpus)
        token_set = set()
        for s in corpus:
            for i in range(self.ngram_size, len(s) + 1):
                token = s[i - self.ngram_size : i]
                token_set.add(token)
        
        for idx, val in enumerate(sorted(token_set)):
            self.vocab[val] = idx
        
    def transform(self, corpus):
        transformation = []
        for s in corpus:
            temp_list = [0 for x in range(len(self.vocab))]
            for i in range(self.ngram_size, len(s) + 1):
                token = s[i - self.ngram_size : i]
                if token in self.vocab: 
                    idx = self.vocab[token]
                    temp_list[idx] += 1
            transformation.append(temp_list)
        return transformation
    
    def fit_transform(self, corpus):
        self.vocab = OrderedDict()
        self.fit(corpus)
        return self.transform(corpus)

In [17]:
corpus = [
    'AATACAT',  # 'AA', 'AT', 'TA', 'AC', 'CA', 'AT'
    'CTACCCT',  # 'CT', 'TA', 'AC', 'CC', 'CC', 'CT'
    'TACCTAC',  # 'TA', 'AC', 'CC', 'CT', 'TA', 'AC'
]

correct_transformation = [
    [1, 1, 2, 1, 0, 0, 1],
    [0, 1, 0, 0, 2, 2, 1],
    [0, 2, 0, 0, 1, 1, 2],
]

# case 1
vectorizer = CountVectorizer(ngram_size=2)
vectorizer.fit(corpus)
vectorizer.transform(corpus) == correct_transformation  # True

True

In [18]:
vectorizer = CountVectorizer(ngram_size=2)
vectorizer.fit_transform(corpus) == correct_transformation  # True

True

In [19]:
# case 3
corpus_2 = ['TCAATCAC', 'GGGGGGGGGGG', 'AAAA']
vectorizer = CountVectorizer(ngram_size=2)
vectorizer.fit(corpus)
vectorizer.transform(corpus_2) == [
    [1, 1, 1, 2, 0, 0, 0],
    [0, 0, 0, 0, 0, 0, 0],
    [3, 0, 0, 0, 0, 0, 0]
]  # True

True

## Optional practical tasks

In [None]:
# Rules:

# These tasks are optional. They do not affect your grade. You can solve any number of tasks.
# Don't share your solutions with other students.
# No grader or testing system. You can share your test cases for tasks. 
# Do it publicly for a greater number of students. Corresponding thread on the forum.
# Send your solutions as a single zip archive to mds-support@hse.ru until April 25, 10 AM MSK 
# with topic Python_Advanced_optional_tasks. 
# Name your files by the template {task_number}_{your_name}.py and pack them to one zip archive.
# We will not do a personal code review. We will collect common mistakes and analyze them. 
# We will also show you our solution and the solution process on the webinar on April 25, 5 PM MSK.
# Try to write clear and efficient code.
# Have fun!

### 1. Mean

In [9]:
def mean(array):
    # for an empty array return nan (use the module math)
    return sum(array)/len(array)

assert mean([1.0, 1.0, 1.0, 1.0]) == 1.0
assert mean([0.0, 1.0, 2.0, 3.0, 4.0, 5.0]) == 2.5

### 2. STD

In [10]:
def std(array):
    mean_val = mean(array)
    n = len(array)
    return np.sqrt(sum([1/n * (x - mean_val)**2 for x in array]))

# Examples:
assert std([1.0, 1.0, 1.0, 1.0]) == 0.0
assert round(std([0.0, 1.0, 2.0, 3.0, 4.0, 5.0]), 6) == 1.707825

### 3. Remove consecutive duplicates

In [17]:
def remove_consecutive_duplicates(array):
    res = []
    i = 0
    while i < len(array):
        res.append(array[i])
        while i + 1 < len(array) and array[i+1] == array[i]:
            i += 1
        i += 1
    return res
            

# Examples:
assert remove_consecutive_duplicates([1, 1, 2, 2]) == [1, 2]
assert remove_consecutive_duplicates([1, 1, 2, 2, 1, 1]) == [1, 2, 1]

 ### 4. RLE. Implement run-length encoding for an array of integers.

In [35]:
from collections import OrderedDict 
def run_length_encode(array):
    word_count_dict = {}
    for x in array: 
        word_count_dict[x] = word_count_dict.setdefault(x, 0) + 1
    
    return sorted(list(word_count_dict.items()), key=lambda x: x[0])
    
run_length_encode([1,2,3])
# Examples:
assert run_length_encode([1, 1, 1, 1]) == [(1, 4)]
assert run_length_encode([1, 3, 3, 3, 4, 4, 5]) == [(1, 1), (3, 3), (4, 2), (5, 1)]

### 5. JOIN. 

In [112]:
lst1 = [[1, 2],
        [3, 4],
        [5, 6]]
lst2 = [[1, 3],
        [3, 5],
        [7, 9]]
method = "INNER" #left # Right 

### 6. Max session count.

In [114]:
# You are given a list of user session.
# Every session is described with two integers: the time, when the session started and 
# the time, when the session ended. 
# Your task is to find the maximum number of sessions that were active simultaneously.

In [121]:
def count_simultaneous(sessions):
    sessions.sort()

    start = sessions[0][0]
    end = sessions[0][1]
    cnt = 0
    for session in sessions:
        if session[0] < end :
            end = min(session[1], end)
            cnt += 1
    return cnt

In [122]:
sessions = [[3, 4],
            [1, 6],
            [0, 7]]
# cnt = 3|
count_simultaneous(sessions)

3

In [123]:
sessions = [[3, 4],
            [1, 2],
            [0, 7]]
# cnt = 2
count_simultaneous(sessions)

2

In [124]:
sessions = [[4, 5],
            [0, 3],
            [1, 9],
            [7, 8],
            [2 ,6]]
# cnt = 3
count_simultaneous(sessions)

3

In [125]:
sessions = [[3, 4],
            [1, 2],
            [0, 7]]
# cnt = 2
count_simultaneous(sessions)

2