In [None]:
import os
import s3fs
import pandas as pd
import numpy as np

# --- Настройки ---
BASE_URL = "https://d37ci6vzurychx.cloudfront.net/trip-data" 
S3_BUCKET = "nyc-tlc-stats"
AWS_REGION = "us-east-1"
LOCAL_DIR = "../nyc_tlc_data"
CAB_TYPES = ["yellow", "green"]
MONTHS = ["2025-04", "2025-05", "2025-06", "2025-07", "2025-08", "2025-09"] 

os.makedirs(LOCAL_DIR, exist_ok=True)
fs = s3fs.S3FileSystem()

# --- Функции ---
def download_and_upload(cab, month):
    file_name = f"{cab}_tripdata_{month}.parquet"
    local_path = os.path.join(LOCAL_DIR, file_name)
    s3_path = f"s3://{S3_BUCKET}/{file_name}"

    # # Проверяем локально
    # if not os.path.exists(local_path):
    #     url = f"{BASE_URL}/{file_name}"
    #     print(f"Downloading {url} → {local_path}...")
    #     os.system(f"curl -fSL {url} -o {local_path}")
    # else:
    #     print(f"File exists locally: {local_path}")

    # # Проверяем в S3
    # if not fs.exists(s3_path):
    #     print(f"Uploading {local_path} → {s3_path}...")
    #     os.system(f"aws s3 cp {local_path} {s3_path} --region {AWS_REGION}")
    # else:
    #     print(f"File already exists in S3: {s3_path}")

    return local_path

# --- Загрузка и подготовка всех файлов ---
all_dfs = []
for cab in CAB_TYPES:
    for month in MONTHS:
        local_file = download_and_upload(cab, month)
        df_month = pd.read_parquet(local_file)
        df_month["cab_type"] = cab
        all_dfs.append(df_month)

# --- Объединение, очистка и фильтрация ---
df = pd.concat(all_dfs, ignore_index=True)

# Преобразование дат
df["tpep_pickup_datetime"] = pd.to_datetime(df["tpep_pickup_datetime"], errors="coerce")
df["tpep_dropoff_datetime"] = pd.to_datetime(df["tpep_dropoff_datetime"], errors="coerce")
df = df.dropna(subset=["tpep_pickup_datetime", "tpep_dropoff_datetime"])

# Числовые колонки
numeric_cols = ["passenger_count", "trip_distance", "fare_amount", "total_amount",
                "PULocationID", "DOLocationID"]
df[numeric_cols] = df[numeric_cols].apply(pd.to_numeric, errors="coerce")
df = df.dropna(subset=numeric_cols)
df["passenger_count"] = df["passenger_count"].astype(int)
df["PULocationID"] = df["PULocationID"].astype(int)
df["DOLocationID"] = df["DOLocationID"].astype(int)


# Фильтрация выбросов
df = df.query(
    "1 <= passenger_count <= 9 & 0 < trip_distance < 100 & 0 < fare_amount < 1000 & 0 < total_amount < 1000"
)

# Добавляем driver_id и trip_time
np.random.seed(42)
df["driver_id"] = np.random.randint(1, 101, size=len(df))
df["trip_time_min"] = (df["tpep_dropoff_datetime"] - df["tpep_pickup_datetime"]).dt.total_seconds() / 60

# Итоговый DataFrame для ClickHouse
df_clickhouse_clean = df[[
    "cab_type",
    "tpep_pickup_datetime",
    "tpep_dropoff_datetime",
    "driver_id",
    "passenger_count",
    "trip_distance",
    "PULocationID",
    "DOLocationID",
    "fare_amount",
    "total_amount",
    "trip_time_min"
]]

print(df_clickhouse_clean.info())
print(df_clickhouse_clean.head())
