In [12]:
import asyncio
import os
from typing import List
import pandas as pd
from playwright.async_api import Page, async_playwright
from sqlalchemy import (
    create_engine,
    inspect,
    MetaData,
    Table,
    Column,
    Integer,
    String,
    Text,
    text,
    UniqueConstraint,
)
from sqlalchemy.orm import sessionmaker

In [13]:
DATABASE_URL = "postgresql://postgres:postgres@localhost:8052/postgres"
CSV_PATH = "./csv"
MAX_ITER = 10
RESTART = False
TO_CSV = True

In [14]:
catalog_url_ts = (
    "https://tatu-shop.ru/products/category/rotary-tattoo-machines-pen"
)
catalog_url_28 = "https://28opt.ru/ez-tattoo/"
catalog_url_fx = "https://fenix-tattoo.ru/catalog/tatuirovochnye-mashinki/rotornye-mashinki/"

In [65]:
INFO_28 = []

In [15]:
if RESTART:
    LINKS_TS = []
    INFO_TS = []

    LINKS_28 = []
    INFO_28 = []

    LINKS_FX = []
    INFO_FX = []

    DOWNLOADED_URLS = []

In [16]:
async def get_product_links_ts(
    page: Page,
) -> List[str]:
    await page.goto(catalog_url_ts, wait_until="networkidle")

    all_links = set()
    current_page = 1

    while True:
        page_links = await page.evaluate(
            """
        () => {
            const links = [];
            // Ищем все ссылки на товары в каталоге
            const productElements = document.querySelectorAll('a[href*="/products/"]');
            productElements.forEach(element => {
                const href = element.getAttribute('href');
                if (href && 
                    !href.includes('/category/') && 
                    !href.includes('#') &&
                    href !== '/products' &&
                    href !== '/products/') {
                    const fullUrl = href.startsWith('http') ? href : 'https://tatu-shop.ru' + href;
                    links.push(fullUrl);
                }
            });
            return [...new Set(links)]; // Убираем дубликаты
        }
    """
        )

        if current_page > 9:
            break

        all_links.update(page_links)

        print(f"ts\t{current_page}\t{len(all_links)}")

        current_page += 1
        next_url = f"{catalog_url_ts}?page={current_page}"
        await page.goto(next_url, wait_until="networkidle")

        await page.wait_for_timeout(300)

    return list(all_links)

In [17]:
async def parse_product_ts(
    page: Page,
    array: List,
    urls: List[str],
    max_iter: int | None = None,
):
    count = 0
    for url in urls:
        if url in DOWNLOADED_URLS:
            continue

        await page.goto(url, wait_until="networkidle")

        props = await page.evaluate(
            """
            () => {
                const properties = {};

                // Извлекаем название продукта
                const titleElement = document.evaluate(
                    '/html/body/div[2]/div/div/div[2]/section/section/section/div[1]/div[2]/div/div/article/div[1]/div[1]/div[1]/div[2]/div[3]/h1',
                    document,
                    null,
                    XPathResult.FIRST_ORDERED_NODE_TYPE,
                    null
                ).singleNodeValue;
                const title = titleElement ? titleElement.textContent.trim() : '';

                // Извлекаем цену
                const priceElement = document.evaluate(
                    '/html/body/div[2]/div/div/div[2]/section/section/section/div[1]/div[2]/div/div/article/div[1]/div[1]/div[1]/div[2]/div[4]/div/div/div[1]/span[1]',
                    document,
                    null,
                    XPathResult.FIRST_ORDERED_NODE_TYPE,
                    null
                ).singleNodeValue;
                const price = priceElement ? priceElement.textContent.trim().replace(" ", "") : '';

                // Извлекаем блок с описанием и характеристиками
                const detailsDiv = document.evaluate(
                    '/html/body/div[2]/div/div/div[2]/section/section/section/div[1]/div[2]/div/div/article/div[1]/div[1]/div[2]/div[2]/div[1]/div[2]',
                    document,
                    null,
                    XPathResult.FIRST_ORDERED_NODE_TYPE,
                    null
                ).singleNodeValue;

                if (detailsDiv) {
                    const detailsText = detailsDiv.textContent.trim();

                    let technicalDetailsIndex = detailsText.indexOf("Технические детали:");
                    let headerLength = "Технические детали:".length;
                    
                    if (technicalDetailsIndex === -1) {
                        technicalDetailsIndex = detailsText.indexOf("Технические характеристики:");
                        headerLength = "Технические характеристики:".length;
                    }
                    
                    if (technicalDetailsIndex === -1) {
                        technicalDetailsIndex = detailsText.indexOf("Характеристики:");
                        headerLength = "Характеристики:".length;
                    }
                    
                    if (technicalDetailsIndex === -1) {
                        technicalDetailsIndex = detailsText.indexOf("Тех. характеристики:");
                        headerLength = "Тех. характеристики:".length;
                    }
                    
                    if (technicalDetailsIndex !== -1) {
                        const technicalDetails = detailsText
                            .substring(technicalDetailsIndex + headerLength)
                            .trim();

                        // Разделяем характеристики по символу новой строки или точке
                        const characteristics = technicalDetails.split(/[•]/).map(s => s.trim());
                        characteristics.forEach(pair => {
                            const [name, value] = pair.split(':').map(s => s.trim());
                            if (name && value) {
                                // Удаляем все после точки или переноса строки
                                const cleanValue = value.split('.')[0].split('\\n')[0].trim();
                                if (cleanValue) {
                                    properties[name] = cleanValue;
                                }
                            }
                        });
                    }
                }
                
                properties['Название'] = title;
                properties['Цена'] = price;
                properties['Ссылка'] = window.location.href;

                return {
                    url: window.location.href,
                    properties: properties
                };
            }
            """
        )

        array.append(props)
        DOWNLOADED_URLS.append(props["url"])

        count += 1

        print(f"ts\t{count}\t{len(props["properties"].keys())}")

        if len(props["properties"].keys()) <= 2:
            print(f"ts ALARM:\t{props}")

        if max_iter and count >= max_iter:
            break

        await page.wait_for_timeout(600)

    return array

In [18]:
async def get_product_links_28(
    page: Page,
) -> List[str]:
    await page.goto(catalog_url_28, wait_until="networkidle")

    all_links = await page.evaluate(
        """
        () => {
            const sel = 'a[href*="/ez-tattoo/"]';
            const links = Array.from(document.querySelectorAll(sel))
                .map(a => a.href)
                .filter(h => h && !h.includes('#'));
            return [...new Set(links)];
        }
        """
    )

    print(f"28\t{0}\t{len(all_links)}")

    return all_links

In [19]:
async def parse_product_28(
    page: Page,
    array: List,
    urls: List[str],
    max_iter: int | None = None,
):
    count = 0
    for url in urls:
        if url in DOWNLOADED_URLS:
            continue

        await page.goto(url, wait_until="networkidle")

        props = await page.evaluate(
            """
        () => {
            const properties = {};
            
            const titleElement = document.evaluate(
                '/html/body/div[2]/section/div/div/div/div[2]/div/div/form/div/h1',
                document,
                null,
                XPathResult.FIRST_ORDERED_NODE_TYPE,
                null
            ).singleNodeValue;
            const title = titleElement ? titleElement.textContent.trim() : '';

            const priceElement = document.evaluate(
                '/html/body/div[2]/section/div/div/div/div[2]/div/div/form/div/div/div[2]/div[2]/div/div[1]/div/strong',
                document,
                null,
                XPathResult.FIRST_ORDERED_NODE_TYPE,
                null
            ).singleNodeValue;
            const price = priceElement ? priceElement.textContent.trim().replace(" ", "").replace('q', '') : '';

            const characteristicBlocks = document.evaluate(
                '/html/body/div[2]/section/div/div/div/div[2]/div/div/form/div/div/div[2]/div[3]/div/span[2]/div/div/p',
                document,
                null,
                XPathResult.ORDERED_NODE_SNAPSHOT_TYPE,
                null
            );

            for (let i = 0; i < characteristicBlocks.snapshotLength; i++) {
                const block = characteristicBlocks.snapshotItem(i);
                const text = block.textContent.trim();

                const characteristics = text.split(/[.;]/);
                characteristics.forEach(pair => {
                    const [name, value] = pair.split(':').map(s => s.trim());
                    if (name && value) {
                        properties[name] = value;
                    }
                });
            }
            
            properties['Название'] = title;
            properties['Цена'] = price;
            properties['Ссылка'] = window.location.href;

            return {
                url: window.location.href,
                properties: properties
            };
        }
        """
        )

        array.append(props)
        DOWNLOADED_URLS.append(props["url"])

        count += 1

        print(f"28\t{count}\t{len(props["properties"].keys())}")

        if len(props["properties"].keys()) <= 2:
            print(f"28 ALARM:\t{props}")

        if max_iter and count >= max_iter:
            break

        await page.wait_for_timeout(300)

    return array

In [20]:
async def get_product_links_fx(
    page: Page,
) -> List[str]:
    await page.goto(catalog_url_fx, wait_until="networkidle")

    all_links = set()
    current_page = 1

    while True:
        page_links = await page.evaluate(
            r"""
        () => {
            const catalogBlock = document.querySelector('.catalog_block.items.row.margin0.js_append.ajax_load.block.flexbox.has-bottom-nav');
            if (!catalogBlock) return [];
            
            const productDivs = catalogBlock.querySelectorAll('.item.item-parent.catalog-block-view__item');
            const links = Array.from(productDivs)
                .map(div => {
                    // Ищем ссылку в заголовке продукта
                    const link = div.querySelector('.js-notice-block__title');
                    if (link && link.href) {
                        return link.href;
                    }
                    // Альтернативный поиск - любая ссылка внутри товара
                    const anyLink = div.querySelector('a[href*="/catalog/"]');
                    return anyLink ? anyLink.href : null;
                })
                .filter(h => h !== null)
                .map(h => {
                    // Убираем фрагменты и добавляем #props
                    const url = h.split('#')[0].split('?')[0];
                    // Добавляем oid если есть в оригинальной ссылке
                    const oidMatch = h.match(/oid=(\d+)/);
                    return oidMatch ? url + '?oid=' + oidMatch[1] + '#props' : url + '#props';
                });
            return [...new Set(links)];
        }
           """
        )

        if current_page > 15:
            break

        all_links.update(page_links)

        print(f"fx\t{current_page}\t{len(all_links)}")

        current_page += 1
        next_url = f"{catalog_url_fx}?PAGEN_1={current_page}"
        await page.goto(next_url, wait_until="networkidle")

        await page.wait_for_timeout(300)

    return list(all_links)

In [56]:
async def parse_product_fx(
    page: Page,
    array: List,
    urls: List[str],
    max_iter: int | None = None,
):
    count = 0
    for url in urls:
        if url in DOWNLOADED_URLS:
            continue

        await page.goto(url, wait_until="networkidle")

        props = await page.evaluate(
            r"""
        () => {
            const properties = {};
            
            const items = document.querySelectorAll('.properties-group__item');
            
            items.forEach(item => {
                const nameWrap = item.querySelector('.properties-group__name-wrap');
                const valueWrap = item.querySelector('.properties-group__value-wrap');
                
                if (nameWrap && valueWrap) {
                    const name = nameWrap.textContent.trim();
                    const value = valueWrap.textContent.trim();
                    properties[name] = value;
                }
            });
            
            const compactItems = document.querySelectorAll('.properties__item--compact');
            
            compactItems.forEach(item => {
                const title = item.querySelector('.properties__title');
                const value = item.querySelector('.properties__value');
                
                if (title && value) {
                    const name = title.textContent.trim();
                    const val = value.textContent.trim();
                    if (!properties[name]) {
                        properties[name] = val;
                    }
                }
            });
            
            const title = document.querySelector('h1')?.textContent.trim() || '';
            const priceElement = document.querySelector('.price_value');
            const price = priceElement ? priceElement.textContent.trim().replace(/\s/g, "") : '';
            
            const photoLink = document.querySelector('.detail-gallery-big__link');
            const photoUrl = photoLink ? photoLink.getAttribute('href') : '';
            const photoAbsUrl = photoUrl ? (photoUrl.startsWith('http') ? photoUrl : 'https://fenix-tattoo.ru' + photoUrl) : '';
            
            properties['Название'] = title;
            properties['Цена'] = price;
            properties['Ссылка'] = window.location.href;
            properties['Фотография'] = photoAbsUrl;
            
            return {
                url: window.location.href,
                properties: properties
            };
        }
        """
        )

        array.append(props)
        DOWNLOADED_URLS.append(props["url"])

        count += 1

        print(f"fx\t{count}\t{len(props["properties"].keys())}")

        if len(props["properties"].keys()) <= 2:
            print(f"fx ALARM:\t{props}")

        if max_iter and count >= max_iter:
            break

        await page.wait_for_timeout(300)

    return array

In [58]:
async def main():
    global LINKS_TS, LINKS_28, LINKS_FX, INFO_TS, INFO_28, INFO_FX, MAX_ITER

    async with async_playwright() as p:
        browser = await p.chromium.launch(headless=True)
        context = await browser.new_context(
            viewport={"width": 1920, "height": 1080},
            user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
        )
        page = await context.new_page()

        print(len(LINKS_TS), len(LINKS_28), len(LINKS_FX))

        LINKS_TS = (
            LINKS_TS if LINKS_TS else await get_product_links_ts(page=page)
        )
        LINKS_28 = (
            LINKS_28 if LINKS_28 else await get_product_links_28(page=page)
        )
        LINKS_FX = (
            LINKS_FX if LINKS_FX else await get_product_links_fx(page=page)
        )

        print(len(LINKS_TS), len(LINKS_28), len(LINKS_FX))

        INFO_TS = (
            INFO_TS
            if len(INFO_TS) > MAX_ITER
            else await parse_product_ts(
                page=page,
                array=INFO_TS,
                urls=LINKS_TS,
                max_iter=MAX_ITER - 1,
            )
        )

        INFO_28 = (
            INFO_28
            if len(INFO_28) >= MAX_ITER
            else await parse_product_28(
                page=page,
                array=INFO_28,
                urls=LINKS_28,
                max_iter=MAX_ITER,
            )
        )

        INFO_FX = (
            INFO_FX
            if len(INFO_FX) >= MAX_ITER
            else await parse_product_fx(
                page=page,
                array=INFO_FX,
                urls=LINKS_FX,
                max_iter=MAX_ITER,
            )
        )


await main()

872 49 297
872 49 297
fx	1	13
fx	2	29
fx	3	8
fx	4	29
fx	5	26
fx	6	23
fx	7	11
fx	8	21
fx	9	10
fx	10	24


In [60]:
val_map = {
    "Название": ["Название"],
    "Ссылка на продукт": ["Ссылка"],
    "Цена": ["Цена"],
    "Фотография": ["Фотография"],
    "Вес": [
        "Вес машинки с батарейкой",
        "Вес машинки",
        "Вес с аккумулятором, г",
        "Вес коробки",
        "Вес",
        "Вес машинки батарейкой",
        "Вес машинки без батарейки",
    ],
    "Время работы": [
        "Время работы",
        "Время работы блока питания",
        "Время работы, часов",
    ],
    "Время зарядки": [
        "Время полной зарядки",
        "Время зарядки",
        "Время зарядки, часов",
    ],
    "Длина хода": [
        "Вылет иглы",
        "Длина хода",
        "Длина хода, мм",
    ],
    "Емкость батареи": [
        "Емкость АКБ",
        "Емкость аккумулятора",
        "Емкость батареи",
        "Емкость аккумулятора, мАч",
    ],
    "Рабочее напряжение": [
        "Вольтаж",
        "Рабочее напряжение",
        "Рекомендуемый вольтаж, В",
    ],
    "Модель": [
        "Модель",
        "Модель машины",
    ],
    "Материал": [
        "Материал",
        "Материал",
        "Материал",
    ],
    "Мощность": [
        "Мощность",
        "Мощность двигателя, В",
        "Потребляемая мощность",
    ],
    "Мотор": [
        "Moтор",
        "Мотор",
        "Мотор",
    ],
    "Производство": [
        "Производcтво",
        "Производство",
        "Производство",
        "Производство",
    ],
    "Гарантия": [
        "Гарантия",
        "Гарантия на блок питания",
        "Гарантия",
    ],
    "Об / мин": [
        "Об / мин",
        "Об/мин",
        "Кол-во оборотов",
        "Оборотов в минуту",
        "Обороты в минуту",
    ],
}

In [61]:
df_ts = pd.DataFrame([x["properties"] for x in INFO_TS])
df_28 = pd.DataFrame([x["properties"] for x in INFO_28])
df_fx = pd.DataFrame([x["properties"] for x in INFO_FX])

In [62]:
def normalize_columns(
    df: pd.DataFrame,
    val_map: dict,
) -> pd.DataFrame:
    """
    Переименовывает колонки датафрейма согласно val_map.
    Если несколько вариантов названий найдены, объединяет их значения.
    Удаляет колонки, которых нет в val_map.
    """
    df_normalized = df.copy()

    result_data = {}

    for standard_name, variants in val_map.items():
        matching_cols = [
            col for col in df_normalized.columns if col in variants
        ]

        if matching_cols:
            combined = df_normalized[matching_cols[0]].copy()
            for col in matching_cols[1:]:
                combined = combined.fillna(df_normalized[col])
            result_data[standard_name] = combined

    df_result = pd.DataFrame(result_data)

    ordered_columns = [
        col for col in val_map.keys() if col in df_result.columns
    ]
    df_result = df_result[ordered_columns]

    return df_result


df_ts_normalized = normalize_columns(df_ts, val_map)
df_28_normalized = normalize_columns(df_28, val_map)
df_fx_normalized = normalize_columns(df_fx, val_map)

print("TS columns:", sorted(df_ts_normalized.columns.to_list()))
print("28 columns:", sorted(df_28_normalized.columns.to_list()))
print("FX columns:", sorted(df_fx_normalized.columns.to_list()))

TS columns: ['Вес', 'Время зарядки', 'Время работы', 'Гарантия', 'Длина хода', 'Емкость батареи', 'Материал', 'Модель', 'Мотор', 'Мощность', 'Название', 'Производство', 'Рабочее напряжение', 'Цена']
28 columns: ['Вес', 'Гарантия', 'Длина хода', 'Емкость батареи', 'Материал', 'Мотор', 'Название', 'Об / мин', 'Производство', 'Рабочее напряжение', 'Цена']
FX columns: ['Вес', 'Время зарядки', 'Время работы', 'Гарантия', 'Длина хода', 'Емкость батареи', 'Материал', 'Мотор', 'Мощность', 'Название', 'Об / мин', 'Производство', 'Ссылка на продукт', 'Фотография', 'Цена']


In [63]:
def create_or_update_table(engine, table_name: str, val_map: dict):
    """
    Создает таблицу с уникальным ограничением на source, название, цену.
    """
    inspector = inspect(engine)
    metadata = MetaData()

    all_columns = list(val_map.keys()) + ["source"]

    if table_name in inspector.get_table_names():
        with engine.connect() as conn:
            result = conn.execute(
                text(
                    f"""
                SELECT constraint_name 
                FROM information_schema.table_constraints 
                WHERE table_name = '{table_name}' 
                AND constraint_type = 'UNIQUE'
                AND constraint_name = 'unique_product'
            """
                )
            )

            has_constraint = result.fetchone() is not None

            if not has_constraint:
                print(
                    f"Dropping table {table_name} to recreate with constraints"
                )
                conn.execute(text(f"DROP TABLE IF EXISTS {table_name}"))
                conn.commit()
            else:
                existing_columns = {
                    col["name"] for col in inspector.get_columns(table_name)
                }

                for col_name in all_columns:
                    if col_name not in existing_columns:
                        conn.execute(
                            text(
                                f'ALTER TABLE {table_name} ADD COLUMN "{col_name}" TEXT'
                            )
                        )
                        conn.commit()
                        print(f"Added column: {col_name}")
                return

    columns = [Column("id", Integer, primary_key=True, autoincrement=True)]
    columns.extend(
        [Column(col_name, Text, nullable=True) for col_name in all_columns]
    )

    table = Table(
        table_name,
        metadata,
        *columns,
        UniqueConstraint("source", "Название", "Цена", name="unique_product"),
    )
    metadata.create_all(engine)
    print(f"Created table: {table_name} with unique constraint")


def upsert_dataframes_to_db(
    df_ts: pd.DataFrame,
    df_28: pd.DataFrame,
    df_fx: pd.DataFrame,
    database_url: str,
    table_name: str = "tattoo_machines",
):
    """
    Вставляет данные, игнорируя дубликаты.
    """
    engine = create_engine(database_url)
    create_or_update_table(engine, table_name, val_map)

    df_combined = pd.concat([df_ts, df_28, df_fx], ignore_index=True)

    df_combined["source"] = (
        ["tatu-shop"] * len(df_ts)
        + ["28opt"] * len(df_28)
        + ["fenix-tattoo"] * len(df_fx)
    )

    inserted = 0
    skipped = 0

    with engine.connect() as conn:
        for _, row in df_combined.iterrows():
            try:
                param_names = {
                    col: col.replace(" ", "_")
                    .replace("/", "_")
                    .replace("-", "_")
                    for col in df_combined.columns
                }

                placeholders = ", ".join(
                    [f":{param_names[col]}" for col in df_combined.columns]
                )
                columns = ", ".join(
                    [f'"{col}"' for col in df_combined.columns]
                )

                query = text(
                    f"""
                    INSERT INTO {table_name} ({columns})
                    VALUES ({placeholders})
                    ON CONFLICT (source, "Название", "Цена") DO NOTHING
                """
                )

                params = {param_names[col]: val for col, val in row.items()}

                result = conn.execute(query, params)
                if result.rowcount > 0:
                    inserted += 1
                else:
                    skipped += 1

            except Exception as e:
                print(f"Error inserting row: {e}")
                skipped += 1

        conn.commit()

    print(f"Processed {len(df_combined)} rows")
    print(f"Inserted {inserted} new records")
    print(f"Skipped {skipped} duplicate records")
    engine.dispose()

In [64]:
if not TO_CSV:
    upsert_dataframes_to_db(
        df_ts_normalized,
        df_28_normalized,
        df_fx_normalized,
        DATABASE_URL,
    )

else:
    df_combined = pd.concat(
        [
            df_ts_normalized,
            df_28_normalized,
            df_fx_normalized,
        ],
        ignore_index=True,
    )

    df_combined["source"] = (
        ["tatu-shop"] * len(df_ts_normalized)
        + ["28opt"] * len(df_28_normalized)
        + ["fenix-tattoo"] * len(df_fx_normalized)
    )

    os.makedirs(CSV_PATH, exist_ok=True)
    index = max(int(x.split(".")[0]) for x in os.listdir(CSV_PATH)) + 1
    df_combined.to_csv(f"{CSV_PATH}/{index}.csv")

In [None]:
INFO_28

[{'url': 'https://fenix-tattoo.ru/catalog/tatuirovochnye-mashinki/rotornye-mashinki/equaliser-mikron/?oid=45907#props',
  'properties': {'Тип': 'Тату машинка',
   'Тип машинки': 'Роторная',
   'Тип подключения': 'RCA',
   'Мотор': 'Faulhaber',
   'Мощность двигателя, В': '4.5',
   'Производство': 'Польша',
   'Длина хода, мм': '3.5',
   'Рабочий вольтаж, В': '11',
   'Вес, г': '55',
   'Название': 'EQUALISER Mikron Grey',
   'Цена': '38309',
   'Ссылка': 'https://fenix-tattoo.ru/catalog/tatuirovochnye-mashinki/rotornye-mashinki/equaliser-mikron/?oid=45907#props',
   'Фотография': 'https://fenix-tattoo.ru/upload/iblock/771/55smym0njyjty3lifpz6uvkwxsebx2rp.jpg'}},
 {'url': 'https://fenix-tattoo.ru/catalog/tatuirovochnye-mashinki/rotornye-mashinki/inkin-hover-fm-dotwork/?oid=57168#props',
  'properties': {'Тип': 'Беспроводная тату машинка',
   'Тип машинки': 'Роторная',
   'Тип подключения': 'Беспроводной',
   'Назначение': 'Универсальная',
   'Материал': 'Аллюминий',
   'Диаметр, мм': '3