In [1]:
import os
import logging
import pandas as pd
from sqlalchemy import create_engine, text
import javaobj  # pip install javaobj-py3

# ---------------- НАСТРОЙКИ ----------------

DB_URI = "postgresql://postgres:smartgrid@172.31.168.2/solar_db"
user_object_id = 70
OUTPUT_DIR = "data"

# -------------------------------------------

os.makedirs(OUTPUT_DIR, exist_ok=True)

logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)

# Создаём engine для подключения к PostgreSQL
engine = create_engine(DB_URI)

# ========== ВСПОМОГАТЕЛЬНЫЕ ФУНКЦИИ ==========

def deserialize_java_object(binary_value):
    """Десериализация бинарного поля (bytea) в Java-объект через javaobj.loads."""
    if binary_value is None:
        return None
    if isinstance(binary_value, memoryview):
        binary_value = binary_value.tobytes()
    try:
        return javaobj.loads(binary_value)
    except Exception as e:
        logger.error("Грешка при десериализация на Java обект: %s", e)
        return None


def unwrap_value(obj):
    """Если obj имеет атрибут .value (JavaDouble/JavaInteger/JavaString) — вернуть obj.value."""
    return getattr(obj, "value", obj)


def extract_forecast_data(forecast_obj):
    """
    Извлекает из forecast_obj список записей:
      - time   (строка "YYYY-MM-DD HH:MM")
      - temp_c (float)
      - cloud  (int)
    """
    days = getattr(forecast_obj, "forecastday", None)
    if days is None:
        return []
    try:
        days = list(days)
    except Exception as e:
        logger.error("Неуспешно преобразуване на forecastday в списък: %s", e)
        return []

    data = []
    for day in days:
        hours = getattr(day, "hour", None)
        if hours is None:
            continue
        try:
            hours = list(hours)
        except Exception as e:
            logger.error("Неуспешно преобразуване на hour в списък: %s", e)
            continue

        for hour in hours:
            t_raw     = getattr(hour, "time", None)
            temp_raw  = getattr(hour, "tempC", None)
            cloud_raw = getattr(hour, "cloud", None)

            t_str     = unwrap_value(t_raw)
            temp_val  = unwrap_value(temp_raw)
            cloud_val = unwrap_value(cloud_raw)

            try:
                temp_c = float(temp_val) if temp_val is not None else None
            except Exception:
                temp_c = None
            try:
                cloud = int(cloud_val) if cloud_val is not None else None
            except Exception:
                cloud = None

            if t_str:
                data.append({
                    "time": str(t_str),
                    "temp_c": temp_c,
                    "cloud": cloud
                })

    return data


def extract_weather_from_db(user_object_id, prediction_date):
    """
    1) SELECT current_data FROM weather_data
    2) десериализация Java-объекта
    3) извлечение forecast → extract_forecast_data
    4) DataFrame, to_datetime, ресемплинг 15 минут, интерполяция
    Возвращает DataFrame с колонками: time, temp_c, cloud
    """
    sql = text("""
        SELECT current_data
        FROM weather_data
        WHERE user_object_id = :uid
          AND date = :dt
        ORDER BY id
        LIMIT 1
    """)

    with engine.connect() as conn:
        row = conn.execute(sql, {"uid": user_object_id, "dt": prediction_date}).fetchone()

    if not row:
        logger.error("Няма записи за user_object_id=%s на %s", user_object_id, prediction_date)
        return None

    root_obj = deserialize_java_object(row[0])
    if root_obj is None:
        return None

    forecast_obj = getattr(root_obj, "forecast", None)
    if forecast_obj is None:
        logger.error("В десериализирания обект няма атрибут forecast")
        return None

    raw = extract_forecast_data(forecast_obj)
    if not raw:
        logger.error("extract_forecast_data върна празен списък")
        return None

    df = pd.DataFrame(raw)

    try:
        df["time"] = pd.to_datetime(df["time"], format="%Y-%m-%d %H:%M")
    except Exception as e:
        logger.error("Неуспешна конверсия на време: %s", e)
        return None

    df = df.drop_duplicates(subset="time").set_index("time").sort_index()

    for col in ["temp_c", "cloud"]:
        if col in df.columns:
            df[col] = pd.to_numeric(df[col], errors="coerce")

    if df[["temp_c", "cloud"]].isna().all().all():
        logger.error("Няма валидни числа в temp_c или cloud за интерполация")
        return None

    try:
        df15 = df.resample("15min").interpolate(method="linear")
    except Exception as e:
        logger.error("Грешка при ресемплиране: %s", e)
        return None

    df15 = df15.reset_index()
    df15["time"] = df15["time"].dt.strftime("%Y-%m-%d %H:%M")
    return df15

# ========== ОСНОВНАЯ ЛОГИКА: ВЫТАЩИТЬ ВСЁ И В ОДИН ФАЙЛ ==========

# 1. Берём все доступные даты из weather_data для этого объекта
sql_dates = text("""
    SELECT DISTINCT date
    FROM weather_data
    WHERE user_object_id = :uid
    ORDER BY date
""")

with engine.connect() as conn:
    df_dates = pd.read_sql(sql_dates, conn, params={"uid": user_object_id})

available_dates = pd.to_datetime(df_dates["date"]).dt.strftime("%Y-%m-%d").tolist()
print("Найдено дат с погодой:", len(available_dates))
print("Первые 5 дат:", available_dates[:5])

# 2. Итеративно вытаскиваем погоду и складываем в один список
all_dfs = []

for prediction_date in available_dates:
    print(f"Обрабатываем {prediction_date} ...")
    df_day = extract_weather_from_db(user_object_id, prediction_date)

    if df_day is None or df_day.empty:
        print(f"  ⚠ Нет данных (после десериализации) для {prediction_date}")
        continue

    df_day["date"] = prediction_date
    df_day["user_object_id"] = user_object_id

    df_day["time"] = pd.to_datetime(df_day["time"])
    all_dfs.append(df_day)

# 3. Склеиваем всё и сохраняем в один файл
if not all_dfs:
    print("❌ Не удалось собрать ни одного дня погоды.")
else:
    weather_full = pd.concat(all_dfs, ignore_index=True)
    weather_full = weather_full.sort_values("time").reset_index(drop=True)

    out_csv = os.path.join(OUTPUT_DIR, f"weather_uid_{user_object_id}_full.csv")
    out_pq  = os.path.join(OUTPUT_DIR, f"weather_uid_{user_object_id}_full.parquet")

    weather_full.to_csv(out_csv, index=False, encoding="utf-8")
    weather_full.to_parquet(out_pq, index=False)

    print("✅ Готово.")
    print("CSV:", out_csv)
    print("Parquet:", out_pq)
    display(weather_full.head())



Найдено дат с погодой: 296
Первые 5 дат: ['2025-01-28', '2025-01-29', '2025-01-30', '2025-01-31', '2025-02-01']
Обрабатываем 2025-01-28 ...
  ⚠ Нет данных (после десериализации) для 2025-01-28
Обрабатываем 2025-01-29 ...
  ⚠ Нет данных (после десериализации) для 2025-01-29
Обрабатываем 2025-01-30 ...
Обрабатываем 2025-01-31 ...
Обрабатываем 2025-02-01 ...
Обрабатываем 2025-02-02 ...
Обрабатываем 2025-02-03 ...
Обрабатываем 2025-02-04 ...
Обрабатываем 2025-02-05 ...
Обрабатываем 2025-02-06 ...
Обрабатываем 2025-02-07 ...
Обрабатываем 2025-02-08 ...
Обрабатываем 2025-02-09 ...
Обрабатываем 2025-02-10 ...
Обрабатываем 2025-02-11 ...
Обрабатываем 2025-02-12 ...
Обрабатываем 2025-02-13 ...
Обрабатываем 2025-02-14 ...
Обрабатываем 2025-02-15 ...
Обрабатываем 2025-02-16 ...
Обрабатываем 2025-02-17 ...
Обрабатываем 2025-02-18 ...
Обрабатываем 2025-02-19 ...
Обрабатываем 2025-02-20 ...
Обрабатываем 2025-02-21 ...
Обрабатываем 2025-02-22 ...
Обрабатываем 2025-02-23 ...
Обрабатываем 2025-02-24 ..

ImportError: Unable to find a usable engine; tried using: 'pyarrow', 'fastparquet'.
A suitable version of pyarrow or fastparquet is required for parquet support.
Trying to import the above resulted in these errors:
 - Missing optional dependency 'pyarrow'. pyarrow is required for parquet support. Use pip or conda to install pyarrow.
 - Missing optional dependency 'fastparquet'. fastparquet is required for parquet support. Use pip or conda to install fastparquet.