## Image to xlsx

In [13]:
import cv2 as cv
import numpy as np


def BGR2HEX(color):
    return "{:02x}{:02x}{:02x}".format(int(color[2]), int(color[1]), int(color[0]))

orig_img = cv.imread("image.jpg", cv.IMREAD_COLOR)
ratio = orig_img.shape[0] / 180
img = cv.resize(orig_img, (180, int(orig_img.shape[1] / ratio)),interpolation=cv.INTER_LINEAR)
rows, cols, _ = img.shape

from openpyxl import Workbook
from openpyxl.styles import PatternFill

wb = Workbook()
ws = wb.active


for row in range(rows):
    for col in range(cols):
        fill = PatternFill(fill_type="solid",
                 start_color=BGR2HEX(img[row,col]))
        _ = ws.cell(column=col+1, row=row+1).fill = fill
        
wb.save("sample.xlsx")

(20000, 28000, 3)


![image.png](attachment:image.png)

**Image must be have resolution about 180x100!**

## PNG signature reader

Require input path of some png image (variable **FILENAME**)

In [None]:
import sys

FILENAME = ""

PNG_MAGIC = bytes([0x89, 0x50, 0x4e, 0x47, 0x0d, 0x0a, 0x1a, 0x0a])
PNG_CHUNK_TYPE_IHDR = 0x49484452 
PNG_CHUNK_TYPE_PLTE = None 
PNG_CHUNK_TYPE_IDAT = 0x49444154
PNG_CHUNK_TYPE_IEND = 0x49454e44


def parse_ihdr_chunk(data):
    if len(data) != 13:
        raise Exception

    hdr_data = {}
    hdr_data["width"]       = int.from_bytes(bytes(data[0:4]), 'big')
    hdr_data["height"]      = int.from_bytes(bytes(data[4:8]), 'big')
    hdr_data["depth"]       = data[8]
    hdr_data["color_type"]  = data[9]
    hdr_data["compression"] = data[10]
    hdr_data["filter"]      = data[11]
    hdr_data["interlace"]   = data[12]

    return hdr_data


class Image:
    def __init__(self, filename):
        self.filename = filename
        self.width  = 0
        self.height = 0
        self.chunks = []

        self.populate()

    def populate(self):
        file = open(self.filename, 'rb')
        buff = file.read(8)

        if buff != PNG_MAGIC:
            raise Exception

        is_again = True

        while is_again:
            buff = file.read(4)
            chunk_size = int.from_bytes(buff, 'big')
            buff = file.read(4)
            chunk_type = int.from_bytes(buff, 'big')
            chunk_data = file.read(chunk_size)
            buff = file.read(4)
            chunk_crc32 = int.from_bytes(buff, 'big')

            chunk = Chunk(chunk_size, chunk_type, chunk_data, chunk_crc32)
            self.chunks.append(chunk)
            
            if chunk_type == PNG_CHUNK_TYPE_IHDR:
                hdr_data = parse_ihdr_chunk(chunk_data)
                self.set_hdr(hdr_data)
                print('asdasdad')
            if chunk_type == PNG_CHUNK_TYPE_IEND:
                is_again = False

    def set_hdr(self, hdr_data = {}):
        self.hdr_data = hdr_data

    def __repr__(self):
        return str(self.hdr_data)


class Chunk:
    def __init__(self, chunk_size, chunk_type, chunk_data, chunk_crc32):
        self.size  = chunk_size
        self.type  = chunk_type
        self.data  = chunk_data
        self.crc32 = chunk_crc32



image = Image(FILENAME)

# Web snippets

## Flask 

### Simple Server

Requirements:
* ___flask___

In [None]:
class Config:
    DEBUG = True

In [30]:
from flask import Flask, request, jsonify
import json

app = Flask(__name__)
app.config.from_object(Config)

@app.route('/')
def index():
    return 'index'

@app.route('/json')
def json():
    return jsonify({'msg':'example json'})

with app.test_client() as client:
    print(client.get('/'))
    print(client.get('json'))
    print(client.get('json').get_json())

<Response streamed [200 OK]>
<Response streamed [200 OK]>
{'msg': 'example json'}


### Simple Server presents some API on vanilla Flask

Requirements:
* ___flask___

In [27]:
from flask import Flask, request, jsonify, abort
import json
from pprint import pprint 

app = Flask(__name__)
app.config.from_object(Config)

bookshelf = [
    {'id': 1, 'Name': 'AAA', 'Author': 'aaa'},
    {'id': 2, 'Name': 'BBB', 'Author': 'bbb'},
    {'id': 3, 'Name': 'CCC', 'Author': 'ccc'}
]


@app.route('/books', methods=['GET'])
def get_books():
    return jsonify(
        count=len(bookshelf),
        items=bookshelf
    )


@app.route('/books', methods=['POST'])
def add_book():
    payload = request.get_json()
    if payload is None or 'id' not in payload.keys():
        return abort(400)
    gen = (book for book in bookshelf if book['id'] == payload['id'])
    for book in gen:
        return abort(409)
    bookshelf.append(payload)
    return jsonify(payload), 201


@app.route('/books/<int:book_id>', methods=['GET'])
def get_book(book_id):
    gen = (book for book in bookshelf if book['id'] == book_id)
    for book in gen:
        return jsonify(book)
    return abort(404)


@app.route('/books/<int:book_id>', methods=['PUT'])
def update_book(book_id):
    payload = request.get_json()
    if payload is None:
        abort(400)
    gen = (book for book in bookshelf if book['id'] == book_id)
    for book in gen:
        book.update(payload)
        return jsonify(book), 200
    return abort(404)


@app.route('/books/<int:book_id>', methods=['DELETE'])
def delete_book(book_id):
    gen = (book for book in bookshelf if book['id'] == book_id)
    for book in gen:
        bookshelf.remove(book)
        return jsonify(book), 200
    return abort(404)


@app.errorhandler(400)
def not_found(err):
    return jsonify({
        "errors": [
            {
                "code": 400,
                "message":"Bad request"
            }
        ]}), 400


@app.errorhandler(404)
def not_found(err):
    return jsonify({
        "errors": [
            {
                "code": 404,
                "message":"Element not found"
            }
        ]}), 404


@app.errorhandler(409)
def not_found(err):
    return jsonify({
        "errors": [
            {
                "code": 409,
                "message":"Added element is now exist"
            }
        ]}), 409

---
Get all books from shelf

In [12]:
with app.test_client() as client:
    response = client.get('/books')
    print(response)
    pprint(response.get_json())

<Response streamed [200 OK]>
{'count': 3,
 'items': [{'Author': 'aaa', 'Name': 'AAA', 'id': 1},
           {'Author': 'TTT', 'Name': 'TTT', 'id': 30},
           {'Author': 'ccc', 'Name': 'CCC', 'id': 3}]}


---
Add some book to shelf

In [6]:
with app.test_client() as client:
    payload = {'id': 4, 'Name': 'DDD', 'Author': 'ddd'}
    response = client.post('/books', json=payload)
    print(response)
    pprint(response.get_json())

<Response streamed [201 CREATED]>
{'Author': 'ddd', 'Name': 'DDD', 'id': 4}


---
Get some book from shelf

In [83]:
with app.test_client() as client:
    response = client.get('/books/2')
    print(response)
    pprint(response.get_json())

<Response streamed [200 OK]>
{'Author': 'bbb', 'Name': 'BBB', 'id': 2}


---
Update some book from bookshels

In [14]:
with app.test_client() as client:
    payload = {'id': 30, 'Name': 'TTT', 'Author': 'TTT'}
    response = client.put('/books/30', json=payload)
    print(response)
    pprint(response.get_json())

<Response streamed [200 OK]>
{'Author': 'TTT', 'Name': 'TTT', 'id': 30}


Delete some book from boookshelf

In [17]:
with app.test_client() as client:
    response = client.delete('/books/1')
    print(response)
    pprint(response.get_json())

<Response streamed [404 NOT FOUND]>
None


---
Testing error handling

In [28]:
with app.test_client() as client:
    payload = {}
    response = client.post('/books', json=payload)
    print(response)
    pprint(response.get_json())
    
    payload = {'id': 1, 'Name': 'DDD', 'Author': 'ddd'}
    response = client.post('/books', json=payload)
    print(response)
    pprint(response.get_json())
    
    response = client.get('/books/66')
    print(response)
    pprint(response.get_json())

<Response streamed [400 BAD REQUEST]>
{'errors': [{'code': 404, 'message': 'Bad request'}]}
<Response streamed [409 CONFLICT]>
{'errors': [{'code': 409, 'message': 'Added element is now exist'}]}
<Response streamed [404 NOT FOUND]>
{'errors': [{'code': 404, 'message': 'Element not found'}]}


---

## Simple authentification with the use JWT

### The example of code with using JWT as extension on Flask

_Next code is example from [Flask-JWT](https://pythonhosted.org/Flask-JWT/)_

Requirements:
* ___flask___
* ___flask-jwt___

In [5]:
from flask import Flask
from flask_jwt import JWT, jwt_required, current_identity
from werkzeug.security import safe_str_cmp
from pprint import pprint

class User(object):
    def __init__(self, id, username, password):
        self.id = id
        self.username = username
        self.password = password

    def __str__(self):
        return "User(id='%s')" % self.id

users = [
    User(1, 'user1', 'abcxyz'),
    User(2, 'user2', 'abcxyz'),
]

username_table = {u.username: u for u in users}
userid_table = {u.id: u for u in users}

def authenticate(username, password):### The example code with using JWT on vanilla Flask
    user = username_table.get(username, None)
    if user and safe_str_cmp(user.password.encode('utf-8'), password.encode('utf-8')):
        return user

def identity(payload):
    user_id = payload['identity']
    return userid_table.get(user_id, None)

app = Flask(__name__)
app.debug = True
app.config['SECRET_KEY'] = 'super-secret'

jwt = JWT(app, authenticate, identity)

@app.route('/protected')
@jwt_required()
def protected():
    return '%s' % current_identity

---
Trying without auth

In [8]:
with app.test_client() as client:
    response = client.get('/protected')
    print(response)
    pprint(response.get_json())

Authorization Required. Request does not contain an access token


<Response streamed [401 UNAUTHORIZED]>
{'description': 'Request does not contain an access token',
 'error': 'Authorization Required',
 'status_code': 401}


---
Trying with auth

In [18]:
with app.test_client() as client:
    payload = {'username':'user1', 'password':'abcxyz'}
    response = client.post('/auth', json=payload)
    print(response)
    pprint(response.get_json())

    
    headers = {'Authorization': 'JWT ' + token['access_token']}
    response = client.get('/protected', headers=headers)
    print(response)
    pprint(response.get_json())

<Response streamed [200 OK]>
{'access_token': 'eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE1OTkyMjExNjksImlhdCI6MTU5OTIyMDg2OSwibmJmIjoxNTk5MjIwODY5LCJpZGVudGl0eSI6MX0.tSYFUjCnCBmzhQCQfyJBWUlOSLz-X1c-xqNpXSrtPmU'}
<Response streamed [200 OK]>
None


### The example code with using JWT on vanilla Flask

Requirements:
+ ___flask___
+ ___pyjwt___

In [2]:
class Config:
    DEBUG = True

In [5]:
from flask import Flask, request, jsonify, abort
from datetime import datetime, timedelta
import jwt

app = Flask(__name__)
app.config.from_object(Config)

users = [
    {'user_id':1, 'username':'example_A', 'password':'example'},
    {'user_id':2, 'username':'example_B', 'password':'example'},
    {'user_id':3, 'username':'example_C', 'password':'example'}
]

key = 'super-secret'

def jwt_req(func):
    def wrapper():
        token = request.args.get('token', default=None)
        if token:
            try:
                jwt.decode(str(token), key, algorithms=['HS256'])
                return func()
            except jwt.ExpiredSignatureError:
                return abort(401)
        return abort(401)
    return wrapper

@app.route('/')
@jwt_req
def index():
    return jsonify({'msg':'Hello world!'})

@app.route('/auth', methods=['POST'])
def auth():
    json_data = request.get_json()
    if json_data is None:
        return abort(400)
    gen = (user for user in users if user['username'] == json_data['username'])
    for user in gen: 
        if json_data['password'] == user['password']:
            payload_jwt = {
                'exp': datetime.now() + timedelta(hours=1),
                'user_id': user['user_id']
            }
            encoded_jwt = jwt.encode(payload_jwt, key, algorithm='HS256').decode('utf-8')
            payload = {
                'access_key': encoded_jwt
            }
            return jsonify(payload)
    return abort(401)

Cheching results

In [8]:
from pprint import pprint

with app.test_client() as client:
    payload = {'username':'example_A', 'password':'example'}
    response = client.post('/auth', json=payload)
    print(response)
    pprint(response.get_json())
    token = response.get_json()

    response = client.get('/')
    print(response)
    pprint(response.get_json())
    
    
    response = client.get('/', query_string={'token': token['access_key']})
    print(response)
    pprint(response.get_json())

<Response streamed [200 OK]>
{'access_key': 'eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE1OTkzMjkwODcsInVzZXJfaWQiOjF9.E2SWHX29ve85ffJDJiRxKD0WNf6gATZLaOhb0XxeqFs'}
<Response streamed [401 UNAUTHORIZED]>
None
<Response streamed [200 OK]>
{'msg': 'Hello world!'}


***
* пример для bookshelf и авторизации с использованием pyjwt + sqlalchemy + flask-restx
***

### The example code with using JWT and sqlalchemy as DB on  Flask

Requirements:
+ ___flask___
+ ___flask-sqlalchemy___
+ ___pyjwt___
+ ___bcrypt___
+ ___marshmallow___
+ ___marshmallow-sqlalchemy___

In [1]:
class Config:
    DEBUG = True
    SQLALCHEMY_DATABASE_URI = 'sqlite:///:memory:'
    SQLALCHEMY_TRACK_MODIFICATIONS = False
    SECRET_KEY = 'you-will-never-guess'

In [2]:
from flask import Flask, request, jsonify, abort
from flask_sqlalchemy import SQLAlchemy
from flask_marshmallow import Marshmallow
import json
import bcrypt

app = Flask(__name__)
app.config.from_object(Config)
db = SQLAlchemy(app)
ma = Marshmallow(app)

###
#
# Model Part
#
###

class User(db.Model):
    __tablename__ = 'users'
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(64), unique=True)
    password_hash = db.Column(db.String(128))

    def check_password(self, password):
        return bcrypt.checkpw(password.encode('utf-8'), self.password_hash)
    
    def __repr__(self):
        return "<User(name={self.username!r})>".format(self=self)
    

class Book(db.Model):
    __tablename__ = 'books'
    id = db.Column(db.Integer, primary_key=True)
    bookname = db.Column(db.String(64))
    author_id = db.Column(db.Integer, db.ForeignKey('users.id'))
    author = db.relationship('User', backref='books')
    
    def __repr__(self):
        return "<Book(name={self.bookname!r})>".format(self=self)

    
gen_hashpw = lambda x: bcrypt.hashpw(x.encode('utf-8'), bcrypt.gensalt())
    
users = [
    User(username='user_1', password_hash=gen_hashpw('aaa')),
    User(username='user_2', password_hash=gen_hashpw('bbb')),
    User(username='user_3', password_hash=gen_hashpw('ccc'))
]

books = [
    Book(bookname='book_1', author_id=1),
    Book(bookname='book_2', author_id=1),
    Book(bookname='book_3', author_id=2),
    Book(bookname='book_4', author_id=2),
    Book(bookname='book_5', author_id=2),
    Book(bookname='book_6', author_id=3)
]

db.create_all()
db.session.add_all(users + books)
db.session.commit()

In [3]:
###
#
# Schemas part
#
###

class BookSchema(ma.SQLAlchemySchema):
    class Meta:
        model = Book
        load_instance = True

    id = ma.auto_field(dump_only=True)
    bookname = ma.auto_field()
    author_id = ma.auto_field()

In [4]:
###
#
# Auth part
#
###
from datetime import datetime, timedelta
import jwt


def jwt_req(func):
    def wrapper(*args, **kwargs):
        token = request.args.get('token', default=None)
        if token:
            try:
                key = app.config['SECRET_KEY']
                jwt.decode(str(token), key, algorithms=['HS256'])
                return func(*args, **kwargs)
            except jwt.ExpiredSignatureError:
                return abort(401)
        return abort(401)
    wrapper.__name__ = func.__name__
    return wrapper


@app.route('/auth', methods=['POST'])
def auth():
    json_data = request.get_json()
    username = json_data.get('username')
    password = json_data.get('password')
    if not json_data or not username or not password:
        return abort(400)
    user = User.query.filter_by(username=username).first_or_404()    
    if user.check_password(password):
        payload_jwt = {
            'exp': datetime.now() + timedelta(hours=1),
            'user_id': user.id
        }
        key = app.config['SECRET_KEY']
        encoded_jwt = jwt.encode(payload_jwt, key, algorithm='HS256').decode('utf-8')
        payload = {
            'access_key': encoded_jwt
        }
        return jsonify(payload)
    return abort(401)


In [5]:
###
#
# View part
#
###

from marshmallow import ValidationError


@app.route('/books', methods=['GET'])
@jwt_req
def get_books():
    return jsonify(
        count=Book.query.count(),
        items=[BookSchema().dump(b) for b in Book.query.all()]
    )


@app.route('/books', methods=['POST'])
@jwt_req
def add_book():
    payload = request.get_json()
    try:
        book = BookSchema().load(payload)
    except ValidationError as err:
        return abort(400)
    db.session.add(book)
    db.session.commit()
    return jsonify(BookSchema().dump(book)), 201


@app.route('/books/<int:book_id>', methods=['GET'])
@jwt_req
def get_book(book_id):
    book = Book.query.filter_by(id=book_id).first_or_404()
    return jsonify(BookSchema().dump(book)), 200


@app.route('/books/<int:book_id>', methods=['PUT'])
@jwt_req
def update_book(book_id):
    payload = request.get_json()
    book = Book.query.filter_by(id=book_id).first_or_404()
    try:
        book = BookSchema().load(payload, instance=book)
    except ValidationError as err:
        return abort(400)
    db.session.commit()
    return jsonify(BookSchema().dump(book)), 200


@app.route('/books/<int:book_id>', methods=['DELETE'])
@jwt_req
def delete_book(book_id):
    book = Book.query.filter_by(id=book_id).first_or_404()
    db.session.delete(book)
    db.session.commit()
    return jsonify(BookSchema().dump(book)), 200


@app.errorhandler(400)
def not_found(err):
    return jsonify({
        "errors": [
            {
                "code": 400,
                "message":"Bad request"
            }
        ]}), 400


@app.errorhandler(404)
def not_found(err):
    return jsonify({
        "errors": [
            {
                "code": 404,
                "message":"Element not found"
            }
        ]}), 404


@app.errorhandler(409)
def not_found(err):
    return jsonify({
        "errors": [
            {
                "code": 409,
                "message":"Added element is now exist"
            }
        ]}), 409


@app.errorhandler(401)
def not_found(err):
    return jsonify({
        "errors": [
            {
                "code": 401,
                "message": "Unauthorized"
            }
        ]}), 401

---
Check all actions **without** auth

In [6]:
from pprint import pprint

with app.test_client() as client:
    response = client.get('/books')
    print('Get all books')
    pprint(response)
    pprint(response.get_json())

    payload = {'bookname': 'DDD', 'author_id': '3'}
    response = client.post('/books', json=payload)
    print('Add some book to shelf')
    print(response)
    pprint(response.get_json())

    response = client.get('/books/2')
    print('Get some book from shelf')
    print(response)
    pprint(response.get_json())

    payload = {'bookname': 'TTT', 'author_id': '2'}
    response = client.put('/books/3', json=payload)
    print('Update some book from bookshels')
    print(response)
    pprint(response.get_json())

    response = client.delete('/books/1')
    print('Delete some book from boookshelf')
    print(response)
    pprint(response.get_json())


Get all books
<Response streamed [401 UNAUTHORIZED]>
{'errors': [{'code': 401, 'message': 'Unauthorized'}]}
Add some book to shelf
<Response streamed [401 UNAUTHORIZED]>
{'errors': [{'code': 401, 'message': 'Unauthorized'}]}
Get some book from shelf
<Response streamed [401 UNAUTHORIZED]>
{'errors': [{'code': 401, 'message': 'Unauthorized'}]}
Update some book from bookshels
<Response streamed [401 UNAUTHORIZED]>
{'errors': [{'code': 401, 'message': 'Unauthorized'}]}
Delete some book from boookshelf
<Response streamed [401 UNAUTHORIZED]>
{'errors': [{'code': 401, 'message': 'Unauthorized'}]}


---
Check all actions **with** auth

In [7]:
from pprint import pprint

with app.test_client() as client:
    payload = {'username':'user_2', 'password':'bbb'}
    response = client.post('/auth', json=payload)
    print(response)
    pprint(response.get_json())
    token = response.get_json()

    
    response = client.get('/books', query_string={'token': token['access_key']})
    print('Get all books')
    pprint(response)
    pprint(response.get_json())

    payload = {'bookname': 'DDD', 'author_id': '3'}
    response = client.post('/books', json=payload, query_string={'token': token['access_key']})
    print('Add some book to shelf')
    print(response)
    pprint(response.get_json())

    response = client.get('/books/2', query_string={'token': token['access_key']})
    print('Get some book from shelf')
    print(response)
    pprint(response.get_json())

    payload = {'bookname': 'TTT', 'author_id': '2'}
    response = client.put('/books/3', query_string={'token': token['access_key']})
    print('Update some book from bookshels')
    print(response)
    pprint(response.get_json())

    response = client.delete('/books/1', query_string={'token': token['access_key']})
    print('Delete some book from boookshelf')
    print(response)
    pprint(response.get_json())


<Response streamed [200 OK]>
{'access_key': 'eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE1OTkzMjkyNDMsInVzZXJfaWQiOjJ9.zw8hrj6gx95r8yke1sfqSLJF7R1Can6HYasbPzvl1iE'}
Get all books
<Response streamed [200 OK]>
{'count': 6,
 'items': [{'author_id': 1, 'bookname': 'book_1', 'id': 1},
           {'author_id': 1, 'bookname': 'book_2', 'id': 2},
           {'author_id': 2, 'bookname': 'book_3', 'id': 3},
           {'author_id': 2, 'bookname': 'book_4', 'id': 4},
           {'author_id': 2, 'bookname': 'book_5', 'id': 5},
           {'author_id': 3, 'bookname': 'book_6', 'id': 6}]}
Add some book to shelf
<Response streamed [201 CREATED]>
{'author_id': 3, 'bookname': 'DDD', 'id': 7}
Get some book from shelf
<Response streamed [200 OK]>
{'author_id': 1, 'bookname': 'book_2', 'id': 2}
Update some book from bookshels
<Response streamed [400 BAD REQUEST]>
{'errors': [{'code': 400, 'message': 'Bad request'}]}
Delete some book from boookshelf
<Response streamed [200 OK]>
{'author_id': 1, 'bookna

**As we can see all works! Great!**

### The example code with using jwt- sqlalchemy- and restx- extensions for Flask

Requirements:
+ ___flask___
+ ___flask-sqlalchemy___
+ ___flask-restx___
+ ___pyjwt___
+ ___bcrypt___
+ ___marshmallow___
+ ___marshmallow-sqlalchemy___

In [1]:
class Config:
    DEBUG = True
    SQLALCHEMY_DATABASE_URI = 'sqlite:///:memory:'
    SQLALCHEMY_TRACK_MODIFICATIONS = False
    SECRET_KEY = 'you-will-never-guess'
    JWT_SECRET_KEY = SECRET_KEY

In [2]:
from flask import Flask, request, jsonify, abort
from flask_sqlalchemy import SQLAlchemy
from flask_marshmallow import Marshmallow
from flask_jwt import JWT, jwt_required
from flask_restx import Api, Resource, fields
import json
import bcrypt

app = Flask(__name__)
app.config.from_object(Config)
db = SQLAlchemy(app)
ma = Marshmallow(app)
api = Api(app, version='1.0', title='Bookshelf API',
    description='A simple Bookshelf api',
)

###
#
# Model Part
#
###

class User(db.Model):
    __tablename__ = 'users'
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(64), unique=True)
    password_hash = db.Column(db.String(128))

    def check_password(self, password):
        return bcrypt.checkpw(password.encode('utf-8'), self.password_hash)
    
    def __repr__(self):
        return "<User(name={self.username!r})>".format(self=self)

    
def authenticate(username, password):
    user = User.query.filter_by(username=username).first()
    if user and user.check_password(password):
        return user

    
def identity(payload):
    user_id = payload['identity']
    return User.query.get(user_id)
    

class Book(db.Model):
    __tablename__ = 'books'
    id = db.Column(db.Integer, primary_key=True)
    bookname = db.Column(db.String(64))
    author_id = db.Column(db.Integer, db.ForeignKey('users.id'))
    author = db.relationship('User', backref='books')
    
    def __repr__(self):
        return "<Book(name={self.bookname!r})>".format(self=self)

    
gen_hashpw = lambda x: bcrypt.hashpw(x.encode('utf-8'), bcrypt.gensalt())
    
users = [
    User(username='user_1', password_hash=gen_hashpw('aaa')),
    User(username='user_2', password_hash=gen_hashpw('bbb')),
    User(username='user_3', password_hash=gen_hashpw('ccc'))
]

books = [
    Book(bookname='book_1', author_id=1),
    Book(bookname='book_2', author_id=1),
    Book(bookname='book_3', author_id=2),
    Book(bookname='book_4', author_id=2),
    Book(bookname='book_5', author_id=2),
    Book(bookname='book_6', author_id=3)
]

db.create_all()
db.session.add_all(users + books)
db.session.commit()

jwt = JWT(app, authenticate, identity)

In [3]:
###
#
# Schemas part
#
###

class BookSchema(ma.SQLAlchemySchema):
    class Meta:
        model = Book
        load_instance = True

    id = ma.auto_field(dump_only=True)
    bookname = ma.auto_field()
    author_id = ma.auto_field()

In [4]:
###
#
# Error part
#
###

@app.errorhandler(400)
def not_found(err):
    return jsonify({
        "errors": [
            {
                "code": 400,
                "message":"Bad request"
            }
        ]}), 400


@app.errorhandler(404)
def not_found(err):
    return jsonify({
        "errors": [
            {
                "code": 404,
                "message":"Element not found"
            }
        ]}), 404


@app.errorhandler(409)
def not_found(err):
    return jsonify({
        "errors": [
            {
                "code": 409,
                "message":"Added element is now exist"
            }
        ]}), 409


@app.errorhandler(401)
def not_found(err):
    return jsonify({
        "errors": [
            {
                "code": 401,
                "message": "Unauthorized"
            }
        ]}), 401

In [5]:
from marshmallow import ValidationError

ns = api.namespace('books', description='Bookshelf operations')

book = api.model('Book', {
    'id': fields.Integer(readonly=True, description='The book unique identifier'),
    'bookname': fields.String(required=True, description='The book name'),
    'author_id': fields.String(required=True, description='The book author')
})

@ns.route('/')
class BookList(Resource):
    '''Shows a list of all books, and lets you POST to add new book'''
    @ns.doc('list_books')
    @ns.marshal_list_with(book)
    @jwt_required()
    def get(self):
        '''Fetch a given resource'''
        return [BookSchema().dump(b) for b in Book.query.all()]
    
    @ns.doc('create_todo')
    @ns.expect(book)
    @ns.marshal_with(book)
    @jwt_required()
    def post(self):
        try:
            book = BookSchema().load(api.payload)
        except ValidationError as err:
            return abort(400)
        db.session.add(book)
        db.session.commit()
        return BookSchema().dump(book), 201


@ns.route('/<int:id>')
@ns.response(404, 'Todo not found')
@ns.param('id', 'The book identifier')
class Books(Resource):
    '''Show a single book and lets you update and delete them'''
    @ns.doc('get_book')
    @ns.marshal_list_with(book)
    @jwt_required()
    def get(self, id):
        '''Fetch a given resource'''
        book = Book.query.filter_by(id=id).first_or_404()
        return BookSchema().dump(book), 200
    
    @ns.doc('put_book')
    @ns.marshal_list_with(book)
    @jwt_required()
    def put(self, id):
        book = Book.query.filter_by(id=id).first_or_404()
        try:
            book = BookSchema().load(payload, instance=book)
        except ValidationError as err:
            return abort(400)
        db.session.commit()
        return BookSchema().dump(book), 200
    
    @ns.doc('delete_book')
    @ns.marshal_list_with(book)
    @jwt_required()
    def delete(self, id):
        book = Book.query.filter_by(id=id).first_or_404()
        db.session.delete(book)
        db.session.commit()
        return BookSchema().dump(book), 200

---
Check all actions **without** auth

In [6]:
from pprint import pprint

with app.test_client() as client:
    response = client.get('/books/')
    print('Get all books')
    pprint(response)
    pprint(response.get_json())

    payload = {'bookname': 'DDD', 'author_id': '3'}
    response = client.post('/books/', json=payload)
    print('Add some book to shelf')
    print(response)
    pprint(response.get_json())

    response = client.get('/books/2')
    print('Get some book from shelf')
    print(response)
    pprint(response.get_json())

    payload = {'bookname': 'TTT', 'author_id': '2'}
    response = client.put('/books/3', json=payload)
    print('Update some book from bookshels')
    print(response)
    pprint(response.get_json())

    response = client.delete('/books/1')
    print('Delete some book from boookshelf')
    print(response)
    pprint(response.get_json())


Authorization Required. Request does not contain an access token
Authorization Required. Request does not contain an access token
Authorization Required. Request does not contain an access token
Authorization Required. Request does not contain an access token
Authorization Required. Request does not contain an access token


Get all books
<Response streamed [401 UNAUTHORIZED]>
{'description': 'Request does not contain an access token',
 'error': 'Authorization Required',
 'status_code': 401}
Add some book to shelf
<Response streamed [401 UNAUTHORIZED]>
{'description': 'Request does not contain an access token',
 'error': 'Authorization Required',
 'status_code': 401}
Get some book from shelf
<Response streamed [401 UNAUTHORIZED]>
{'description': 'Request does not contain an access token',
 'error': 'Authorization Required',
 'status_code': 401}
Update some book from bookshels
<Response streamed [401 UNAUTHORIZED]>
{'description': 'Request does not contain an access token',
 'error': 'Authorization Required',
 'status_code': 401}
Delete some book from boookshelf
<Response streamed [401 UNAUTHORIZED]>
{'description': 'Request does not contain an access token',
 'error': 'Authorization Required',
 'status_code': 401}


---
Check all actions **with** auth

In [7]:
from pprint import pprint

with app.test_client() as client:
    payload = {'username':'user_2', 'password':'bbb'}
    response = client.post('/auth', json=payload)
    print(response)
    pprint(response.get_json())
    token = response.get_json()
    headers = {'Authorization': 'JWT ' + token['access_token']}
    
    response = client.get('/books/', headers=headers)
    print('Get all books')
    pprint(response)
    pprint(response.get_json())

    payload = {'bookname': 'DDD', 'author_id': '3'}
    response = client.post('/books/', json=payload, headers=headers)
    print('Add some book to shelf')
    print(response)
    pprint(response.get_json())

    response = client.get('/books/2', headers=headers)
    print('Get some book from shelf')
    print(response)
    pprint(response.get_json())

    payload = {'bookname': 'TTT', 'author_id': '2'}
    response = client.put('/books/3', headers=headers)
    print('Update some book from bookshels')
    print(response)
    pprint(response.get_json())

    response = client.delete('/books/1', headers=headers)
    print('Delete some book from boookshelf')
    print(response)
    pprint(response.get_json())


<Response streamed [200 OK]>
{'access_token': 'eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE1OTkzMjA1MDgsImlhdCI6MTU5OTMyMDIwOCwibmJmIjoxNTk5MzIwMjA4LCJpZGVudGl0eSI6Mn0.4TTRT2167-O7wAup1XT4PBMCsPclMJSpK81yI1Yn38k'}
Get all books
<Response streamed [200 OK]>
[{'author_id': '1', 'bookname': 'book_1', 'id': 1},
 {'author_id': '1', 'bookname': 'book_2', 'id': 2},
 {'author_id': '2', 'bookname': 'book_3', 'id': 3},
 {'author_id': '2', 'bookname': 'book_4', 'id': 4},
 {'author_id': '2', 'bookname': 'book_5', 'id': 5},
 {'author_id': '3', 'bookname': 'book_6', 'id': 6}]
Add some book to shelf
<Response streamed [201 CREATED]>
{'author_id': '3', 'bookname': 'DDD', 'id': 7}
Get some book from shelf
<Response streamed [200 OK]>
{'author_id': '1', 'bookname': 'book_2', 'id': 2}
Update some book from bookshels
<Response streamed [200 OK]>
{'author_id': '2', 'bookname': 'TTT', 'id': 3}
Delete some book from boookshelf
<Response streamed [200 OK]>
{'author_id': '1', 'bookname': 'book_1', 'id': 1}

# Asyncio
requriments: loguru

In [2]:
from time import sleep
from loguru import logger

def foo_sync():
    logger.info('Start foo sync')
    sleep(.1)
    logger.info('Stop foo sync')
    
def boo_sync():
    logger.info('Start boo sync')
    sleep(.1)
    logger.info('Stop boo sync')
    
foo_sync()
boo_sync()

2021-02-21 14:12:37.097 | INFO     | __main__:foo_sync:5 - Start foo sync
2021-02-21 14:12:37.208 | INFO     | __main__:foo_sync:7 - Stop foo sync
2021-02-21 14:12:37.209 | INFO     | __main__:boo_sync:10 - Start boo sync
2021-02-21 14:12:37.318 | INFO     | __main__:boo_sync:12 - Stop boo sync


In [51]:
# %autoawait asyncio

import asyncio
from time import sleep
from loguru import logger


async def foo_async():
    print('Start foo sync')
    await asyncio.sleep(5)
    print('Stop foo sync')
    
    
async def boo_async():
    print('Start boo async')
    await asyncio.sleep(5)
    print('Stop boo async')
    

tasks = [foo_async(), boo_async()]
coro = asyncio.wait(tasks)
loop = asyncio.get_event_loop()
loop.create_task(coro)

<Task pending coro=<wait() running at c:\users\illusion of control\appdata\local\programs\python\python37\lib\asyncio\tasks.py:361>>

Start foo sync
Start boo async
Stop foo sync
Stop boo async


In [43]:
import random
from time import sleep
import asyncio


def task(pid):
    """Synchronous non-deterministic task.
    """
    sleep(random.randint(0, 2) * 0.001)
    print('Task %s done' % pid)


async def task_coro_1(pid):
    logger.info('Start boo async')
    await asyncio.sleep(1)
    logger.info('Stop boo async')
    
async def task_coro_2(pid):
    logger.info('Start boo async')
    await asyncio.sleep(1)
    logger.info('Stop boo async')



def synchronous():
    for i in range(1, 10):
        task(i)


async def asynchronous():
    tasks = [task_coro_1, task_coro_2]
    await asyncio.wait(tasks)


print('Synchronous:')
synchronous()

ioloop = asyncio.get_event_loop()
print('Asynchronous:')
ioloop.create_task(asynchronous())

Synchronous:
Task 1 done
Task 2 done
Task 3 done
Task 4 done
Task 5 done
Task 6 done
Task 7 done
Task 8 done
Task 9 done
Asynchronous:


<Task pending coro=<asynchronous() running at <ipython-input-43-889dca19ca20>:30>>