# Parsing with Simych

## Подготовительные действия

In [1]:
import sys

In [2]:
# Для определения версии Python в Jupyter Notebook
!python --version

Python 3.12.7


In [3]:
# Прямой запрос к пути интерпретатора Python
# import sys
print("Python interpreter path:", sys.executable)

Python interpreter path: C:\ProgramData\anaconda3\python.exe


In [None]:
# Как определить версию Python, на которой запускается скрипт
# import sys
print(sys.version)

In [None]:
# Если необходимо узнать только номер версии
# import sys
print(sys.version_info)

## Определение исходных и финишных папок

In [5]:
import os
import random
import glob
from pathlib import Path
import re

In [6]:
# Пример определения пути к домашней папке
downloads_path = str(Path.home() / 'Downloads')
home_path = str(Path.home())

In [7]:
# Напечатали полный путь к домашней папке
print(home_path)
print(downloads_path)

C:\Users\User
C:\Users\User\Downloads


In [10]:
path = "C:/Users/User/my_learning/parsing_cases/notebooks/schedule_test.py"

In [11]:
print(path)

C:/Users/User/my_learning/parsing_cases/notebooks/schedule_test.py


## Автоматическая проверка через Python

### Проверка наличия API у сайта с помощью Python

In [2]:
import requests

def check_api(site_url):
    # Популярные пути к API (можно расширить список)
    api_endpoints = [
        "/api/v1/news",
        "/api/news",
        "/graphql",
        "/wp-json/wp/v2/posts"  # Для WordPress
    ]
    
    found_apis = []
    for endpoint in api_endpoints:
        url = f"{site_url.rstrip('/')}{endpoint}"
        try:
            response = requests.get(url, timeout=5)
            if response.status_code == 200:
                print(f"🎯 Найдено API: {url}")
                found_apis.append(url)
        except requests.exceptions.RequestException:
            continue
    
    if not found_apis:
        print("🔍 API не обнаружено. Возможно, сайт использует внутренние запросы (AJAX).")

In [None]:
# Пример вызова
check_api("https://example.com")

In [4]:
# Пример вызова
check_api("https://vc.ru/new")

🔍 API не обнаружено. Возможно, сайт использует внутренние запросы (AJAX).


## Пример кода для теста

In [6]:
import requests
from bs4 import BeautifulSoup

In [None]:
url = "https://vc.ru/new"
headers = {"User-Agent": "Mozilla/5.0"}

response = requests.get(url, headers=headers)
soup = BeautifulSoup(response.text, "html.parser")

# Извлечение заголовков (уточните селекторы!)
for article in soup.select(".content-title"):
    title = article.get_text(strip=True)
    print(title)

In [12]:
url = "https://vc.ru/new"
headers = {"User-Agent": "Mozilla/5.0"}

response = requests.get(url, headers=headers)
soup = BeautifulSoup(response.text, "html.parser")
articles = soup.find('div', class_ = 'content-list')
divs = articles.find_all('div', class_='content--short')

# Извлечение заголовков (уточните селекторы!)
for article in divs:
    title = article.find('div', class_='content-title').text.strip() if article.find('div', class_='content-title') else 'Absent'
    print(title)

Трюизмы: как тебя заставляют соглашаться, не спрашивая
Telegram против остальных: где продвигать свой продукт в 2025 году
Google добавила ИИ-функции в «Таблицы», выпустила инструменты для разработки ИИ-агентов и анонсировала Gemini 2.5 Flash
Как создать продающие текста или как заставить клиента купить Вашу услугу или товар: от слов к действию 🌟
Американский фондовый рынок и крипта ответили ростом на тарифную «паузу» для стран, которые не стали вводить зеркальные меры против США
Трамп объявил о «немедленном» повышении пошлин для Китая до 125%
Чем старше — тем лучше: почему Брэд Питт с годами становится только красивее?
«Бот запрашивает то, что уже известно властям»: Горелкин ответил на подозрения в обмане блогеров с помощью Telegram-бота для маркировки
Instagram* начал тестировать «заблокированные» Reels — пользователи смогут их увидеть, если введут «секретный код»
Канал под ключ: из автомеханика во владельца бизнеса за пол года
CJM в маркетинге. Российская версия карты пути клиента ил

## V1 Улучшенный тестовый код

In [14]:
import os
import requests
from bs4 import BeautifulSoup

In [16]:
# Настройки
URL = "https://vc.ru/new"
HEADERS = {"User-Agent": "Mozilla/5.0"}
TEMP_HTML_PATH = "data/interim/vc_ru_temp.html"  # Путь для сохранения HTML

# Создаем папку, если её нет
os.makedirs(os.path.dirname(TEMP_HTML_PATH), exist_ok=True)

def save_html(content, path):
    """Сохраняет HTML во временный файл."""
    with open(path, 'w', encoding='utf-8') as f:
        f.write(content)

def load_html(path):
    """Загружает HTML из файла (если существует)."""
    if os.path.exists(path):
        with open(path, 'r', encoding='utf-8') as f:
            return f.read()
    return None

# Парсинг (с кешированием HTML)
html_content = load_html(TEMP_HTML_PATH)
if not html_content:
    try:
        response = requests.get(URL, headers=HEADERS, timeout=10)
        response.raise_for_status()  # Проверка на ошибки HTTP
        html_content = response.text
        save_html(html_content, TEMP_HTML_PATH)
        print("✅ HTML сохранен в файл для тестирования.")
    except Exception as e:
        print(f"🚨 Ошибка при загрузке страницы: {e}")
        exit()

soup = BeautifulSoup(html_content, 'html.parser')
articles = soup.find('div', class_='content-list')
if not articles:
    print("🚨 Не найдена секция с новостями!")
    exit()

# Извлечение данных
for article in articles.find_all('div', class_='content--short'):
    title = article.find('div', class_='content-title').get_text(strip=True) if article.find('div', class_='content-title') else 'N/A'
    print(f"Заголовок: {title}")

    # Дополнительные данные (добавьте свои селекторы)
    author = article.find('a', class_='author__name').get_text(strip=True) if article.find('a', class_='author__name') else 'N/A'
    print(f"Автор: {author}\n")

✅ HTML сохранен в файл для тестирования.
Заголовок: Как настроить отчетность и аналитику для рекламных кампаний?
Автор: Директолог который пишет

Заголовок: Все доходности облигаций: какие бывают и чем отличаются от доходност��й депозитов
Автор: igotosochi

Заголовок: Делаю много, а результата нет
Автор: Ульяна Майорова

Заголовок: Кейс: связали 1С:Бухгалтерия и AmoCRM — теперь счета выставляются автоматически
Автор: Артем про продажи

Заголовок: ⭐️281% годовых на облигациях – много это, или мало?
Автор: Кот.Финанс

Заголовок: 💣 Облигации как ловушка: почему даже “безрисковый” инструмент может разорить инвестора. ТОП-3 важных ошибок
Автор: Суцкевер Семен - Fond&Flow

Заголовок: Трюизмы: как тебя заставляют соглашаться, не спраш��вая
Автор: Евгений Клочков

Заголовок: «Мне донатят за ламповость». Как я стал стримером после неудачного стартапа, и зарабатываю 70к в месяц.
Автор: Батюшка Бизнесменский

Заголовок: Курс биткоина вырос на 12% за сутки. Это разворот? Или обновление минимумов п

### 🔍 **Что улучшено:**
1. **Кеширование HTML**  
   - Сайт не будет запрашиваться при каждом запуске — данные берутся из локального файла `vc_ru_temp.html`.  
   - Если файла нет, код загрузит HTML и сохранит его автоматически.

2. **Обработка ошибок**  
   - Проверка HTTP-статуса (`raise_for_status()`).  
   - Защита от отсутствия элементов (`.find()` + тернарные операторы).  

3. **Гибкость**  
   - Легко добавить новые поля (достаточно уточнить селекторы).  


## V2 Исправленный код с единообразными селекторами

In [18]:
import os
import requests
from bs4 import BeautifulSoup
from datetime import datetime
from urllib.parse import urljoin  # Для объединения URL

In [20]:
# Настройки
URL = "https://vc.ru/new"
HOST = "https://vc.ru"
HEADERS = {"User-Agent": "Mozilla/5.0"}
TEMP_HTML_PATH = "data/interim/vc_ru_temp.html"

### Вот полная реализация функции `load_or_fetch_html()`,
которая кеширует HTML во временный файл и обрабатывает ошибки:

In [26]:
import os
import requests
from urllib.parse import urljoin

def load_or_fetch_html(url, cache_path, headers=None, timeout=10):
    """
    Загружает HTML из кеша (если есть) или скачивает с сайта.
    
    Параметры:
        url (str): URL страницы для парсинга
        cache_path (str): Путь для сохранения/загрузки HTML
        headers (dict): Заголовки HTTP-запроса
        timeout (int): Таймаут запроса в секундах
        
    Возвращает:
        str: HTML-контент
    """
    # Создаем папку для кеша, если её нет
    os.makedirs(os.path.dirname(cache_path), exist_ok=True)
    
    # Пытаемся загрузить из кеша
    if os.path.exists(cache_path):
        with open(cache_path, 'r', encoding='utf-8') as f:
            print(f"⚡ Используется кешированный HTML: {cache_path}")
            return f.read()
    
    # Если кеша нет - скачиваем
    try:
        print(f"🌐 Загружаем данные с {url}")
        response = requests.get(
            url,
            headers=headers or {},
            timeout=timeout
        )
        response.raise_for_status()  # Проверка на ошибки HTTP
        
        # Сохраняем в кеш
        with open(cache_path, 'w', encoding='utf-8') as f:
            f.write(response.text)
        print(f"✅ HTML сохранен в кеш: {cache_path}")
        
        return response.text
        
    except requests.exceptions.RequestException as e:
        print(f"🚨 Ошибка при загрузке страницы: {e}")
        raise  # Можно заменить на return None для мягкой обработки

### Как это работает:
1. **Проверка кеша**:
   - Если файл `data/interim/vc_ru_temp.html` существует, читает его.
   - Если нет — скачивает страницу и сохраняет в этот файл.

2. **Обработка ошибок**:
   - Ловит проблемы с интернетом/доступом к сайту.
   - Проверяет HTTP-статус (404, 500 и др.).

3. **Безопасность**:
   - Автоматически создает папки для кеша.
   - Использует `utf-8` для корректного сохранения текста.

### Пример использования:
```python
html_content = load_or_fetch_html(
    url="https://vc.ru/new",
    cache_path="data/interim/vc_ru_temp.html",
    headers={"User-Agent": "Mozilla/5.0"}
)
```

### Дополнительно можно добавить:
Срок жизни кеша (например, перезагружать, если файл старше 1 часа):

In [None]:
import time
MAX_CACHE_AGE = 3600  # 1 час в секундах

if os.path.exists(cache_path) and 
   (time.time() - os.path.getmtime(cache_path)) < MAX_CACHE_AGE:
    # Используем кеш

### Сжатие HTML для экономии места:

In [None]:
import gzip
with gzip.open(cache_path + '.gz', 'wt', encoding='utf-8') as f:
    f.write(html)

In [22]:
def parse_articles(html_content):
    soup = BeautifulSoup(html_content, 'html.parser')
    articles = soup.find('div', class_='content-list')
    if not articles:
        raise ValueError("Секция с новостями не найдена")

    results = []
    for article in articles.find_all('div', class_='content--short'):
        # Основные данные
        data = {
            'title': get_text_or_none(article, 'div.content-title'),
            'author': get_text_or_none(article, 'a.author__name'),
            'date': get_text_or_none(article, 'div.content-header__date'),
            'shorts': get_text_or_none(article, 'div.block-text'),
            'link': get_link(article, 'a.content__link', HOST),
            'category': extract_category(get_link(article, 'a.content__link', HOST)),
            'parse_datetime': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        }
        results.append(data)
    return results

In [28]:
# Пример использования
if __name__ == "__main__":
    html_content = load_or_fetch_html(URL, TEMP_HTML_PATH, HEADERS)
    articles_data = parse_articles(html_content)
    print(f"Найдено статей: {len(articles_data)}")

⚡ Используется кешированный HTML: data/interim/vc_ru_temp.html


NameError: name 'get_text_or_none' is not defined

In [None]:

# Вспомогательные функции
def get_text_or_none(parent, selector):
    element = parent.select_one(selector)
    return element.get_text(strip=True) if element else None

def get_link(parent, selector, host):
    element = parent.select_one(selector)
    return urljoin(host, element['href']) if element and 'href' in element.attrs else None

def extract_category(url):
    if not url:
        return None
    parts = url.replace(HOST, '').strip('/').split('/')
    return parts[0] if parts else None

## V3 Исправленная версия с полным набором функций

In [2]:
import os
import requests
from bs4 import BeautifulSoup
from datetime import datetime
from urllib.parse import urljoin

### --- Вспомогательные функции ---

In [3]:
def get_text_or_none(parent, selector):
    """Извлекает текст из элемента или возвращает None, если элемент не найден."""
    element = parent.select_one(selector)
    return element.get_text(strip=True) if element else None

def get_link(parent, selector, host):
    """Извлекает URL ссылки, объединяя с host."""
    element = parent.select_one(selector)
    return urljoin(host, element['href']) if element and 'href' in element.attrs else None

def extract_category(url):
    """Извлекает категорию из URL (например, 'marketing' из 'https://vc.ru/marketing/123')."""
    if not url:
        return None
    parts = url.replace(HOST, '').strip('/').split('/')
    return parts[0] if parts else None

def load_or_fetch_html(url, cache_path, headers=None, timeout=10):
    """Загружает HTML из кеша или скачивает с сайта."""
    os.makedirs(os.path.dirname(cache_path), exist_ok=True)
    if os.path.exists(cache_path):
        with open(cache_path, 'r', encoding='utf-8') as f:
            return f.read()
    try:
        response = requests.get(url, headers=headers, timeout=timeout)
        response.raise_for_status()
        with open(cache_path, 'w', encoding='utf-8') as f:
            f.write(response.text)
        return response.text
    except requests.exceptions.RequestException as e:
        print(f"Ошибка: {e}")
        return None

In [28]:
# --- Основной парсер ---
def parse_articles(html_content):
    soup = BeautifulSoup(html_content, 'html.parser')
    articles = soup.find('div', class_='content-list')
    if not articles:
        raise ValueError("Секция с новостями не найдена")

    results = []
    for article in articles.find_all('div', class_='content--short'):
        data = {
            'title': get_text_or_none(article, 'div.content-title'),
            'author': get_text_or_none(article, 'a.author__name'),
            'date': get_text_or_none(article, 'div.content-header__date'), #div.content-header__date
            'shorts': get_text_or_none(article, 'div.block-text'),
            'link': get_link(article, 'a.content__link', HOST),
            'category': extract_category(get_link(article, 'a.content__link', HOST)),
            'parse_datetime': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        }
        results.append(data)
    return results

#### Исправленная функция 

In [33]:
def parse_articles(html_content):
    soup = BeautifulSoup(html_content, 'html.parser')
    articles = soup.find('div', class_='content-list')
    if not articles:
        raise ValueError("Секция с новостями не найдена")

    results = []
    for article in articles.find_all('div', class_='content--short'):
        # Ищем тег <time> внутри статьи
        time_tag = article.find('time')
        
        # Извлекаем дату из атрибута datetime (формат ISO 8601)
        if time_tag and time_tag.has_attr('datetime'):
            date_iso = time_tag['datetime']  # "2025-04-12T09:43:34.000Z"
            # Конвертируем в нужный формат (опционально)
            date_formatted = datetime.fromisoformat(date_iso.replace('Z', '+00:00')).strftime('%d.%m.%Y в %H:%M')
        else:
            date_iso = None
            date_formatted = None

        data = {
            'title': get_text_or_none(article, 'div.content-title'),
            'author': get_text_or_none(article, 'a.author__name'),
            'date': date_formatted,  # Формат "12.04.2025 в 12:43"
            # 'date_iso': date_iso, # Сохраняем оригинальный ISO-формат
            # 'date_formatted': date_formatted,  # Формат "12.04.2025 в 12:43"
            'shorts': get_text_or_none(article, 'div.block-text'),
            'link': get_link(article, 'a.content__link', HOST),
            'category': extract_category(get_link(article, 'a.content__link', HOST)),
            'parse_datetime': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        }
        results.append(data)
    return results

### --- Конфигурация --- Запуск функции

In [34]:

# --- Конфигурация ---
URL = "https://vc.ru/new"
HOST = "https://vc.ru"
HEADERS = {"User-Agent": "Mozilla/5.0"}
# TEMP_HTML_PATH = "data/interim/vc_ru_temp.html"
TEMP_HTML_PATH = "data/interim/dynamic_page.html"

# --- Запуск ---
if __name__ == "__main__":
    html_content = load_or_fetch_html(URL, TEMP_HTML_PATH, HEADERS)
    if html_content:
        articles_data = parse_articles(html_content)
        print(f"Найдено статей: {len(articles_data)}")
        for article in articles_data[:3]:  # Вывод первых 3 статей для проверки
            print(article)

Найдено статей: 24
{'title': 'Почему так сложно пилить видеоконтент и что с этим делать', 'author': 'Наталия Потёмина', 'date': '12.04.2025 в 09:43', 'shorts': 'Я не умею говорить, смотря в камеру — и мне часто приходится это делать. Методом самоонализа я поняла, что со мной не так. А вы расскажите, что с вами.', 'link': 'https://vc.ru/life/1920915-slozhnosti-sozdaniya-videokontenta-i-puti-ikh-resheniya', 'category': 'life', 'parse_datetime': '2025-04-12 17:09:19'}
{'title': 'Как продвигать Telegram-канал без бюджета и привлекать подписчиков и клиентов с помощью бота', 'author': 'Артем Жуков', 'date': '12.04.2025 в 09:07', 'shorts': 'Можно ли в 2025 развивать Telegram-канал компании без бюджета на рекламу? Да — если подключить бота со специальной механикой. В кейсе показываю, как разработали чат-бота с программой лояльности и за две недели получили 25% рост канала за счёт имеющейся аудитории и без затрат на трафик.', 'link': 'https://vc.ru/marketing/1918599-prodvizhenie-telegram-kanala

### Ключевые исправления:
#### Добавлены все недостающие функции:

get_text_or_none
get_link
extract_category
load_or_fetch_html

#### Глобальные константы:
HOST теперь корректно используется в extract_category.

#### Проверка работы:
Код выводит первые 3 статьи для визуальной проверки.

### Автоматизация расписания (простой вариант)
Создайте файл run_parser.py:

In [None]:
import schedule
import time

def job():
    print("Парсинг запущен...")
    # Ваш код парсинга и экспорта

schedule.every().day.at("09:00").do(job)

while True:
    schedule.run_pending()
    time.sleep(60)

#### Запускайте его в фоне через nohup python run_parser.py & (Linux/macOS).

### Пример экспорта в CSV/JSON

In [6]:
import json
import csv

In [7]:
def save_to_csv(data, filename):
    with open(filename, 'w', encoding='utf-8', newline='') as f:
        writer = csv.DictWriter(f, fieldnames=data[0].keys())
        writer.writeheader()
        writer.writerows(data)

def save_to_json(data, filename):
    with open(filename, 'w', encoding='utf-8') as f:
        json.dump(data, f, ensure_ascii=False, indent=2)

In [9]:
# Использование
save_to_csv(articles_data, 'data/output/vc_ru_articles.csv')
save_to_json(articles_data, 'data/output/vc_ru_articles.json')

## V4 - V1 DYNAMIC_PARCE - 1. Динамическая загрузка с Selenium

#### Цель:
Прокрутить ленту vc.ru на 10 "страниц" (или до достижения лимита новостей).
Имитировать поведение человека: случайные задержки, плавная прокрутка.
Защита от бана: User-Agent, куки, ограничение скорости.

In [1]:
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
import time
import random

In [29]:
# Настройки
URL = "https://vc.ru/new"
SCROLL_PAUSE = random.uniform(2, 5)  # Случайные задержки
MAX_SCROLLS = 10  # Лимит прокруток

def dynamic_parse():
    # chrome_options = Options()
    # chrome_options.add_argument("--headless")  # Режим без GUI (для сервера)
    # chrome_options.add_argument(f"user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36")

    # Автоматическая установка ChromeDriver
    driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()))
    
    # driver = webdriver.Chrome(service=Service('chromedriver'), options=chrome_options)
    driver.get(URL)

    for _ in range(MAX_SCROLLS):
        # Плавная прокрутка вниз
        driver.execute_script("window.scrollBy(0, 2000);")
        time.sleep(SCROLL_PAUSE)
        
        # Проверка на "конец ленты" (если есть кнопка "Показать ещё")
        try:
            more_button = driver.find_element(By.CSS_SELECTOR, ".button-more")
            more_button.click()
            time.sleep(SCROLL_PAUSE)
        except:
            pass

    html_content = driver.page_source
    driver.quit()
    return html_content

### Использование

In [30]:
# Использование
html_content = dynamic_parse()
articles_data = parse_articles(html_content)  # Ваша существующая функция

In [None]:
# Что добавить - Рандомизация действий:
if random.random() > 0.7:  # 30% chance to hover over elements
    element = driver.find_element(By.CSS_SELECTOR, random.choice([".content-title", ".author__name"]))
    webdriver.ActionChains(driver).move_to_element(element).pause(1).perform()

# Сохранение сессии (куки) между запусками.

### Устранение ошибки NoSuchDriverException - настройка ChromeDriver

In [25]:
# 1. Установите ChromeDriver автоматически (рекомендуемый способ)
# Используйте webdriver-manager для автоматической загрузки драйвера:

from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager

# Автоматическая установка ChromeDriver
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()))

### V2 DYNAMIC_PARCE - Полный рабочий код с обработкой ошибок

In [None]:
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from webdriver_manager.chrome import ChromeDriverManager
import time
import random

In [18]:
def dynamic_parse():
    try:
        # Автоматическая настройка ChromeDriver
        driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()))
        
        # Или ручной путь (раскомментируйте, если автоматически не работает)
        # driver = webdriver.Chrome(service=Service(r'ПУТЬ_К_CHROMEDRIVER'))

        driver.get("https://vc.ru/new")
        
        for _ in range(10):
            driver.execute_script("window.scrollBy(0, 2000);")
            time.sleep(random.uniform(2, 5))
            
        html_content = driver.page_source
        return html_content
        
    except Exception as e:
        print(f"Ошибка: {e}")
        return None
    finally:
        if 'driver' in locals():
            driver.quit()

HTML успешно получен!


In [26]:
# Проверка
if __name__ == "__main__":
    html = dynamic_parse()
    if html:
        print("HTML успешно получен!")

HTML успешно получен!


### 2. Обновление CSV/JSON с дедупликацией

#### Алгоритм:
Загружаем существующие данные из файла.
Объединяем со свежими данными.
Удаляем дубликаты по title или link.
Сохраняем обновлённый файл.

In [27]:
import pandas as pd

def update_data(new_data, csv_path="data/output/articles.csv", json_path="data/output/articles.json"):
    # Загрузка старых данных (если файл есть)
    try:
        old_df = pd.read_csv(csv_path)
    except FileNotFoundError:
        old_df = pd.DataFrame()

    # Объединение + удаление дубликатов
    combined_df = pd.concat([old_df, pd.DataFrame(new_data)])
    combined_df.drop_duplicates(subset=["title"], keep="last", inplace=True)  # Или "link"

    # Сохранение
    combined_df.to_csv(csv_path, index=False, encoding='utf-8')
    combined_df.to_json(json_path, orient="records", force_ascii=False, indent=2)
    print(f"Данные обновлены. Уникальных записей: {len(combined_df)}")

In [21]:
# Обновление CSV/JSON с дедупликацией
if __name__ == "__main__":
    update_data(articles_data)

Данные обновлены. Уникальных записей: 36


In [31]:
# Обновление CSV/JSON с дедупликацией
if __name__ == "__main__":
    update_data(articles_data, csv_path="data/output/vc_ru_articles.csv", json_path="data/output/vc_ru_articles.json")

Данные обновлены. Уникальных записей: 36


### Следующие шаги:
#### Настроим Selenium:
Установите драйвер (ChromeDriver).
Проверим работу dynamic_parse() на тестовой прокрутке.

#### Интеграция с вашим кодом:
Заменим load_or_fetch_html() на dynamic_parse().
Добавим обработку ошибок (например, если сайт блокирует запросы).

#### Тестирование дедупликации:
Создадим тестовые CSV/JSON с дубликатами.
Проверим, что update_data() корректно объединяет файлы.

In [None]:
# Важно для Selenium:
# Для macOS/Windows установите:
pip install selenium webdriver-manager

In [None]:
# Драйвер можно настроить автоматически:
from webdriver_manager.chrome import ChromeDriverManager
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()))

## V5 - Совмещаем все функции в один блок

### Libraries - Импортирование библиотек

In [1]:
# Для вспомогательных функций и основного парсера
import os
import requests
from bs4 import BeautifulSoup
from datetime import datetime
from urllib.parse import urljoin

# Для экспорта/импорта временных данных
import json
import csv

# Для работы с динамическими сайтами
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.action_chains import ActionChains
from webdriver_manager.chrome import ChromeDriverManager
import time
import random

# Обновление CSV/JSON с дедупликацией
import pandas as pd

# Google Sheets
from gspread import Client, Spreadsheet, Worksheet, service_account, exceptions
from typing import Optional, List, Dict

### Усовершенствованная функция по импортированию библиотек

In [1]:
imports = [
        "import os",
        "import requests",
        "from bs4 import BeautifulSoup",
        "from datetime import datetime",
        "from urllib.parse import urljoin",
        "import json",
        "import csv",
        "from selenium import webdriver",
        "from selenium.webdriver.common.by import By",
        "from selenium.webdriver.chrome.service import Service",
        "from selenium.webdriver.chrome.options import Options",
        "import time",
        "import random",
        "import pandas as pd"
    ]

In [2]:
import time
import importlib

In [3]:
# import time
# import importlib

def import_and_measure_improved(import_statement):
    """
    Импортирует библиотеку или модуль и измеряет время выполнения операции.
    :param import_statement: Строка с командой импорта (например, "import requests" или "from telebot import types").
    """
    start_time = time.time()  # Запуск таймера

    try:
        # Выполняем команду импорта
        exec(import_statement)
        end_time = time.time()  # Остановка таймера

        # Вычисление времени выполнения
        elapsed_time = end_time - start_time
        minutes = int(elapsed_time // 60)
        seconds = int(elapsed_time % 60)

        # Вывод результата
        print(f"\t'{import_statement}' успешно завершено!\n"
              f"\tВремя выполнения операции: "
              f"минут: {minutes}, секунд: {seconds}")
    except Exception as e:
        print(f"Ошибка при импортировании '{import_statement}': {e}")

In [4]:
# Пример использования усовершенствованной функции IMPORT_AND_MEASURE_IMPROVED
if __name__ == "__main__":
    """
    imports = [
        "import time",
        "import importlib"
    ]
    """
    total_start_time = time.time()  # Запуск общего таймера
    count = 1 # Счетчик операций
    print("Start immorting of libraries !")
    
    for imp in imports:
        print(f"{count}. Выполняется операция '{imp.upper()}' ...")
        import_and_measure_improved(imp)
        count += 1

    total_end_time = time.time()  # Остановка общего таймера
    total_elapsed_time = total_end_time - total_start_time
    total_minutes = int(total_elapsed_time // 60)
    total_seconds = int(total_elapsed_time % 60)

    print(f"\nОбщее время импортирования библиотек: "
          f"минут: {total_minutes}, секунд: {total_seconds}")

Start immorting of libraries !
1. Выполняется операция 'IMPORT OS' ...
	'import os' успешно завершено!
	Время выполнения операции: минут: 0, секунд: 0
2. Выполняется операция 'IMPORT REQUESTS' ...
	'import requests' успешно завершено!
	Время выполнения операции: минут: 0, секунд: 7
3. Выполняется операция 'FROM BS4 IMPORT BEAUTIFULSOUP' ...
	'from bs4 import BeautifulSoup' успешно завершено!
	Время выполнения операции: минут: 0, секунд: 1
4. Выполняется операция 'FROM DATETIME IMPORT DATETIME' ...
	'from datetime import datetime' успешно завершено!
	Время выполнения операции: минут: 0, секунд: 0
5. Выполняется операция 'FROM URLLIB.PARSE IMPORT URLJOIN' ...
	'from urllib.parse import urljoin' успешно завершено!
	Время выполнения операции: минут: 0, секунд: 0
6. Выполняется операция 'IMPORT JSON' ...
	'import json' успешно завершено!
	Время выполнения операции: минут: 0, секунд: 0
7. Выполняется операция 'IMPORT CSV' ...
	'import csv' успешно завершено!
	Время выполнения операции: минут

### --- Вспомогательные функции ---

In [5]:
def get_text_or_none(parent, selector):
    """Извлекает текст из элемента или возвращает None, если элемент не найден."""
    element = parent.select_one(selector)
    return element.get_text(strip=True) if element else None

def get_link(parent, selector, host):
    """Извлекает URL ссылки, объединяя с host."""
    element = parent.select_one(selector)
    return urljoin(host, element['href']) if element and 'href' in element.attrs else None

def extract_category(url):
    """Извлекает категорию из URL (например, 'marketing' из 'https://vc.ru/marketing/123')."""
    if not url:
        return None
    parts = url.replace(HOST, '').strip('/').split('/')
    return parts[0] if parts else None

def load_or_fetch_html(url, cache_path, headers=None, timeout=10):
    """Загружает HTML из кеша или скачивает с сайта."""
    os.makedirs(os.path.dirname(cache_path), exist_ok=True)
    if os.path.exists(cache_path):
        with open(cache_path, 'r', encoding='utf-8') as f:
            return f.read()
    try:
        response = requests.get(url, headers=headers, timeout=timeout)
        response.raise_for_status()
        with open(cache_path, 'w', encoding='utf-8') as f:
            f.write(response.text)
        return response.text
    except requests.exceptions.RequestException as e:
        print(f"Ошибка: {e}")
        return None

In [6]:
# --- Основной парсер ---
def parse_articles(html_content):
    soup = BeautifulSoup(html_content, 'html.parser')
    articles = soup.find('div', class_='content-list')
    if not articles:
        raise ValueError("Секция с новостями не найдена")

    results = []
    for article in articles.find_all('div', class_='content--short'):
        data = {
            'title': get_text_or_none(article, 'div.content-title'),
            'author': get_text_or_none(article, 'a.author__name'),
            'date': get_text_or_none(article, 'div.content-header-date'),
            'shorts': get_text_or_none(article, 'div.block-text'),
            'link': get_link(article, 'a.content__link', HOST),
            'category': extract_category(get_link(article, 'a.content__link', HOST)),
            'parse_datetime': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        }
        results.append(data)
    return results

### --- Конфигурация --- Запуск функции

In [7]:
# --- Конфигурация ---
URL = "https://vc.ru/new"
HOST = "https://vc.ru"
HEADERS = {"User-Agent": "Mozilla/5.0"}
TEMP_HTML_PATH = "data/interim/vc_ru_temp.html"

In [11]:
# --- Запуск 3 ---
if __name__ == "__main__":
    html_content = load_or_fetch_html(URL, TEMP_HTML_PATH, HEADERS)
    if html_content:
        articles_data = parse_articles(html_content)
        print(f"Найдено статей: {len(articles_data)}")
        for article in articles_data[:3]:  # Вывод первых 3 статей для проверки
            print(article)

Найдено статей: 12
{'title': 'Повышение конверсии и прибыли за 10 пунктов: как начать зарабатывать больше, благодаря одной статье', 'author': 'Хорош в маркетинге', 'date': None, 'shorts': 'Вы хотите зарабатывать больше? Вы хотите повысить конверсии ��а своем сайте? Тогда, усаживайтесь поудобнее и дочитывайте до конца. Ведь в конце будет бонусный самый важный пункт по моему мнению. Я думаю, что Вы согласитесь со мной', 'link': 'https://vc.ru/marketing/1921259-kak-povysit-konversii-i-pribyl-na-sayte-10-strategiy', 'category': 'marketing', 'parse_datetime': '2025-04-12 07:36:11'}
{'title': 'Google уволила «сотни сотрудников» в подразделении, которое занимается Android иPixel', 'author': 'Таня Боброва', 'date': None, 'shorts': 'В начале 2025-го компания предлагала сотрудникам уйти по собственному желанию в рамках оптимизации.', 'link': 'https://vc.ru/hr/1921136-google-uvolila-sotrudnikov-iz-podrazdeleniya-android-i-pixel', 'category': 'hr', 'parse_datetime': '2025-04-12 07:36:11'}
{'title'

In [13]:
# import os
# from bs4 import BeautifulSoup

# --- Запуск 2 ---
if __name__ == "__main__":
    html_content = load_or_fetch_html(URL, TEMP_HTML_PATH, HEADERS)
    if html_content:
        articles_data = parse_articles(html_content)
        print(f"Найдено статей: {len(articles_data)}")
        for article in articles_data[:3]:  # Вывод первых 3 статей для проверки
            print(article)

Найдено статей: 12
{'title': 'Рабочий vs Предприниматель', 'author': 'Ирина Курбанова', 'date': None, 'shorts': 'Что общего между рабочим, вкалывающим по 12 часов день на стройке, и предпринимателем, погрязшем в текучке?', 'link': 'https://vc.ru/opinions/1915659-rabochiy-i-predprinimatel', 'category': 'opinions', 'parse_datetime': '2025-04-11 12:25:01'}
{'title': 'OpenAI обновила «память» у ChatGPT — бот теперь может запоминать всечаты', 'author': 'Таня Боброва', 'date': None, 'shorts': 'И, например, напомнить содержание разговора, который был несколько дней назад.', 'link': 'https://vc.ru/chatgpt/1918921-obnovlenie-pamyati-chatgpt-ot-openai', 'category': 'chatgpt', 'parse_datetime': '2025-04-11 12:25:01'}
{'title': 'Настольный мерч: в какие игры можно интегрировать бренд?', 'author': 'Станислав Грушевский', 'date': None, 'shorts': None, 'link': 'https://vc.ru/marketing/1919121-nastolnyy-merch-dlya-brendov-idei-integratsii-v-igry', 'category': 'marketing', 'parse_datetime': '2025-04-11

### Пример экспорта в CSV/JSON

In [12]:
import json
import csv

In [13]:
def save_to_csv(data, filename):
    with open(filename, 'w', encoding='utf-8', newline='') as f:
        writer = csv.DictWriter(f, fieldnames=data[0].keys())
        writer.writeheader()
        writer.writerows(data)

def save_to_json(data, filename):
    with open(filename, 'w', encoding='utf-8') as f:
        json.dump(data, f, ensure_ascii=False, indent=2)

In [14]:
# Использование
save_to_csv(articles_data, 'data/output/vc_ru_articles.csv')
save_to_json(articles_data, 'data/output/vc_ru_articles.json')

## V6 DYNAMIC_PARCE - 1. Динамическая загрузка с Selenium

#### Цель:
Прокрутить ленту vc.ru на 10 "страниц" (или до достижения лимита новостей).
Имитировать поведение человека: случайные задержки, плавная прокрутка.
Защита от бана: User-Agent, куки, ограничение скорости.

In [15]:
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
import time
import random

### Использование

### V2 DYNAMIC_PARCE - Полный рабочий код с обработкой ошибок

In [None]:
# from selenium import webdriver
# from selenium.webdriver.chrome.service import Service
# from selenium.webdriver.common.by import By
# from webdriver_manager.chrome import ChromeDriverManager
# import time
# import random

In [18]:
def dynamic_parse():
    try:
        # Автоматическая настройка ChromeDriver
        driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()))
        
        # Или ручной путь (раскомментируйте, если автоматически не работает)
        # driver = webdriver.Chrome(service=Service(r'ПУТЬ_К_CHROMEDRIVER'))

        driver.get("https://vc.ru/new")
        
        for _ in range(10):
            driver.execute_script("window.scrollBy(0, 2000);")
            time.sleep(random.uniform(2, 5))
            
        html_content = driver.page_source
        return html_content
        
    except Exception as e:
        print(f"Ошибка: {e}")
        return None
    finally:
        if 'driver' in locals():
            driver.quit()

In [19]:
# Проверка
if __name__ == "__main__":
    html = dynamic_parse()
    if html:
        print("HTML успешно получен!")

Ошибка: name 'ChromeDriverManager' is not defined


#### 2. Обновление CSV/JSON с дедупликацией

#### Алгоритм:
Загружаем существующие данные из файла.
Объединяем со свежими данными.
Удаляем дубликаты по title или link.
Сохраняем обновлённый файл.

In [25]:
import pandas as pd

def update_data(new_data, csv_path="data/output/articles.csv", json_path="data/output/articles.json"):
    # Загрузка старых данных (если файл есть)
    try:
        old_df = pd.read_csv(csv_path)
    except FileNotFoundError:
        old_df = pd.DataFrame()

    # Объединение + удаление дубликатов
    combined_df = pd.concat([old_df, pd.DataFrame(new_data)])
    combined_df.drop_duplicates(subset=["title"], keep="last", inplace=True)  # Или "link"

    # Сохранение
    combined_df.to_csv(csv_path, index=False, encoding='utf-8')
    combined_df.to_json(json_path, orient="records", force_ascii=False, indent=2)
    print(f"Данные обновлены. Уникальных записей: {len(combined_df)}")

In [26]:
# Обновление CSV/JSON с дедупликацией
if __name__ == "__main__":
    update_data(articles_data)

Данные обновлены. Уникальных записей: 48


In [27]:
# Обновление CSV/JSON с дедупликацией
if __name__ == "__main__":
    update_data(articles_data, csv_path="data/output/vc_ru_articles.csv", json_path="data/output/vc_ru_articles.json")

Данные обновлены. Уникальных записей: 12


In [41]:
articles_data

[{'title': 'Поисковая выдача Telegram: первичные результаты тестирования нового плейсмента',
  'author': 'Владислав Ануфриев',
  'date': None,
  'shorts': 'Я сходил к нашему отделу по работе с Telegram Ads, чтобы узнать результаты первых тестов по поисковой рекламе в мессенджере и поделиться с вами их наблюдениями.',
  'link': 'https://vc.ru/marketing/1920047-reklama-v-telegram-rezultaty-testirovaniya-novogo-formata-poiska',
  'category': 'marketing',
  'parse_datetime': '2025-04-11 12:44:57'},
 {'title': 'Как я влез в строительный бизнес и начал приводить по 300 целевых заявок в месяц с ключевой ставкой в 21% и уже приблизился к нервному срыву...',
  'author': 'Георгий Ягелло I themedia',
  'date': None,
  'shorts': 'Самый пострадавший бизнес 2025 года - стройка. Так как ей, еще ни одному бизнесу не доставалось, процентная ставка буквально уничтожает все потуги. Единственный шанс это система бесперебойной системы лидогенерации и вот как я к ней пришел.',
  'link': 'https://vc.ru/marke

#### Обновленная функция

In [None]:
import pandas as pd
from datetime import datetime

def update_data(
    new_data: list[dict],
    csv_path: str = "data/output/articles.csv",
    json_path: str = "data/output/articles.json"
) -> None:
    """
    Обновляет данные, сохраняя первую дату появления записи.
    
    Параметры:
        new_data (list[dict]): Свежие данные (каждый элемент — словарь с полями, включая 'parse_datetime').
        csv_path (str): Путь к CSV-файлу.
        json_path (str): Путь к JSON-файлу.
    """
    try:
        # Загрузка старых данных (если файл существует)
        try:
            old_df = pd.read_csv(csv_path)
            old_df["parse_datetime"] = pd.to_datetime(old_df["parse_datetime"])
        except FileNotFoundError:
            old_df = pd.DataFrame()

        # Конвертация новых данных в DataFrame
        new_df = pd.DataFrame(new_data)
        new_df["parse_datetime"] = pd.to_datetime(new_df["parse_datetime"])

        # Объединение данных
        combined_df = pd.concat([old_df, new_df], ignore_index=True)

        # Удаление дубликатов с сохранением новых данных + оригинальной даты первого появления
        combined_df["first_seen"] = combined_df.groupby("title")["parse_datetime"].transform("min")
        combined_df = combined_df.sort_values("parse_datetime", ascending=False).drop_duplicates("title", keep="first")

        # Восстанавливаем исходную дату первого появления
        combined_df["parse_datetime"] = combined_df["first_seen"]
        combined_df = combined_df.drop(columns="first_seen")

        # Сохранение
        combined_df.to_csv(csv_path, index=False, encoding='utf-8')
        combined_df.to_json(json_path, orient="records", force_ascii=False, indent=2)
        print(f"✅ Данные обновлены. Уникальных записей: {len(combined_df)}")

    except Exception as e:
        print(f"🚨 Ошибка: {e}")

# Пример вызова
new_articles = [
    {
        "title": "Новость 1",
        "author": "Автор 1",
        "parse_datetime": "2025-01-10 12:00:00",  # Новая дата парсинга
        # ...остальные поля
    },
    # ...
]
update_data(new_articles)

## V7 - Финальная версия функций для Selenium

In [1]:
# Загрузка библиотек
# Для вспомогательных функций и основного парсера
import os
import requests
from bs4 import BeautifulSoup
from datetime import datetime
from urllib.parse import urljoin

# Для экспорта/импорта временных данных
import json
import csv

# Для работы с динамическими сайтами
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.action_chains import ActionChains
from webdriver_manager.chrome import ChromeDriverManager
import time
import random

# Обновление CSV/JSON с дедупликацией
import pandas as pd

# Google Sheets
from gspread import Client, Spreadsheet, Worksheet, service_account, exceptions
from typing import Optional, List, Dict

# Библиотеки для ТелеБОТа
import telebot
# для указание типов
from telebot import types
# токен лежит в файле config.py
import config

77. Hello Nick!
88. Notebook testing SCHEDULE
111. Token_&_CHAT_ID - Gut !!


### HUMAN_LIKE_INTERACTION - Имитирует действия человека

In [2]:
def human_like_interaction(driver):
    """Имитирует действия человека: плавная прокрутка, hover, случайные паузы."""
    # Плавная прокрутка (3-5 сек)
    scroll_pause = random.uniform(3, 5)
    scroll_amount = random.randint(800, 1200)
    driver.execute_script(f"window.scrollBy(0, {scroll_amount});")
    time.sleep(scroll_pause)

    # Случайный hover на элементы (30% вероятность)
    if random.random() > 0.7:
        elements = driver.find_elements(By.CSS_SELECTOR, ".content-title, .author__name, .content-header__item")
        if elements:
            element = random.choice(elements)
            ActionChains(driver).move_to_element(element).pause(1).perform()
            time.sleep(random.uniform(1, 2))

### DYNAMIC_PARSE - Динамически загружает контент в локальный html файл

In [3]:
def dynamic_parse(url="https://vc.ru/new", max_scrolls=10):
    """
    Динамически загружает контент с имитацией поведения человека.
    
    Параметры:
        url (str): URL для парсинга
        max_scrolls (int): Максимальное число прокруток
        
    Возвращает:
        str: HTML-контент или None при ошибке
    """
    try:
        # Настройка драйвера
        service = Service(ChromeDriverManager().install())
        options = webdriver.ChromeOptions()
        options.add_argument("--disable-blink-features=AutomationControlled")
        options.add_argument(f"user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{random.randint(90, 115)}.0.0.0 Safari/537.36")
        
        driver = webdriver.Chrome(service=service, options=options)
        driver.get(url)

        # Счетчик прокрутки
        count = 1

        # Основной цикл прокрутки
        for _ in range(max_scrolls):
            human_like_interaction(driver)
            print(f"\tПрокрутка: {count}")
            count+=1
            # Проверка на кнопку "Показать ещё" (если есть)
            try:
                more_btn = driver.find_element(By.CSS_SELECTOR, ".button-more")
                if more_btn.is_displayed():
                    more_btn.click()
                    time.sleep(random.uniform(2, 4))
            except:
                pass
                
        print(f"\tПрокручено: {count} раз!")
        print("\n✅ 1. HTML сохранён с динамическим контентом!")
        return driver.page_source

    except Exception as e:
        print(f"🚨 Ошибка в dynamic_parse(): {type(e).__name__} - {str(e)}")
        return None
    finally:
        if 'driver' in locals():
            driver.quit()

In [4]:
# Пример использования
if __name__ == "__main__":
    # html = dynamic_parse()
    html = dynamic_parse(max_scrolls=50)
    if html:
        with open("data/interim/dynamic_page.html", "w", encoding="utf-8") as f:
            f.write(html)
        print("✅ 1. HTML сохранён с динамическим контентом!")
        print("Скрипт отработал:", datetime.now().strftime('%Y-%m-%d %H:%M:%S'))

Прокрутка: 1
Прокрутка: 2
Прокрутка: 3
Прокрутка: 4
Прокрутка: 5
Прокрутка: 6
Прокрутка: 7
Прокрутка: 8
Прокрутка: 9
Прокрутка: 10
Прокрутка: 11
Прокрутка: 12
Прокрутка: 13
Прокрутка: 14
Прокрутка: 15
Прокрутка: 16
Прокрутка: 17
Прокрутка: 18
Прокрутка: 19
Прокрутка: 20
Прокрутка: 21
Прокрутка: 22
Прокрутка: 23
Прокрутка: 24
Прокрутка: 25
Прокрутка: 26
Прокрутка: 27
Прокрутка: 28
Прокрутка: 29
Прокрутка: 30
Прокрутка: 31
Прокрутка: 32
Прокрутка: 33
Прокрутка: 34
Прокрутка: 35
Прокрутка: 36
Прокрутка: 37
Прокрутка: 38
Прокрутка: 39
Прокрутка: 40
Прокрутка: 41
Прокрутка: 42
Прокрутка: 43
Прокрутка: 44
Прокрутка: 45
Прокрутка: 46
Прокрутка: 47
Прокрутка: 48
Прокрутка: 49
Прокрутка: 50
Прокручено: 51 раз!
✅ HTML сохранён с динамическим контентом!
Скрипт отработал: 2025-04-27 22:09:43


### PARSE_ARTICLES - Запуск основной функции парсинга

#### --- Вспомогательные функции --- GET_TEXT - GET_LINK - EXTRACT_CATEGORY - LOAD_OR_FETCH_HTML

In [13]:
def get_text_or_none(parent, selector):
    """Извлекает текст из элемента или возвращает None, если элемент не найден."""
    element = parent.select_one(selector)
    return element.get_text(strip=True) if element else None

def get_link(parent, selector, host):
    """Извлекает URL ссылки, объединяя с host."""
    element = parent.select_one(selector)
    return urljoin(host, element['href']) if element and 'href' in element.attrs else None

def extract_category(url):
    """Извлекает категорию из URL (например, 'marketing' из 'https://vc.ru/marketing/123')."""
    if not url:
        return None
    parts = url.replace(host, '').strip('/').split('/')
    return parts[0] if parts else None

def load_or_fetch_html(url, cache_path, headers=None, timeout=10):
    """Загружает HTML из кеша или скачивает с сайта."""
    os.makedirs(os.path.dirname(cache_path), exist_ok=True)
    if os.path.exists(cache_path):
        with open(cache_path, 'r', encoding='utf-8') as f:
            return f.read()
    try:
        response = requests.get(url, headers=headers, timeout=timeout)
        response.raise_for_status()
        with open(cache_path, 'w', encoding='utf-8') as f:
            f.write(response.text)
        return response.text
    except requests.exceptions.RequestException as e:
        print(f"Ошибка: {e}")
        return None

#### Исправленная функция

In [5]:
def parse_articles(html_content, host):
    soup = BeautifulSoup(html_content, 'html.parser')
    articles = soup.find('div', class_='content-list')
    if not articles:
        raise ValueError("Секция с новостями не найдена")

    results = []
    for article in articles.find_all('div', class_='content--short'):
        # Ищем тег <time> внутри статьи
        time_tag = article.find('time')
        
        # Извлекаем дату из атрибута datetime (формат ISO 8601)
        if time_tag and time_tag.has_attr('datetime'):
            date_iso = time_tag['datetime']  # "2025-04-12T09:43:34.000Z"
            # Конвертируем в нужный формат (опционально)
            date_formatted = datetime.fromisoformat(date_iso.replace('Z', '+00:00')).strftime('%d.%m.%Y в %H:%M')
        else:
            date_iso = None
            date_formatted = None

        data = {
            'title': get_text_or_none(article, 'div.content-title'),
            'author': get_text_or_none(article, 'a.author__name'),
            'date': date_formatted,  # Формат "12.04.2025 в 12:43"
            # 'date_iso': date_iso, # Сохраняем оригинальный ISO-формат
            # 'date_formatted': date_formatted,  # Формат "12.04.2025 в 12:43"
            'shorts': get_text_or_none(article, 'div.block-text'),
            'link': get_link(article, 'a.content__link', host),
            'category': extract_category(get_link(article, 'a.content__link', host)),
            'parse_datetime': datetime.now().strftime('%d.%m.%Y. %H:%M:%S')
        }
        results.append(data)

    print("\n✅ 2. Парсинг новостей прошёл успешно!")
    return results

#### --- Конфигурация --- Запуск функции

In [9]:
# --- Конфигурация ---
# URL = "https://vc.ru/new"
HOST = "https://vc.ru"
# HEADERS = {"User-Agent": "Mozilla/5.0"}
# TEMP_HTML_PATH = "data/interim/vc_ru_temp.html"

In [28]:
# Запуск осовной функции
# HOST = "https://vc.ru"
# articles_data = parse_articles(html, HOST)

### UPDATE_DATA - Обновление CSV/JSON с дедупликацией

#### Алгоритм:
Загружаем существующие данные из файла.
Объединяем со свежими данными.
Удаляем дубликаты по title или link.
Сохраняем обновлённый файл.

In [6]:
# import pandas as pd

def update_data(new_data, csv_path="data/output/articles.csv", json_path="data/output/articles.json"):
    # Загрузка старых данных (если файл есть)
    try:
        old_df = pd.read_csv(csv_path)
    except FileNotFoundError:
        old_df = pd.DataFrame()

    # Объединение + удаление дубликатов
    combined_df = pd.concat([old_df, pd.DataFrame(new_data)])
    combined_df.drop_duplicates(subset=["title"], keep="last", inplace=True)  # Или "link"

    # Сохранение
    combined_df.to_csv(csv_path, index=False, encoding='utf-8')
    combined_df.to_json(json_path, orient="records", force_ascii=False, indent=2)
    print(f"Данные обновлены. Уникальных записей: {len(combined_df)}")

In [12]:
# Обновление CSV/JSON с дедупликацией
if __name__ == "__main__":
    update_data(articles_data)

Данные обновлены. Уникальных записей: 72


In [27]:
# Обновление CSV/JSON с дедупликацией
if __name__ == "__main__":
    update_data(articles_data, csv_path="data/output/vc_ru_articles.csv", json_path="data/output/vc_ru_articles.json")

Данные обновлены. Уникальных записей: 12


In [13]:
# Обновление CSV/JSON с дедупликацией
# if __name__ == "__main__":
#     update_data(articles_data, csv_path="data/output/vc_ru_articles.csv", json_path="data/output/vc_ru_articles.json")

#### Вот модернизированная функция `update_data()`, которая:
1. Сохраняет **первую дату создания** записи (`first_seen`),  
2. Удаляет дубликаты, оставляя **новые данные**, но с оригинальной датой первого появления.

In [40]:
# import pandas as pd
# from datetime import datetime

def update_data(
    new_data: list[dict],
    csv_path: str = "data/output/articles.csv",
    json_path: str = "data/output/articles.json"
) -> None:
    """
    Обновляет данные, сохраняя первую дату появления записи.
    
    Параметры:
        new_data (list[dict]): Свежие данные (каждый элемент — словарь с полями, включая 'parse_datetime').
        csv_path (str): Путь к CSV-файлу.
        json_path (str): Путь к JSON-файлу.
    """
    try:
        # Загрузка старых данных (если файл существует)
        try:
            old_df = pd.read_csv(csv_path)
            old_df["parse_datetime"] = pd.to_datetime(old_df["parse_datetime"])
        except FileNotFoundError:
            old_df = pd.DataFrame()

        # Конвертация новых данных в DataFrame
        new_df = pd.DataFrame(new_data)
        new_df["parse_datetime"] = pd.to_datetime(new_df["parse_datetime"])

        # Объединение данных
        combined_df = pd.concat([old_df, new_df], ignore_index=True)

        # Удаление дубликатов с сохранением новых данных + оригинальной даты первого появления
        combined_df["first_seen"] = combined_df.groupby("title")["parse_datetime"].transform("min")
        combined_df = combined_df.sort_values("parse_datetime", ascending=False).drop_duplicates("title", keep="first")

        # Восстанавливаем исходную дату первого появления
        combined_df["parse_datetime"] = combined_df["first_seen"]
        combined_df = combined_df.drop(columns="first_seen")

        # Сохранение
        combined_df.to_csv(csv_path, index=False, encoding='utf-8')
        combined_df.to_json(json_path, orient="records", force_ascii=False, indent=2)
        print(f"✅ Данные обновлены. Уникальных записей: {len(combined_df)}")

    except Exception as e:
        print(f"🚨 Ошибка в update_data(): {e}")

In [None]:
# Пример вызова
new_articles = [
    {
        "title": "Новость 1",
        "author": "Автор 1",
        "parse_datetime": "2025-01-10 12:00:00",  # Новая дата парсинга
        # ...остальные поля
    },
    # ...
]
update_data(new_articles)

##### 🔍 **Ключевые изменения:**
1. **Сохранение первой даты (`first_seen`)**  
   - Группировка по `title` и поиск минимальной даты с помощью `groupby() + transform("min")`.  
2. **Приоритет новых данных**  
   - Сортировка по `parse_datetime` (новые записи выше) + `drop_duplicates(keep="first")`.  
3. **Восстановление оригинальной даты**  
   - После дедупликации возвращаем `first_seen` в `parse_datetime`. 

#### Доработанная функция update_data(), для работы с форматом даты "%d.%m.%Y. %H:%M:%S"

In [6]:
# import pandas as pd
# from datetime import datetime

def update_data(
    new_data: list[dict],
    csv_path: str = "data/output/articles.csv",
    json_path: str = "data/output/articles.json"
) -> None:
    """
    Обновляет данные, сохраняя первую дату появления записи.
    Работает с форматом даты: "%d.%m.%Y. %H:%M:%S".
    """
    try:
        # Загрузка старых данных (если файл существует)
        try:
            old_df = pd.read_csv(csv_path)
            # Конвертируем дату из строки в datetime с указанием формата
            old_df["parse_datetime"] = pd.to_datetime(
                old_df["parse_datetime"],
                format="%d.%m.%Y. %H:%M:%S",
                dayfirst=True
            )
        except FileNotFoundError:
            old_df = pd.DataFrame()

        # Конвертация новых данных в DataFrame
        new_df = pd.DataFrame(new_data)
        new_df["parse_datetime"] = pd.to_datetime(
            new_df["parse_datetime"],
            format="%d.%m.%Y. %H:%M:%S",
            dayfirst=True
        )

        # Объединение данных
        combined_df = pd.concat([old_df, new_df], ignore_index=True)

        # Удаление дубликатов с сохранением новых данных + оригинальной даты первого появления
        combined_df["first_seen"] = combined_df.groupby("title")["parse_datetime"].transform("min")
        combined_df = combined_df.sort_values("parse_datetime", ascending=False).drop_duplicates("title", keep="first")

        # Восстанавливаем исходную дату первого появления и конвертируем обратно в строку
        combined_df["parse_datetime"] = combined_df["first_seen"].dt.strftime("%d.%m.%Y. %H:%M:%S")
        combined_df = combined_df.drop(columns="first_seen")

        # Сохранение
        combined_df.to_csv(csv_path, index=False, encoding='utf-8')
        combined_df.to_json(json_path, orient="records", force_ascii=False, indent=2)
        print(f"\n✅ 3. Данные в CSV/JSON файлах обновлены. Уникальных записей: {len(combined_df)}")

    except Exception as e:
        print(f"🚨 Ошибка в update_data(): {e}")

In [None]:
# Пример вызова
new_articles = [
    {
        "title": "Новость 1",
        "author": "Автор 1",
        "parse_datetime": "27.04.2025. 22:12:18",  # Ваш текущий формат
        # ...остальные поля
    },
    # ...
]
update_data(new_articles)

### V1 - Код для экспорта в Google Sheets

In [18]:
from gspread import Client, Spreadsheet, Worksheet, service_account, exceptions
from typing import Optional, List, Dict
import json

In [31]:
def export_to_google_sheets(
    json_path: str,
    spreadsheet_id: str,
    sheet_name: str = "News",
    credentials_path: str = "credentials.json"
) -> None:
    """
    Экспортирует данные из JSON в Google Sheets с использованием gspread.

    Параметры:
        json_path (str): Путь к JSON-файлу с данными.
        spreadsheet_id (str): ID таблицы из URL.
        sheet_name (str): Название листа (по умолчанию "News").
        credentials_path (str): Путь к credentials.json.

    Исключения:
        exceptions.GSpreadException: При ошибках API.
    """
    try:
        # 1. Инициализация клиента
        gc: Client = service_account(filename=credentials_path)

        # 2. Загрузка данных из JSON
        with open(json_path, 'r', encoding='utf-8') as f:
            data: List[Dict] = json.load(f)  # Предполагаем, что JSON — список словарей

        if not data:
            print("⚠️ Нет данных для экспорта.")
            return

        # 3. Открытие таблицы
        spreadsheet: Spreadsheet = gc.open_by_key(spreadsheet_id)
        
        try:
            worksheet: Worksheet = spreadsheet.worksheet(sheet_name)
        except exceptions.WorksheetNotFound:
            worksheet: Worksheet = spreadsheet.add_worksheet(title=sheet_name, rows=len(data)+1, cols=10)

        # 4. Подготовка данных
        headers: List[str] = list(data[0].keys())
        values: List[List] = [headers] + [list(item.values()) for item in data]

        # 5. Запись в таблицу
        worksheet.clear()
        worksheet.update(values)
        print(f"✅ Данные успешно экспортированы в лист '{sheet_name}'")
        print(f"🔗 URL таблицы: https://docs.google.com/spreadsheets/d/{spreadsheet_id}")

    except exceptions.GSpreadException as e:
        print(f"🚨 Ошибка Google Sheets API: {e}")
    except Exception as e:
        print(f"🚨 Неожиданная ошибка: {e}")

In [32]:
# Пример вызова
export_to_google_sheets(
    json_path="data/output/vc_ru_articles.json",
    spreadsheet_id="1XQBizX0YVDsZ7PaHlNvnEVLwyOxUZuFJo0447ku6Qdg",
    sheet_name="VC.ru News"
)

✅ Данные успешно экспортированы в лист 'VC.ru News'
🔗 URL таблицы: https://docs.google.com/spreadsheets/d/1XQBizX0YVDsZ7PaHlNvnEVLwyOxUZuFJo0447ku6Qdg


In [33]:
# Пример вызова
export_to_google_sheets(
    json_path="data/output/articles.json",
    spreadsheet_id="1XQBizX0YVDsZ7PaHlNvnEVLwyOxUZuFJo0447ku6Qdg",
    sheet_name="VC.ru News"
)

✅ Данные успешно экспортированы в лист 'VC.ru News'
🔗 URL таблицы: https://docs.google.com/spreadsheets/d/1XQBizX0YVDsZ7PaHlNvnEVLwyOxUZuFJo0447ku6Qdg


### V2 - Как сделать "умное" обновление Google Sheets?

#### Как сделать "умное" обновление Google Sheets?
Вот модифицированная версия функции, которая:
Сначала загружает текущие данные из Google Sheets.
Объединяет их с новыми (удаляя дубликаты).
Обновляет лист целиком.

### V2 - EXPORT_TO_GOOGLE_SHEETS - Код для "умного" обновления Google Sheets

In [None]:
# from gspread import Client, Spreadsheet, Worksheet, service_account, exceptions
# from typing import Optional, List, Dict
# import json

In [7]:
def export_to_google_sheets(
    json_path: str,
    spreadsheet_id: str,
    sheet_name: str = "News",
    credentials_path: str = "credentials.json"
) -> None:
    try:
        gc = service_account(filename=credentials_path)
        spreadsheet = gc.open_by_key(spreadsheet_id)
        
        # Загрузка новых данных из JSON
        with open(json_path, 'r', encoding='utf-8') as f:
            new_data = json.load(f)  # list[dict]

        if not new_data:
            print("⚠️ Нет новых данных для экспорта.")
            return

        # Загрузка старых данных из Google Sheets
        try:
            worksheet = spreadsheet.worksheet(sheet_name)
            old_data = worksheet.get_all_records()  # list[dict]
        except exceptions.WorksheetNotFound:
            old_data = []
            worksheet = spreadsheet.add_worksheet(title=sheet_name, rows=len(new_data)+1, cols=10)

        # Объединение + дедупликация (по полю 'title')
        combined_data = old_data + new_data
        unique_data = []
        seen_titles = set()
        for item in reversed(combined_data):  # Чтобы keep='last'
            if item['title'] not in seen_titles:
                seen_titles.add(item['title'])
                unique_data.append(item)
        unique_data.reverse()  # Вернём исходный порядок

        # Подготовка данных для записи
        headers = list(unique_data[0].keys())
        values = [headers] + [list(item.values()) for item in unique_data]

        # Обновление листа
        worksheet.clear()
        worksheet.update(values)
        print(f"\n✅ 4. Данные обновлены с дедупликацией. Уникальных записей: {len(unique_data)}")
        # print("\tСкрипт 'EXPORT_TO_GOOGLE_SHEETS' отработал:", datetime.now().strftime('%H:%M:%S'))

    except exceptions.GSpreadException as e:
        print(f"🚨 Ошибка Google Sheets API: {e}")

    return len(unique_data)

In [19]:
# Пример вызова
export_to_google_sheets(
    json_path="data/output/articles.json",
    spreadsheet_id="1XQBizX0YVDsZ7PaHlNvnEVLwyOxUZuFJo0447ku6Qdg",
    sheet_name="VC.ru News"
)

✅ Данные обновлены с дедупликацией. Уникальных записей: 151
Скрипт отработал: 2025-04-29 20:21:56


151

## Тренируемся на скачанной динамической странице

In [14]:
def load_html(path):
    """Загружает HTML из файла (если существует)."""
    if os.path.exists(path):
        with open(path, 'r', encoding='utf-8') as f:
            return f.read()
    return None

In [15]:
# Парсинг (с кешированием HTML)
TEMP_HTML_PATH = "data/interim/dynamic_page.html"  # Путь для сохранения HTML
html = load_html(TEMP_HTML_PATH)

In [17]:
# Запуск осовной функции
host = "https://vc.ru"
articles_data = parse_articles(html, host)


✅ 2. Парсинг новостей прошёл успешно!


In [18]:
# Вызов функции обновления csv/json
update_data(articles_data)


✅ 3. Данные в CSV/JSON файлах обновлены. Уникальных записей: 175


# Запуск всех функций парсинга

## START_MESSAGE - Отправление сообщения ТелеБОТу о начале работы парсера

In [8]:
# Отправление сообщения ТелеБОТу о начале работы парсера
def start_message(chat_id):
    bot = telebot.TeleBot(config.token)
    message_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    print(f"✅ 0. Бот запущен. Никаких действий не требуется! Время: {message_time}")
    
    bot.send_message(chat_id, f"Запуск парсера новостей с VC.RU:\nНачало парсинга: {message_time}\nИдёт парсинг сайта ...")
    bot.stop_polling()  # Останавливаем бота

In [None]:
# Запуск функции START_MESSAGE()
if __name__ == "__main__":
    start_message(chat_id)

## FINISH_MESSAGE - Отправление сообщения ТелеБОТу об окончании работы парсера

In [9]:
# Отправление сообщения ТелеБОТу об окончании работы парсера
def finish_message(chat_id, ad, ud):
    bot = telebot.TeleBot(config.token)
    message_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    print(f"\n✅ 5. Парсинг завершен: {message_time}\n\tПолучено {ad} записей\n\tУникальных записей: {ud}")
    
    bot.send_message(chat_id, f"Окончание парсинга: {message_time}\nПолучено {ad} записей\nУникальных записей: {ud}")
    bot.stop_polling()  # Останавливаем бота

In [21]:
# Запуск функции START_MESSAGE()
# if __name__ == "__main__":
#     finish_message(chat_id, len(articles_data), len(unique_data))

## START_PARSING - Функция запуска парсинга (основная объединяющая функция)

In [11]:
def start_parsing():
    # Константы:
    chat_id = "1938719365"
    
    # Запуск стартового сообщения ТелеБОТу
    start_message(chat_id)
    
    # Запуск сохранения динамического сайта в HTML
    html = dynamic_parse(max_scrolls=50)
    # --- ДЛЯ ТЕСТОВ ---
    # html = dynamic_parse()
    if html:
        with open("data/interim/dynamic_page.html", "w", encoding="utf-8") as f:
            f.write(html)
        # print("✅ 1. HTML сохранён с динамическим контентом!")
        print("\tСкрипт 'DYNAMIC_PARSE' отработал:", datetime.now().strftime('%H:%M:%S'))

    # Запуск осовной функции
    host = "https://vc.ru"
    articles_data = parse_articles(html, host)
    if articles_data:
        # print("\n✅ 2. Парсинг новостей прошёл успешно!")
        print("\tСкрипт 'PARSE_ARTICLES' отработал:", datetime.now().strftime('%H:%M:%S'))

    # Обновление CSV/JSON с дедупликацией
    update_data(articles_data)
    # print("\n3. ✅ CSV/JSON обновлены!")
    print("\tСкрипт 'UPDATE_DATA' отработал:", datetime.now().strftime('%H:%M:%S'))

    # Обновление Google Sheets
    count_unique_data = export_to_google_sheets(
        json_path="data/output/articles.json",
        spreadsheet_id="1XQBizX0YVDsZ7PaHlNvnEVLwyOxUZuFJo0447ku6Qdg",
        sheet_name="VC.ru News"
    )
    print("\tGoogle Sheets обновлены!")
    print("\tСкрипт 'EXPORT_TO_GOOGLE_SHEETS' отработал:", datetime.now().strftime('%H:%M:%S'))

    # Сообщение ТелеБОТу о завершению парсинга
    finish_message(chat_id, len(articles_data), count_unique_data)

In [11]:
# Запуск функции START_PARSING - выдало ошибку превышения времени ожидания - НАДО ОТРАБОТАТЬ !!
if __name__ == "__main__":
    start_parsing()

Бот запущен. Никаких действий не требуется! Время: 2025-04-29 20:06:34
🚨 Ошибка в dynamic_parse(): ReadTimeoutError - HTTPConnectionPool(host='localhost', port=53985): Read timed out. (read timeout=120)


TypeError: object of type 'NoneType' has no len()

In [12]:
# Запуск функции START_PARSING
if __name__ == "__main__":
    start_parsing()

✅ 0. Бот запущен. Никаких действий не требуется! Время: 2025-04-29 21:53:39
	Прокрутка: 1
	Прокрутка: 2
	Прокрутка: 3
	Прокрутка: 4
	Прокрутка: 5
	Прокрутка: 6
	Прокрутка: 7
	Прокрутка: 8
	Прокрутка: 9
	Прокрутка: 10
	Прокрутка: 11
	Прокрутка: 12
	Прокрутка: 13
	Прокрутка: 14
	Прокрутка: 15
	Прокрутка: 16
	Прокрутка: 17
	Прокрутка: 18
	Прокрутка: 19
	Прокрутка: 20
	Прокрутка: 21
	Прокрутка: 22
	Прокрутка: 23
	Прокрутка: 24
	Прокрутка: 25
	Прокрутка: 26
	Прокрутка: 27
	Прокрутка: 28
	Прокрутка: 29
	Прокрутка: 30
	Прокрутка: 31
	Прокрутка: 32
	Прокрутка: 33
	Прокрутка: 34
	Прокрутка: 35
	Прокрутка: 36
	Прокрутка: 37
	Прокрутка: 38
	Прокрутка: 39
	Прокрутка: 40
	Прокрутка: 41
	Прокрутка: 42
	Прокрутка: 43
	Прокрутка: 44
	Прокрутка: 45
	Прокрутка: 46
	Прокрутка: 47
	Прокрутка: 48
	Прокрутка: 49
	Прокрутка: 50
	Прокручено: 51 раз!

✅ 1. HTML сохранён с динамическим контентом!
	Скрипт 'DYNAMIC_PARSE' отработал: 21:59:54


NameError: name 'HOST' is not defined

## 🔄 **Автоматизация расписания (простой вариант)**

3. **Расписание**:
   - Для ежедневного парсинга предлагаю:
     - **Windows**: Планировщик задач + `.bat`-файл
     - **Linux/macOS**: Cron
     - Альтернатива: GitHub Actions (бесплатно для редких запусков)

In [1]:
import schedule
import time

In [8]:
#Автоматизация расписания (простой вариант)
import schedule_test.py
# Создайте файл `run_parser.py`:
# ```python
# import schedule
# import time

def job():
    print("Парсинг запущен...")
    # Ваш код парсинга и экспорта
    
# schedule.every().day.at("09:00").do(job)
schedule.every(10).seconds.do(job)

while True:
    schedule.run_pending()
    time.sleep(1)
# ```
# Запускайте его в фоне через `nohup python run_parser.py &` (Linux/macOS).

ModuleNotFoundError: No module named 'schedule_test.py'; 'schedule_test' is not a package

### Testing SCHEDULE

In [2]:
# Ниже приведен пример скрипта, который выполняет функцию hello_world() каждые 10 секунд:
# import schedule
# import time
 
def hello_world():
    print("Hello, World!")
 
schedule.every(10).seconds.do(hello_world)
 
while True:
    schedule.run_pending()
    time.sleep(1)

Hello, World!
Hello, World!
Hello, World!


KeyboardInterrupt: 

#### Расширенные возможности планирования
Библиотека schedule предоставляет различные методы для более гибкого планирования задач. Некоторые из них:

every(interval).seconds
every(interval).minutes
every(interval).hours
every().day.at(time)
every().monday.at(time)
every().wednesday.at(time)
every().friday.at(time)

In [None]:
# Пример задачи, выполняющейся каждый день в 10:00:

# import schedule
# import time
 
def daily_task():
    print("Running daily task")
 
schedule.every().day.at("10:00").do(daily_task)
 
while True:
    schedule.run_pending()
    time.sleep(1)

## PostgreSQL & Python

### Пример подключения к PostgreSQL из Python

In [4]:
# !pip install psycopg2

Defaulting to user installation because normal site-packages is not writeable
Collecting psycopg2
  Downloading psycopg2-2.9.10-cp312-cp312-win_amd64.whl.metadata (5.0 kB)
Downloading psycopg2-2.9.10-cp312-cp312-win_amd64.whl (1.2 MB)
   ---------------------------------------- 0.0/1.2 MB ? eta -:--:--
   ---------------------------------------- 0.0/1.2 MB ? eta -:--:--
   ---------------------------------------- 0.0/1.2 MB ? eta -:--:--
   --------- ------------------------------ 0.3/1.2 MB ? eta -:--:--
   --------- ------------------------------ 0.3/1.2 MB ? eta -:--:--
   ------------------ --------------------- 0.5/1.2 MB 621.2 kB/s eta 0:00:02
   ------------------ --------------------- 0.5/1.2 MB 621.2 kB/s eta 0:00:02
   --------------------------- ------------ 0.8/1.2 MB 588.4 kB/s eta 0:00:01
   --------------------------- ------------ 0.8/1.2 MB 588.4 kB/s eta 0:00:01
   ------------------------------------ --- 1.0/1.2 MB 599.4 kB/s eta 0:00:01
   ---------------------------

In [5]:
import psycopg2
from psycopg2 import sql

In [7]:
# import psycopg2
# from psycopg2 import sql

# Подключение
conn = psycopg2.connect(
    dbname="news_parser",
    user="nikart_parser",
    password="arteeva12",
    host="localhost",  # или IP удалённого сервера
    port="5432"
)

UnicodeDecodeError: 'utf-8' codec can't decode byte 0xc2 in position 67: invalid continuation byte

### Создание таблицы

In [None]:
cursor = conn.cursor()
cursor.execute("""
    CREATE TABLE IF NOT EXISTS news (
        id SERIAL PRIMARY KEY,
        title TEXT NOT NULL,
        author TEXT,
        date TIMESTAMP,
        link TEXT UNIQUE  # Защита от дубликатов
    )
""")

### Вставка данных

In [None]:
cursor.execute("""
    INSERT INTO news (title, author, date, link)
    VALUES (%s, %s, %s, %s)
    ON CONFLICT (link) DO NOTHING  # Пропустить дубликаты
""", ("Заголовок", "Автор", "2023-01-01", "https://example.com"))

conn.commit()
cursor.close()
conn.close()