In [1]:
import polars as pl

import warnings
import os
os.chdir('/home/denisalpino/dev/FinABYSS')

In [2]:
warnings.simplefilter("ignore", Warning)

In [None]:
title_lower = pl.col("title").str.to_lowercase()
condition_title = (
    # PR Newswire группа
    (pl.col("source") == "PR Newswire") & (
        title_lower.str.contains("releases monthly portfolio", literal=True) |
        title_lower.str.contains("announce distribution rates", literal=True) |
        title_lower.str.ends_with("results of operations")
    )
) | (
    # Business Wire группа
    (pl.col("source") == "Business Wire") & (
        title_lower.str.contains("advisory for", literal=True) |
        title_lower.str.contains("monthly disclosure of the total number of shares and voting rights", literal=True) |
        title_lower.str.contains("early repurchase", literal=True) |
        title_lower.str.contains("early redemption", literal=True) |
        title_lower.str.contains("issue of debt", literal=True) |
        title_lower.str.contains("variable rate fix", literal=True) |
        title_lower.str.starts_with("eurofins: purchases of own shares") |
        title_lower.str.starts_with("eurofins scientific") |
        title_lower.str.ends_with("short interest report") |
        title_lower.str.ends_with("home member state election")
    )
) | (
    # Josh Jenke группа
    (pl.col("source") == "Josh Jenke, Peoria Journal Star") &
        title_lower.str.starts_with("3 of the best")
) | (
    # Public Record группа
    (pl.col("source") == "Public Record") &
        title_lower.str.contains("vendors licenses", literal=True)
) | (
    # Investing.com группа
    (pl.col("source") == "Investing.com") &
        title_lower.str.starts_with("watch live:")
) | (
    # Общее условие для заголовка
    title_lower.str.starts_with('aem united states ag tractor and combine report') |
    title_lower.str.contains("net asset value", literal=True) |
    title_lower.str.contains("media advisory", literal=True) |
    title_lower.str.contains("form 8", literal=True) |
    title_lower.str.contains("block listing interim review", literal=True) |
    title_lower.str.contains("holdings in company", literal=True) |
    title_lower.str.contains("holding(s) in company", literal=True) |
    title_lower.str.contains("director/pdmr shareholding", literal=True) |
    title_lower.str.contains("director / pdmr notification", literal=True) |
    title_lower.str.contains("null", literal=True)
) | (
    # GlobeNewswire группа
    (pl.col("source") == "GlobeNewswire") & (
        title_lower.str.starts_with('key digital') | # Под вопросом (слишком общая формулировка)
        title_lower.str.starts_with("issue of equity") |
        title_lower.str.starts_with("delisting of certificates") |
        title_lower.str.contains("buy-back program", literal=True) |
        title_lower.str.contains("disclosure of trading in own shares", literal=True) |
        title_lower.str.contains("voting rights", literal=True) |
        title_lower.str.contains("notification of major holdings", literal=True) |
        title_lower.str.contains("investors calendar", literal=True) |
        title_lower.str.contains("recall", literal=True) |
        title_lower.str.contains("public health alert", literal=True) |
        title_lower.str.starts_with("conditions for")
    )
)

In [None]:
text_lower = pl.col("text").str.to_lowercase()
condition_text = (
    # GlobeNewswire группа
    (pl.col("source") == "GlobeNewswire") & (
        text_lower.str.starts_with('attachment')
    )
) | (
    # Sarah Ferguson группа
    (pl.col("source") == "Sarah Ferguson") &
        text_lower.str.starts_with("(sponsored)")
)

In [4]:
if not os.path.exists("data/preprocessed/"):
    os.mkdir("data/preprocessed/")

In [None]:
# Настраиваем потоковый режим: устанавливаем размер чанка
with pl.Config() as cfg:
    cfg.set_streaming_chunk_size(25_000)

    # Формируем lazy-план обработки данных
    plan = (
        pl.scan_parquet("data/raw/articles.parquet")
        # delete all spaces from sides
        .with_columns(
            pl.col("text").str.strip_chars().alias("text")
        )
        # filter only non-empty texts
        .filter(pl.col("text").str.len_chars() > 0)
        # filter not representative texts by titles
        .filter(~condition_title)
        # filter not representative texts by texts
        .filter(~condition_text)
        # cast the column to its corresponding type
        .with_columns(
            pl.col("datetime")
              .str.to_datetime(format="%Y-%m-%dT%H:%M:%S%.fZ", time_zone="UTC")
              .alias("datetime")
        )
        .unique(subset=["text"]) # cant compare by column assets because of internal library error
        .sort("datetime") # sorting by datetime asceding
        .sink_parquet("data/preprocessed/articles.parquet", engine="streaming")
    )

In [6]:
actual_size = os.path.getsize("data/preprocessed/articles.parquet")

print(f"Real size on disk: {actual_size / 1024**3:.2f} GB")

Real size on disk: 2.03 GB
