In [1]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn import datasets
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.neural_network import MLPRegressor
import cv2
import os

In [2]:
# 测试数据集
# X, y = datasets.make_moons(n_samples=100, noise=0.2, random_state=100)
# X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
# X_train

In [3]:
class Layer:
    def __init__(self,input_nodes, output_nodes, 
                 activation=None, 
                 weights=None, bias=None):
        """
        input_nodes: 输入节点数 
        output_nodes: 输出节点数         
        activation: 激活函数      
        weights: 权值      
        bias: 偏置
    """
        
        self.weights = np.random.randn(input_nodes,output_nodes) * np.sqrt(1 / output_nodes) # Xavier初始值 
        self.bias = np.random.normal(0,0.1,output_nodes) # 均值为0，标准差为0.1的高斯分布
        self.activation = activation
        self.activation_output = None     
        self.error = None  # 用于计算当前层的 delta 变量的中间变量 
        self.delta = None  # 记录当前层的 delta 变量，用于计算梯度 
    
    def activate(self, X):
        # forward
        r = np.dot(X, self.weights) + self.bias
        self.activation_output = self.apply_activation(r) 
        return self.activation_output
    
    def apply_activation(self, r): 
        # 计算激活函数的输出
        if self.activation is None:
            return r
        if self.activation == 'sigmoid':
            return 1 / (1 + np.exp(-r))
        elif self.activation == 'relu':
            return np.maximum(r, 0)        
        elif self.activation == 'tanh':
            return np.tanh(r)
        return r
    
    def apply_activation_diff(self, r):
        # 计算激活函数的导数
        if self.activation is None:
            return np.ones_like(r)
        # Sigmoid       
        elif self.activation == 'sigmoid': 
            return r * (1 - r)
        # ReLU
        elif self.activation == 'relu':             
            grad = np.array(r, copy=True)             
            grad[r > 0] = 1             
            grad[r <= 0] = 0             
            return grad
        # tanh      
        elif self.activation == 'tanh':             
            return 1 - r ** 2 
        return r

In [4]:
class NeutralNetwork:
    def __init__(self):
        self.layers = [] # 网络层
    
    def add_layer(self, layer):
        self.layers.append(layer)
    
    def forward(self, X):
        # 前向传播
        for layer in self.layers:
            X = layer.activate(X)
        return X
    
    def mean_squared_loss(self, X_train, y_train):
        square =  np.square(y_train - self.forward(X_train))
        length = len(square.flatten())
        total_loss = np.sum(square)
        return total_loss / length
    
    def BP(self, X, y, lr):
        # 反向传播算法
        # 向前计算，得到最终输出值
        output = self.forward(X)
        for i in reversed(range(len(self.layers))): 
            layer = self.layers[i]
            if layer == self.layers[-1]: # 如果是输出层
                layer.error = y - output
                # delta = (o - t) * diff
                layer.delta = layer.error * layer.apply_activation_diff(output)
            else: 
                next_layer = self.layers[i + 1] 
                # 使用上一层的delta
                layer.error = np.dot(next_layer.weights, next_layer.delta)
                layer.delta = layer.error * layer.apply_activation_diff(layer.activation_output)
        
        # 更新权重
        for i in range(len(self.layers)):
            layer = self.layers[i]
            # 输入数据或者上一层数据的输出
            x = X if i == 0 else self.layers[i-1].activation_output
            # 限制其至少为2维进行运算
            ahead_layer_output = np.atleast_2d(x)
            # 梯度下降
            # w_descend = delta * output * learning_rate
            layer.weights += layer.delta * ahead_layer_output.T * lr
            # print(layer.delta.shape, ahead_layer_output.shape)
            
            # b_descend = delta * learning_rate
            layer.bias += layer.delta * lr
    
    def train(self, X_train, y_train, lr, max_epochs):
        # 网络训练函数
        # one-hot 编码：二分类
        # y_onehot = np.zeros((y_train.shape[0], 2)) 
       #  y_onehot[np.arange(y_train.shape[0]), y_train] = 1
        mses = [] 
        for i in range(max_epochs):         
            for j in range(len(X_train)):  # 一次训练一个样本                 
                self.BP(X_train[j],  y_train[j], lr)             
                # 打印出 MSE Loss                 
                mse = self.mean_squared_loss(X_train,y_train)                
                mses.append(mse)                 
            print('Epoch: %d, MSE: %f' % (i+1, float(mse)))
                  
    # 如果是分类问题，则计算准确度
    def accuracy(self, y_predict, y_test): 
        return np.sum(y_predict == y_test) / len(y_test)
    
    def predict(self, X_predict):
        return self.forward(X_predict)     

In [5]:
# y_onehot = np.zeros((y_train.shape[0], 2)) 
# y_onehot[np.arange(y_train.shape[0]), y_train] = 1
# y_onehot

In [6]:
# nn.train(X_train, X_test, y_train, y_test, 0.1, 25)

In [7]:
# nn.predict(X_test) == y_test

In [8]:
# 读取图片数据集
def unpickle(file):
    import pickle
    with open(file, 'rb') as fo:
        dict = pickle.load(fo, encoding='bytes')
    return dict

In [9]:
# test_batch= unpickle('./cifar-10-batches-py/test_batch')

In [10]:
# test_batch.keys()

In [11]:
# 包含rgb三个通道各1024位，总共3072位，10000个数据
# print(test_batch[b'data'].shape)
# 10000个数据的标签,类别为0-9
# print(len(test_batch[b'labels']) )
# batch的名称:test batch, batch1, batch2
# print(test_batch[b'batch_label'])
# 图片名称
# print(test_batch[b'filenames'])

In [12]:
# batch = unpickle('./cifar-10-batches-py/data_batch_1')

In [13]:
# data = batch[b'data']

In [14]:
# 将数据保存为jpg图片
# for i in range(50):
#     image = data[i]
#     r_channel = image[:1024].reshape(32,32)
#     g_channel = image[1024:2048].reshape(32,32)
#     b_channel = image[2048:].reshape(32,32)
#     result_img = np.ones((32, 32, 3), dtype=np.uint8)
#     result_img[:,:,0] = r_channel
#     result_img[:,:,1] = g_channel
#     result_img[:,:,2] = b_channel
#     cv2.imwrite('./images/'+str(i)+'.jpg',result_img)

In [15]:
def Expand(channel,block_size):
    Height = channel.shape[0]
    Width = channel.shape[1]
    Max = Height if Height > Width else Width
    if(Height % block_size == 0 and Width % block_size == 0):
        return channel
    else:
        # 计算扩充块的大小
        if(Max % block_size == 0): N = Max
        else: N = block_size * ((int)(Max / block_size) + 1)
        expand_channel = np.zeros((N,N))
        for i in range(Height):
            for j in range(Width):
                expand_channel[i][j] = channel[i][j]
        return expand_channel

In [16]:
def Divide(channel,block_size):
    Height = channel.shape[0]
    Width = channel.shape[1]
    if Height != Width: return 
    else: 
        num = (int) (Height / block_size)
        train_data = np.zeros((num*num,block_size*block_size))
        # 按块处理
        for i in range(num):
            for j in range(num):
                    block = channel[j * block_size : (j + 1) * block_size, 
                                    i * block_size : (i + 1) * block_size].flatten()
                    col = (int)(i * num + j)
                    train_data[col,:] = block
        return train_data / 255.0

In [17]:
def SaveNN(train_data, n_hidden,num_epochs):
    # 获取训练后隐藏层的输出,输出层的权重和偏置
    # print(train_data.shape) (121,256) (nums,features)
    n_nums = train_data.shape[0]
    n_features = train_data.shape[1]
    nn = NeuralNetwork() # 初始化神经网络模型
    # 构建神经网络
    nn.add_layer(Layer(n_features,n_hidden,'relu')) # 非线性层
    nn.add_layer(Layer(n_hidden,n_features)) # 线性层
    # 训练
    nn.train(train_data,train_data,0.1,num_epochs)
    return nn

In [18]:
def Reback(train_data,Height,Width):
    num = (int) (np.sqrt(train_data.shape[0])) # 块个数
    block_size = (int) (np.sqrt(train_data.shape[1])) # 块大小
    width = height = block_size * num # 扩充后的宽高
    # 还原
    expand_channel = np.zeros((width,height))
    for i in range(num):
        for j in range(num):
            block = train_data[i * num + j, :].reshape(block_size,block_size)
            expand_channel[j * block_size : (j + 1) * block_size, 
                    i * block_size : (i + 1) * block_size] = block
    # 去除扩充后的数据
    channel = np.zeros((Height,Width))
    for i in range(Height):
        for j in range(Width):
           channel[i][j] = expand_channel[i][j]
    return channel * 255.0

In [19]:
def Compress(path,block_size,n_hidden,num_epochs):
    # 1. 读取图片
    img = cv2.imread(path)
    Height = img[:,:,0].shape[0]
    Width = img[:,:,0].shape[1]
    # 2. 分别处理rgb三个通道
    NN = [] # 三个通道对应的三个BP神经网络信息
    for i in range(3):
        channel = img[:,:,i] # 一个通道
        # 3. 通道扩充
        channel = Expand(channel,block_size)
        # 4. 通道分块
        train_data = Divide(channel,block_size)
        # 5. 训练神经网络并保存
        NN.append(SaveNN(train_data, n_hidden,num_epochs))
    return NN,Height,Width

In [20]:
def Depress(path,NN,Height,Width):
    # 原图片
    img = np.zeros((Height,Width,3))
    # 获取参数
    # 1.分别处理3个神经网络
    for i in range(3):
        nn = NN[i]
        # 隐藏层
        hidden_layer = nn.layers[0]
        # 输出层
        output_layer = nn.layers[1]
        # 隐藏层输出
        compress_channel = hidden_layer.activation_output
        # 输出层的w和b
        hidden_w, hidden_b = [output_layer.weights,output_layer.bias]
        # 计算对应通道分块之后的训练数据
        train_data = (np.dot(compress_channel,hidden_w) + hidden_b)
        # 对分块的通道进行还原并储存
        img[:,:,i] = Reback(train_data,Height,Width)
    cv2.imwrite(path,img)
    print('Success!')

In [157]:
def Evaluate(origin_path, compress_path):
    stats1 = os.stat(origin_path)
    stats2 = os.stat(compress_path)
    img1 = cv2.imread(origin_path).astype(np.float32)
    img2 = cv2.imread(compress_path).astype(np.float32)
    row = img1.shape[0]
    col = img1.shape[1]
    rate = stats2.st_size / stats1.st_size
    mse = np.sum(((img1 - img2).flatten()) ** 2)
    psnr = 10 * np.log10(3 * row * col * 255**2 / mse)
    print('rate: %f' % rate)
    print('psnr: %f' % psnr)
    print('mse: %f' % mse)

In [22]:
# 使用sklearn内置的BP算法实现压缩
def bpid(read_path,write_path,block_size,hidden_layer_sizes,max_iter):
    # 1. 读取图片
    img = cv2.imread(read_path)
    Height = img[:,:,0].shape[0]
    Width = img[:,:,0].shape[1]
    MAX = Height if Height > Width else Width
    if MAX % block_size == 0: 
        nums = MAX / block_size
    else:
        nums = MAX / block_size + 1
    # 2. 分别处理rgb三个通道
    predicts = []
    for i in range(3):
        channel = img[:,:,i] # 一个通道
        # 3. 通道扩充
        channel = Expand(channel,block_size)
        # 4. 通道分块
        train_data = Divide(channel,block_size)
        # 5. 训练神经网络并保存结果
        mlp = MLPRegressor(hidden_layer_sizes=hidden_layer_sizes,
                           max_iter=max_iter)
        mlp.fit(train_data,train_data)
        predicts.append(mlp.predict(train_data))
    # 6. 还原
    I = np.zeros((Height,Width,3))
    for i in range(3):
        I[:,:,i] = Reback(predicts[i],Height,Width)
    cv2.imwrite(write_path,I)
    print('success!')

In [23]:
# NN,Height,Width = Compress('./test-images/wenwen.jpg',
#                            block_size=4,
#                            n_hidden=10,
#                            num_epochs=50)
# Depress('./test-images/compress-wenwen.jpg',NN,Height,Width)
# Evaluate('./test-images/wenwen.jpg','./test-images/compress-wenwen.jpg')

In [24]:
# NN,Height,Width = Compress('./test-images/Curry/Curry1024.png',
#                            block_size=4,
#                            n_hidden=10,
#                            num_epochs=50)
# Depress('./test-images/Curry/Compressed/Curry1024',NN,Height,Width)
# Evaluate('./test-images/Curry/Curry1024.png','./test-images/Curry/Compressed/Curry1024')

In [46]:
bpid('./test-images/wenwen.jpg',
     './test-images/perfect-wenwen.jpg',
     2,hidden_layer_sizes=(14),max_iter=500)
Evaluate('./test-images/wenwen.jpg',
         './test-images/perfect-wenwen.jpg')

success!
rate: 1.337001
psnr: 26.431877
mse: 12003547.000000


In [26]:
import time

In [193]:
tic = time.perf_counter()
bpid('./test-images/Curry/Curry1024.png',
     './test-images/Curry/Compressed/Curry1024.png',
     2,hidden_layer_sizes=(64),max_iter=500)
toc = time.perf_counter()
Evaluate('./test-images/Curry/Curry1024.png',
         './test-images/Curry/Compressed/Curry1024.png')
print(f"该程序耗时: {toc - tic:0.4f} seconds")

success!
rate: 0.942138
psnr: 76.682149
mse: 4741.000000
该程序耗时: 19.6575 seconds


In [102]:
# max_w = np.max(hidden_w)
# max_b = np.max(hidden_b)
# max_red = np.max(compress_red_channel)
# min_w = np.min(hidden_w)
# min_b = np.min(hidden_b)
# min_red = np.min(compress_red_channel)

In [29]:
def NormalizationParam(*params):
    many = []
    for param in params:
        max_value = np.max(param)
        min_value = np.min(param)
        param = ((param - min_value) / (max_value - min_value)) * 63.0
        many.append(param)
    return many

In [30]:
def DeNormalization(params):
    many = []
    for [maxp,minp,param] in params:
        param = param / 63.0
        param = param * (maxp - minp) + minp
        many.append(param)
    return many

In [31]:
# 归一化
# hidden_w,hidden_b,compress_red_channel = NormalizationParam(hidden_w,hidden_b,compress_red_channel)
# hidden_w,hidden_b,compress_red_channel

In [32]:
# 反归一化
# params = [[max_w,min_w,hidden_w],[max_b,min_b,hidden_b],[max_red,min_red,compress_red_channel]]
# hidden_w,hidden_b,compress_red_channel = DeNormalization(params)
# hidden_w,hidden_b,compress_red_channel
# red_channel = np.dot(compress_red_channel,hidden_w) + hidden_b
# red_channel,pred