## 1. 多线程Threding和多进程Multiprocessing

### 顺序执行单线程与同时执行两个并发线程

In [1]:
from threading import Thread
import time

def my_counter():
    i = 0
    for _ in range(100000000):
        i = i + 1
    return True

def main():
    thread_array = {}
    start_time = time.time()
    for tid in range(2):
        t = Thread(target=my_counter)
        t.start()
        t.join()
    end_time = time.time()
    print("Total time: {}".format(end_time - start_time))

if __name__ == '__main__':
    main()

Total time: 34.5584599972


In [2]:
from threading import Thread
import time

def my_counter():
    i = 0
    for _ in range(100000000):
        i = i + 1
    return True

def main():
    thread_array = {}
    start_time = time.time()
    for tid in range(2):
        t = Thread(target=my_counter)
        t.start()
        thread_array[tid] = t
    for i in range(2):
        thread_array[i].join()
    end_time = time.time()
    print("Total time: {}".format(end_time - start_time))

if __name__ == '__main__':
    main()

Total time: 45.1772201061


### 多进程multiprocessing

In [3]:
from multiprocessing import Process
import time
 
def f(n):
    time.sleep(1)
    print n*n

if __name__=='__main__':
    for i in range(10):
        p = Process(target=f,args=[i,])
        p.start()

0
1
4
9
16
25
36
49
64
81


### 进程间通信Queue

In [4]:
from multiprocessing import Process, Queue
import time
 
def write(q):
    for i in ['A','B','C','D','E']:
        print('Put %s to queue' % i)
        q.put(i)
        time.sleep(0.5)
 
def read(q):
    while True:
        v = q.get(True)
        print('get %s from queue' %v)
        if(v == 'E'): break;
 
if __name__ == '__main__':
    q = Queue()
    pw = Process(target=write,args=(q,))
    pr = Process(target=read,args=(q,))
    pw.start()
    pr.start()
    pr.join()
    pr.terminate()

Put A to queue
get A from queue
Put B to queue
get B from queue
Put C to queue
get C from queue
Put D to queue
get D from queue
Put E to queue
get E from queue


### 进程池Pool

In [5]:
from multiprocessing import Pool
import time
 
def f(x):
    print x*x
    time.sleep(2)
    return x*x
if __name__ == '__main__':
    '''定义启动的进程数量'''
    pool = Pool(processes=5)
    res_list = []

    for i in range(10):
        '''以异步并行的方式启动进程，如果要同步等待的方式，可以在每次启动进程之后调用res.get()方法，也可以使用Pool.apply'''
        res = pool.apply_async(f,[i,]) 
        print('-------:',i)
        res_list.append(res)
    pool.close()
    pool.join()
    for r in res_list:
        print "result",(r.get(timeout=5))

0
4
1
9
16
('-------:', 0)
('-------:', 1)
('-------:', 2)
('-------:', 3)
('-------:', 4)
('-------:', 5)
('-------:', 6)
('-------:', 7)
('-------:', 8)
('-------:', 9)
25
36
49
64
81
result 0
result 1
result 4
result 9
result 16
result 25
result 36
result 49
result 64
result 81


### 多进程与多线程对比

In [6]:
from multiprocessing import Process
import threading
import time
lock = threading.Lock()
 
def run(info_list,n):
    lock.acquire()
    info_list.append(n)
    lock.release()
    print('%s\n' % info_list)
    
if __name__=='__main__':
    info = []
    for i in range(10):
        #target为子进程执行的函数，args为需要给函数传递的参数 
        p = Process(target=run,args=[info,i])
        p.start()
        p.join()
    time.sleep(1) #这里是为了输出整齐让主进程的执行等一下子进程
    print('------------threading--------------')
    for i in range(10):
        p = threading.Thread(target=run,args=[info,i])
        p.start()
        p.join()

[0]

[1]

[2]

[3]

[4]

[5]

[6]

[7]

[8]

[9]

------------threading--------------
[0]

[0, 1]

[0, 1, 2]

[0, 1, 2, 3]

[0, 1, 2, 3, 4]

[0, 1, 2, 3, 4, 5]

[0, 1, 2, 3, 4, 5, 6]

[0, 1, 2, 3, 4, 5, 6, 7]

[0, 1, 2, 3, 4, 5, 6, 7, 8]

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]



## 2. 函数式编程 & （lambda、map、filter、reduce）

In [7]:
def inc(x):
    def incx(y):
        return x+y
    return incx
 
inc2 = inc(2)
inc5 = inc(5)
 
print inc2(5) # 输出 7
print inc5(5) # 输出 10

7
10


In [8]:
g = lambda x: x * 2
print g(3)
print (lambda x: x * 2)(4)

6
8


In [9]:
for n in ["qi", "yue", "July"]:
    print len(n)

2
3
4


In [10]:
name_len = map(len, ["qi", "yue", "July"])
print name_len

[2, 3, 4]


In [11]:
def toUpper(item):
      return item.upper()
 
upper_name = map(toUpper, ["qi", "yue", "July"])
print upper_name

['QI', 'YUE', 'JULY']


In [12]:
items = [1, 2, 3, 4, 5]
squared = []
for i in items:
    squared.append(i**2)
print squared

[1, 4, 9, 16, 25]


In [13]:
items = [1, 2, 3, 4, 5]
squared = list(map(lambda x: x**2, items))
print squared

[1, 4, 9, 16, 25]


In [14]:
number_list = range(-5, 5)
less_than_zero = list(filter(lambda x: x < 0, number_list))
print(less_than_zero)

[-5, -4, -3, -2, -1]


In [15]:
def add(x,y): return x + y
print reduce(add, range(1, 5))
print reduce(add, range(1, 5), 10)

10
20


### 例：计算数组中的平均数

In [16]:
# 正常写法：
num =[2, -5, 9, 7, -2, 5, 3, 1, 0, -3, 8]
positive_num_cnt = 0
positive_num_sum = 0
for i in range(len(num)):
    if num[i] > 0:
        positive_num_cnt += 1
        positive_num_sum += num[i]
 
if positive_num_cnt > 0:
    average = positive_num_sum / positive_num_cnt
 
print average
# 输出 5

5


In [17]:
# 函数式写法：
num =[2, -5, 9, 7, -2, 5, 3, 1, 0, -3, 8]
positive_num = filter(lambda x: x>0, num)
average = reduce(lambda x,y: x+y, positive_num) / len( positive_num )
print average

5


## pyspark

In [18]:
import sys
from operator import add
from pyspark import SparkContext
sc = SparkContext()

In [19]:
lines = sc.textFile("stormofswords.csv")
counts = lines.flatMap(lambda x: x.split(',')) \
              .map(lambda x: (x, 1)) \
              .reduceByKey(add)
output = counts.collect()
output = filter(lambda x:not x[0].isnumeric(), sorted(output, key=lambda x:x[1], reverse = True))
for (word, count) in output[:10]:
    print "%s: %i" % (word, count)

sc.stop()

Tyrion: 36
Jon: 26
Sansa: 26
Robb: 25
Jaime: 24
Tywin: 22
Cersei: 20
Arya: 19
Robert: 18
Joffrey: 18
