In [7]:
import pandas as pd
from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String, DateTime, Float, select, Date, JSON
import logging
import os
import sys
from math import ceil
from tqdm import tqdm
from pandas.util.testing import assert_frame_equal

from wikibaseintegrator import wbi_core, wbi_login, wbi_login
from wikibaseintegrator.wbi_config import config as wbi_config

In [13]:
logging.getLogger().setLevel(logging.INFO)

In [8]:
BRICKIT_BD_LOGIN = 'postgres'
BRICKIT_BD_PASSWORD = 'pass'
BRICKIT_BD_HOST = 'host'
WIKIBASE_HOST = '84.201.142.182'

WIKIBASE_LOGIN = 'WikibaseAdmin'
WIKIBASE_PASSWORD = 'WikibaseDockerAdminPass'

connection_string = f'postgresql://{BRICKIT_BD_LOGIN}:{BRICKIT_BD_PASSWORD}@{BRICKIT_BD_HOST}:5432/holybricks'
engine_pg = create_engine(connection_string)

wbi_config['MEDIAWIKI_API_URL'] = f'http://{WIKIBASE_HOST}:8181/api.php'
wbi_config['SPARQL_ENDPOINT_URL'] = f'http://{WIKIBASE_HOST}:8989/bigdata/sparql'
wbi_config['WIKIBASE_URL'] = 'http://wikibase.svc'

In [None]:
# Легаси, уже умеем нормально
ITEMS_DICT = {
    'I': {
        'Brickit Company': 'Q1',
        'Brickit Image': 'Q2',
        'Brickit Part': 'Q3'

    },
    'P': {
        'instance of': 'P1',
        'Image URL': 'P2',
        'Image ID': 'P3',
        'Part Name': 'P4',
        'Part Tag': 'P5',
        'Part Image': 'P6',
        'Part Num': 'P7',
        'Part Child': 'P8'
    }    
}

# Забрать какие-то данные из Brickit

In [None]:
df_images = pd.read_sql(f"""
SELECT id, public_url
FROM staging.manual_images
WHERE initial_entity_type = 'part'
    """, con = engine_pg)


df_parts = pd.read_sql(f"""
SELECT part_num, "name", tag, part_cat_id, child_part_nums, image_id
FROM staging.synthetic_parts
    """, con = engine_pg)

# Залить сущности

In [None]:
# Костыль для связки наша сущность - ID в Wikibase/
# Предполагаем, что это можно взять при помощи SPARQL
# А пока что по результатам циклов ниже в эти датафреймы дописывается ID Wikibase и сохраняются в csv
# df_images = pd.read_csv('./df_images.csv')
# df_parts = pd.read_csv('./df_parts.csv')

In [None]:
login_instance = wbi_login.Login(user=WIKIBASE_LOGIN, pwd=WIKIBASE_PASSWORD)   

In [None]:
# Загрузить 100 изображений из всех
for _, img_i in df_images[:100].iterrows():
    data = [
        wbi_core.Url(str(img_i['public_url']), prop_nr=ITEMS_DICT['P']['Image URL']),
        wbi_core.String(str(img_i['id']), prop_nr=ITEMS_DICT['P']['Image ID']),
        wbi_core.ItemID(ITEMS_DICT['I']['Brickit Image'], prop_nr=ITEMS_DICT['P']['instance of'])
    ]
    item = wbi_core.ItemEngine(new_item=True, data=data,core_props=set())
    
    # Этот метод в библиотеке из коробки не работает. 
    # Надо либо закомментить в библиотеке в wbi_core.ItemEngine.set_label() условие после "Skip set_label if the item already have one and if_exists is at 'KEEP'"
    # Либо просто не проставлять лейблы
    item.set_label('img_' + str(img_i['id']), if_exists='REPLACE')
    
    r = item.write(login_instance)    
    
    df_images.loc[df_images.id == img_i['id'], 'entity_id'] = r

In [None]:
df_images.to_csv('./df_images.csv', index = False) #Обновляем "базу знаний" про связки с id Wikibase

In [None]:
# Заливка деталей
df_parts_img = df_parts[df_parts.image_id.isin(df_images[~df_images.entity_id.isnull()].id)] #только тех, для которых мы уже залили картинки
for _, part_i in df_parts_img.iterrows():
    part_img = part_i['image_id']
    img_Q = df_images[df_images.id == part_img].reset_index().at[0, 'entity_id']
    
    data = [
        wbi_core.String(str(part_i['name']), prop_nr=ITEMS_DICT['P']['Part Name']),
        wbi_core.String(str(part_i['tag']), prop_nr=ITEMS_DICT['P']['Part Tag']),
        wbi_core.String(str(part_i['part_num']), prop_nr=ITEMS_DICT['P']['Part Num']),
        wbi_core.ItemID(ITEMS_DICT['I']['Brickit Part'], prop_nr=ITEMS_DICT['P']['instance of']),
        wbi_core.ItemID(img_Q, prop_nr=ITEMS_DICT['P']['Part Image'])
    ]
    item = wbi_core.ItemEngine(new_item=True, data=data,core_props=set())
    
    # Этот метод в библиотеке из коробки не работает. 
    item.set_label('part_num_' + str(part_i['part_num']), if_exists='REPLACE')
    
    r = item.write(login_instance)    
    
    df_parts.loc[df_parts.part_num == part_i['part_num'], 'entity_id'] = r

In [None]:
df_parts.to_csv('./df_parts.csv', index = False) #Обновляем "базу знаний" про связки с id Wikibase

# Эвотор

In [155]:
# Выгрузка схемы базы из Эвотора
'''
select col.column_id, 
       col.owner as schema_name,
       col.table_name, 
       col.column_name, 
       col.data_type, 
       col.data_length, 
       col.data_precision, 
       col.data_scale, 
       col.nullable
from sys.all_tab_columns col
inner join sys.all_tables t on col.owner = t.owner 
                              and col.table_name = t.table_name
-- excluding some Oracle maintained schemas
where col.owner not in ('ANONYMOUS','CTXSYS','DBSNMP','EXFSYS', 'LBACSYS', 
   'MDSYS', 'MGMT_VIEW','OLAPSYS','OWBSYS','ORDPLUGINS', 'ORDSYS','OUTLN', 
   'SI_INFORMTN_SCHEMA','SYS','SYSMAN','SYSTEM','TSMSYS','WK_TEST','WKSYS', 
   'WKPROXY','WMSYS','XDB','APEX_040000', 'APEX_PUBLIC_USER','DIP', 
   'FLOWS_30000','FLOWS_FILES','MDDATA', 'ORACLE_OCM', 'XS$NULL',
   'SPATIAL_CSW_ADMIN_USR', 'SPATIAL_WFS_ADMIN_USR', 'PUBLIC')  
order by col.owner, col.table_name, col.column_id;
'''

df_e = pd.read_csv('./evotor_schemas.csv')

In [None]:
# Возьмём самые ходовые схемы для тестов
schema_list = [
    'AIRFLOW', 
    'BIGDATA_LOADER', 
    'EVOTOR_ANALYTICS', 
    'EVOTOR_BIGDATA', 
    'EVOTOR_MARKET_REPL',
    'EVOTOR_REPORTS',
    'EVOTOR_CRM'
]
df_e = df_e[df_e.SCHEMA_NAME.isin(schema_list)]

In [76]:
# Какие properties будут нужны 
prop_list = ['Field', 'Description', 'Schema','Table','located in','Data Type','Data Length']
prop_df = get_items_by_label(prop_list, item_type = 'P')
prop_df = {i['label']: i['item'] for _, i in prop_df.iterrows()}

## Создание схем

In [None]:
Q_database = get_items_by_label(['dwh'], item_type = 'Q').at[0, 'item']
for schema in schema_list:
    data = [
        wbi_core.ItemID(Q_database, prop_nr=prop_df['located in'])
    ]
    item = wbi_core.ItemEngine(new_item=True, data=data,core_props=set())

    item.set_label(schema, if_exists='REPLACE')

    r = item.write(login_instance)

## Создание таблиц

In [None]:
for schema in tqdm(schema_list):
    print(schema)
    Q_schema = get_items_by_label([schema], item_type = 'Q').at[0, 'item'] # ID объекта DWH
    df_schema = df_e[df_e.SCHEMA_NAME == schema]
    
    # Таблицы
    for table in tqdm(df_schema.TABLE_NAME.unique()):
        if '#' in table:
            continue
        
        df_table = df_schema[df_schema.TABLE_NAME == table]
        
        fields = []
        # Квалифаеры
        for _, field in df_table.iterrows(): 
            qualifiers = [
                wbi_core.String(field['DATA_TYPE'], prop_nr=prop_df['Data Type'], is_qualifier = True),
                wbi_core.String(str(field['DATA_LENGTH']), prop_nr=prop_df['Data Length'], is_qualifier = True),
            ]
            # Поля
            fields.append(wbi_core.String(field['COLUMN_NAME'], prop_nr=prop_df['Field'], qualifiers=qualifiers))
                
        data = [wbi_core.ItemID(Q_schema, prop_nr=prop_df['located in'])]
        data.extend(fields)
        
        item = wbi_core.ItemEngine(new_item=True, data=data,core_props=set())

        item.set_label(schema + '.' + table, if_exists='REPLACE')

        r = item.write(login_instance)

## Перечислить таблицы в схемах

In [None]:
for schema in schema_list:
    print(schema)
    Q_schema = get_items_by_label([schema], item_type = 'Q').at[0, 'item'] # ID объекта DWH

    df_schema = df_e[(df_e.SCHEMA_NAME == schema) & (~df_e.TABLE_NAME.str.contains('#'))]
    df_schema['lables'] = df_schema.SCHEMA_NAME + '.' + df_schema.TABLE_NAME
    
    lables_list = list(set(df_schema.lables))
    
    batch = 25
    Q_tables = []
    for i in tqdm(range(ceil(len(lables_list) / batch)), desc = 'batches'):
        lables_list_i = lables_list[batch*i : batch*(i+1)]
        Q_tables_i = get_items_by_label(lables_list_i, item_type = 'Q').item.to_list()
        Q_tables.extend(Q_tables_i)
        
    data = [wbi_core.ItemID(Q_i, prop_nr=prop_df['Table']) for Q_i in Q_tables]

    item = wbi_core.ItemEngine(new_item=False, item_id = Q_schema, data=data,core_props=set())

    item.write(login_instance)

---
# Классы

In [3]:
def get_items_by_label(label_list:list, item_type:str, is_unique:bool = True, is_notnull:bool = True):
    '''
    По переданному списку лейблов находит entity_id в базе Wikibase. 
    
    label_list: список искомых лейблов
    item_type: тип искомого объекта. Если не указано, то любой объект. Если указано:
        "P" - Property
        "Q" - Item
    is_unique: если True, то вернёт ошибку, если найдено больше одного значения
    is_notnull: если True, то вернёт ошибку, если не найдено ни одного значения
    '''
    
    query = """
        SELECT DISTINCT ?item ?itemLabel
        WHERE {{
          ?item rdfs:label ?itemLabel. 

          VALUES ?itemLabel {{ {label_filter} }}
        }}""".format(label_filter = ' '.join([f'\"{i}\"@en' for i in label_list]))
    
    result = wbi_core.ItemEngine.execute_sparql_query(query)
    result_list = [[i['itemLabel']['value'], i['item']['value'].replace('http://wikibase.svc/entity/', '')] 
                   for i in result['results']['bindings']]
    
    df = pd.DataFrame(result_list, columns = ['label', 'item'])
        
    if item_type in ('P', 'Q'):
        df = df[df.item.str.contains(item_type)] 

    df_check = df.groupby('label').count()
    if is_unique and df_check.item.max() > 1:
        r = df[df.label.isin(df_check[df_check.item > 1].index.to_list())].sort_values(by = 'label')
        logging.info(f"entity_id определён неоднозначно: \n{r}")
        return None
    elif is_notnull and len(set(label_list) - set(df.label)) > 0:
        r = set(label_list) - set(df.label)
        logging.info(f"entity_id не найден: \n{r}!")
        return None
    else:
        return df

In [18]:
login_instance = wbi_login.Login(user=WIKIBASE_LOGIN, pwd=WIKIBASE_PASSWORD)   

In [24]:
class WikiDatabase():
    def __init__(self):
        pass
    

    def fetch_schemas()
        return self.resolved_schemas 
    
    def push_to_wb(self):
        pass

In [None]:
class WikiSchema():    
    def __init__(self):
        pass
    

    def fetch_tables()
        return self.resolved_tables 
    
    def push_to_wb(self):
        pass

In [236]:
class WikiTable():
    @staticmethod
    def get_wb_schema(Q:str, properties_df:pd.DataFrame, login_instance:wbi_login.Login) -> str:
        '''
        Вернуть entity_id схемы, в которой находится таблица с entity_id = Q
        '''
        
        query = '''
            SELECT ?schema ?schema_name WHERE {{
                ?schema wdt:{table} wd:{Q} .
                ?schema rdfs:label ?schema_name .
            }}'''.format(
                Q = Q,
                table = properties_df['Table'],        
            )
        result = wbi_core.ItemEngine.execute_sparql_query(query)
        
        result_list = [[i['schema_name']['value'], i['schema']['value'].replace('http://wikibase.svc/entity/', '')]
                       for i in result['results']['bindings']]

        result_df = pd.DataFrame(result_list, columns = ['schema_name', 'schema'])
        
        if result_df.shape[0] > 1:
            raise Exception(f'Table with entity_id {Q} finded in several schemas: \n{result_df.shema.to_list()}')
        elif result_df.shape[0] == 0:
            raise Exception(f'Table with entity_id {Q} not finded in schemas!')
        else:
            return (result_df.at[0, 'schema_name'], result_df.at[0, 'schema'])
    
    
    @staticmethod    
    def get_wb_fields(Q:str, properties_df:pd.DataFrame, login_instance:wbi_login.Login) -> pd.DataFrame:
        query = '''
            SELECT  ?COLUMN_NAME ?DATA_TYPE ?DATA_LENGTH ?DESCRIPTION
            WHERE
            {{
                 wd:{Q} p:{field} ?statement.
                 ?statement ps:{field} ?COLUMN_NAME.
                 
                 # CORE FIELDS
                 ?statement pq:{data_type} ?DATA_TYPE.
                 ?statement pq:{data_length} ?DATA_LENGTH.
                 
                 # CUSTOM FIELD
                 OPTIONAL {{?statement pq:{descrition} ?DESCRIPTION.}}  
            }}        
        '''.format(
            Q = Q,
            field = properties_df['Field'],
            data_type = properties_df['Data Type'],
            data_length = properties_df['Data Length'],
            descrition = properties_df['Description'] 
        )   
        
        result = wbi_core.ItemEngine.execute_sparql_query(query)

        wb_fields_df = []
        for bind in result['results']['bindings']:
            wb_fields_df.append({k: v['value'] for k, v in bind.items()})
        wb_fields_df = pd.DataFrame(wb_fields_df)        
        
        return wb_fields_df
    

    def __fetch_fields(self):        
        if self.new_item:
            self.resolved_fields = df_input.copy()
        elif self.df_input.shape[0] == 0:
            logging.info('No input dataframe, can`t fetch!')
        else:
            try: 
                assert_frame_equal(
                    self.df_input[['COLUMN_NAME', 'DATA_TYPE', 'DATA_LENGTH']].astype(str), 
                    self.wb_fields[['COLUMN_NAME', 'DATA_TYPE', 'DATA_LENGTH']].astype(str), 
                    check_like = False
                )
                logging.info(f'Table {self.name} has no changes, skipping update.')
            except:
                logging.info("""
                    {columns} columns in new table
                    {added} colums are added
                    {deleted} columns are deleted              
                """.format(
                    columns = self.df_input.shape[0],
                    added = self.df_input[~self.df_input.COLUMN_NAME.isin(self.wb_fields.COLUMN_NAME)].shape[0],
                    deleted = self.wb_fields[~self.wb_fields.COLUMN_NAME.isin(self.df_input.COLUMN_NAME)].shape[0]                    
                ))
                
                self.resolved_fields = df_input.merge(self.wb_fields[['COLUMN_NAME', 'DESCRIPTION']],
                                                     how = 'left', left_on = 'COLUMN_NAME', right_on = 'COLUMN_NAME')
            
            return self.resolved_fields    
    
    
    def __init__(self, name:str, properties_df:pd.DataFrame, login_instance:wbi_login.Login, df_input = pd.DataFrame()):
        self.name = name
        self.login_instance = login_instance
        self.properties_df = properties_df
        self.df_input = df_input
        
        Q_df = get_items_by_label([name], item_type = 'Q')
        if Q_df is None:
            logging.info(f'No such table: {name}! New one will be created.')
            if self.df_input.shape[0] == 0:
                raise Exception('Cannot create new item from empty input DataFrame!')
            self.new_item = True                      
            self.Q = None
            schema_name = df_input.SCHEMA_NAME.unique()
            if schema_name.shape[0] > 1:
                raise Exception('Ambiguous schema_name: {schema_name} ')
            else:
                self.schema_name = schema_name[0]
            
            self.Q_schema = get_items_by_label([self.schema_name], item_type = 'Q').at[0, 'item']
        else:
            self.new_item = False
            self.Q = Q_df.at[0, 'item']
            self.schema_name, self.Q_schema = self.get_wb_schema(self.Q, self.properties_df, self.login_instance)
            self.wb_fields = self.get_wb_fields(self.Q, self.properties_df, self.login_instance)
        
        self.__fetch_fields()
        logging.info("""
            Table {name} (entity_id: {Q}), schema {schema_name} (entity_id: {Qs})
        """.format(
            name = self.name,
            Q = self.Q,
            schema_name = self.schema_name,
            Qs = self.Q_schema
        ))
    
    
    def push_to_wiki(self):
        # TO DO: проверка на дубликаты полей
        # А это можно на SPARQL сделать? Хотя зачем. 
        self.resolved_fields

        fields = []
        # Квалифаеры
        for _, field in self.resolved_fields.iterrows(): 
            qualifiers = [
                wbi_core.String(field['DATA_TYPE'], prop_nr=prop_df['Data Type'], is_qualifier = True),
                wbi_core.String(str(field['DATA_LENGTH']), prop_nr=prop_df['Data Length'], is_qualifier = True),
            ]
            description_i =  field['DESCRIPTION']
            if description_i:
                qualifiers.append(
                    wbi_core.String(str(description_i), prop_nr=prop_df['Description'], is_qualifier = True),
                )
            
            # Поля
            fields.append(wbi_core.String(field['COLUMN_NAME'], prop_nr=prop_df['Field'], qualifiers=qualifiers))

        data = [wbi_core.ItemID(self.Q_schema, prop_nr=prop_df['located in'])]
        data.extend(fields)

        item = wbi_core.ItemEngine(new_item=self.new_item, data=data,core_props=set())

        if self.new_item:
            item.set_label(schema + '.' + table, if_exists='REPLACE')

        r = item.write(login_instance)

In [240]:
t2 = WikiTable('AIRFLOW.TEST_2', prop_df, login_instance, df_input[['SCHEMA_NAME', 'COLUMN_NAME', 'DATA_TYPE', 'DATA_LENGTH']])

INFO:root:entity_id не найден: 
{'AIRFLOW.TEST_2'}!
INFO:root:No such table: AIRFLOW.TEST_2! New one will be created.
INFO:root:
            Table AIRFLOW.TEST_2 (entity_id: None), schema AIRFLOW (entity_id: Q203)
        


In [241]:
t1 = WikiTable('AIRFLOW.TEST_1', prop_df, login_instance, df_input[['SCHEMA_NAME', 'COLUMN_NAME', 'DATA_TYPE', 'DATA_LENGTH']])

INFO:root:
                    1 columns in new table
                    0 colums are added
                    1 columns are deleted              
                
INFO:root:
            Table AIRFLOW.TEST_1 (entity_id: Q254), schema AIRFLOW (entity_id: Q203)
        


# Прочие знания

In [None]:
# API поиска
wbi_core.ItemEngine.get_search_results('part_num_10793') 

In [None]:
# Дескрипшн
set_description(self, description, lang=None, if_exists='REPLACE'):

In [None]:
# Обновление существующих айтемов
data = [
    wbi_core.ItemID(img_Q, prop_nr = ITEMS_DICT['P']['Part Image'])
]
item = wbi_core.ItemEngine(new_item=False, item_id = 'Q1234', data=data,core_props=set())
r = item.write(login_instance)  