# 目标:完成多线程的TCPServer

上一章我们完成一个简单的TCPServer,但是对于需要同时处理很多连接的情况来说,使用单线程的Server肯定是不够用的,他会大大的降低服务器处理的速度,特别是对于多核的处理器系统来说.所以我们本章开始学习并发处理.

# 多线程和多进程

在一个操作系统中,不同的程序需要同时运行.为了保证各个程序之间不受到干扰,各个程序之间内存是互相隔离的,让每个程序都认为它在独占自己的存储空间.在这种模型下每个程序的运行实例称之为进程.

## 进程
考虑一个场景：浏览器，网易云音乐以及notepad++ 三个软件只能顺序执行是怎样一种场景呢？另外，假如有两个程序A和B，程序A在执行到一半的过程中，需要读取大量的数据输入（I/O操作），而此时CPU只能静静地等待任务A读取完数据才能继续执行，这样就白白浪费了CPU资源。你是不是已经想到在程序A读取数据的过程中，让程序B去执行，当程序A读取完数据之后，让程序B暂停。聪明，这当然没问题，但这里有一个关键词：切换。

既然是切换，那么这就涉及到了状态的保存，状态的恢复，加上程序A与程序B所需要的系统资源（内存，硬盘，键盘等等）是不一样的。自然而然的就需要有一个东西去记录程序A和程序B分别需要什么资源，怎样去识别程序A和程序B等等(比如读书)。

进程定义：

进程就是一个程序在一个数据集上的一次动态执行过程。进程一般由程序、数据集、进程控制块三部分组成。我们编写的程序用来描述进程要完成哪些功能以及如何完成；数据集则是程序在执行过程中所需要使用的资源；进程控制块用来记录进程的外部特征，描述进程的执行变化过程，系统可以利用它来控制和管理进程，它是系统感知进程存在的唯一标志。

举一例说明进程：
想象一位有一手好厨艺的计算机科学家正在为他的女儿烘制生日蛋糕。他有做生日蛋糕的食谱，厨房里有所需的原料:面粉、鸡蛋、糖、香草汁等。在这个比喻中，做蛋糕的食谱就是程序(即用适当形式描述的算法)计算机科学家就是处理器(cpu)，而做蛋糕的各种原料就是输入数据。进程就是厨师阅读食谱、取来各种原料以及烘制蛋糕等一系列动作的总和。现在假设计算机科学家的儿子哭着跑了进来，说他的头被一只蜜蜂蛰了。计算机科学家就记录下他照着食谱做到哪儿了(保存进程的当前状态)，然后拿出一本急救手册，按照其中的指示处理蛰伤。这里，我们看到处理机从一个进程(做蛋糕)切换到另一个高优先级的进程(实施医疗救治)，每个进程拥有各自的程序(食谱和急救手册)。当蜜蜂蛰伤处理完之后，这位计算机科学家又回来做蛋糕，从他离开时的那一步继续做下去。

## 线程
线程的出现是为了降低上下文切换的消耗，提高系统的并发性，并突破一个进程只能干一样事的缺陷，使到进程内并发成为可能。

假设，一个文本程序，需要接受键盘输入，将内容显示在屏幕上，还需要保存信息到硬盘中。若只有一个进程，势必造成同一时间只能干一样事的尴尬（当保存时，就不能通过键盘输入内容）。若有多个进程，每个进程负责一个任务，进程A负责接收键盘输入的任务，进程B负责将内容显示在屏幕上的任务，进程C负责保存内容到硬盘中的任务。这里进程A，B，C间的协作涉及到了进程通信问题，而且有共同都需要拥有的东西——-文本内容，不停的切换造成性能上的损失。若有一种机制，可以使任务A，B，C共享资源，这样上下文切换所需要保存和恢复的内容就少了，同时又可以减少通信所带来的性能损耗，那就好了。是的，这种机制就是线程。
线程也叫轻量级进程，它是一个基本的CPU执行单元，也是程序执行过程中的最小单元，由线程ID、程序计数器、寄存器集合和堆栈共同组成。线程的引入减小了程序并发执行时的开销，提高了操作系统的并发性能。线程没有自己的系统资源。

## 并行和并发
并行处理（Parallel Processing）是计算机系统中能同时执行两个或更多个处理的一种计算方法。并行处理可同时工作于同一程序的不同方面。并行处理的主要目的是节省大型和复杂问题的解决时间。并发处理(concurrency Processing)：指一个时间段中有几个程序都处于已启动运行到运行完毕之间，且这几个程序都是在同一个处理机(CPU)上运行，但任一个时刻点上只有一个程序在处理机(CPU)上运行

并发的关键是你有处理多个任务的能力，不一定要同时。并行的关键是你有同时处理多个任务的能力。所以说，并行是并发的子集

![](./image/process.png)

## 进程和线程之间的对比

简而言之,一个程序至少有一个进程,一个进程至少有一个线程.
进程就是一个应用程序在处理机上的一次执行过程，它是一个动态的概念，而线程是进程中的一部分，进程包含多个线程在运行。
多线程可以共享全局变量，多进程不能。多线程中，所有子线程的进程号相同；多进程中，不同的子进程进程号不同。

## 同步与异步
同步：指一个进程在执行某个请求的时候，若该请求需要一段时间才能返回信息，那么这个进程将会一直等待下去，直到收到返回信息才继续执行下去。

异步：指进程不需要一直等待下去，而是继续执行下面的操作，不管其他进程的状态，当有消息返回时系统会通知进程进行处理，这样可以提高执行效率


# QT下如何建立多线程


QT下已经内置了QThread类来负责建立进程.这个进程类内的内容会分为两个部分,第一部分在建立进程的父进程中执行,第二部分在子进程中执行.
QThread内大部分的函数都是在父进程中执行的,比如构造函数,启动进程所需的start函数等,实际在子进程内执行的函数只有run()以及在run()内的程序.
run函数的定义如下,他是一个虚函数,需要继承者去实现它.
>virtual void run();

同时QThread提供了一些辅助的函数,比如睡眠,获取线程号等.
下面我们来看下建立一个线程的例子:

```
class MyThread : public QThread
{
public:
    MyThread(QObject *parent = nullptr):QThread(parent){}
    QString name;
protected:
    void run();//多线程执行的内容将通过重新该虚函数实现
};
```

首先建立一个继承字QThread的子类,并重载父类的run函数.
然后我们在run函数中实现我们需要做的工作.在这个例子里我们简单的做个另一个累加,然后sleep一秒钟模拟要完成一个比较慢的工作.

```
void MyThread::run()
{
    int count=0;
    qDebug()<<"Thread "<<this->name<<" run:"<<QThread::currentThread();

    while(1)
    {
        QThread::sleep(1);
        count++;
        qDebug()<<"Thread "<<this->name<<count<<" in"<<QThread::currentThread();;
    }
}
```



## 如何使用线程

然后我们在主函数中使用上面自定义的线程类:

```
int main(int argc, char *argv[])
{
    QApplication a(argc, argv);
    MyThread th1;
    th1.name = "thread one";
    th1.start();//启动线程
    MyThread th2;
    th2.name = "thread two";
    th2.start();//启动线程

    return a.exec();
}
```

在上面的程序中我们建立了两个线程,分别起了不同的名称.然后调用各自的start函数开始启动线程.当两个线程启动后你会看到不断交替的有thread one和thread two字样的文字打印出来.

### 程序执行在哪里?

在上面的程序中,要明确的是,只有两个类实例各自的run函数才是运行在新建的子线程中的,但是其他的程序包括构造函数,修改name,调用start都是在主线程中执行的.

# 线程安全

线程安全是多线程编程时的计算机程序代码中的一个概念。在拥有共享数据的多条线程并行执行的程序中，线程安全的代码会通过同步机制保证各个线程都可以正常且正确的执行，不会出现数据污染等意外情况。
由于在同一个进程中多个线程之间是共享内存空间的,这样既方便了线程之间的数据交互,也带来了风险,所以在使用变量的时候要个格外小心.

多个线程访问同一个对象时，如果不用考虑这些线程在运行时环境下的调度和交替执行，也不需要进行额外的同步，或者在调用方进行任何其他操作，调用这个对象的行为都可以获得正确的结果，那么这个对象就是线程安全的。
或者说：一个类或者程序所提供的接口对于线程来说是原子操作或者多个线程之间的切换不会导致该接口的执行结果存在二义性，也就是说我们不用考虑同步的问题。

## 线程安全主要因素
线程安全问题大多是由全局变量及静态变量引起的，局部变量逃逸也可能导致线程安全问题。
若每个线程中对全局变量、静态变量只有读操作，而无写操作，一般来说，这个全局变量是线程安全的；若有多个线程同时执行写操作，一般都需要考虑线程同步，否则的话就可能影响线程安全。
类要成为线程安全的，首先必须在单线程环境中有正确的行为。如果一个类实现正确(这是说它符合规格说明的另一种方式)，那么没有一种对这个类的对象的操作序列(读或者写公共字段以及调用公共方法)可以让对象处于无效状态，观察到对象处于无效状态、或者违反类的任何不可变量、前置条件或者后置条件的情况。
此外，一个类要成为线程安全的，在被多个线程访问时，不管运行时环境执行这些线程有什么样的时序安排或者交错，它必须仍然有如上所述的正确行为，并且在调用的代码中没有任何额外的同步。其效果就是，在所有线程看来，对于线程安全对象的操作是以固定的、全局一致的顺序发生的。
正确性与线程安全性之间的关系非常类似于在描述 ACID(原子性、一致性、独立性和持久性)事务时使用的一致性与独立性之间的关系：从特定线程的角度看，由不同线程所执行的对象操作是先后(虽然顺序不定)而不是并行执行的。

## 线程安全与可重入

如果一个函数能够安全地同时被多个线程调用而得到正确的结果，那么，我们说这个函数是线程安全的。所谓“安全”，一切可能导致结果不正确的因素都是不安全的调用。
线程安全，是针对多线程而言的。与可重入联系起来，我们可以断定：可重入函数必定是线程安全的，但线程安全的不一定是可重入的。不可重入函数，函数调用结果不具有可再现性，可以通过互斥锁等机制使之能安全地同时被多个线程调用，那么，这个不可重入函数就是转换成了线程安全。
可重入（Reentrant）函数可以由多于一个任务并发使用，而不必担心数据错误。相反地，不可重入( Non - Reentrant)函数不能由超过一个任务所共享，除非能确保函数的互斥（或者使用信号量，或者在代码的关键部分禁用中断）。可重入函数可以在任意时刻被中断，稍后再继续运行，不会丢失数据。可重入函数要么使用本地变量，要么在使用全局变量时保护自己的数据。


# 建立多线程的TcpServer

## 多线程对于TcpServer的要求

```
void TcpServer::NewConnectionSlot()
{
    qDebug()<<"New Connection Slot coming...";
    currentClient = tcpServer->nextPendingConnection();
    tcpClient.append(currentClient);
    connect(currentClient, &QIODevice::readyRead, this, &TcpServer::onReadData);
    connect(currentClient, &QAbstractSocket::disconnected, this, &TcpServer::onDisconnectedSlot);

    qDebug()<<"ip:"<<currentClient->peerAddress().toString()<<":"<<currentClient->peerPort();
}
```

我们之前的TcpServer并没有考虑线程的安全性,所以不能直接使用.主要表现在我们使用了一个类内全局的列表来保存当前的客户端,这意味着如果是多线程的情况下,不同的线程可能会同时操作同一个变量.


```
void TcpServer::onReadData()
{
    qDebug()<<"onReadData";

    //方式1:
    // 遍历所有客户端,批量处理
    for(int i=0; i<tcpClient.length(); i++)
    {
        QByteArray buffer = tcpClient[i]->readAll();
        if(buffer.isEmpty())    continue;

        static QString IP_Port, IP_Port_Pre;
        qDebug()<<"read data ip:"<<currentClient->peerAddress().toString()<<":"<<currentClient->peerPort();
        qDebug()<<buffer;

        //以下代码仅供调试
        //SendDataAll(buffer,buffer.size());//echo to all
        SendData(tcpClient[i],buffer,buffer.size());//echo to sender
    }

}
```
类似的设计也出现在收取数据的函数里,使用全局的变量来操作当前的socket,这个函数本身设计的不符合线程安全的要求.

因此我们必须改造目前的TcpServer的实现方式,同时为了使得各个线程能够独立的申请和释放操作socket的辅助内存空间,我们首先编写了一个继承自QTcpSocket的子类.

```
class TcpSocket : public QTcpSocket
{
public:
    TcpSocket(QTcpSocket *parent = NULL): QTcpSocket(parent)
    {
        //申请一些新的内存,比如new一些变量.
    }
    ~TcpSocket()
    {
        //释放申请的内存
    }

public slots:
    void ReadData();
    void onDisconnectedSlotThread();
    qint64 SendData(TcpSocket *socket,const char * data, qint64 len);
    QByteArray m_buffer;
};
```

通过构造函数和析构函数,我们可以给每个socket准备自己独立的变量.

```
void TcpSocket::ReadData()
{
    TcpSocket *socket = (TcpSocket*)sender();
    QByteArray buffer = socket->readAll();
    if(buffer.isEmpty())    return;
    qDebug()<<"ReadData:"<<buffer<<" in "<<QThread::currentThread();
    m_buffer.append(buffer);//保存数据
    //以下代码仅供调试
    SendData(socket,buffer,buffer.size());//echo to sender
}

void TcpSocket::onDisconnectedSlotThread()
{
    qDebug()<<"onDisconnectedSlotThread.pid:"<<QThread::currentThread();
}

qint64 TcpSocket::SendData(TcpSocket* socket,const char * data, qint64 len)
{
    qint64 slen;
    slen=socket->write(data,len); //qt5除去了.toAscii()
    qDebug()<<"SendData len:"<<slen<<"data:"<<data;
    return slen;
}
```
另外我们也改造了新的socket子类中关于接收和发送数据的部分,使之能够适应多线程的环境.
大家需要注意的是我们在使用变量的时候尽可能使用了局部变量,以避免线程间的污染.同时需要保存数据的时候,也使用继承后子类里面的成员变量m_buffer.这个变量对于每个socket来说都是独立的,虽然他们的名字是相同的,但是使用不同的内存地址.每个类的实例都可以独立的保存自己的数据.



## 改造TcpServer

建立了新的Socket类后,我们也需要对应的修改TcpServer的实现
```
class TcpServer:public QObject
{
public:
    TcpServer(QWidget *parent = 0):QObject(parent)
    {
        tcpServer = new QTcpServer(this);
        connect(tcpServer, &QTcpServer::newConnection, this, &TcpServer::NewConnectionSlot);
    }

    ~TcpServer()
    {
        delete tcpServer;
    }
public:
    QHostAddress ip;
    quint16 port;
    bool SvStart();
private:
    QTcpServer *tcpServer;

public slots:
    void NewConnectionSlot();
};
```

## 建立多线程

在上面的构造函数中,我们为TcpServer关联了信号QTcpServer::newConnection和槽TcpServer::NewConnectionSlot,所以当我们收到新的客户端连接的时候,会自动进入到下面的函数

```
void TcpServer::NewConnectionSlot()
{
    qDebug()<<"New Connection Slot coming...";
    QTcpSocket *socket = tcpServer->nextPendingConnection();
    qDebug()<<"ip:"<<socket->peerAddress().toString()<<":"<<socket->peerPort();

    //建立新线程来处理后续数据
    TcpSocket *threadSocket= new TcpSocket();
    threadSocket->setSocketDescriptor(socket->socketDescriptor());
    QThread *thread = new QThread(threadSocket);
    connect(threadSocket, &QAbstractSocket::disconnected, thread, &QThread::quit);
    connect(threadSocket, &QAbstractSocket::disconnected, threadSocket, &TcpSocket::onDisconnectedSlotThread);
    connect(threadSocket, &QIODevice::readyRead, threadSocket, &TcpSocket::ReadData);
    threadSocket->moveToThread(thread);
    thread->start();
}
```

多线程的服务器根据使用的场景不同可能使用不同的关联方法.
### 方法一:
当客户端连接到服务器后,会长时间保持连接,并互相通讯,随时可以增加设备和减少设备(物联网场景通常是这样的),此时应该采用收到请求后立刻为新的socket建立新的线程.此种场景下通常不会频繁的出现线程的建立和销毁.
### 方法二:
有大量的客户端连接到服务器,通讯结束后切断连接,等下次需要通讯的时候,再次建立连接.比如浏览器,它通常不会和服务器保持一个长时间的连接,这种场景下需要建立一个线程池,当新的socket到来后,把他分配给已有的线程进行处理,处理结束后空闲线程重新回到线程池.

我们上面的例子采用了第一种方法.


### 信号和槽
connect(threadSocket, &QAbstractSocket::disconnected, thread, &QThread::quit);
connect(threadSocket, &QAbstractSocket::disconnected, threadSocket, &TcpSocket::onDisconnectedSlotThread);
在上面的例子中我们关联的不同的信号和槽,特别要注意,同一个信号我们关联到了不同的槽.他们的含义是不一样的.上面一句是告诉线程,当收到断开连接后,就退出线程.下面一句是告诉套接字,收到断开连接后进行必要的收尾工作.这个信号的发送方都是一样的,但是接收方是不同的.



# 完整的多线程服务器演示代码

```
#include "mainwindow.h"
#include <QApplication>
#include <QTcpServer>
#include <QHostAddress>
#include <QNetworkInterface>
#include <QDebug>
#include <QTcpSocket>
#include <QAbstractSocket>
#include <QIODevice>
#include <QThread>
#include <process.h>

class TcpSocket : public QTcpSocket
{
public:
    TcpSocket(QTcpSocket *parent = NULL): QTcpSocket(parent){}
    ~TcpSocket(){}

public slots:
    void ReadData();
    void onDisconnectedSlotThread();
    qint64 SendData(TcpSocket *socket,const char * data, qint64 len);
    QByteArray m_buffer;
};

void TcpSocket::ReadData()
{
    TcpSocket *socket = (TcpSocket*)sender();
    QByteArray buffer = socket->readAll();
    if(buffer.isEmpty())    return;
    qDebug()<<"ReadData:"<<buffer<<" in "<<QThread::currentThread();
    m_buffer.append(buffer);
    //以下代码仅供调试
    SendData(socket,buffer,buffer.size());//echo to sender
}

void TcpSocket::onDisconnectedSlotThread()
{
    qDebug()<<"onDisconnectedSlotThread.pid:"<<QThread::currentThread();
}

qint64 TcpSocket::SendData(TcpSocket* socket,const char * data, qint64 len)
{
    qint64 slen;
    slen=socket->write(data,len); //qt5除去了.toAscii()
    qDebug()<<"SendData len:"<<slen<<"data:"<<data;
    return slen;
}

class TcpServer:public QObject
{
public:
    QList<QHostAddress> localaddrs;
    TcpServer(QWidget *parent = 0):QObject(parent)
    {
        tcpServer = new QTcpServer(this);
        //获得本机可用地址清单.
        localaddrs = QNetworkInterface::allAddresses();
        foreach(QHostAddress ha, localaddrs)
        {
            qDebug()<<ha.toString();
        }
        ip="127.0.0.1";//本地loop地址
        port=8888;

        connect(tcpServer, &QTcpServer::newConnection, this, &TcpServer::NewConnectionSlot);
    }

    ~TcpServer()
    {
        delete tcpServer;
    }
public:
    QHostAddress ip;
    quint16 port;
    bool SvStart();
private:
    QTcpServer *tcpServer;

public slots:
    void NewConnectionSlot();
};

void TcpServer::NewConnectionSlot()
{
    qDebug()<<"New Connection Slot coming...";
    QTcpSocket *socket = tcpServer->nextPendingConnection();
    qDebug()<<"ip:"<<socket->peerAddress().toString()<<":"<<socket->peerPort();

    //建立新线程来处理后续数据
    TcpSocket *threadSocket= new TcpSocket();
    threadSocket->setSocketDescriptor(socket->socketDescriptor());
    QThread *thread = new QThread(threadSocket);
    connect(threadSocket, &QAbstractSocket::disconnected, thread, &QThread::quit);
    connect(threadSocket, &QAbstractSocket::disconnected, threadSocket, &TcpSocket::onDisconnectedSlotThread);
    connect(threadSocket, &QIODevice::readyRead, threadSocket, &TcpSocket::ReadData);
    threadSocket->moveToThread(thread);
    thread->start();
}

bool TcpServer::SvStart()
{
    qDebug()<<"try to start tcpserver at"<<ip.toString()<<":"<<port;
    bool ok = tcpServer->listen(ip, port);
     if(ok)
         qDebug("tcp server started!");
     else
         qDebug("tcp server start failed!");
     return ok;
}


int main(int argc, char *argv[])
{
    QApplication a(argc, argv);
    TcpServer ts;
    ts.SvStart();//启动服务器

    return a.exec();
}

```

# 作业

实现多线程的客户端,其中一个线程发送数据,一个线程接收数据.