# Python Advanced

![python](https://upload.wikimedia.org/wikipedia/commons/c/c3/Python-logo-notext.svg)

## Socket

### Example: echo server

In [None]:
# simplest version
import socket

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) # reuse port in multiple sockets
sock.bind(('127.0.0.1', 50070))
sock.listen(5)

conn, addr = sock.accept()
print 'Connected by', addr
while True:
    data = conn.recv(1024)
    if not data: break
    conn.send(data)
conn.close()

In [None]:
# multi threading
import socket
from thread import start_new_thread

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) # reuse port in multiple sockets
sock.bind(('127.0.0.1', 50070))
sock.listen(5)

def _handle_conn(conn):
    while True:
        data = conn.recv(1024)
        if not data: break
        conn.send(data)
    conn.close()
    
while True:
    conn, addr = sock.accept()
    print 'Connected by', addr
    start_new_thread(_handle_conn, (conn,))

In [None]:
# asyncore
import asyncore
import socket

class EchoHandler(asyncore.dispatcher_with_send):
    def handle_read(self):
        data = self.recv(8192)
        if data:
            self.send(data)

class EchoServer(asyncore.dispatcher):
    def __init__(self, host, port):
        asyncore.dispatcher.__init__(self)
        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
        self.set_reuse_addr()
        self.bind((host, port))
        self.listen(5)

    def handle_accept(self):
        pair = self.accept()
        if pair is not None:
            sock, addr = pair
            print 'Incoming connection from %s' % repr(addr)
            handler = EchoHandler(sock)

server = EchoServer('localhost', 50071)
server2 = EchoServer('localhost', 50072)
asyncore.loop()

In [None]:
# gevent
from gevent.server import StreamServer

def echo(socket, address):
    print('New connection from %s:%s' % address)
    # using a makefile because we want to use readline()
    with socket.makefile(mode='rb') as rfileobj:
        while True:
            line = rfileobj.readline()
            if not line:
                break
            socket.sendall(line)
    print('%s:%s disconnected' % address)

server = StreamServer(('0.0.0.0', 50070), echo)
server.serve_forever()

## Multi Threading

In [None]:
# thread
import thread
from time import sleep

def sleep_echo(sleep_interval, msg):
    sleep(sleep_interval)
    print msg

thread.start_new_thread(sleep_echo, (2, 'hello'))
print 'world'

In [None]:
# threading
from threading import Thread
from time import sleep

class DelayEcho(Thread):
    def __init__(self, interval, msg):
        super(DelayEcho, self).__init__()
        self.interval = interval
        self.msg = msg
        self.daemon = True
        
    def run(self):
        sleep(self.interval)
        print self.msg

t = DelayEcho(2, 'hello')
t.start()
t.join()
print 'world'

In [None]:
# threading.Lock
import threading
from threading import Lock
from threading import Thread

resource_lock = Lock()

def update_resource():
    with resource_lock:
        print threading.currentThread().name
        
threads = map(lambda x: Thread(target=update_resource), xrange(10))
[t.start() for t in threads]
[t.join() for t in threads]
print 'main'

In [None]:
# threading.local
import threading
from time import sleep

_lock = threading.Lock()
resource = threading.local()
resource.name = 'default'

def update_resource():
    resource.name = threading.currentThread().name
    sleep(1)
    with _lock:
        print resource.name # each thread as its one value

threads = map(lambda x: Thread(target=update_resource), xrange(5))
[t.start() for t in threads]
[t.join() for t in threads]
print resource.name

In [None]:
# Queue
from Queue import Queue
from threading import Thread

q = Queue()

def setter(q, v):
    q.put(v)

def getter(q):
    print q.get()
    
_setter = Thread(target=setter, args=(q, 1))
_getter = Thread(target=getter, args=(q, ))
_getter.start()
_setter.start()
_getter.join()
_setter.join()

In [None]:
from Queue import LifoQueue

q = LifoQueue()
q.put(1)
q.put(2)
print q.get()
print q.get()

In [None]:
from Queue import PriorityQueue

q = PriorityQueue()
q.put((1, 'a'))
q.put((3, 'b'))
q.put((2, 'c'))

print q.get()
print q.get()
print q.get()

### Limitation of Thread

* No stop/interrupt
* No multi-core support

## Multi Processing

In [None]:
# multiprocessing
from multiprocessing import Process
from time import sleep

def delay_echo(interval, msg):
    sleep(interval)
    print msg
    
p = Process(target=delay_echo, args=(2, 'hello'))
p.start()
print 'world'
p.join()

In [None]:
# Pool
import urllib
from multiprocessing import Pool
# from multiprocessing.dummy import Pool
from time import sleep

urls = ['http://www.google.com',
        'http://www.facebook.com',
        'http://www.baidu.com']

def fetch_content(url):
    sleep(1)
    print url
    print len(urllib.urlopen(url).read())
    
pool = Pool()
pool.map(fetch_content, urls)
pool.close()
pool.join()

In [None]:
# Queue
from multiprocessing import Queue
# Lock
from multiprocessing import Lock

## Coroutine

In [None]:
# yield
def fibonacci():
    a, b = 1, 1
    yield a
    yield b
    while True:
        a, b = b, a+b
        yield b

fib = fibonacci()
for _ in xrange(20):
    print fib.next()

In [None]:
# yield send
def puzzle_game():
    while True:
        answer = (yield 'type a word: ')
        if answer == 'harry':
            yield 'you got it'
        else:
            yield 'try again'
            
game = puzzle_game()
game.next()
game.send('jerry')
game.next()
game.send('harry')

In [None]:
# gevent, tornado
import gevent

def delay_echo(msg):
    gevent.sleep(2)
    print msg
    
gevent.spawn(delay_echo, 'hello world')
gevent.spawn(delay_echo, 'how are you')
gevent.wait()

**In Python3, there is builtin coroutine support from [asyncio](//docs.python.org/3.5/library/asyncio.html#module-asyncio) module**

## setup.py and pip

### Python project structure

![project_structure](/files/images/project_structure.svg)

In [None]:
# setup.py example
import os
from setuptools import setup
from distutils.command.install import install

class _MyInstall(install):
    def run(self):
        self._generate_proto_modules()
        install.run(self)

    def _generate_proto_modules(self):
        from nbs.im.proto_generator import generate_proto_from_xml
        curdir = os.path.dirname(__file__)
        with open(os.path.join(curdir, 'src', 'nbs', 'im', 'meta.xml')) as fp:
            meta_content = fp.read()
        generate_proto_from_xml([meta_content], os.path.join(curdir, 'src', 'nbs', 'im', 'proto'))

setup(
    name='nbs',
    version='0.1.2',
    description='Netact simulator',
    author='',
    author_email='',
    url='http://gitlab.china.nsn-net.net/ta/nbs',
    package_dir={'': 'src'},
    install_requires=['pysimplesoap', 'protobuf'], # NOTE: use git+https://github.com/lybicat/pysimplesoap.git
    packages=['nbs',
              'nbs.im',
              'nbs.im.proto',
              'NbsLibrary',
              ],
    package_data={'':['*.ini', '*.wsdl','*.xsd','*.xml']},
    data_files=[('nbs', ['src/nbs/logging.ini', 'src/nbs/ne3s.wsdl', 'src/nbs/swaref.xsd', 'src/nbs/im/meta.xml'])],
    platforms='any',
    cmdclass={'install': _MyInstall},
)

```yaml
# .gitlab-ci.yml
stages:
    - test
    - deployment

utest:
    stage: test
    script:
        - make test

deployment:
    only:
        - tags
    script:
        - make package
        - sshpass -p passwd scp -o StrictHostKeyChecking=no dist/nbs-*.tar.gz user@10.2.3.4:/opt/pypiserver/packages # python setup.py upload may be better
```

## Unit Testing and TDD

![learning curve](//ifconfiger.com/media/programming_language_learning_curves_python.png?fileid=a9e2ae2d1a3c8d837beee6ee478df9d96592fdcb22837d72ff18e5be1c23bc48)

### Why "Unit Testing" is so important in Python

### Simple Example

### Bowling Game

#### Description:

Write a program to score a game of Ten-Pin Bowling.

Input: string (described below) representing a bowling game
Ouput: integer score

The scoring rules:

> Each game, or "line" of bowling, includes ten turns, or "frames" for the bowler.
> 
> In each frame, the bowler gets up to two tries to knock down all ten pins.
> 
> If the first ball in a frame knocks down all ten pins, this is called a "strike". The frame is over. The score for the frame is ten plus the total of the pins knocked down in the next two balls.
> 
> If the second ball in a frame knocks down all ten pins, this is called a "spare". The frame is over. The score for the frame is ten plus the number of pins knocked down in the next ball.
> 
> If, after both balls, there is still at least one of the ten pins standing the score for that frame is simply the total number of pins knocked down in those two balls.
> 
> If you get a spare in the last (10th) frame you get one more bonus ball. If you get a strike in the last(10th) frame you get two more bonus balls. These bonus throws are taken as part of the same turn. If a bonus ball knocks down all the pins, the process does not repeat. The bonus balls are only used to calculate the score of the final frame.

The game score is the total of all frame scores.

#### Examples:

* X indicates a strike
* / indicates a spare
* - indicates a miss
* | indicates a frame boundary
* The characters after the || indicate bonus balls

X|X|X|X|X|X|X|X|X|X||XX

* Ten strikes on the first ball of all ten frames.
* Two bonus balls, both strikes.

Score for each frame == 10 + score for next two 

balls == 10 + 10 + 10 == 30

Total score == 10 frames x 30 == 300

9-|9-|9-|9-|9-|9-|9-|9-|9-|9-||

* Nine pins hit on the first ball of all ten frames.
* Second ball of each frame misses last remaining pin.
* No bonus balls.

Score for each frame == 9

Total score == 10 frames x 9 == 90

5/|5/|5/|5/|5/|5/|5/|5/|5/|5/||5

* Five pins on the first ball of all ten frames.
* Second ball of each frame hits all five remaining pins, a spare.
* One bonus ball, hits five pins.

Score for each frame == 10 + score for next one

ball == 10 + 5 == 15

Total score == 10 frames x 15 == 150

X|7/|9-|X|-8|8/|-6|X|X|X||81

Total score == 167

```python
# test_bowling.py
import unittest
from bowlling import get_bowlling_score


class TestScore(unittest.TestCase):
    def test_all_missing(self):
        self._assert_score('--|--|--|--|--|--|--|--|--|--||', 0)
        
    def test_first_hit(self):
        self._assert_score('1-|--|--|--|--|--|--|--|--|--||', 1)
        
    def test_one_spare(self):
        self._assert_score('1/|--|--|--|--|--|--|--|--|--||', 10)
        
    def test_one_strike(self):
        self._assert_score('X|--|--|--|--|--|--|--|--|--||', 10)
        
    def test_two_hits(self):
        self._assert_score('12|--|--|--|--|--|--|--|--|--||', 3)
        
    def test_second_hit(self):
        self._assert_score('-5|--|--|--|--|--|--|--|--|--||', 5)
        
    def test_two_frames_hit(self):
        self._assert_score('13|1-|--|--|--|--|--|--|--|--||', 5)
        
    def test_multi_frames_hit(self):
        self._assert_score('1-|1-|-2|33|--|11|-1|--|--|-1||', 14)
        
    def test_spare_with_bonus(self):
        self._assert_score('1/|6-|--|--|--|--|--|--|--|--||', 22)
        
    def test_spare_with_bonus_2(self):
        self._assert_score('1/|62|--|--|--|--|--|--|--|--||', 24)
        
    def test_strike_after_spare(self):
        self._assert_score('1/|X|--|--|--|--|--|--|--|--||', 30)
        
    def test_strike_with_bonus(self):
        self._assert_score('X|12|--|--|--|--|--|--|--|--||', 16)
        
    def test_strike_after_strike(self):
        self._assert_score('X|X|12|--|--|--|--|--|--|--||', 37)
        
    def test_last_strike_with_bonus(self):
        self._assert_score('--|--|--|--|--|--|--|--|X|X||12', 34)
        
    def test_last_spare_with_bonus(self):
        self._assert_score('--|--|--|--|--|--|--|--|X|2/||2', 32)
        
    def test_last_all_strikes(self):
        self._assert_score('--|--|--|--|--|--|--|--|X|X||XX', 60)
        
    def test_last_strike_after_spare(self):
        self._assert_score('--|--|--|--|--|--|--|--|4/|X||XX', 50)

    def _assert_score(self, score_str, expect_score):
        self.assertEqual(get_bowlling_score(score_str), expect_score)

        
if __name__ == '__main__':
    unittest.main()
```

```python
# bowling.py
def get_bowlling_score(bowling_str):
    frames = bowling_str.split('|')
    return sum(get_frame_total_score(frames, index) for index in range(10))

def get_subsequent_balls(frames, index):
    return ''.join(frames[index+1:])

def get_frame_total_score(frames, index):
    if 'X' in frames[index]:
        return get_ball_score(get_subsequent_balls(frames, index)[:2])+10
    elif '/' in frames[index]:
        return 10+get_ball_score(get_subsequent_balls(frames,index)[:1])
    return get_ball_score(frames[index])

def get_ball_score(balls):
    if '/' in balls:
        return 10
    return sum({'-':0,'X':10,'1':1,'2':2,'3':3,'4':4,'5':5,'6':6,'7':7,'8':8,'9':9}[ball] for ball in balls)
```

### Mock

In [None]:
# mock
import time

def delay_print(msg, delay):
    time.sleep(delay)
    print msg
    
import unittest

time.sleep = lambda x: True

class TestDelayPrint(unittest.TestCase):
    def test_delay_print_empty_string(self):
        delay_print('', 5)
            
suite = unittest.TestLoader().loadTestsFromTestCase(TestDelayPrint)
unittest.TextTestRunner().run(suite)

In [None]:
# mock for thread
from threading import Thread

def echo_in_process(interval, msg):
    from time import sleep
    sleep(interval)
    print msg

import time
time.sleep = lambda x: None # mock time.sleep

t = Thread(target=echo_in_process, args=(5, 'hello world'))
t.start()
t.join()

In [None]:
# mock in thread
from threading import Thread

def mock_in_thread():
    import time
    time.sleep = lambda x: None # mock time.sleep
    print 'after mock'

t = Thread(target=mock_in_thread)
t.start()
t.join()

from time import sleep
sleep(5)
print 'hello world'

In [None]:
# mock for child process
import multiprocessing

def echo_in_process(interval, msg):
    from time import sleep
    sleep(interval)
    print msg
    
import time
time.sleep = lambda x: None # mock time.sleep

p = multiprocessing.Process(target=echo_in_process, args=(5, 'hello world'))
p.start()
p.join()

## Big data analysis

* Memory
* Index

In [None]:
# iterator
with open('access_10000.log') as fp:
    for line in fp:
        pass # proceed line

d = {'a': 1, 'b': 2, 'c': 3}
for k in d:
    print k

for k, v in d.iteritems():
    print k, v

from itertools import imap

imap(int, ('0', '1', '2'))

In [None]:
# numpy
import numpy as np

# 1 2 3
# 4 5 6
# 7 8 9
metrix = np.array([[1,2,3], [4,5,6], [7,8,9]])
metrix[:2, 1:] # slice
metrix[:2, 1:] = 0
metrix # view of data but not copy

In [None]:
(metrix[:2, 1:] + 3) * 2 # broadcast

In [None]:
bool_index = np.array([True, False, True])
metrix[bool_index]

In [None]:
metrix[metrix % 2 == 1]

In [None]:
# statistics on ndarray
print metrix.sum()
print metrix[metrix % 2 == 1].mean()

In [None]:
# pandas
import pandas as pd

data = pd.read_table('access_10000.log', sep=' ', names=[
        'src', 'field2', 'field3', 'datetime', 'timezone', 'method', 'code', 'length', 'referer', 'agent'])
del data['field2']
del data['field3']
methods = data['method'].str.split()
data['method'] = methods.apply(lambda x: x[0])
data['url'] = methods.apply(lambda x: x[1])
data['protocol'] = methods.apply(lambda x: x[2])
# TODO: handle merge datetime and timezone to an unique datetime field

In [None]:
data[data['code']>300]['url'].unique() # get all invalid request urls

In [None]:
%matplotlib inline
import seaborn as sns

data['src'].value_counts()[:15].plot(kind='barh', figsize=(12, 5))

In [None]:
# use chunksize to handle huge dataset
import pandas as pd
from pandas import Series

data = pd.read_table('access_10000.log', sep=' ', chunksize=1000, names=[
        'src', 'field2', 'field3', 'datetime', 'timezone', 'method', 'code', 'length', 'referer', 'agent'])
invalid_visits = 0
for chunk in data:
    invalid_visits += len(chunk[chunk['code'] >= 300])
print invalid_visits

## Common Patterns

![bible](//a1.att.hudong.com/34/62/19300001337301131296620943684.jpg)

### Singleton


> http://blog.zhangyu.so/python/2016/02/16/design-patterns-of-python-borg/

### Decorator

> http://blog.zhangyu.so/python/2016/02/17/design-patterns-of-python-decorator/

### Proxy

> http://blog.zhangyu.so/python/2016/02/24/design-patterns-of-python-proxy/

### MapReduce

> http://blog.zhangyu.so/python/2016/02/19/design-patterns-of-python-mapreduce/