# Содержание

* [config.py](#config.py)
* [db.py](#db.py)
* [okved.py](#okved.py)
* [Тестирование](#Тестирование)

# config.py

In [7]:
%%writefile config.py
import os
from sqlalchemy.engine.url import make_url

# Параметры базы данных
### Строка подключения в виде
### postgresql+psycopg2://username:password@host:port/database
### сохранена в качестве переменной среды
db_uri_env_name = "MIDDLE_PYTHON_EDU_DB_SQLALCHEMY_CONN"
DB_URI = os.getenv(db_uri_env_name)
if not DB_URI:
    raise ImportError(
        f"Необходимо добавить переменную среды '{db_uri_env_name}' "
        "со значением вида 'postgresql+psycopg2://username:password@host:port/database', "
        "где username, password, host, port, database - параметры подключения к БД."
        )
db_dict = make_url(DB_URI)
DB_CREDENTIALS = dict(
    host=db_dict.host,
    port=db_dict.port,
    dbname=db_dict.database,
    user=db_dict.username,
    password=db_dict.password,
    )
    
schema = 'hw1'
okved_table = 'okved'
egrul_table = 'telecom_companies'

# Параметры файловой системы
bulk_data_dir = 'bulk_data'
okved_filename = 'okved_2.json.zip'
egrul_filename = 'egrul.json.zip'

okved_filepath = os.path.join(bulk_data_dir, okved_filename)
egrul_filepath = os.path.join(bulk_data_dir, egrul_filename)

# Параметры скрипта
okved_primary_code = '61'
egrul_dtypes = {
    'ogrn': 'int64',
    'inn': 'int64',
    'kpp': 'int64',
    }

Overwriting config.py


# psql.py

In [1]:
%%writefile psql.py
import psycopg2
import psycopg2.extras
from sqlalchemy import create_engine

import pandas as pd
from numpy import NaN

import config

class PsqlConnector():
    """Класс для работы с базой данных PostgreSQL"""
    
    def __init__(self):
        self.db_credentials = config.DB_CREDENTIALS
        self.db_uri = config.DB_URI
    
    def replace_nans(self, df):
        """Замена пустот на None, под формат баз данных PostgreSQL.
        В т.ч. заменяются пустые строки ''.
        
        Аргументы
        ----------
        df : pandas.DataFrame
            Датафрейм для заливки в БД.

        Возвращается
        ----------
        df : pandas.DataFrame
            Датафрейм для заливки в БД с "отформатированными" пустотами.
        """
        df = df.where(pd.notna(df), None)
        df = df.replace({'': None})
        df = df.replace({pd.NaT: None})
        df = df.replace({NaN: None})
        
        return df

    def connect(self):
        """Создание подключения к базе данных PostgreSQL."""
        conn = None
        try:
            if isinstance(self.db_credentials, str):
                conn = psycopg2.connect(self.db_credentials)
            else:
                conn = psycopg2.connect(**self.db_credentials)
        except(Exception, psycopg2.DatabaseError) as error:
            print(error)
            raise error
        return conn
            
    def insert_values(self, df, schema, table, on_conflict_clause=None):
        """Обертка для метода
        psycopg2.extras.execute_values с обработкой подключения.
        
        Аргументы
        ----------
        df : pandas.DataFrame
            Датафрейм для заливки в БД.
        schema: str
            Название схемы БД.
        table : str
            Название таблицы БД.
        on_conflict_clause: str
            Выражение для выполнения т.н. "UPSERT" - игнорирование или обработка данных при возникновении дубликатов.
            Например:
                "ON CONFLICT ON (name) DO NOTHING"
                или
                "ON CONFLICT (name) DO 
                UPDATE SET email = EXCLUDED.email"
        """
        if not on_conflict_clause:
            on_conflict_clause = ''
        
        conn = self.connect()
        cursor = conn.cursor()
        
        try:
            df = self.replace_nans(df)
            tuples = [tuple(x) for x in df.to_numpy()]
            cols = ','.join(list(df.columns))
            query  = f"INSERT INTO {schema}.{table}({cols}) VALUES %s {on_conflict_clause}"
            psycopg2.extras.execute_values(cursor, query, tuples)
            conn.commit()
        except(Exception, psycopg2.DatabaseError) as error:
            print(f"Error: {error}")
            conn.rollback()
            raise error
        finally:
            cursor.close()
            conn.close()

    def read_query(self, query):
        """Обертка для метода pd.read_sql_query
        с обработкой подключения
        
        Аргументы
        ----------
        query : str
            SELECT SQL-запрос.
        """
        engine = create_engine(self.db_uri)
        try:
            df = pd.read_sql_query(query, engine)
            return df
        finally:
            engine.dispose()

    def execute_query(self, query):
        """Метод для выполнения различных запросов, 
        не возвращающих таблицы
        
        Аргументы
        ----------
        query : str
            Любой SQL-запрос.
        """
        conn = self.connect()
        cursor = conn.cursor()
        try:
            cursor.execute(query)
            conn.commit()
        except(Exception, psycopg2.DatabaseError) as error:
            print(f"Error: {error}")
            conn.rollback()
            cursor.close()
            raise error
        finally:
            conn.close()
            
    def execute_sql(self, filepath, encoding='cp1251'):
        """Надстройка на методом self.execute_query
        для выполнения различных запросов из .sql файлов.
        
        Аргументы
        ----------
        filepath : str
            Путь к файлу .sql с запросом.
        encoding: str, default: 'cp1251'
            Кодировка .sql файла.
        """
        with open(filepath, 'r', encoding=encoding) as q:
            query = q.read()
        self.execute_query(query)
            
    def truncate_table(self, schema, table, restart_identity=True):
        """Метод для полной очистки таблицы.
        
        Аргументы
        ----------
        schema: str
            Название схемы БД.
        table : str
            Название таблицы БД.
        restart_identity: bool, default: True
            Удаление с перезапуском счетчика генерируемого идентификатора.
        """
        if restart_identity:
            restart_identity_statement = 'RESTART IDENTITY'
        else:
            restart_identity_statement = ''
        self.execute_query(f"""TRUNCATE TABLE {schema}.{table} {restart_identity_statement}""")

Overwriting psql.py


# app.py

In [9]:
%%writefile app.py
import pandas as pd
import zipfile
from joblib import Parallel, delayed

import config
import psql

db = psql.PsqlConnector()

def time_decorator(func):
    """Декоратор для отображения времени исполнения функции"""
    def decorator(*args, **kwargs):
        start_dttm = pd.to_datetime('today')
        result = func(*args, **kwargs)
        end_dttm = pd.to_datetime('today')
        diff = end_dttm - start_dttm
        print(f'{func.__name__} - время выполнения: {round(diff.total_seconds(), 2)} с')
        return result
    return decorator

def create_db_schema():
    """Создание структура базы данных"""
    db.execute_sql('sql/create_schema_hw1.sql')
    db.execute_sql('sql/recreate_table_hw1.okved.sql')
    db.execute_sql('sql/recreate_table_hw1.telecom_companies.sql')

@time_decorator
def upload_okved():
    """1 задание домашней работы - загрузка данных ОКВЭД"""
    okved = pd.read_json(config.okved_filepath, compression='zip')
    db.insert_values(okved, schema=config.schema, table=config.okved_table)
    
def get_filelist(zip_path, max_files_count=None):
    """Получение списка файлов в архиве
    
    Аргументы
    ----------
    zip_path: str
        Путь к zip-файлу.
    max_files_count: int, default: None
        Количество читаемых файлов (для тестирования).
    """
    with zipfile.ZipFile(zip_path, 'r') as zip_archive:
        filelist = zip_archive.namelist()
    if max_files_count:
        return filelist[:max_files_count]
    else:
        return filelist

def get_okved_code(item):
    """Получение кода ОКВЭД в данных ЕГРЮЛ.
    
    Аргументы
    ----------
    item: dict
        Словарь, ячейка данных в датафрейме.
    """
    if item.get('СвОКВЭД'):
        if item['СвОКВЭД'].get('СвОКВЭДОсн'):
            return item['СвОКВЭД']['СвОКВЭДОсн']['КодОКВЭД']
        
def upload_egrul_job(filename, okved_primary_code, dtypes):
    """Функция для загрузки данных ЕГРЮЛ из одного JSON-файла.
    
    Аргументы
    ----------
    filename: str
        Название JSON-файла в архиве.
    okved_primary_code: str | int
        Фильтруемый код ОКВЭД.
    dtypes: dict
        Словарь-маппинг типов данных.
    """
    with zipfile.ZipFile(config.egrul_filepath, 'r') as zip_archive:
        with zip_archive.open(filename) as f:
            egrul = pd.read_json(f, dtype=dtypes)
            egrul['okved_code'] = egrul['data'].map(get_okved_code)
            egrul = egrul[
                (egrul['okved_code'].str.startswith(f'{okved_primary_code}.', na=False))
                | (egrul['okved_code'] == str(okved_primary_code))
                ]
            egrul['source_filename'] = filename
            egrul = egrul[[
                'ogrn',
                'inn',
                'kpp',
                'name',
                'okved_code',
                'source_filename',
                ]]
            db.insert_values(egrul, schema=config.schema, table=config.egrul_table)

@time_decorator
def upload_egrul():
    """2 задание домашней работы - загрузка данных ЕГРЮЛ"""
    filelist = get_filelist(config.egrul_filepath)
    Parallel(n_jobs=-1)(delayed(upload_egrul_job)(
        filename, config.okved_primary_code, config.egrul_dtypes) for filename in filelist)

def app():
    """Функция для запуска скрипта"""
    create_db_schema()
    upload_okved()
    upload_egrul()
    
if __name__ == "__main__":
    app()

Overwriting app.py


# Тестирование

In [1]:
import config
import app
import psql

db = psql.PsqlConnector()

In [2]:
app.app()

upload_okved - время выполнения: 0.45 с
upload_egrul - время выполнения: 1004.87 с


In [3]:
okved = db.read_query(f'SELECT * FROM {config.schema}.{config.okved_table}')
display(okved.head(10))
display(okved.shape)
display(okved.info())

Unnamed: 0,code,parent_code,section,name,comment,load_dttm
0,01,A,A,"Растениеводство и животноводство, охота и пред...",Эта группировка включает:\n- два основных вида...,2023-06-21 14:06:27.003380
1,01.1,01,A,Выращивание однолетних культур,Эта группировка включает:\n- выращивание однол...,2023-06-21 14:06:27.003380
2,01.11,01.1,A,"Выращивание зерновых (кроме риса), зернобобовы...",Эта группировка включает:\n- все формы выращив...,2023-06-21 14:06:27.003380
3,01.11.1,01.11,A,Выращивание зерновых культур,,2023-06-21 14:06:27.003380
4,01.11.11,01.11.1,A,Выращивание пшеницы,,2023-06-21 14:06:27.003380
5,01.11.12,01.11.1,A,Выращивание ячменя,,2023-06-21 14:06:27.003380
6,01.11.13,01.11.1,A,Выращивание ржи,,2023-06-21 14:06:27.003380
7,01.11.14,01.11.1,A,Выращивание кукурузы,,2023-06-21 14:06:27.003380
8,01.11.15,01.11.1,A,Выращивание овса,,2023-06-21 14:06:27.003380
9,01.11.16,01.11.1,A,Выращивание гречихи,,2023-06-21 14:06:27.003380


(2818, 6)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2818 entries, 0 to 2817
Data columns (total 6 columns):
 #   Column       Non-Null Count  Dtype         
---  ------       --------------  -----         
 0   code         2818 non-null   object        
 1   parent_code  2796 non-null   object        
 2   section      2818 non-null   object        
 3   name         2818 non-null   object        
 4   comment      1007 non-null   object        
 5   load_dttm    2818 non-null   datetime64[ns]
dtypes: datetime64[ns](1), object(5)
memory usage: 132.2+ KB


None

In [4]:
egrul = db.read_query(f'SELECT * FROM {config.schema}.{config.egrul_table}')
display(egrul.head(10))
display(egrul.shape)
display(egrul.info())
display(egrul['okved_code'].value_counts(dropna=False, normalize=True))

Unnamed: 0,ogrn,inn,kpp,name,okved_code,source_filename,load_dttm
0,1114910000480,4909109000.0,490901001.0,"ООО ""ИТЛАЙН""",61.10,06439.json,2023-06-21 14:06:31.771144
1,1120521000819,521014200.0,52101001.0,"ООО ""ЛЕВАШИ-ТЕЛЕКОМ""",61.1,06849.json,2023-06-21 14:06:31.854981
2,1120529000536,529911500.0,52901001.0,"ООО ""РУСЛАН""",61.10.1,06849.json,2023-06-21 14:06:31.854981
3,1120531000182,531012100.0,53101001.0,"ООО ""ОТЧЕТ-ЦЕНТР""",61.10.4,06849.json,2023-06-21 14:06:31.854981
4,1120545000564,545025500.0,55401001.0,"ООО ""ЕВРОТЕЛЕКОМ""",61.10,06849.json,2023-06-21 14:06:31.854981
5,1120546000519,516011400.0,51601001.0,"ООО ""ПАУТИНА""",61.1,06849.json,2023-06-21 14:06:31.854981
6,1120546000630,546022300.0,54601001.0,"ООО ""БАГАТ-2012""",61.10,06849.json,2023-06-21 14:06:31.854981
7,1127847488053,7811531000.0,781101001.0,"ООО ""СМО СИСТЕМА""",61.30,07388.json,2023-06-21 14:06:31.921148
8,1097746001143,7714773000.0,773401001.0,"ЗАО ""ИНТЕРФОН""",61.10,05575.json,2023-06-21 14:06:31.973851
9,1097746001979,7728691000.0,772801001.0,"ЗАО ""ДЖЕТТЕЛ""",61.1,05575.json,2023-06-21 14:06:31.973851


(20917, 7)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 20917 entries, 0 to 20916
Data columns (total 7 columns):
 #   Column           Non-Null Count  Dtype         
---  ------           --------------  -----         
 0   ogrn             20917 non-null  int64         
 1   inn              20914 non-null  float64       
 2   kpp              20914 non-null  float64       
 3   name             20917 non-null  object        
 4   okved_code       20917 non-null  object        
 5   source_filename  20917 non-null  object        
 6   load_dttm        20917 non-null  datetime64[ns]
dtypes: datetime64[ns](1), float64(2), int64(1), object(3)
memory usage: 1.1+ MB


None

61.10       0.437061
61.10.1     0.189607
61.1        0.117177
61.10.4     0.091791
61.10.9     0.055457
61.20       0.036812
61.90       0.015585
61.10.3     0.013673
61.10.2     0.013243
61.20.2     0.009370
61.2        0.006454
61.20.1     0.003920
61.10.5     0.003681
61.30       0.003538
61.20.3     0.001530
61.20.4     0.000478
61.10.6     0.000191
61.30.1     0.000143
61.20.5     0.000096
61.10.21    0.000096
61.30.2     0.000096
Name: okved_code, dtype: float64