In [7]:
import sys
from pathlib import Path
sys.path.insert(0, str(Path.cwd().parent))

from src.data.load_data import load_raw_data
from src.data.data_profiler import check_data
import polars as pl

df_January = load_raw_data(year=2023, month=1)
df_February = load_raw_data(year=2023, month=2)
df_March = load_raw_data(year=2023, month=3)
df_April = load_raw_data(year=2023, month=4)
df_May = load_raw_data(year=2023,month=5)
df_June  = load_raw_data(year=2023, month=6)
df_July = load_raw_data(year=2023, month=7)
df_August = load_raw_data(year=2023, month=8)
df_September  = load_raw_data(year=2023, month=9)
df_October  = load_raw_data(year=2023, month=10)
df_November = load_raw_data(year=2023, month=11)
df_December = load_raw_data(year=2023, month=12)

In [8]:
dfs = [
    df_January, df_February, df_March, df_April, df_May, df_June,
    df_July, df_August, df_September, df_October, df_November, df_December
]

df_full = pl.concat(dfs)

In [9]:
check_data(df_full)


[96m[1m
╔══════════════════════════════════════════════════════════╗
║               DATA PROFILING REPORT                      ║
╚══════════════════════════════════════════════════════════╝
[0m

  БАЗОВАЯ ИНФОРМАЦИЯ

[1mРазмер датасета:[0m
  Строк: 5,719,877
  Колонок: 13
  Размер в памяти: 746.75 MB

[1mТипы данных:[0m
  Datetime(time_unit='us', time_zone=None): 2 колонок
  Float64: 4 колонок
  String: 7 колонок

[1mКолонки:[0m
   1. ride_id                        (String)
   2. rideable_type                  (String)
   3. started_at                     (Datetime(time_unit='us', time_zone=None))
   4. ended_at                       (Datetime(time_unit='us', time_zone=None))
   5. start_station_name             (String)
   6. start_station_id               (String)
   7. end_station_name               (String)
   8. end_station_id                 (String)
   9. start_lat                      (Float64)
  10. start_lng                      (Float64)
  11. end_lat             

In [14]:
import polars as pl

def clean_rides_data(df: pl.DataFrame) -> pl.DataFrame:
    """
    Основная функция очистки данных о поездках
    """

    df_clean = df.clone()
    
    print("Начинаем очистку ...")
    print(f"Исходный размер: {df_clean.shape}")
    
    
    # Убедимся, что временные метки в правильном формате
    df_clean = df_clean.with_columns([
        pl.col("started_at").cast(pl.Datetime(time_unit="us")),
        pl.col("ended_at").cast(pl.Datetime(time_unit="us"))
    ])
    
    CHICAGO_LAT_BOUNDS = (41.6, 42.2)
    CHICAGO_LNG_BOUNDS = (-88.0, -87.5)
    
    coord_condition = (
        pl.col("start_lat").is_between(*CHICAGO_LAT_BOUNDS) &
        pl.col("start_lng").is_between(*CHICAGO_LNG_BOUNDS) &
        pl.col("end_lat").is_between(*CHICAGO_LAT_BOUNDS) &
        pl.col("end_lng").is_between(*CHICAGO_LNG_BOUNDS)
    )
    
    zero_coord_condition = (
        (pl.col("end_lat") != 0) & 
        (pl.col("end_lng") != 0)
    )
    
    df_clean = df_clean.filter(coord_condition & zero_coord_condition)
    print(f"После фильтрации координат: {df_clean.shape}")
    
    # длительность в минутах
    df_clean = df_clean.with_columns([
        ((pl.col("ended_at") - pl.col("started_at")).dt.total_seconds() / 60).alias("duration_minutes")
    ])
    
    time_condition = (
        (pl.col("duration_minutes") > 0) &  # Длительность положительная
        (pl.col("duration_minutes") <= 24 * 60) &  # Не более суток
        (pl.col("ended_at") >= pl.col("started_at"))  # Конец после начала
    )
    
    df_clean = df_clean.filter(time_condition & (pl.col("duration_minutes") >= 1))
    print(f"После фильтрации по времени: {df_clean.shape}")
    
    missing_analysis = df_clean.group_by("rideable_type").agg([
        pl.col("start_station_name").is_null().mean().alias("pct_missing_start_station"),
        pl.col("end_station_name").is_null().mean().alias("pct_missing_end_station"),
        pl.count().alias("count")
    ]).sort("count", descending=True)
    
    print("\nАнализ пропусков по типам самокатов:")
    print(missing_analysis)
    
    # Обычно electric bikes могут не иметь станций (бесдоковые)
    # Но для классических bikes станции должны быть
    
    #флаги для поездок без станций
    df_clean = df_clean.with_columns([
        pl.col("start_station_name").is_null().alias("missing_start_station"),
        pl.col("end_station_name").is_null().alias("missing_end_station")
    ])
    
    
    # Проверяем уникальность ride_id
    duplicate_ids = df_clean.group_by("ride_id").agg(pl.count().alias("count")).filter(pl.col("count") > 1)
    if duplicate_ids.height > 0:
        print(f"\nНайдено {duplicate_ids.height} дубликатов ride_id")
        # Оставляем первую запись для каждого ride_id
        df_clean = df_clean.unique(subset=["ride_id"], keep="first")

    valid_rideable_types = ["electric_bike", "classic_bike", "docked_bike"]
    valid_member_types = ["member", "casual"]
    
    df_clean = df_clean.filter(
        pl.col("rideable_type").is_in(valid_rideable_types) &
        pl.col("member_casual").is_in(valid_member_types)
    )
    
    # Поездки без конечной станции И без конечных координат
    potential_issues = df_clean.filter(
        pl.col("missing_end_station") & 
        ((pl.col("end_lat").is_null()) | (pl.col("end_lng").is_null()))
    )
    
    print(f"\nПотенциальные проблемные поездки (без конечной точки): {potential_issues.height}")
    
    # Поездки с очень большой длительностью
    long_rides = df_clean.filter(pl.col("duration_minutes") > 12 * 60)  # Более 12 часов
    print(f"Поездки длительностью > 12 часов: {long_rides.height}")

    # Добавляем день недели, час и месяц для анализа
    df_clean = df_clean.with_columns([
        pl.col("started_at").dt.weekday().alias("day_of_week"),
        pl.col("started_at").dt.hour().alias("hour_of_day"),
        pl.col("started_at").dt.month().alias("month"),
        pl.col("started_at").dt.date().alias("date")
    ])
    
    df_clean_version = df_clean.filter(
        ~pl.col("missing_start_station") & 
        ~pl.col("missing_end_station")
    )
    
    print(f"\nФинальные размеры:")
    print(f"- Полный очищенный набор: {df_clean.shape}")
    print(f"- Только поездки со станциями: {df_clean_version.shape}")
    
    # отчет об очистке
    report = {
        "initial_rows": df.height,
        "final_rows": df_clean.height,
        "rows_removed": df.height - df_clean.height,
        "pct_removed": round((1 - df_clean.height / df.height) * 100, 2),
        "missing_start_station_pct": round(df_clean["missing_start_station"].mean() * 100, 2),
        "missing_end_station_pct": round(df_clean["missing_end_station"].mean() * 100, 2),
        "avg_duration_minutes": round(df_clean["duration_minutes"].mean(), 2),
        "median_duration_minutes": round(df_clean["duration_minutes"].median(), 2)
    }
    
    print("\nОтчет об очистке:")
    for key, value in report.items():
        print(f"  {key}: {value}")
    
    return df_clean, df_clean_version, report

df_clean, df_clean_with_stations, report = clean_rides_data(df_full)

# Доп анализ аномалий
def analyze_anomalies(df: pl.DataFrame):
    """
    Анализ потенциальных аномалий и краж
    """
    print("\n" + "="*50)
    print("АНАЛИЗ АНОМАЛИЙ")
    print("="*50)
    
    # Поездки без начальной станции
    no_start_station = df.filter(pl.col("missing_start_station"))
    print(f"1. Поездки без начальной станции: {no_start_station.height}")
    print("   Распределение по типам самокатов:")
    print(no_start_station.group_by("rideable_type").agg(pl.count()).sort("count", descending=True))
    
    # Поездки без конечной станции
    no_end_station = df.filter(pl.col("missing_end_station"))
    print(f"\n2. Поездки без конечной станции: {no_end_station.height}")
    print("   Распределение по типам пользователей:")
    print(no_end_station.group_by("member_casual").agg(pl.count()).sort("count", descending=True))
    
    # Поездки с очень короткой/длинной длительностью
    print(f"\n3. Анализ длительности поездок:")
    print(f"   Менее 5 минут: {df.filter(pl.col('duration_minutes') < 5).height}")
    print(f"   Более 3 часов: {df.filter(pl.col('duration_minutes') > 180).height}")
    
    # Проверка круглосуточных поездок
    overnight_rides = df.filter(
        (pl.col("duration_minutes") > 60 * 6) &  # Более 6 часов
        (pl.col("started_at").dt.hour() >= 18) &  # Начало вечером
        (pl.col("ended_at").dt.hour() <= 8)       # Окончание утром
    )
    print(f"\n4. Ночные длительные поездки (>6 часов): {overnight_rides.height}")
    
    # Анализ по дням недели
    print(f"\n5. Распределение поездок по дням недели:")
    day_of_week_stats = df.group_by("day_of_week").agg([
        pl.count().alias("count"),
        pl.col("duration_minutes").median().alias("median_duration")
    ]).sort("day_of_week")
    print(day_of_week_stats)
    
    return {
        "no_start_station": no_start_station,
        "no_end_station": no_end_station,
        "overnight_rides": overnight_rides
    }

# Запускаем анализ
anomalies = analyze_anomalies(df_clean)

# Сохранение очищенных данных
def save_cleaned_data(df_full_clean: pl.DataFrame, df_with_stations: pl.DataFrame):
    """
    Сохранение очищенных данных
    """
    df_full_clean.write_parquet("data/processed/rides_2023_cleaned.parquet")
    df_with_stations.write_parquet("data/processed/rides_2023_with_stations.parquet")   
    df_with_stations.write_csv("data/processed/rides_2023_with_stations_sample.csv")
    
    print("\nДанные сохранены:")
    print("- rides_2023_cleaned.parquet (все очищенные данные)")
    print("- rides_2023_with_stations.parquet (только со станциями)")
    print("- rides_2023_with_stations_sample.csv (выборка для просмотра)")


save_cleaned_data(df_clean, df_clean_with_stations)

Начинаем очистку ...
Исходный размер: (5719877, 13)
После фильтрации координат: (5712878, 13)
После фильтрации по времени: (5563046, 14)

Анализ пропусков по типам самокатов:
shape: (3, 4)
┌───────────────┬───────────────────────────┬─────────────────────────┬─────────┐
│ rideable_type ┆ pct_missing_start_station ┆ pct_missing_end_station ┆ count   │
│ ---           ┆ ---                       ┆ ---                     ┆ ---     │
│ str           ┆ f64                       ┆ f64                     ┆ u32     │
╞═══════════════╪═══════════════════════════╪═════════════════════════╪═════════╡
│ electric_bike ┆ 0.292941                  ┆ 0.304124                ┆ 2837942 │
│ classic_bike  ┆ 0.000013                  ┆ 0.000074                ┆ 2649728 │
│ docked_bike   ┆ 0.0                       ┆ 0.0                     ┆ 75376   │
└───────────────┴───────────────────────────┴─────────────────────────┴─────────┘


(Deprecated in version 0.20.5)
  pl.count().alias("count")
(Deprecated in version 0.20.5)
  duplicate_ids = df_clean.group_by("ride_id").agg(pl.count().alias("count")).filter(pl.col("count") > 1)



Потенциальные проблемные поездки (без конечной точки): 0
Поездки длительностью > 12 часов: 2668

Финальные размеры:
- Полный очищенный набор: (5563046, 20)
- Только поездки со станциями: (4244293, 20)

Отчет об очистке:
  initial_rows: 5719877
  final_rows: 5563046
  rows_removed: 156831
  pct_removed: 2.74
  missing_start_station_pct: 14.94
  missing_end_station_pct: 15.52
  avg_duration_minutes: 15.49
  median_duration_minutes: 9.78

АНАЛИЗ АНОМАЛИЙ
1. Поездки без начальной станции: 831383
   Распределение по типам самокатов:
shape: (2, 2)
┌───────────────┬────────┐
│ rideable_type ┆ count  │
│ ---           ┆ ---    │
│ str           ┆ u32    │
╞═══════════════╪════════╡
│ electric_bike ┆ 831349 │
│ classic_bike  ┆ 34     │
└───────────────┴────────┘

2. Поездки без конечной станции: 863282
   Распределение по типам пользователей:
shape: (2, 2)
┌───────────────┬────────┐
│ member_casual ┆ count  │
│ ---           ┆ ---    │
│ str           ┆ u32    │
╞═══════════════╪════════╡
│ me

(Deprecated in version 0.20.5)
  print(no_start_station.group_by("rideable_type").agg(pl.count()).sort("count", descending=True))
(Deprecated in version 0.20.5)
  print(no_end_station.group_by("member_casual").agg(pl.count()).sort("count", descending=True))
(Deprecated in version 0.20.5)
  pl.count().alias("count"),


   Менее 5 минут: 1108732
   Более 3 часов: 14268

4. Ночные длительные поездки (>6 часов): 848

5. Распределение поездок по дням недели:
shape: (7, 3)
┌─────────────┬────────┬─────────────────┐
│ day_of_week ┆ count  ┆ median_duration │
│ ---         ┆ ---    ┆ ---             │
│ i8          ┆ u32    ┆ f64             │
╞═════════════╪════════╪═════════════════╡
│ 1           ┆ 709983 ┆ 9.166667        │
│ 2           ┆ 801075 ┆ 9.15            │
│ 3           ┆ 813555 ┆ 9.1             │
│ 4           ┆ 836570 ┆ 9.233333        │
│ 5           ┆ 820092 ┆ 9.666667        │
│ 6           ┆ 858281 ┆ 11.45           │
│ 7           ┆ 723490 ┆ 11.316667       │
└─────────────┴────────┴─────────────────┘

Данные сохранены:
- rides_2023_cleaned.parquet (все очищенные данные)
- rides_2023_with_stations.parquet (только со станциями)
- rides_2023_with_stations_sample.csv (выборка для просмотра)


In [15]:
import os
from pathlib import Path

def save_cleaned_data(df_full_clean: pl.DataFrame, df_with_stations: pl.DataFrame):
    """
    Сохранение очищенных данных с созданием директорий
    """
    processed_dir = Path("data/processed")
    processed_dir.mkdir(parents=True, exist_ok=True)
    
    full_path = processed_dir / "rides_2023_cleaned.parquet"
    stations_path = processed_dir / "rides_2023_with_stations.parquet"
    
    df_full_clean.write_parquet(str(full_path))
    df_with_stations.write_parquet(str(stations_path))
    sample_path = processed_dir / "rides_2023_with_stations_sample.csv"
    df_with_stations.head(100000).write_csv(str(sample_path))
    
    print("\nДанные успешно сохранены:")
    print(f"   - {full_path}")
    print(f"   - {stations_path}")
    print(f"   - {sample_path} (первые 100k строк)")
    
    return full_path, stations_path

# Сохраняем данные
full_path, stations_path = save_cleaned_data(df_clean, df_clean_with_stations)


Данные успешно сохранены:
   - data/processed/rides_2023_cleaned.parquet
   - data/processed/rides_2023_with_stations.parquet
   - data/processed/rides_2023_with_stations_sample.csv (первые 100k строк)


In [16]:
def analyze_cleaning_results(df_clean: pl.DataFrame, df_with_stations: pl.DataFrame):
    """
    Глубокий анализ результатов очистки
    """
    print("\n" + "="*60)
    print("ГЛУБОКИЙ АНАЛИЗ РЕЗУЛЬТАТОВ ОЧИСТКИ")
    print("="*60)
    
    print("\n1. РАСПРЕДЕЛЕНИЕ ПО ТИПАМ ПОЛЬЗОВАТЕЛЕЙ:")
    user_analysis = df_clean.group_by("member_casual").agg([
        pl.len().alias("total_rides"),
        (pl.col("duration_minutes").mean() / 60).alias("avg_hours"),
        pl.col("duration_minutes").median().alias("median_minutes"),
        (pl.col("rideable_type") == "electric_bike").mean().alias("pct_electric"),
        (pl.col("missing_start_station")).mean().alias("pct_no_start_station"),
        (pl.col("missing_end_station")).mean().alias("pct_no_end_station")
    ])
    print(user_analysis)
    
    print("\n2. СЕЗОННОСТЬ ПОЕЗДОК ПО МЕСЯЦАМ:")
    monthly_analysis = df_clean.group_by("month").agg([
        pl.len().alias("rides_count"),
        pl.col("duration_minutes").mean().alias("avg_duration_min"),
        (pl.col("member_casual") == "member").mean().alias("pct_members"),
        (pl.col("rideable_type") == "electric_bike").mean().alias("pct_electric")
    ]).sort("month")
    print(monthly_analysis)
    
    print("\n3. АНАЛИЗ ПО ЧАСАМ СУТОК:")
    hourly_analysis = df_clean.group_by("hour_of_day").agg([
        pl.len().alias("rides_count"),
        pl.col("duration_minutes").mean().alias("avg_duration_min"),
        (pl.col("member_casual") == "casual").mean().alias("pct_casual")
    ]).sort("hour_of_day")
    
    max_count = hourly_analysis["rides_count"].max()
    print("\n   Часы с максимальной нагрузкой:")
    for row in hourly_analysis.filter(pl.col("rides_count") > max_count * 0.9).rows():
        hour, count, avg_dur, pct_casual = row
        print(f"   {hour:02d}:00 - {count:7d} поездок, средняя {avg_dur:.1f} мин, {pct_casual*100:.1f}% casual")
    
    print("\n4. ДЛИТЕЛЬНЫЕ ПОЕЗДКИ (более 3 часов):")
    long_rides = df_clean.filter(pl.col("duration_minutes") > 180)
    print(f"   Всего: {long_rides.height} поездок")
    print(f"   Максимальная длительность: {long_rides['duration_minutes'].max()/60:.1f} часов")
    
    long_by_user = long_rides.group_by("member_casual").agg([
        pl.len().alias("count"),
        pl.col("duration_minutes").median().alias("median_hours")
    ])
    print("   По типам пользователей:")
    for row in long_by_user.rows():
        user_type, count, median = row
        print(f"   - {user_type}: {count} поездок, медиана {median/60:.1f} часов")
    
    print("\n5. ТОП-10 ПОПУЛЯРНЫХ МАРШРУТОВ:")
    popular_routes = df_with_stations.group_by([
        "start_station_name", "end_station_name"
    ]).agg([
        pl.len().alias("ride_count"),
        pl.col("duration_minutes").median().alias("median_duration_min")
    ]).sort("ride_count", descending=True).head(10)
    print(popular_routes)
    
    print("\n6. СРАВНЕНИЕ ТИПОВ САМОКАТОВ:")
    bike_comparison = df_clean.group_by("rideable_type").agg([
        pl.len().alias("total_rides"),
        (pl.len() / df_clean.height).alias("pct_of_total"),
        pl.col("duration_minutes").mean().alias("avg_duration_min"),
        (pl.col("member_casual") == "member").mean().alias("pct_members"),
        (pl.col("missing_start_station")).mean().alias("pct_missing_start"),
        (pl.col("missing_end_station")).mean().alias("pct_missing_end")
    ]).sort("total_rides", descending=True)
    print(bike_comparison)
    
    return {
        "user_analysis": user_analysis,
        "monthly_analysis": monthly_analysis,
        "hourly_analysis": hourly_analysis,
        "long_rides": long_rides,
        "popular_routes": popular_routes,
        "bike_comparison": bike_comparison
    }

analytics = analyze_cleaning_results(df_clean, df_clean_with_stations)


ГЛУБОКИЙ АНАЛИЗ РЕЗУЛЬТАТОВ ОЧИСТКИ

1. РАСПРЕДЕЛЕНИЕ ПО ТИПАМ ПОЛЬЗОВАТЕЛЕЙ:
shape: (2, 7)
┌──────────────┬─────────────┬───────────┬──────────────┬──────────────┬─────────────┬─────────────┐
│ member_casua ┆ total_rides ┆ avg_hours ┆ median_minut ┆ pct_electric ┆ pct_no_star ┆ pct_no_end_ │
│ l            ┆ ---         ┆ ---       ┆ es           ┆ ---          ┆ t_station   ┆ station     │
│ ---          ┆ u32         ┆ f64       ┆ ---          ┆ f64          ┆ ---         ┆ ---         │
│ str          ┆             ┆           ┆ f64          ┆              ┆ f64         ┆ f64         │
╞══════════════╪═════════════╪═══════════╪══════════════╪══════════════╪═════════════╪═════════════╡
│ member       ┆ 3563336     ┆ 0.206084  ┆ 8.75         ┆ 0.49794      ┆ 0.147563    ┆ 0.144081    │
│ casual       ┆ 1999710     ┆ 0.350997  ┆ 12.15        ┆ 0.531885     ┆ 0.152806    ┆ 0.174961    │
└──────────────┴─────────────┴───────────┴──────────────┴──────────────┴─────────────┴─────────────