In [37]:
import polars as pl
from pathlib import Path
from collections import Counter
import pathlib

RAW_DIR   = Path("raw")
NULLS     = ["\\N", "NULL", ""]

# capturamos solo los encabezados
schema_largos = Counter()     
encabezados   = {}             

for csv in RAW_DIR.rglob("*.csv"):
    if "__MACOSX" in csv.parts:
        continue
    header = pl.read_csv(csv, n_rows=0, null_values=NULLS).columns
    encabezados[csv.name] = header
    schema_largos[len(header)] += 1

print("Distribución de cantidad de columnas por archivo:")
for cols, cantidad in schema_largos.items():
    print(f"  {cols} columnas  en {cantidad} archivos")


Distribución de cantidad de columnas por archivo:
  15 columnas  en 48 archivos


In [38]:
# función snake_case para comparar
def to_snake(s: str) -> str:
    return s.strip().lower().replace(" ", "_")

variantes = {}   # {nombre_normalizado: {variantes_originales}}

for header in encabezados.values():
    for col in header:
        norm = to_snake(col)
        variantes.setdefault(norm, set()).add(col)

print("Columnas detectadas (normalizadas y variantes encontradas):\n")
for norm, originals in sorted(variantes.items()):
    if len(originals) > 1:      # solo mostramos las que cambian
        print(f"- {norm}: {sorted(originals)}")


Columnas detectadas (normalizadas y variantes encontradas):

- birth_year: ['Birth Year', 'birth year']
- end_station_id: ['End Station ID', 'end station id']
- end_station_latitude: ['End Station Latitude', 'end station latitude']
- end_station_longitude: ['End Station Longitude', 'end station longitude']
- end_station_name: ['End Station Name', 'end station name']
- gender: ['Gender', 'gender']
- start_station_id: ['Start Station ID', 'start station id']
- start_station_latitude: ['Start Station Latitude', 'start station latitude']
- start_station_longitude: ['Start Station Longitude', 'start station longitude']
- start_station_name: ['Start Station Name', 'start station name']


In [39]:
RAW_DIR = Path("raw")
NULLS   = ["\\N", "NULL", ""]

# Sinonimospara que coincidan los headers
SYNONYMS = {
    "tripduration": "trip_duration",
    "trip_duration_seconds": "trip_duration",
    "bikeid": "bike_id",
    "starttime": "start_time",
    "started_at": "start_time",
    "stoptime": "stop_time",
    "ended_at": "stop_time",
    "usertype": "user_type",
    "birth year": "birth_year",
}

def normalize(df: pl.DataFrame) -> pl.DataFrame:
    df = df.rename({c: c.strip().lower().replace(" ", "_") for c in df.columns})
    df = df.rename({c: SYNONYMS.get(c, c) for c in df.columns})
    return df

dfs = []
for csv in RAW_DIR.rglob("*.csv"):
    if "__MACOSX" in csv.parts:
        continue
    part = pl.read_csv(csv, null_values=NULLS, infer_schema_length=10_000)
    dfs.append(normalize(part))

#union
df = pl.concat(dfs, how="vertical_relaxed")
print("DataFrame unido:", df.shape)
df.head()


DataFrame unido: (34383842, 15)


trip_duration,start_time,stop_time,start_station_id,start_station_name,start_station_latitude,start_station_longitude,end_station_id,end_station_name,end_station_latitude,end_station_longitude,bike_id,user_type,birth_year,gender
i64,str,str,i64,str,f64,f64,i64,str,f64,f64,i64,str,f64,i64
457,"""2017-10-01 00:00:00""","""2017-10-01 00:07:38""",479,"""9 Ave & W 45 St""",40.760193,-73.991255,478,"""11 Ave & W 41 St""",40.760301,-73.998842,30951,"""Subscriber""",1985.0,1
6462,"""2017-10-01 00:00:20""","""2017-10-01 01:48:03""",279,"""Peck Slip & Front St""",40.707873,-74.00167,307,"""Canal St & Rutgers St""",40.714275,-73.9899,14809,"""Customer""",,0
761,"""2017-10-01 00:00:27""","""2017-10-01 00:13:09""",504,"""1 Ave & E 16 St""",40.732219,-73.981656,350,"""Clinton St & Grand St""",40.715595,-73.98703,28713,"""Subscriber""",1992.0,1
1193,"""2017-10-01 00:00:29""","""2017-10-01 00:20:22""",3236,"""W 42 St & Dyer Ave""",40.758985,-73.9938,3233,"""E 48 St & 5 Ave""",40.757246,-73.978059,16008,"""Customer""",1992.0,2
2772,"""2017-10-01 00:00:32""","""2017-10-01 00:46:44""",2006,"""Central Park S & 6 Ave""",40.765909,-73.976342,469,"""Broadway & W 53 St""",40.763441,-73.982681,14556,"""Customer""",,0


In [40]:
for name, dtype in df.schema.items():
    print(f"{name:<20} : {dtype}")


trip_duration        : Int64
start_time           : String
stop_time            : String
start_station_id     : Int64
start_station_name   : String
start_station_latitude : Float64
start_station_longitude : Float64
end_station_id       : Int64
end_station_name     : String
end_station_latitude : Float64
end_station_longitude : Float64
bike_id              : Int64
user_type            : String
birth_year           : Float64
gender               : Int64


In [41]:
nulls_df = df.null_count()                       

# Lo pasamos a diccionario
nulls_dict = {col: int(nulls_df[col][0]) for col in df.columns}

for col, n in sorted(nulls_dict.items(), key=lambda x: x[1], reverse=True):
    if n > 0:
        print(f"{col:<25} -> {n}")

birth_year                -> 3735425
user_type                 -> 15909


In [42]:
df_clean = (
    df
    # fechas
    .with_columns([
        # start_time
        pl.coalesce([
            pl.col("start_time").cast(pl.Utf8, strict=False)
              .str.strptime(pl.Datetime, "%Y-%m-%d %H:%M:%S", strict=False, exact=False),
            pl.col("start_time").cast(pl.Utf8, strict=False)
              .str.strptime(pl.Datetime, "%m/%d/%Y %H:%M:%S", strict=False, exact=False)
        ]).alias("start_time"),

        # stop_time
        pl.coalesce([
            pl.col("stop_time").cast(pl.Utf8, strict=False)
              .str.strptime(pl.Datetime, "%Y-%m-%d %H:%M:%S", strict=False, exact=False),
            pl.col("stop_time").cast(pl.Utf8, strict=False)
              .str.strptime(pl.Datetime, "%m/%d/%Y %H:%M:%S", strict=False, exact=False)
        ]).alias("stop_time"),
    ])
    #birth_year
    .with_columns(
        pl.col("birth_year")
          .cast(pl.Float64, strict=False)          
          .cast(pl.Int64,  strict=False)        
    )
)

for name, dtype in df_clean.schema.items():
    print(f"{name:<20}: {dtype}")

trip_duration       : Int64
start_time          : Datetime(time_unit='us', time_zone=None)
stop_time           : Datetime(time_unit='us', time_zone=None)
start_station_id    : Int64
start_station_name  : String
start_station_latitude: Float64
start_station_longitude: Float64
end_station_id      : Int64
end_station_name    : String
end_station_latitude: Float64
end_station_longitude: Float64
bike_id             : Int64
user_type           : String
birth_year          : Int64
gender              : Int64


In [43]:
import polars as pl

#columnas de tipo texto
str_cols = [c for c, t in df_clean.schema.items() if t == pl.Utf8]

#reemplazar ""  por null y recortar
df_clean = df_clean.with_columns([
    pl.when(
        pl.col(c).cast(pl.Utf8, strict=False).str.strip_chars() == ""
    )
    .then(pl.lit(None))
    .otherwise(
        pl.col(c).cast(pl.Utf8, strict=False).str.strip_chars()
    )
    .alias(c)
    for c in str_cols
])

#conteo de strings vacíos que todavía existan 
vacíos = df_clean.select([
    (pl.col(c) == "").sum().alias(c) for c in str_cols
])

print("Strings vacíos restantes por columna:")
print(vacíos)


Strings vacíos restantes por columna:
shape: (1, 3)
┌────────────────────┬──────────────────┬───────────┐
│ start_station_name ┆ end_station_name ┆ user_type │
│ ---                ┆ ---              ┆ ---       │
│ u32                ┆ u32              ┆ u32       │
╞════════════════════╪══════════════════╪═══════════╡
│ 0                  ┆ 0                ┆ 0         │
└────────────────────┴──────────────────┴───────────┘


In [44]:
#nuevo conteo de vacios
nulls_df = df_clean.null_count()                       

#lo pasamos a diccionario
nulls_dict = {col: int(nulls_df[col][0]) for col in df.columns}

for col, n in sorted(nulls_dict.items(), key=lambda x: x[1], reverse=True):
    if n > 0:
        print(f"{col:<25} -> {n}")

start_time                -> 9420380
stop_time                 -> 9420175
birth_year                -> 3735425
user_type                 -> 15909


In [45]:
#Eliminacion de duplicados
subset = ["start_time", "bike_id", "start_station_id", "stop_time"]

dup_total = len(df_clean) - df_clean.unique(subset=subset).shape[0]
print("Duplicados con clave compuesta:", dup_total)

if dup_total:
    df_clean = df_clean.unique(subset=subset, keep="first")
    print("Filas tras deduplicar:", df_clean.shape[0])


Duplicados con clave compuesta: 6897879
Filas tras deduplicar: 27485963


In [49]:
#reglas simples de coherencia
neg_dur   = (df_clean['trip_duration'] <= 0).sum()
bad_order = (df_clean['stop_time'] < df_clean['start_time']).sum()

print("Validaciones basicas")
print(f"Duración ≤ 0 seg.        : {neg_dur}")
print(f"stop_time < start_time   : {bad_order}")

#quitat filas stop_time < start_time
bad_mask   = df_clean['stop_time'] < df_clean['start_time']
bad_count  = bad_mask.sum()

print("Filas descartadas por inconsistencia temporal :", bad_count)

df_clean = df_clean.filter(~bad_mask)
print("Filas finales tras depurar                 :", df_clean.shape[0])

Validaciones basicas
Duración ≤ 0 seg.        : 0
stop_time < start_time   : 49
Filas descartadas por inconsistencia temporal : 49
Filas finales tras depurar                 : 24962830


In [50]:
row_cnt, col_cnt = df_clean.shape
print("Tamaño del dataset")
print(f"Filas     : {row_cnt:,}")
print(f"Columnas  : {col_cnt}\n")

print("Nulos por columna")
nulls_df = df_clean.null_count()
nulls_dict = {col: int(nulls_df[col][0]) for col in df_clean.columns}

for col, n in sorted(nulls_dict.items(), key=lambda x: x[1], reverse=True):
    if n:                                       
        pct = n * 100 / row_cnt
        print(f"{col:<25} -> {n:>10}  ({pct:5.2f} %)")


Tamaño del dataset
Filas     : 24,962,830
Columnas  : 15

Nulos por columna
birth_year                ->    2509115  (10.05 %)
user_type                 ->      15909  ( 0.06 %)


In [51]:
#Enviar a un parquet
pathlib.Path("clean").mkdir(exist_ok=True) 

df_clean.write_parquet(
    "clean/citibike_phase1.parquet",
    compression="snappy"
)

print("Parquet listo en clean/citibike_phase1.parquet")

Parquet listo en clean/citibike_phase1.parquet
