In [1]:
import math
import filecmp

In [2]:
#Invert a byte
def invert_byte(value):
    new_value = 0
    # Iterate over each bit in the byte
    for i in range(0, 8):
        # Create a mask with a 1 in the current bit position
        mask = (1 << i)
        # If the current bit is 0, set the corresponding bit in the new value to 1
        if (mask & value) == 0:
            new_value |= (1 << i);
    # Return the new val with inverted bits   
    return new_value



# Get the index of first zero in the byte by counting  from left to right
def firstIndexZero(byte, start_indx):
    # Invert the byte, so that 0 bits become 1 bits and vice versa
    invByte = invert_byte(byte)
    for i in range(0, 8-start_indx):
        mask = 1 << (7 - i - start_indx)
        # If the current bit is 0, return its index
        if (invByte) & mask != 0:
            return i + start_indx
    # If no zero bits are found, return -1    
    return -1



# Gives the bit mask containing totalBits with zeros on the left.
def zeroBitsLeft(totalBits):
    mask = 0
    for i in range(0, 8-totalBits):
        mask |= (1 << i)
    return mask



# Gives bit mask that has totalBits of 1s on the right side of the byte
def mask_right_1sBits(totalBits):
    mask = 0
    for i in range(0, totalBits):
        mask |= (1 << i)
    return mask



# Gives the no of bits in a byte
def byte_length(value):
    length = 0
    while(value > 0):
        length += 1
        value >>= 1
    return length

The below code implements the Rice encoding and decoding algorithm. The Rice encoding is a lossless data compression algorithm used to compress integer values. The algorithm divides the input integer value into two parts, quotient and remainder, and then encodes these two parts separately using a prefix code.

In [3]:
def encode_rice_algo(val, k):
    #This line initializes the value of 'm' as 2 raised to the power of 'k'.
    m = math.pow(2, k)
    
    #These lines compute the quotient and remainder parts of the input 'value'
    qnt =  int(math.floor(val/m))
    rmndr = int(val % m)
    
    # concatenate a set of ones followed by a zero
    qntCode = ('1' * qnt) + '0'
    rmndrCode = format(rmndr, f'0{k}b')
    
    #These lines encode the quotient and remainder parts into binary strings. 
    #For the quotient, a string of '1's is concatenated 'quotient' number of times, followed by a '0'. 
    #For the remainder, the 'format' function is used to format it into a binary string with 'k' digits, 
    #padded with leading zeros.
    Encoded_Value = qntCode + rmndrCode
    return Encoded_Value
    

# Decode a value using Rice decoding algorithm
def decode_rice_algo(val, k):
    #This line initializes the value of 'm' as 2 raised to the power of 'k'.
    m = math.pow(2, k)
    
    #These lines compute the quotient and remainder parts of the input 'value'
    qnt2 = val >> k + 1
    qnt = byte_length(qnt2)
      
    #It calculates the remainder by performing a bitwise AND operation between value and mask    
    mask = mask_right_1sBits(k)
    rmndr = val & mask
    
    Decoded_Value = int(qnt * m + rmndr)
    return Decoded_Value



In [4]:
# Encode a file using the Rice encoding algorithm
def file_encode_rice_algo(source, destination, k):
    #Read from source byte by byte.
    #Encode each byte. The encoded value can be longer or shorter than a byte.
    #If it is longer, split it in chunks one-byte long and add it to the buffer
    #If it is shorter, packs it into one byte and fill the remaining bits with the next chunk
    
    # set buffer size to 256KB
    buffer_size = 262144
    
    # initialize output buffer to empty byte array
    Out_Buffer = bytearray()
    
    # initialize data byte and bits left counters
    Data_Byte = 0
    Bits_Left = 8

    
    # open source file in binary mode
    with open(source, "rb") as srcStream:
        # open destination file in write binary mode
        with open(destination, "wb") as destStream:

            # read data from source file into input buffer
            bufferInput = srcStream.read(buffer_size)
            # loop until all data has been read
            while len(bufferInput):
                # process input buffer byte by byte
                for srcByte in bufferInput:

                    # encode input byte using Rice encoding with parameter k
                    EncSbyte = encode_rice_algo(srcByte, k)
                    # compute length of encoded byte sequence
                    Length_EncByte = len(EncSbyte)

                     # initialize index to 0
                    index = 0
                    
                   # process bits of encoded byte sequence
                    while(index < Length_EncByte):
                        # compute next index to process
                        Nxt_Index = min(index+Bits_Left, Length_EncByte)
                        # decrement bits left counter
                        Bits_Left -= (Nxt_Index - index)
                        
                        # shift encoded bits to the left and OR with data byte
                        Data_Byte |= (int(EncSbyte[index:Nxt_Index], 2) << Bits_Left)

                         # if data byte is full
                        if(Bits_Left == 0):
                            # append data byte to output buffer
                            Out_Buffer.append(Data_Byte)

                           # reset data byte and bits left counters
                            Data_Byte = 0
                            Bits_Left = 8
                            
                            # if output buffer is full, write to destination file
                            if(len(Out_Buffer) >= buffer_size):
                                destStream.write(Out_Buffer)
                                Out_Buffer = bytearray()
                    
                        # update index to next index
                        index = Nxt_Index
                # read more data into input buffer        
                bufferInput = srcStream.read(buffer_size)
            # if there are any remaining bits in the data byte, append to output buffer
            if(Bits_Left != 8):
                Out_Buffer.append(Data_Byte)
                
            # write remaining output buffer to destination file
            destStream.write(Out_Buffer)

In [5]:
# Decode a file using Rice decoding algorithm            
def file_decode_rice_algo(source, destination, k):

    # Set the buffer size
    buffer_size = 262144
    
    # Initialize the output and byte buffers
    Out_Buffer = bytearray()
    bufferByte = bytearray()
    
    
    # Initialize the start index and shift value for decoding
    Str_Index = 0 
    shift = 0
    
    # Initialize the current and next bytes to read from the source stream
    srcByte = None
    with open(source, "rb") as srcStream:
        with open(destination, "wb") as destStream:
            
            # Read the first byte from the source stream
            srcByte = srcStream.read(1)
            # Read the second byte from the source stream
            nextByte = srcStream.read(1)

            # Loop over the bytes in the source stream until there are no more bytes
            while len(srcByte) > 0: 
                
                # Get the index of the first zero bit in the current byte
                indexWithZero = firstIndexZero(srcByte[0],  Str_Index)
                
                # If there is no zero bit in the current byte, add the byte to the byte buffer and read the next byte
                if indexWithZero == -1:
                    bufferByte.append(srcByte[0])
                    srcByte = nextByte
                    nextByte = srcStream.read(1)
                    # Reset the start index to zero and continue with the next iteration of the loop
                    Str_Index = 0;
                    continue
                
                # If the index of the first zero bit plus k is greater than or equal to 8,
                # add the byte to the byte buffer and read the next byte
                if indexWithZero + k >= 8:
                    bufferByte.append(srcByte[0])
                    srcByte = nextByte
                    nextByte = srcStream.read(1)

                    # Set the shift value based on the index of the first zero bit plus k
                    shift = (8 + 7 - (indexWithZero + k))
                else:
                    # Set the shift value based on the index of the first zero bit plus k 
                    shift = 7 - (indexWithZero + k)

                # Convert the current byte to an integer
                srcByte_Int = int.from_bytes(srcByte, "big")
                
                # Create a mask to zero out the leftmost shift bits of the current byte
                zero_bits_mask = zeroBitsLeft(8-shift)

                # Zero out the leftmost shift bits of the current byte and append the result to the byte buffer
                remain_bytes = srcByte_Int & zero_bits_mask
                bufferByte.append(srcByte_Int)
                     
               # Loop over the bytes in the byte buffer in reverse order
                for i in range(len(bufferByte)-1, -1, -1):
                    if(i < len(bufferByte)-1):
                        # Extract the leftmost shift bits of the current byte and shift them to the right position in the next byte
                        
                        extract = bufferByte[i] & zero_bits_mask
                        extract = extract << (8 - shift)
                        bufferByte[i+1] |= extract

                    # Shift the current byte to the right by the shift value
                    bufferByte[i] >>= shift

                # Convert the byte buffer to an integer and decode it using the RiceDecode function
                Val_To_Decipher = int.from_bytes(bufferByte, "big")
                Val_after_decypher = decode_rice_algo(Val_To_Decipher, k)
                
                # Append the decoded value to the output buffer
                Out_Buffer.append(Val_after_decypher)
                
                #clean temporary bytearray
                bufferByte = bytearray()
                
                #If the remaining byte is 0 and there are no more bytes to read,
                #break out of the loop since there are no more bits to decipher
                if(remain_bytes == 0 and len(nextByte) == 0):
                     break
                        
                #Append the remaining byte to the byte buffer
                bufferByte.append(remain_bytes)
                #Set the start index for the next byte to the remaining bits
                Str_Index = 8-shift
                    
                #If the byte buffer has bytes in it, set the current byte to be the byte buffer 
                #and reset the byte buffer to an empty byte array
                if(len(bufferByte) != 0):
                    srcByte = bufferByte
                    bufferByte = bytearray()
                #Otherwise, set the current byte to the next byte in the input stream 
                #and read the next byte from the input stream    
                else:
                    srcByte = nextByte
                    nextByte = srcStream.read(1)

                #If the output buffer has reached its maximum size, write it to the output stream
                if(len(Out_Buffer) >= buffer_size):
                    destStream.write(Out_Buffer)
                    Out_Buffer = bytearray()


            #Write the output buffer to the output stream after all the bytes have been processed
            destStream.write(Out_Buffer)

            #Close the output stream
            destStream.close()            

In [6]:
originalFile = "Sound1.wav"
encodedFile = "Sound1_Enc.ex2"
decodedFile = "Sound1_Enc_Dec.wav"


import filecmp

k = 2


print(f'Testing with K = {k}')

print('Encoding...')
file_encode_rice_algo(originalFile, encodedFile, k)
print("Done")

print('Decoding...')
file_decode_rice_algo(encodedFile, decodedFile, k)
print("Done")

comp = filecmp.cmp(originalFile, decodedFile)

print(f'Comparing original file and decoded file. Are they equal? {comp}\n\n')

Testing with K = 2
Encoding...
Done
Decoding...
Done
Comparing original file and decoded file. Are they equal? False




In [7]:
originalFile = "Sound1.wav"
encodedFile = "Sound1_Enc.ex2"
decodedFile = "Sound1_Enc_Dec.wav"


import filecmp

k = 4


print(f'Testing with K = {k}')

print('Encoding...')
file_encode_rice_algo(originalFile, encodedFile, k)
print("Done")

print('Decoding...')
file_decode_rice_algo(encodedFile, decodedFile, k)
print("Done")

comp = filecmp.cmp(originalFile, decodedFile)

print(f'Comparing original file and decoded file. Are they equal? {comp}\n\n')

Testing with K = 4
Encoding...
Done
Decoding...
Done
Comparing original file and decoded file. Are they equal? True




In [8]:
originalFile = "Sound2.wav"
encodedFile = "Sound2_Enc.ex2"
decodedFile = "Sound2_Enc_Dec.wav"


import filecmp

k = 2


print(f'Testing with K = {k}')

print('Encoding...')
file_encode_rice_algo(originalFile, encodedFile, k)
print("Done")

print('Decoding...')
file_decode_rice_algo(encodedFile, decodedFile, k)
print("Done")

comp = filecmp.cmp(originalFile, decodedFile)

print(f'Comparing original file and decoded file. Are they equal? {comp}\n\n')

Testing with K = 2
Encoding...
Done
Decoding...
Done
Comparing original file and decoded file. Are they equal? True




In [9]:
originalFile = "Sound2.wav"
encodedFile = "Sound2_Enc.ex2"
decodedFile = "Sound2_Enc_Dec.wav"


import filecmp

k = 4


print(f'Testing with K = {k}')

print('Encoding...')
file_encode_rice_algo(originalFile, encodedFile, k)
print("Done")

print('Decoding...')
file_decode_rice_algo(encodedFile, decodedFile, k)
print("Done")

comp = filecmp.cmp(originalFile, decodedFile)

print(f'Comparing original file and decoded file. Are they equal? {comp}\n\n')

Testing with K = 4
Encoding...
Done
Decoding...
Done
Comparing original file and decoded file. Are they equal? True


