# Библиотека asyncpg

***Ноутбук подготовлен на основе материалов, изложенных в книге "Asyncio и конкурентное программирование на Python" (автор Фаулер М.)***

In [1]:
!pip freeze | grep asyncpg

In [2]:
!pip install asyncpg -q

[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/3.1 MB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.2/3.1 MB[0m [31m7.8 MB/s[0m eta [36m0:00:01[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m[90m━━━━━━━━━━━━━[0m [32m2.0/3.1 MB[0m [31m29.9 MB/s[0m eta [36m0:00:01[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m3.1/3.1 MB[0m [31m38.1 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.1/3.1 MB[0m [31m27.9 MB/s[0m eta [36m0:00:00[0m
[?25h

In [3]:
import os
import glob
import csv
import pandas as pd
from sqlalchemy import create_engine

import asyncpg
from asyncpg import Record
import asyncio

from typing import List,Union

### 1. Подготовка демонстрационного массива данных

In [4]:
%%bash
mkdir /content/sales

In [5]:
%%bash
#!/bin/bash
: <<'END'
Скрипт для многократного копирования файла-шаблона с данными по продажам
END
file="data.csv"
source="/content/drive/MyDrive/datasets"
target_folder="/content/sales"
number_of_copies=10
for (( step=1; step<=$number_of_copies; step++ ))
do
cp "$source/$file" "$target_folder"
sed -i "s/c_/c_$step/" "$target_folder/$file"
mv "$target_folder/$file" "$target_folder/data_$step.csv"
done

### 2. Установка БД

In [6]:
%%capture
%%bash
# Install postgresql server
sudo apt-get -y -qq update
sudo apt-get -y -qq install postgresql
sudo service postgresql start

# Setup a password `postgres` for username `postgres`
sudo -u postgres psql -U postgres -c "ALTER USER postgres PASSWORD 'postgres';"

# Setup a database with name `tfio_demo` to be used
sudo -u postgres psql -U postgres -c 'DROP DATABASE IF EXISTS db;'
sudo -u postgres psql -U postgres -c 'CREATE DATABASE db;'

In [7]:
%env DB_DEMO_DATABASE_NAME=db
%env DB_DEMO_DATABASE_HOST=localhost
%env DB_DEMO_DATABASE_PORT=5432
%env DB_DEMO_DATABASE_USER=postgres
%env DB_DEMO_DATABASE_PASS=postgres

env: DB_DEMO_DATABASE_NAME=db
env: DB_DEMO_DATABASE_HOST=localhost
env: DB_DEMO_DATABASE_PORT=5432
env: DB_DEMO_DATABASE_USER=postgres
env: DB_DEMO_DATABASE_PASS=postgres


In [8]:
endpoint="postgresql://{}:{}@{}:{}/{}".format(
    os.environ['DB_DEMO_DATABASE_USER'],
    os.environ['DB_DEMO_DATABASE_PASS'],
    os.environ['DB_DEMO_DATABASE_HOST'],
    os.environ['DB_DEMO_DATABASE_PORT'],
    os.environ['DB_DEMO_DATABASE_NAME'],
)

In [9]:
print(endpoint)

postgresql://postgres:postgres@localhost:5432/db


In [10]:
con =  create_engine(endpoint)

### 3. Тестирование соединения

In [11]:
async def test_connection():
    connection = await asyncpg.connect(host=os.environ['DB_DEMO_DATABASE_HOST'],
                                       port=os.environ['DB_DEMO_DATABASE_PORT'],
                                       user=os.environ['DB_DEMO_DATABASE_USER'],
                                       database=os.environ['DB_DEMO_DATABASE_NAME'],
                                       password=os.environ['DB_DEMO_DATABASE_PASS'])
    version = connection.get_server_version()
    print(f'Connected! Postgres version is {version}')
    await connection.close()

In [12]:
await test_connection()

Connected! Postgres version is ServerVersion(major=10, minor=0, micro=22, releaselevel='final', serial=0)


### 4. Формирование шаблона таблицы Продажи

In [13]:
CREATE_SALES_TABLE = \
    """
    CREATE TABLE IF NOT EXISTS sales(
        sale_id SERIAL PRIMARY KEY,
        city TEXT NOT NULL,
        manager TEXT NOT NULL,
        product TEXT NOT NULL,
        val INT NOT NULL
    );"""

In [14]:
async def create_tables():
    connection = await asyncpg.connect(host=os.environ['DB_DEMO_DATABASE_HOST'],
                                       port=os.environ['DB_DEMO_DATABASE_PORT'],
                                       user=os.environ['DB_DEMO_DATABASE_USER'],
                                       database=os.environ['DB_DEMO_DATABASE_NAME'],
                                       password=os.environ['DB_DEMO_DATABASE_PASS'])
    statements = [CREATE_SALES_TABLE]

    print('Creating tables...')
    for statement in statements:
        status = await connection.execute(statement)
        print(status)
    print('Tables in the database are created!')
    await connection.close()

In [15]:
await create_tables()

Creating tables...
CREATE TABLE
Tables in the database are created!


### 5. Контроль результатов

In [16]:
def select_postgresql(sql):
    return pd.read_sql(sql,con)

In [17]:
sql = """select s.* from sales as s limit 7"""

In [18]:
select_postgresql(sql)

Unnamed: 0,sale_id,city,manager,product,val


### 6. Запись датасетов с данными о продажах в БД

In [19]:
PATH_DATASETS_SALES = '/content/sales/*.csv'

In [20]:
files_sale_csv = []
for current_file in glob.glob(PATH_DATASETS_SALES):
    files_sale_csv.append(current_file)

In [21]:
print(files_sale_csv[:5])

['/content/sales/data_9.csv', '/content/sales/data_3.csv', '/content/sales/data_6.csv', '/content/sales/data_10.csv', '/content/sales/data_1.csv']


In [22]:
def read_file_csv(path_file_csv:str)->List[Union[str,int]]:
  with open(path_file_csv) as r:
    reader = csv.reader(r, delimiter=',')
    rows_data = [row for row in reader][1:]
    
    def convert_string_number(s:str)->Union[str,int]:
      try:
        return int(s)
      except:
        return s

    rows_data = [tuple(convert_string_number(el) for el in record) for record in rows_data]
    return rows_data

In [23]:
PATH_TEST_FILE = '/content/sales/data_1.csv'
print(read_file_csv(PATH_TEST_FILE)[:5])

[('c_1', 'm5', 'pr7', 9), ('c_1', 'm6', 'pr1', 8), ('c_1', 'm11', 'pr3', 4), ('c_1', 'm8', 'pr13', 9), ('c_1', 'm14', 'pr14', 9)]


In [24]:
async def insert_sales(path_file_csv:str, connection):
    sales = read_file_csv(path_file_csv)
    insert_sales = "INSERT INTO sales VALUES (DEFAULT, $1, $2, $3, $4)"
    return await connection.executemany(insert_sales, sales)

In [25]:
async def main(files_sales:List[str]):
  connection = await asyncpg.connect(host=os.environ['DB_DEMO_DATABASE_HOST'],
                                     port=os.environ['DB_DEMO_DATABASE_PORT'],
                                     user=os.environ['DB_DEMO_DATABASE_USER'],
                                     database=os.environ['DB_DEMO_DATABASE_NAME'],
                                     password=os.environ['DB_DEMO_DATABASE_PASS'])
    
  for current_dataset in files_sales:
    await insert_sales(current_dataset, connection)

In [26]:
await main(files_sale_csv)

### 7. Проверка результатов

In [27]:
sql = """select * from sales as s where s.city='c_1'"""

In [28]:
select_postgresql(sql)

Unnamed: 0,sale_id,city,manager,product,val
0,41,c_1,m5,pr7,9
1,42,c_1,m6,pr1,8
2,43,c_1,m11,pr3,4
3,44,c_1,m8,pr13,9
4,45,c_1,m14,pr14,9
5,46,c_1,m7,pr8,2
6,47,c_1,m1,pr3,2
7,48,c_1,m14,pr1,3
8,49,c_1,m10,pr5,4
9,50,c_1,m14,pr10,3


In [29]:
async def get_results():
    connection = await asyncpg.connect(host=os.environ['DB_DEMO_DATABASE_HOST'],
                                     port=os.environ['DB_DEMO_DATABASE_PORT'],
                                     user=os.environ['DB_DEMO_DATABASE_USER'],
                                     database=os.environ['DB_DEMO_DATABASE_NAME'],
                                     password=os.environ['DB_DEMO_DATABASE_PASS'])
    
    sales_query = """select s.city, s.manager, s.product, s.val 
                     from sales as s 
                     where s.city='c_1'"""
    results: List[Record] = await connection.fetch(sales_query)
    for sale in results:
      print(f'city: {sale["city"]}, \
              manager: {sale["manager"]}, \
              product: {sale["product"]}, \
              val: {sale["val"]}')

    await connection.close()

In [30]:
await get_results()

city: c_1,               manager: m5,               product: pr7,               val: 9
city: c_1,               manager: m6,               product: pr1,               val: 8
city: c_1,               manager: m11,               product: pr3,               val: 4
city: c_1,               manager: m8,               product: pr13,               val: 9
city: c_1,               manager: m14,               product: pr14,               val: 9
city: c_1,               manager: m7,               product: pr8,               val: 2
city: c_1,               manager: m1,               product: pr3,               val: 2
city: c_1,               manager: m14,               product: pr1,               val: 3
city: c_1,               manager: m10,               product: pr5,               val: 4
city: c_1,               manager: m14,               product: pr10,               val: 3
