In [5]:
!pip install duckdb -q

[0m

In [1]:
import os
import duckdb
import yaml

In [2]:
with open("params.yaml") as f:
    params = yaml.safe_load(f)
NULL_THRESH = params['bronze']['max_null_frac']   # напр. 0.5
UNIQ_THRESH = params['bronze']['max_unique_frac'] # напр. 0.95

RAW = "data/raw/train.parquet"
OUT = "data/bronze/train/"

os.makedirs(OUT, exist_ok=True)

con = duckdb.connect()

In [3]:
columns = con.execute(f"DESCRIBE SELECT * FROM read_parquet('{RAW}')").fetchall()
column_names = [col[0] for col in columns]

# Получаем общее число строк
row_count = con.execute(f"SELECT COUNT(*) FROM read_parquet('{RAW}')").fetchone()[0]

# Вычисляем статистики по колонкам
stats = []
for col in column_names:
    result = con.execute(f"""
        SELECT
            COUNT(*) FILTER (WHERE {col} IS NULL) AS null_count,
            COUNT(DISTINCT {col}) AS distinct_count
        FROM read_parquet('{RAW}')
    """).fetchone()
    
    stats.append({
        "column": col,
        "null_count": result[0],
        "distinct_count": result[1],
        "null_fraction": result[0] / row_count
    })

# Вывод примера
import pandas as pd
stats_df = pd.DataFrame(stats)
print(stats_df.head())

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

     column  null_count  distinct_count  null_fraction
0   bid_qty           0           45801            0.0
1   ask_qty           0           46819            0.0
2   buy_qty           0          222586            0.0
3  sell_qty           0          223650            0.0
4    volume           0          302667            0.0


In [6]:
stats_df.tail(30)

Unnamed: 0,column,null_count,distinct_count,null_fraction
867,X863,0,502336,0.0
868,X864,0,1,0.0
869,X865,0,525729,0.0
870,X866,0,525887,0.0
871,X867,0,1,0.0
872,X868,0,525699,0.0
873,X869,0,1,0.0
874,X870,0,1,0.0
875,X871,0,1,0.0
876,X872,0,1,0.0


In [7]:
import os
import duckdb
import yaml
import pandas as pd
from datetime import datetime

# ----------------------
# 1. Загрузка параметров
# ----------------------
with open("params.yaml") as f:
    params = yaml.safe_load(f)

NULL_THRESH = params['bronze']['max_null_frac']     # Например, 0.5
UNIQ_THRESH = params['bronze']['max_unique_frac']   # Например, 0.95
RAW = "data/raw/train.parquet"
OUT = "data/bronze/train/"
os.makedirs(OUT, exist_ok=True)

print(f"[{datetime.now()}] Анализ структуры файла: {RAW}")

# ----------------------
# 2. Анализ колонок
# ----------------------
con = duckdb.connect()
columns = con.execute(f"DESCRIBE SELECT * FROM read_parquet('{RAW}')").fetchall()
column_names = [col[0] for col in columns]
row_count = con.execute(f"SELECT COUNT(*) FROM read_parquet('{RAW}')").fetchone()[0]
print(f"[{datetime.now()}] Всего строк: {row_count}, колонок: {len(column_names)}")

stats = []
for col in column_names:
    result = con.execute(f"""
        SELECT
            COUNT(*) FILTER (WHERE {col} IS NULL) AS null_count,
            COUNT(DISTINCT {col}) AS distinct_count
        FROM read_parquet('{RAW}')
    """).fetchone()

    stats.append({
        "column": col,
        "null_count": result[0],
        "distinct_count": result[1],
        "null_fraction": result[0] / row_count
    })

stats_df = pd.DataFrame(stats)
stats_df["distinct_fraction"] = stats_df["distinct_count"] / row_count

# ----------------------
# 3. Отбор колонок по порогам
# ----------------------
rejected_cols = stats_df[
    (stats_df["null_fraction"] > NULL_THRESH) |
    (stats_df["distinct_fraction"] > UNIQ_THRESH)
]
selected_cols = stats_df[
    ~(
        (stats_df["null_fraction"] > NULL_THRESH) |
        (stats_df["distinct_fraction"] > UNIQ_THRESH)
    )
]["column"].tolist()

print(f"[{datetime.now()}] Отклонено {len(rejected_cols)} колонок по порогам. Оставлено: {len(selected_cols)}")
if not selected_cols:
    raise ValueError("Ни одной подходящей колонки не осталось после фильтрации по порогам.")

# ----------------------
# 4. Чтение и фильтрация
# ----------------------
print(f"[{datetime.now()}] Чтение выбранных колонок и удаление строк с пустыми значениями...")
df = con.execute(f"SELECT {', '.join(selected_cols)} FROM read_parquet('{RAW}')").df()
before_drop = len(df)
df_clean = df.dropna(how="all")
after_drop = len(df_clean)
print(f"[{datetime.now()}] Удалено {before_drop - after_drop} строк, где все значения были пустыми")

# ----------------------
# 5. Сохранение в bronze слой
# ----------------------
bronze_path = os.path.join(OUT, "filtered.parquet")
df_clean.to_parquet(bronze_path, index=False)
print(f"[{datetime.now()}] Сохранено {after_drop} строк и {len(selected_cols)} колонок в: {bronze_path}")

[2025-06-20 07:16:38.686939] Анализ структуры файла: data/raw/train.parquet
[2025-06-20 07:16:41.888507] Всего строк: 525887, колонок: 897


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

[2025-06-20 07:23:43.736378] Отклонено 697 колонок по порогам. Оставлено: 200
[2025-06-20 07:23:43.738359] Чтение выбранных колонок и удаление строк с пустыми значениями...


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

[2025-06-20 07:24:27.306707] Удалено 0 строк, где все значения были пустыми
[2025-06-20 07:25:14.342452] Сохранено 525887 строк и 200 колонок в: data/bronze/train/filtered.parquet


In [9]:
import os
import duckdb
import yaml
import pandas as pd
import boto3
from datetime import datetime

# ----------------------
# 1. Загрузка параметров
# ----------------------
with open("params.yaml") as f:
    params = yaml.safe_load(f)

NULL_THRESH    = params['bronze']['max_null_frac']     # напр. 0.5
UNIQ_THRESH    = params['bronze']['max_unique_frac']   # напр. 0.95
BRONZE_BUCKET  = params['bronze'].get('minio_bucket', 'bronze')
RAW            = "data/raw/train.parquet"
LOCAL_OUT_DIR  = "data/bronze/train/"
BUCKET_PREFIX  = params['bronze'].get('minio_prefix', 'train/')

# Переменные окружения для MinIO
S3_ENDPOINT    = os.getenv('S3_ENDPOINT', 'http://localhost:9000')
S3_ACCESS_KEY  = os.getenv('S3_ACCESS_KEY', 'minioadmin')
S3_SECRET_KEY  = os.getenv('S3_SECRET_KEY', 'minioadmin')

# Создаем локальную директорию для bronze
os.makedirs(LOCAL_OUT_DIR, exist_ok=True)

print(f"[{datetime.now()}] Анализ структуры файла: {RAW}")

# ----------------------
# 2. Анализ колонок
# ----------------------
con = duckdb.connect()
columns = con.execute(
    f"DESCRIBE SELECT * FROM read_parquet('{RAW}')"
).fetchall()
column_names = [col[0] for col in columns]
row_count = con.execute(
    f"SELECT COUNT(*) FROM read_parquet('{RAW}')"
).fetchone()[0]
print(f"[{datetime.now()}] Всего строк: {row_count}, колонок: {len(column_names)}")

stats = []
for col in column_names:
    result = con.execute(
        f"""
        SELECT
            COUNT(*) FILTER (WHERE {col} IS NULL) AS null_count,
            COUNT(DISTINCT {col}) AS distinct_count
        FROM read_parquet('{RAW}')
        """
    ).fetchone()
    stats.append({
        "column": col,
        "null_count": result[0],
        "distinct_count": result[1],
        "null_fraction": result[0] / row_count
    })

stats_df = pd.DataFrame(stats)
stats_df["distinct_fraction"] = stats_df["distinct_count"] / row_count

# ----------------------
# 3. Отбор колонок по порогам
# ----------------------
rejected_cols = stats_df[
    (stats_df["null_fraction"] > NULL_THRESH) |
    (stats_df["distinct_fraction"] > UNIQ_THRESH)
]
selected_cols = stats_df[
    ~(
        (stats_df["null_fraction"] > NULL_THRESH) |
        (stats_df["distinct_fraction"] > UNIQ_THRESH)
    )
]["column"].tolist()

print(f"[{datetime.now()}] Отклонено {len(rejected_cols)} колонок. Оставлено: {len(selected_cols)}")
if not selected_cols:
    raise ValueError("Ни одной подходящей колонки не осталось после фильтрации по порогам.")

# ----------------------
# 4. Чтение и фильтрация строк
# ----------------------
print(f"[{datetime.now()}] Чтение выбранных колонок и удаление пустых строк...")
df = con.execute(
    f"SELECT {', '.join(selected_cols)} FROM read_parquet('{RAW}')"
).df()
before_drop = len(df)
df_clean = df.dropna(how="all")
after_drop = len(df_clean)
print(f"[{datetime.now()}] Удалено {before_drop - after_drop} строк")

# ----------------------
# 5. Сохранение локально и в MinIO
# ----------------------
local_path = os.path.join(LOCAL_OUT_DIR, "filtered.parquet")
df_clean.to_parquet(local_path, index=False)
print(f"[{datetime.now()}] Локально сохранено: {local_path}")

# Upload to MinIO via boto3
s3 = boto3.client(
    's3',
    endpoint_url=S3_ENDPOINT,
    aws_access_key_id=S3_ACCESS_KEY,
    aws_secret_access_key=S3_SECRET_KEY
)
remote_key = os.path.join(BUCKET_PREFIX, "filtered.parquet")
s3.upload_file(Filename=local_path, Bucket=BRONZE_BUCKET, Key=remote_key)
print(f"[{datetime.now()}] Загружено в MinIO: s3://{BRONZE_BUCKET}/{remote_key}")

[2025-06-20 08:04:12.744710] Анализ структуры файла: data/raw/train.parquet
[2025-06-20 08:04:13.245113] Всего строк: 525887, колонок: 897


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

[2025-06-20 08:11:20.836587] Отклонено 686 колонок. Оставлено: 211
[2025-06-20 08:11:20.837759] Чтение выбранных колонок и удаление пустых строк...


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

[2025-06-20 08:12:13.824434] Удалено 0 строк
[2025-06-20 08:12:57.665635] Локально сохранено: data/bronze/train/filtered.parquet


EndpointConnectionError: Could not connect to the endpoint URL: "http://localhost:9000/bronze/train/filtered.parquet?uploads"