In [None]:
!curl ipecho.net/plain
!pip install PyMySQL

In [None]:
import os
from glob import glob
import pandas as pd
import sqlalchemy
import sqlalchemy.orm

In [None]:
#connect to google drive
from google.colab import drive
drive.mount('/content/gdrive')

In [None]:
# Подключение к базе MySQL
CONNECTION_STRING = 'mysql+pymysql://biblosphere:biblosphere@35.223.45.184/biblosphere'

In [None]:
# Путь к исходным данным
DATASET_PATH = '/content/gdrive/MyDrive/Biblosphera/datasets_biblioteki/datasets_2'

In [None]:
# Выполнение SQL-запроса
def execute_query(query):
    engine = sqlalchemy.create_engine(CONNECTION_STRING, echo=False)
    connection = engine.connect()
    queryResult = engine.execute(query)
    result = []
    if queryResult.returns_rows:
        for row in queryResult:
            result.append(row)
    connection.close()
    return result

In [None]:
# Создание списка файлов по маске
def create_dataset_from_files(datasetPath, fileMask, originalColumnsList, columnsLis):
    files = list(glob(os.path.join(datasetPath, fileMask)))
    dfResult = pd.DataFrame(columns=originalColumnsList)
    for file in files:
        df = pd.read_csv(file, sep=';', encoding='cp1251', usecols=originalColumnsList)
        dfResult = dfResult.append(df)
    dfResult = dfResult.set_axis(columnsList, axis='columns')
    return dfResult

In [None]:
def prepare_age_to_convert(x):
    y = x.replace('+', '').split(' ; ')
    y.sort(reverse=True)
    return y[0]

In [None]:
# Сохранение БД
def save_table_at_database(tableName, df):
    engine = sqlalchemy.create_engine(CONNECTION_STRING, echo=True)
    connection = engine.connect()
    df.to_sql(tableName, connection, if_exists='append', index=False)
    connection.close()

In [None]:
# Создание пустых таблиц
query = 'CREATE TABLE readers (readerId BIGINT, dateOfBirth DATE, PRIMARY KEY (readerId))'
execute_query(query)
query = 'CREATE TABLE catalog (recId BIGINT, author TEXT, title TEXT, age SMALLINT UNSIGNED, PRIMARY KEY (recId))'
execute_query(query)
query = 'CREATE TABLE funds (fundId BIGINT, recId BIGINT, siglaId BIGINT, PRIMARY KEY (fundId))'
execute_query(query)
query = 'CREATE TABLE circulation (id BIGINT, recId BIGINT, readerId BIGINT, bookpointId BIGINT, PRIMARY KEY (id))'
execute_query(query)

In [None]:
# Подготовка датасета для таблицы catalog
fileMask = 'cat*.csv'
columnsList = ['recId', 'author', 'title', 'age']
originalColumnsList = ['recId', 'aut', 'title', 'ager']
catalogDF = create_dataset_from_files(DATASET_PATH, fileMask, originalColumnsList, columnsList)
catalogDF = catalogDF.sort_values('recId').reset_index(drop=True)
catalogDF['author'] = catalogDF['author'].fillna('')
catalogDF['title'] = catalogDF['title'].fillna('')
catalogDF['recId'] = catalogDF['recId'].astype('uint32')
catalogDF['age'] = catalogDF['age'].fillna('0')
catalogDF['age'] = catalogDF['age'].apply(prepare_age_to_convert)
catalogDF['age'] = catalogDF['age'].astype('uint8')
catalogDF.info()
print(catalogDF.head())

In [None]:
# Подготовка датасета для таблицы funds
fileMask = 'fund_*.csv'
columnsList = ['fundId', 'recId', 'siglaId']
originalColumnsList = ['fundID', 'catalogueRecordID', 'siglaID']
fundsDF = create_dataset_from_files(DATASET_PATH, fileMask, originalColumnsList, columnsList)
fundsDF['fundId'] = fundsDF['fundId'].astype('uint32')
fundsDF['recId'] = fundsDF['recId'].astype('uint32')
fundsDF['siglaId'] = fundsDF['siglaId'].astype('uint16')
fundsDF = fundsDF.sort_values('fundId').reset_index(drop=True)
fundsDF.info()
print(fundsDF.head())
print(fundsDF['siglaId'].max())

In [None]:
# Подготовка датасета для таблицы circulaton
fileMask = 'circulaton_*.csv'
columnsList = ['id', 'recId', 'readerId', 'bookpointId']
originalColumnsList = ['circulationID', 'catalogueRecordID', 'readerID', 'bookpointID']
circulationDF = create_dataset_from_files(DATASET_PATH, fileMask, originalColumnsList, columnsList)
circulationDF = circulationDF.sort_values('id').reset_index(drop=True)
circulationDF['id'] = circulationDF['id'].astype('uint32')
circulationDF['recId'] = circulationDF['recId'].astype('uint32')
circulationDF['readerId'] = circulationDF['readerId'].astype('uint32')
circulationDF['bookpointId'] = circulationDF['bookpointId'].astype('uint16')
circulationDF.info()
print(circulationDF.head())
print(circulationDF['bookpointId'].max())

In [None]:
# Подготовка датасета для таблицы readers
columnsList = ['readerId', 'dateOfBirth']
readerDF = pd.read_csv(DATASET_PATH + '/readers.csv', sep=';', encoding='cp1251', header=None, usecols=[0,1])
readerDF = readerDF.set_axis(columnsList, axis='columns')
readerDF = readerDF.sort_values('readerId').reset_index(drop=True)
readerDF['dateOfBirth'] = pd.to_datetime(readerDF['dateOfBirth'], errors='coerce')
meanDateOfBirth = readerDF['dateOfBirth'].mean()
readerDF['dateOfBirth'] = readerDF['dateOfBirth'].fillna(meanDateOfBirth)
readerDF['readerId'] = readerDF['readerId'].astype('uint32')

In [None]:
# Сохранение данных в таблицы
save_table_at_database('catalog', catalogDF)
save_table_at_database('funds', fundsDF)
save_table_at_database('circulation', circulationDF)
save_table_at_database('readers', readerDF)

In [None]:
# Индексация таблиц
query = 'CREATE INDEX readerId_index ON circulation (readerId)'
execute_query(query)
query = 'CREATE INDEX recId_index ON funds (recId)'
execute_query(query)
query = 'CREATE INDEX siglaId_index ON funds (siglaId)'
execute_query(query)
query = 'CREATE INDEX author_index ON catalog (author(16)) USING BTREE'
execute_query(query)
query = 'CREATE INDEX title_index ON catalog (title(50)) USING BTREE'
execute_query(query)