# ELEC0138 Security and Privacy Assignment
## Group 11 Demo

The following notebook contains the demo code for the assignmet project. 

## Step 0a) Install dependencies
Run the following code block in order to install the required dependencies for the project.

In [1]:
!pip install praat-parselmouth
!pip install pycryptodome



Collecting praat-parselmouth
  Downloading praat_parselmouth-0.4.3-cp39-cp39-win_amd64.whl (8.9 MB)
     ---------------------------------------- 0.0/8.9 MB ? eta -:--:--
     ----- ---------------------------------- 1.3/8.9 MB 27.7 MB/s eta 0:00:01
     ------------- -------------------------- 3.0/8.9 MB 31.8 MB/s eta 0:00:01
     ------------------ --------------------- 4.2/8.9 MB 30.2 MB/s eta 0:00:01
     ------------------------- -------------- 5.7/8.9 MB 30.3 MB/s eta 0:00:01
     -------------------------------- ------- 7.2/8.9 MB 30.5 MB/s eta 0:00:01
     -------------------------------------- - 8.6/8.9 MB 30.5 MB/s eta 0:00:01
     ---------------------------------------  8.9/8.9 MB 31.7 MB/s eta 0:00:01
     ---------------------------------------- 8.9/8.9 MB 26.0 MB/s eta 0:00:00
Collecting numpy>=1.7.0
  Downloading numpy-1.24.2-cp39-cp39-win_amd64.whl (14.9 MB)
     ---------------------------------------- 0.0/14.9 MB ? eta -:--:--
     --- -------------------------------

In [6]:
import os
import time
import hashlib

import parselmouth
from parselmouth.praat import call

from pathlib import Path
from Crypto.PublicKey import RSA
from Crypto.Util.Padding import pad
from Crypto.Cipher import AES, PKCS1_OAEP
from Crypto.Random import get_random_bytes

## Step 0b) Define constants

In [3]:
RAW_AUDIO_DIR = './demo/raw'
TRANSFORMED_AUDIO_DIR = './demo/transformed'
ENCRYPTED_AUDIO_DIR = ''
DECRYPTED_AUDO_DIR = ''

## Step 1) Transform the audio files
The transformation code uses the ParselMouth library which uses Praat functions in order to modify and transform the audio files. The raw audio files are read from the source directory and stores them in the output directory. 

In [19]:
def __transform_audio(snd):
    """Transforms the provided audio file

    Args:
        snd (parselmouth.Sound): sound to be transformed

    Returns:
        parselmouth.Sound: the transformed sound
    """
    pitch_shift = 60
    formant_shift_factor = 1.5 
    
    pitch = snd.to_pitch()
    medain_pitch = call(pitch, "Get quantile", 0, 0, 0.5, "Hertz")
    new_pitch = medain_pitch + pitch_shift
    transformed_snd = call(snd, "Change gender", 100, 500, formant_shift_factor, new_pitch, 1, 1)

    return transformed_snd


def transform(data_dir, output_dir):
    """Transforms all the audio files from the raw dataset
    """
    print('=> Transforming raw audio files...')
    
    raw_files = set()
    for dir_, _, files in os.walk(data_dir):
        for file_name in files:
            if file_name.endswith('.flac'):
                rel_dir = os.path.relpath(dir_, data_dir)
                rel_file = os.path.join(rel_dir, file_name)
                raw_files.add(rel_file)

    # Check if any raw files need to be processed
    if len(raw_files) == 0:
        print('\tUnable to find any raw files. Skipping this step.')
    else:
        print(f'\tTransforming {len(raw_files)} audio files...')

        # Transform the all the audio files
        for idx, file_sub_path in enumerate(raw_files):

            raw_file_path = os.path.join(data_dir, file_sub_path).replace('\\', '/')
            file_name = os.path.basename(file_sub_path)
            subdirs = os.path.dirname(file_sub_path)
            
            output_file_name = output_dir + '/transformed_' + file_name

            snd = parselmouth.Sound(raw_file_path)
            transformed_snd = __transform_audio(snd)
            print(output_file_name)
            transformed_snd.save(output_file_name, parselmouth.SoundFileFormat.FLAC)

        print('\tSuccessfully transformed audio files.')

In [20]:
transform(RAW_AUDIO_DIR, TRANSFORMED_AUDIO_DIR)

=> Transforming raw audio files...
	Transforming 10 audio files...
./demo/transformed/transformed_1272-128104-0000.flac


PraatError: Cannot create file “C:\Users\yslon\Desktop\ELEC0138_22-23_Assignment\.\demo\transformed\transformed_1272-128104-0000.flac”.
Sound "untitled_changeGender": not written to 16-bit sound file “C:\Users\yslon\Desktop\ELEC0138_22-23_Assignment\.\demo\transformed\transformed_1272-128104-0000.flac”.

## Step 2) Encrypt and decrypt the audio files

In [None]:
key_dir = './keys'
audio_dir = 'demo-test'

In [None]:
import os
import time
import hashlib

from pathlib import Path
from Crypto.PublicKey import RSA
from Crypto.Util.Padding import pad
from Crypto.Cipher import AES, PKCS1_OAEP
from Crypto.Random import get_random_bytes

### Step 2.1) Generate AES and RSA keys and save to local files

In [None]:
AES_key = get_random_bytes(32) #32 bytes (256) or 16 bytes (128)
RSA_key = RSA.generate(2048)

In [None]:
# Save key
if not os.path.exists(key_dir):
        os.makedirs(key_dir)

with open(os.path.join(key_dir, 'AES_key.txt'), 'wb') as f:
    f.write(AES_key)

with open(os.path.join(key_dir, 'public_key.txt'), 'wb') as f:
    f.write(RSA_key.publickey().export_key())

with open(os.path.join(key_dir, 'private_key.txt'), 'wb') as f:
    f.write(RSA_key.export_key())

### Step 2.2) Encrypt the audio files and the AES key
The audio files are encrypted with AES, and the AES key is encrypted with the public key of RSA

In [None]:
def copy_folders(src_folder, dest_folder):
    for item in os.listdir(src_folder):
        if os.path.isdir(os.path.join(src_folder, item)):
            # Copy folders
            os.makedirs(os.path.join(dest_folder, item), exist_ok =  True)
            # Copy subfolders
            copy_folders(os.path.join(src_folder, item), os.path.join(dest_folder, item))

In [None]:
def encrypt_file(key_dir, data_dir):
    if not os.path.exists(key_dir):
        print('Cannot find key')
        
    else:
        with open(os.path.join(key_dir, 'AES_key.txt'), 'rb') as f:
            key = f.read()
        with open(os.path.join(key_dir, 'public_key.txt'), 'rb') as f:
            public_key = RSA.import_key(f.read())
    
        i = 1
        BLOCK_SIZE = 128
        prev_dir_path = ''
        chunksize = 1000 * 1024
        cipher = PKCS1_OAEP.new(public_key)

        filepaths = list(Path(data_dir).glob(r'**/*.flac'))
        print('Detected ' + str(len(filepaths)) + ' audio files in ' + os.path.abspath(data_dir))

        outfile_dir = data_dir + '_encrypted'
        if not os.path.exists(outfile_dir):
            os.makedirs(outfile_dir)
        print('Encrypted audio file is saved in', os.path.abspath(outfile_dir))

        copy_folders(data_dir, outfile_dir)
        
        start_time = time.time()

        for filename in filepaths:

            output_file = os.path.join(outfile_dir, os.path.splitext(str(filename))[0] + '_AES.enc')
            output_file = output_file.replace(data_dir + '\\', '')

            dir_path = os.path.dirname(output_file)
            if dir_path != prev_dir_path:
                print('\nFor files in', os.path.abspath(dir_path))
                prev_dir_path = dir_path

            iv = get_random_bytes(AES.block_size)
            encryptor = AES.new(key, AES.MODE_CBC, iv)
            filesize = os.path.getsize(str(filename))

            with open(str(filename), 'rb') as f:
                chunk = f.read(chunksize)

                with open(output_file, 'wb') as f:
                    f.write(filesize.to_bytes(8, 'big'))
                    f.write(iv)
                    chunk = pad(chunk, AES.block_size)
                    f.write(encryptor.encrypt(chunk))
                    end_time_in = time.time()
                    print(str(i) + ' ' + os.path.basename(str(filename)) + ' encrypted as ' 
                          + os.path.basename(output_file) + ' %.2f' % (end_time_in - start_time) + 's')
                    i += 1
        
        # Encrypt AES key
        with open(os.path.join(key_dir, 'AES_key.txt'), 'rb') as f:
            plaintext = f.read()
        ciphertext = b''
        for i in range(0, len(plaintext), BLOCK_SIZE):
            block = plaintext[i : i + BLOCK_SIZE]
            ciphertext += cipher.encrypt(block)
        with open(os.path.join(key_dir, 'AES_key.enc'), 'wb') as f:
            f.write(ciphertext)
        print('\nAES key is encrypted as AES_key.enc and saved in', os.path.abspath(os.path.join(key_dir, 'AES_key.enc')))
        
        end_time = time.time()
        
        print('\nTime for encrypting ' + str(len(filepaths)) + ' files is %.2f' % (end_time - start_time) + 's')

In [None]:
encrypt_file(key_dir, audio_dir)

### Step 2.3) Decrypt the AES key and the audio files
The AES key is decrypted with the private key of RSA and the audio files are decrypted bwith the AES key

In [None]:
def decrypt_file(key_dir, data_dir):
    if not os.path.exists(key_dir):
        print('Cannot find key')
        
    else:
        with open(os.path.join(key_dir, 'private_key.txt'), 'rb') as f:
            private_key = RSA.import_key(f.read())
    
        BLOCK_SIZE = 128
        prev_dir_path = ''
        chunksize = 1000 * 1024
        cipher = PKCS1_OAEP.new(private_key)
        
        with open(os.path.join(key_dir, 'AES_key.enc'), 'rb') as f:
            ciphertext = f.read()
        plaintext = b''
        for i in range(0, len(ciphertext), private_key.size_in_bytes()):
            block = ciphertext[i : i + private_key.size_in_bytes()]
            plaintext += cipher.decrypt(block)
        with open(os.path.join(key_dir, 'AES_key_dec.txt'), 'wb') as f:
            f.write(plaintext)
        print('Decrypted AES key AES_key_dec.txt is saved in', os.path.abspath(os.path.join(key_dir, 'AES_key_dec.txt')))
        with open(os.path.join(key_dir, 'AES_key_dec.txt'), 'rb') as f:
            key = f.read()

        filepaths = list(Path(data_dir).glob(r'**/*.enc'))
        print('\nDetected ' + str(len(filepaths)) + ' encrypted audio files in ' + os.path.abspath(data_dir))

        outfile_dir = data_dir.replace('encrypted', 'decrypted')
        if not os.path.exists(outfile_dir):
            os.makedirs(outfile_dir)
        print('Decrypted audio file is saved in', os.path.abspath(outfile_dir))

        copy_folders(data_dir, outfile_dir)
        
        start_time = time.time()

        i = 1
        
        for filename in filepaths:

            output_file = os.path.join(outfile_dir, os.path.splitext(str(filename))[0] + '.flac')
            output_file = output_file.replace(data_dir + '\\', '')

            dir_path = os.path.dirname(output_file)
            if dir_path != prev_dir_path:
                print('\nFor files in', os.path.abspath(dir_path))
                prev_dir_path = dir_path

            with open(str(filename), 'rb') as f:
                filesize = int.from_bytes(f.read(8), 'big')
                iv = f.read(AES.block_size)
                decryptor = AES.new(key, AES.MODE_CBC, iv)
                chunk = f.read(chunksize)

                with open(output_file, 'wb') as f:
                    f.write(decryptor.decrypt(chunk))
                    f.truncate(filesize)
                    end_time_in = time.time()
                    print(str(i) + ' ' + os.path.basename(str(filename)) + ' decrypted as ' 
                          + os.path.basename(output_file) + ' %.2f' % (end_time_in - start_time) + 's')
                    i += 1
                
        end_time = time.time()
        
        print('\nTime for decrypting ' + str(len(filepaths)) + ' files is %.2f' % (end_time - start_time) + 's')

In [None]:
decrypt_file(key_dir, audio_dir + '_encrypted')

### Step 2.4) Compare the decrypted audio files with the original audio files

In [None]:
def hash_file(data_dir):
    hash_dict = {}
    
    filepaths = list(Path(data_dir).glob(r'**/*.flac'))
    print('Detected ' + str(len(filepaths)) + ' audio files in ' + os.path.abspath(data_dir))
    
    for filename in filepaths:
            
        with open(filename, 'rb') as f:
            md5obj = hashlib.md5()
            md5obj.update(f.read())
            hash_value = md5obj.hexdigest()
            #print(os.path.basename(str(filename)) + ' hash value is ' + hash_value)
            
            if hash_value in hash_dict:
                hash_dict[hash_value].append(filename)
            else:
                hash_dict[hash_value] = [filename]
    
    return hash_dict

def compare_hash(data_dir1, data_dir2):
    i = 0
    filepaths1 = list(Path(data_dir1).glob(r'**/*.flac'))
    filepaths2 = list(Path(data_dir2).glob(r'**/*.flac'))
    
    hash_dict1 = hash_file(data_dir1)
    hash_dict2 = hash_file(data_dir2)
    
    for hash_value in hash_dict1:
        if hash_value in hash_dict2:
            i += 1
            print('\nFiles with hash value ' + hash_value + ' found in both directories:')
            print([os.path.abspath(str(f)) for f in hash_dict1[hash_value]][0])
            print([os.path.abspath(str(f)) for f in hash_dict2[hash_value]][0])
            
    print('\n{} ({:.2%}) files are the same after decrypted compared to original files'.format(i, i/len(filepaths1)))

In [None]:
compare_hash(audio_dir, audio_dir + '_decrypted')