# 并发编程

### 创建线程

In [None]:
import threading
import time
 
def counter(n):  
    cnt = 0;  
    for i in range(n):  
        cnt += 1
        time.sleep(0.1)
        print(cnt)
               
th = threading.Thread(target=counter, args=(10,));   
th.start();  
th.join(); 
print('main thread task done')

## 创建线程-继承类

In [None]:
import threading, time, random  
  
def counter():  
    cnt = 0;  
    for i in range(10000):  
        for j in range(i):  
            cnt += j;  

class SubThread(threading.Thread):  
    def __init__(self, name):  
        threading.Thread.__init__(self, name=name) 
        pass
  
    def run(self):  
        i = 0;  
        while i < 3:  
            print(self.name,'counting...\n') 
            counter();  
            print(self.name,'finish\n')  
            i += 1;  

th = SubThread('thread-1')
th.start()
th.join()
print('all done') 

In [None]:
import threading, time
  
class SubThread(threading.Thread):  
    def __init__(self, name):  
        threading.Thread.__init__(self, name=name)  
  
    def run(self):  
        i = 0;  
        while i < 3:  
            print(self.name,'counting...\n')  
            time.sleep(1) 
            print(self.name,'finish\n') 
            i += 1

th = SubThread('thread-1')
print('main start')
th.setDaemon(False)
th.start()  
th.join() 
print('main end\n')

## Daemon守护线程
### 外部运行

# 线程同步

In [None]:
from threading import Thread
some_var = 0
class IncrementThread(Thread):
    def run(self):
        global some_var
        read_value = some_var
        print("some_var in %s is %d" % (self.name, read_value))
        some_var = read_value + 1
        #print "some_var in %s after increment is %d" % (self.name, some_var)
def use_increment_thread():
    threads = []
    for i in range(50):
        t = IncrementThread()
        threads.append(t)
        t.start()
    for t in threads:
        t.join()
    print("After 50 modifications, some_var should have become 50")
    print("After 50 modifications, some_var is %d" % (some_var,))
use_increment_thread()

In [None]:
# lock
# 可以通过下面两种方式创建一个Lock对象，新创建的 Lock 对象处于未上锁的状态：
import threading
l = threading.Lock()
l

In [None]:
from threading import Lock, Thread
lock = Lock()
some_var = 0
class IncrementThread(Thread):
    def run(self):
        #we want to read a global variable
        #and then increment it
        global some_var
        lock.acquire(True)
        read_value = some_var
        print("some_var in %s is %d" % (self.name, read_value))
        some_var = read_value + 1
        print("some_var in %s after increment is %d" % (self.name, some_var))
        lock.release()
def use_increment_thread():
    threads = []
    for i in range(50):
        t = IncrementThread()
        threads.append(t)
        t.start()
    for t in threads:
        t.join()
    print("After 50 modifications, some_var should have become 50")
    print("After 50 modifications, some_var is %d" % (some_var,))
use_increment_thread()

In [None]:
# 不加锁计数器
import time
from threading import Thread
value = 0
def getlock():
    global value
    new = value + 1
    time.sleep(0.001)  # 使用sleep让线程有机会切换
    value = new
threads = []
for i in range(100):
    t = Thread(target=getlock)
    t.start()
    threads.append(t)
for t in threads:
    t.join()
print(value)

In [None]:
# 加锁保证结果
import time
from threading import Thread, Lock
value = 0
lock = Lock()
def getlock():
    global value
    with lock:
        new = value + 1
        time.sleep(0.001)
        value = new
threads = []
for i in range(100):
    t = Thread(target=getlock)
    t.start()
    threads.append(t)
for t in threads:
    t.join()
print(value)

In [None]:
import threading  
import time  
  
def test_xc(): 
    mutex.acquire()#取得锁  
    f = open("test.txt","a")  
    f.write("test_dxc"+'\n')  
    f.close()
    mutex.release()#释放锁  

mutex = threading.Lock()#创建锁 

threads = []
for i in range(5):  
    t = threading.Thread(target=test_xc)  
    t.start()  
    threads.append(t)
for t in threads:
    t.join()

## 可重入锁
### 使用command line运行

In [None]:
import threading

print('lock acquire')
lock = threading.Lock()
lock.acquire()
lock.acquire()
lock.release()
lock.release()
print('done')

In [None]:
import threading

print('lock acquire')
lock = threading.RLock()
lock.acquire()
lock.acquire()
lock.release()
lock.release()
print('done')

## Condition

In [None]:
import threading, time
class Seeker(threading.Thread):
    def __init__(self, cond, name):
        super(Seeker, self).__init__()
        self.cond = cond
        self.name = name
    def run(self):
        time.sleep(1) #确保先运行Seeker中的方法
        self.cond.acquire() #b
        print(self.name + ': 我已经把眼睛蒙上了')
        self.cond.notify()
        self.cond.wait() #c
                         #f
        print(self.name + ': 我找到你了 ~_~')
        self.cond.notify()
        self.cond.release()
                            #g
        print(self.name + ': 我赢了')   #h
class Hider(threading.Thread):
    def __init__(self, cond, name):
        super(Hider, self).__init__()
        self.cond = cond
        self.name = name
    def run(self):
        self.cond.acquire()
        self.cond.wait()    #a    #释放对琐的占用，同时线程挂起在这里，直到被notify并重新占有琐。
                            #d
        print(self.name + ': 我已经藏好了，你快来找我吧')
        self.cond.notify()
        self.cond.wait()    #e
                            #h
        self.cond.release()
        print(self.name + ': 被你找到了，哎~~~')
cond = threading.Condition()
seeker = Seeker(cond, 'seeker')
hider = Hider(cond, 'hider')
seeker.start()
hider.start()

## Event 交通灯

In [None]:
import threading
import random
import time


class VehicleThread(threading.Thread):
    """Class representing a motor vehicle at an intersection"""

    def __init__(self, threadName, event):
        """Initializes thread"""

        threading.Thread.__init__(self, name=threadName)

        # ensures that each vehicle waits for a green light
        self.threadEvent = event

    def run(self):
        """Vehicle waits unless/until light is green"""

        # stagger arrival times
        time.sleep(random.randrange(1, 10))

        # prints arrival time of car at intersection
        print("%s arrived at %s\n" % \
              (self.getName(), time.ctime(time.time())))

        # wait for green light
        self.threadEvent.wait()

        # displays time that car departs intersection
        print("%s passes through intersection at %s\n" % \
              (self.getName(), time.ctime(time.time())))


greenLight = threading.Event()
vehicleThreads = []

# creates and starts five Vehicle threads
for i in range(1, 5):
    vehicleThreads.append(VehicleThread("Vehicle" + str(i),
                                        greenLight))

for vehicle in vehicleThreads:
    vehicle.start()

while threading.activeCount() > 1:
    # sets the Event's flag to false -- block all incoming vehicles
    greenLight.clear()
    print("RED LIGHT! at", time.ctime(time.time()))
    time.sleep(3)

    # sets the Event's flag to true -- awaken all waiting vehicles
    print("GREEN LIGHT! at", time.ctime(time.time()))
    greenLight.set()
    time.sleep(1)

## 信号量

In [None]:
import time
from random import random
from threading import Thread, Semaphore
sema = Semaphore(1)
def foo(tid):
    with sema:
        print('{} acquire sema'.format(tid))
        wt = random() * 2
        time.sleep(wt)
    print('{} release sema'.format(tid))
threads = []
for i in range(5):
    t = Thread(target=foo, args=(i,))
    threads.append(t)
    t.start()
for t in threads:
    t.join()

# 进程

In [None]:
import multiprocessing
import time
def foo(i):
    print('called function in process: %s' % i)
    time.sleep(5)
    return
    
Process_jobs = []
for i in range(5):
    p = multiprocessing.Process(target=foo, args=(i,))
    Process_jobs.append(p)
    p.start()
    
for p in Process_jobs:
    p.join()

## 后台进程

In [None]:
import multiprocessing
import time

def foo():
    name = multiprocessing.current_process().name
    print ("Starting %s \n" %name)
    time.sleep(3)
    print ("Exiting %s \n" %name)

background_process = multiprocessing.Process\
                    (name='background_process',\
                     target=foo)
background_process.daemon = True
NO_background_process = multiprocessing.Process\
                          (name='NO_background_process',\
                           target=foo)
NO_background_process.daemon = False
background_process.start()
NO_background_process.start()


### 杀死进程

In [None]:
import multiprocessing
import time
def foo():
    print('Starting function')
    time.sleep(0.1)
    print('Finished function')

p = multiprocessing.Process(target=foo, name='Process-#Test#')
print('Process before execution:', p, p.is_alive())
p.start()
print('Process running:', p, p.is_alive())
p.terminate()
print('Process terminated:', p, p.is_alive())
p.join()
print('Process joined:', p, p.is_alive())
print('Process exit code:', p.exitcode)

## 继承创建进程

In [None]:
import multiprocessing
class MyProcess(multiprocessing.Process):
    def run(self):
        print('called run method in process: %s' %self.name)
        return


jobs = []
for i in range(5):
    p = MyProcess ()
    jobs.append(p)
    p.start()
    p.join()

# 进程间通信

## 进程共享内存

In [None]:
from multiprocessing import Process, Value, Array

def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]

if __name__ == '__main__':
    num = Value('d', 0.0)
    arr = Array('i', range(10))

    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()

    print(num.value)
    print(arr[:])

## 进程共享-队列

In [None]:
# https://docs.python.org/3/library/multiprocessing.html?highlight=queue#multiprocessing.Queue
import multiprocessing
import random
import time
class producer(multiprocessing.Process):
    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue
    def run(self) :
        for i in range(10):
            item = random.randint(0, 256)
            self.queue.put(item)
            print("<---Process Producer : item %d appended to queue %s" % (item,self.name))
            time.sleep(1)
            # print("The size of queue is %s" % self.queue.qsize())
class consumer(multiprocessing.Process):
    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue
    def run(self):
        time.sleep(3)
        while True:
            # empty or not
            time.sleep(2)
            try:
                item = self.queue.get()
                print('--->Process Consumer : item %d popped from by %s \n' % (item, self.name))
            except Exception as e:
                print("the queue is empty, Process Consumer exit")
                break
        time.sleep(1)

In [None]:
queue = multiprocessing.Queue()
queue.put(-1) # to avoid queue size (empty, qsize on macos)
process_producer = producer(queue)
process_consumer = consumer(queue)
process_producer.start()
process_consumer.start()
process_producer.join()
process_consumer.join()

# 如果多个consumer，进程同步

### 管道

In [None]:
from multiprocessing import Process, Pipe
 
class Consumer(Process):
    def __init__(self, pipe):
        Process.__init__(self)
        self.pipe = pipe
 
    def run(self):
        self.pipe.send("Consumer Words")
        print("Consumer Received:", self.pipe.recv())
 
 
class Producer(Process):
    def __init__(self, pipe):
        Process.__init__(self)
        self.pipe = pipe
 
    def run(self):
        print("Producer Received:", self.pipe.recv())
        self.pipe.send("Producer Words")
 
 
pipe = Pipe()
p = Producer(pipe[0])
c = Consumer(pipe[1])
p.daemon = c.daemon = True
p.start()
c.start()
p.join()
c.join()
print("Ended!")

### 进程池

In [None]:
#阻塞方式
from multiprocessing import Lock, Pool
import time
 
def function(index):
    print("Start process: ", index)
    time.sleep(3)
    print("End process", index)
 
 
pool = Pool(processes=3)
for i in range(4):
    pool.apply(function, (i,))

print("Started processes")
pool.close()
pool.join()
print("Subprocess done.")

In [None]:
# 非阻塞方式
from multiprocessing import Lock, Pool
import time
 
def function(index):
    print("Start process: ", index)
    time.sleep(3)
    print("End process", index)
 
 
pool = Pool(processes=2)
for i in range(4):
    pool.apply_async(function, (i,))

print("Started processes")
pool.close()
pool.join()
print("Subprocess done.")

# 正则表达式

In [None]:
s = r"<html><body><h1>hello world</h1></body></html>"

In [None]:
start = s.find("<h1>")
end = s.find("</h1>")
print(s[start+4:end])

In [None]:
import re
help(re)

In [None]:
# 导入库
import re

In [None]:
p1 = r".*<h1>(.*?)</h1>.*"
pattern = re.compile(p1)
groups = re.match(pattern, s)
print(groups.group(1))

## 查找字符串 Match & Search



In [None]:
name="Hello,My name is tiger,nice to meet you..."
k=re.search(r't(ige)r',name)
if k:
    print(k.group(0),k.group(1))
else:
    print("not search!")


In [None]:
name="Hello,My name is tiger,nice to meet you..."
k=re.match(r"H(....)", name)
if k:
    print(k.group(0),'\n',k.group(1))
else:
    print("not match!")

## 查找所有 FindAll & FindIter

In [None]:
mail='<user01@mail.com> <user02@mail.com> user04@mail.com'
re.findall(r'(\w+@m....[a-z]{3})', mail)

In [None]:
mail_list_iter = re.finditer(r'(\w+@m....[a-z]{3})', mail)

In [None]:
for i in mail_list_iter:
    print(type(i))
    print(i.group())

## 替换

In [None]:
help(re.sub)

In [None]:
test="Hi, nice to meet you where are you from?"
re.sub(r'\s','-',test)

In [None]:
re.sub(r'\s','-',test, 3)

In [None]:
help(re.subn)

In [None]:
re.subn(r'\s','-',test, 3)

### 分隔字符串

In [None]:
test="Hi, nice to meet you where are you from?"
re.split(r"\s+",test)

In [None]:
re.split(r"\s+",test,3)