In [6]:
import polars as pl
import pandas as pd
import geopandas as gpd
from datetime import datetime
import os
import csv
import re
from typing import List, Dict, Any, Tuple

# ------------------------------
# DATE PARSER
# ------------------------------
def parse_date(d):
    """
    Parse a date from various formats and return an int in YYYYMMDD format.
    """
    if isinstance(d, int):
        d = str(d)

    date_formats = [
        "%Y%m%d", "%Y-%m-%d", "%Y%m%d", "%Y%m-%d",
        "%d-%m-%Y", "%d/%m/%Y", "%Y/%m/%d", "%m/%d/%Y",
        "%m-%d-%Y", "%d.%m.%Y",
    ]

    for fmt in date_formats:
        try:
            parsed_date = datetime.strptime(d, fmt)
            return str(parsed_date.strftime("%Y%m%d"))
        except ValueError:
            continue

    raise ValueError(f"Could not parse date: {d}")

# ------------------------------
# TIME PARSER
# ------------------------------
def parse_time(t: str) -> str:
    """
    Parse a time string in various GTFS-compatible formats and return HH:MM:SS.
    Supports hours up to 47.
    """
    if isinstance(t, int):
        t = str(t)
    t = t.strip().lower().replace(',', '.')

    # Handle AM/PM
    ampm_match = re.match(r'(\d{1,2}):?(\d{1,2})?:?(\d{1,2})?\s*(am|pm)', t)
    if ampm_match:
        h, m, s, meridiem = ampm_match.groups()
        h, m, s = int(h), int(m or 0), int(s or 0)
        if meridiem == 'pm' and h < 12:
            h += 12
        if meridiem == 'am' and h == 12:
            h = 0
        if h > 47:
            raise ValueError(f"Hour value over 47: {h}")
        return f"{h:02}:{m:02}:{s:02}"

    # Split by colon if present
    parts = re.split(r'[:]', t)
    if 1 <= len(parts) <= 3:
        h = int(parts[0]) if parts[0] else 0
        m = int(parts[1]) if len(parts) > 1 and parts[1] else 0
        s = int(parts[2]) if len(parts) > 2 and parts[2] else 0
        if h > 47:
            raise ValueError(f"Hour value over 47: {h}")
        return f"{h:02}:{m:02}:{s:02}"

    # Fallback: digits-only string
    digits = ''.join(c for c in t if c.isdigit())
    if digits:
        digits = digits.zfill(6)
        h, m, s = int(digits[:2]), int(digits[2:4]), int(digits[4:6])
        if h > 47:
            raise ValueError(f"Hour value over 47: {h}")
        return f"{h:02}:{m:02}:{s:02}"

    raise ValueError(f"Could not parse time: {t}")


# ------------------------------
# SCHEMA DEFINITION
# ------------------------------
def get_df_schema_dict(path: str) -> Tuple[Dict[str, Any], List[str]]:
    if "stops.txt" in str(path):
        schema_dict = {"stop_id": str, "stop_name": str, "stop_lat": float, "stop_lon": float}
        mandatory_cols = ["stop_id", "stop_lat", "stop_lon"]
    elif "trips.txt" in str(path):
        schema_dict = {"route_id": str, "service_id": str, "trip_id": str}
        mandatory_cols = ["route_id", "service_id", "trip_id"]
    elif "stop_times.txt" in str(path):
        schema_dict = {
            "trip_id": str,
            "arrival_time": "time",
            "departure_time": "time",
            "stop_id": str,
            "stop_sequence": int
        }
        mandatory_cols = ["trip_id", "arrival_time", "departure_time", "stop_id"]
    elif "routes.txt" in str(path):
        schema_dict = {"route_id": str, "agency_id": str, "route_short_name": str,
                       "route_long_name": str, "route_type": int}
        mandatory_cols = ["route_id", "route_type"]
    elif "calendar.txt" in str(path):
        schema_dict = {
            "service_id": str, "monday": int|bool, "tuesday": int|bool, "wednesday": int|bool,
            "thursday": int|bool, "friday": int|bool, "saturday": int|bool, "sunday": int|bool,
            "start_date": "date", "end_date": "date"
        }
        mandatory_cols = ["service_id", "monday", "tuesday","wednesday","thursday","friday","saturday","sunday","start_date","end_date"]
    else:
        raise Exception(f"File {path} not implemented.")
    return schema_dict, mandatory_cols

# ------------------------------
# CSV FORMAT DETECTION
# ------------------------------
def detect_csv_format(sample_text: str, max_lines: int = 1) -> Dict[str, Any]:
    lines = sample_text.strip().splitlines()[:max_lines]
    sample = "\n".join(lines)
    try:
        sniffer = csv.Sniffer()
        dialect = sniffer.sniff(sample, delimiters=[',', ';', '\t', '|'])
        delimiter, quotechar, doublequote = dialect.delimiter, dialect.quotechar, dialect.doublequote
    except Exception:
        possible_delims = [',', ';', '\t', '|']
        delim_scores = {}
        for d in possible_delims:
            counts = [ln.count(d) for ln in lines if ln.strip()]
            if counts:
                variance = max(counts) - min(counts)
                delim_scores[d] = (sum(counts)/len(counts), variance)
        delimiter = min(delim_scores, key=lambda k: delim_scores[k][1]) if delim_scores else ','
        quote_candidates = ['"', "'"]
        qcounts = {q: sample.count(q) for q in quote_candidates}
        quotechar = max(qcounts, key=qcounts.get) if max(qcounts.values()) > 0 else '"'
        doublequote = (quotechar*2) in sample
    dot_nums = len(re.findall(r'\d+\.\d+', sample))
    comma_nums = len(re.findall(r'\d+,\d+', sample))
    float_point = '.' if dot_nums >= comma_nums else ','
    return {"delimiter": delimiter, "quotechar": quotechar, "doublequote": doublequote, "float_point": float_point}

# ------------------------------
# TRY PARSE SINGLE LINE
# ------------------------------
def try_parse_line(line: str, config: Dict[str, Any], expected_cols: int = None) -> Tuple[List[str]|None, str|None, str|None, bool, str|None]:
    try:
        parsed = next(csv.reader([line], delimiter=config["delimiter"], quotechar=config["quotechar"], doublequote=config["doublequote"]))
    except Exception as e:
        fixed_line = re.sub(r'(?<=\w)"(?=\w)', "'", line)
        try:
            parsed_fixed = next(csv.reader([fixed_line], delimiter=config["delimiter"], quotechar=config["quotechar"], doublequote=config["doublequote"]))
            return parsed_fixed, "Quotation error", "replaced embedded \" with '", False, fixed_line
        except Exception:
            return None, f"Quotation error: {e}", "excluded", True, None
    detected_cols = len(parsed)
    if expected_cols is not None and detected_cols != expected_cols:
        fixed_line = re.sub(r'(?<=\w)"(?=\w)', "'", line)
        try:
            parsed_fixed = next(csv.reader([fixed_line], delimiter=config["delimiter"], quotechar=config["quotechar"], doublequote=config["doublequote"]))
            if len(parsed_fixed) == expected_cols:
                return parsed_fixed, f"expected {expected_cols} cols, got {detected_cols}", "replaced embedded \" with '", False, fixed_line
        except Exception:
            pass
        return None, f"expected {expected_cols} cols, got {detected_cols}", "excluded", True, None
    return parsed, None, None, False, line

# ------------------------------
# VALIDATE & LOAD CSV
# ------------------------------
def validate_and_load_csv(path: str, header: bool = True, csv_text=None) -> Tuple[pl.DataFrame|None, pl.DataFrame, Dict[str,Any]|str]:
    schema_dict, mandatory_cols = get_df_schema_dict(path)

    if csv_text is None:
        if not os.path.exists(path):
            raise FileNotFoundError(path)
        with open(path, encoding="utf-8") as f:
            csv_text = f.read()

    lines = csv_text.splitlines()
    config = detect_csv_format(csv_text)

    parsed_rows = []
    expected_cols = None
    header_row = None

    # Initialize consistent empty errors_df
    errors_df = pl.DataFrame({
        "line_number": pl.Series(dtype=pl.Int64),
        "content": pl.Series(dtype=pl.Utf8),
        "error": pl.Series(dtype=pl.Utf8),
        "fix": pl.Series(dtype=pl.Utf8),
        "excluded": pl.Series(dtype=pl.Boolean),
    })

    for i, line in enumerate(lines, start=1):
        parsed, error, fix, excluded, used_line = try_parse_line(line, config, expected_cols)
        if parsed is not None and expected_cols is None:
            expected_cols = len(parsed)

        if header and parsed is not None and header_row is None:
            header_row = parsed
            expected_cols = len(header_row)
            continue

        if parsed is not None:
            parsed_rows.append(parsed)

        if error:
            new_error = pl.DataFrame({
                "line_number": [i],
                "content": [line],
                "error": [error],
                "fix": [fix],
                "excluded": [excluded],
            })
            errors_df = errors_df.vstack(new_error)

    if not header_row:
        return None, pl.DataFrame([{"error": "No header found"}]), config

    present_cols = set(header_row)
    missing = [c for c in mandatory_cols if c not in present_cols]
    if missing:
        err_str = f"Mandatory cols {missing} not in file"
        return None, pl.DataFrame([{"error": err_str}]), err_str

    df = pl.DataFrame(parsed_rows, schema=header_row)

    rows_to_exclude = set()
    for col, dtype in schema_dict.items():
        if col not in df.columns:
            continue

        new_vals = []

        for idx, val in enumerate(df[col].to_list()):
            original = val
            parsed_val = val

            if val is None or str(val).strip() == '':
                new_vals.append(val)
                continue

            try:
                if dtype == "date":
                    parsed_val = parse_date(str(val))
                elif dtype == "time":
                    parsed_val = parse_time(str(val))
                elif dtype == int | bool:
                    sval = str(val).strip().lower()
                    if sval in ("true","1"):
                        parsed_val = 1
                    elif sval in ("false","0"):
                        parsed_val = 0
                    else:
                        parsed_val = int(float(sval))
                elif dtype == int:
                    parsed_val = int(float(val))
                elif dtype == float:
                    parsed_val = float(val)
                else:
                    parsed_val = str(val)

            except Exception as e:
                # Capture exact parsing error
                errors_df = errors_df.vstack(pl.DataFrame({
                    "line_number": [idx + 1],
                    "content": [str(val)],
                    "error": [f"Parse failed for column '{col}': {e}"],
                    "fix": ["excluded"],
                    "excluded": [True]
                }))
                rows_to_exclude.add(idx)
                new_vals.append(val)
                continue

            # Track modifications
            if str(parsed_val) != str(original):
                errors_df = errors_df.vstack(pl.DataFrame({
                    "line_number": [idx + 1],
                    "content": [str(original)],
                    "error": [f"Value in column '{col}' modified after parsing"],
                    "fix": [f"{original} -> {parsed_val}"],
                    "excluded": [False]
                }))

            new_vals.append(parsed_val)

        df = df.with_columns(pl.Series(col, new_vals))

    if rows_to_exclude:
        df = df.filter(~pl.arange(0, df.height).is_in(list(rows_to_exclude)))

    return df, errors_df, config


# -------------------------
# Example usage
# -------------------------
if __name__ == "__main__":
    csv_text = '''service_id,monday,tuesday,wednesday,thursday,friday,saturday,sunday,start_date,end_date
weekday,True,True,True,True,True,false,False,2025-10-01,20251231
weekend,False,False,False,False,False,True,True,20251005,20251231
'''

    df, errors_df, cfg = validate_and_load_csv("/home/miguel/Documents/Proyectos/PTLevelofService/accessibility/UrbanAccessAnalyzer/no_sync/cambridge_massachusetts_us/gtfs_files/mdb11__amtrak/stop_times.txt", header=True)

    print("Detected config:", cfg)
    print("\nErrors:")
    print(errors_df)
    print("\nLoaded data:")
    print(df)
    print("\nLoaded shape:", df.shape)

  df = pl.DataFrame(parsed_rows, schema=header_row)


KeyboardInterrupt: 

In [None]:
df

In [3]:
import os
file, ext = os.path.splitext("a/b/c.txt")    # file = "c", ext = ".txt"
ext

'.txt'

In [5]:
str(None)

'None'

In [None]:
#     csv_text = '''stop_id,stop_name,x,stop_lat,stop_lon,y,z,a,b,c 
#     839,"Rt. 340 at Pine Hill Farms",,40.0386666852,-76.1498668113,,,0,,0 
# 839,"Rt. 340 at Pine Hill Farms",,40.0386666852,-76.1498668113,,,0,,0
# 1441;"Rt. 30 at Rhoades Auto";;40.0141079301;-76.1507757729;;;0;;0 
# 1,"Rt. 30 at King"s",,40.0144107263,-76.1523472101,,,0,,0
# 1441,"Rt. 30 at Rhoades Auto",,40.0141079301,-76.1507757729,,,0,,0
# 1442,"Rt. 30 at After Eight Bed & Breakfast",,40.0138175168,-76.149297822,,,0,,0
# 1443,"Rt. 30 at Paradise Custom Shop",,40.0131079771,-76.1456821665,,,0,,0
# 1444,"Rt. 30 at Paradise Candle",,40.0124615832,-76.1424119972,,,0,,0
# 1446,"Rt. 30 at hex Shop",,40.0116494174,-76.1383344392,,,0,,0
# 839,"Rt. 340 at Pine Hill Farms",,40.0386666852,-76.1498668113,,,0,,0
# 84,"Fellowship Dr. & Friendship Ave.'''  # truncated intentionally

In [7]:
import re

def parse_time(t: str) -> str:
    """
    Parse a time string and return HH:MM:SS.
    Rules:
      - With colons (:): parse flexibly (e.g., "7:1" -> "07:01:00").
      - Without colons: only 4 or 6 digits are allowed:
            4 digits (HHMM) -> HH:MM:00
            6 digits (HHMMSS) -> HH:MM:SS
      Supports hours up to 47.
    """
    if isinstance(t, int):
        t = str(t)
    t = t.strip().lower().replace(',', '.')

    # Handle AM/PM (unchanged)
    ampm_match = re.match(r'(\d{1,2}):?(\d{1,2})?:?(\d{1,2})?\s*(am|pm)', t)
    if ampm_match:
        h, m, s, meridiem = ampm_match.groups()
        h, m, s = int(h), int(m or 0), int(s or 0)
        if meridiem == 'pm' and h < 12:
            h += 12
        if meridiem == 'am' and h == 12:
            h = 0
        if h > 47:
            raise ValueError(f"Hour value over 47: {h}")
        return f"{h:02}:{m:02}:{s:02}"

    # Case 1: String has colons → flexible parsing
    if ':' in t:
        parts = t.split(':')
        parts = [p.zfill(2) if p else '00' for p in parts]
        while len(parts) < 3:
            parts.append('00')

        h, m, s = map(int, parts[:3])
        if m > 59 or s > 59:
            raise ValueError(f"Invalid time value: {t}")
        if h > 47:
            raise ValueError(f"Invalid hour value: {t} is over 47 hours")

        return f"{h:02}:{m:02}:{s:02}"

    # Case 2: No colons → must be 4 or 6 digits
    digits = ''.join(c for c in t if c.isdigit())
    if not digits:
        raise ValueError(f"Could not parse time: {t}")

    if len(digits) == 4:  # HHMM
        h, m, s = int(digits[:2]), int(digits[2:4]), 0
    elif len(digits) == 6:  # HHMMSS
        h, m, s = int(digits[:2]), int(digits[2:4]), int(digits[4:6])
    else:
        raise ValueError(f"Invalid time format (must be 4 or 6 digits): {t}")

    if m > 59 or s > 59:
        raise ValueError(f"Invalid time value: {t}")
    if h > 47:
        raise ValueError(f"Invalid hour value: {t} is over 47 hours")

    return f"{h:02}:{m:02}:{s:02}"

In [8]:
parse_time("30:02:01")    # → '30:02:01'
parse_time("7:01:02")     # → '07:01:02'
parse_time("07:01")       # → '07:01:00'
parse_time("0710")       # → '07:01:02'
parse_time("11:59:59 PM") # → '23:59:59'
parse_time("1:00:00")   # → '100:00:00'

'01:00:00'

In [9]:
parse_time("7010")    # → '30:02:01'

ValueError: Invalid hour value: 7010 is over 47 hours

In [10]:

import polars as pl
import pandas as pd
import geopandas as gpd
from datetime import datetime
import os
import csv
import re
from typing import List, Dict, Any, Tuple
import warnings 

# ------------------------------
# DATE PARSER
# ------------------------------
def parse_date(d):
    """
    Parse a date from various formats and return an int in YYYYMMDD format.
    """
    if isinstance(d, int):
        d = str(d)

    date_formats = [
        "%Y%m%d", "%Y-%m-%d", "%Y%m%d", "%Y%m-%d",
        "%d-%m-%Y", "%d/%m/%Y", "%Y/%m/%d", "%m/%d/%Y",
        "%m-%d-%Y", "%d.%m.%Y",
    ]

    for fmt in date_formats:
        try:
            parsed_date = datetime.strptime(d, fmt)
            return str(parsed_date.strftime("%Y%m%d"))
        except ValueError:
            continue

    raise ValueError(f"Could not parse date: {d}")

# ------------------------------
# TIME PARSER
# ------------------------------
def parse_time(t: str) -> str:
    """
    Parse a time string and return HH:MM:SS.
    Rules:
      - With colons (:): parse flexibly (e.g., "7:1" -> "07:01:00").
      - Without colons: only 4 or 6 digits are allowed:
            4 digits (HHMM) -> HH:MM:00
            6 digits (HHMMSS) -> HH:MM:SS
      Supports hours up to 47.
    """
    if isinstance(t, int):
        t = str(t)
    t = t.strip().lower().replace(',', '.')

    # Handle AM/PM (unchanged)
    ampm_match = re.match(r'(\d{1,2}):?(\d{1,2})?:?(\d{1,2})?\s*(am|pm)', t)
    if ampm_match:
        h, m, s, meridiem = ampm_match.groups()
        h, m, s = int(h), int(m or 0), int(s or 0)
        if meridiem == 'pm' and h < 12:
            h += 12
        if meridiem == 'am' and h == 12:
            h = 0
        if h > 47:
            raise ValueError(f"Hour value over 47: {h}")
        return f"{h:02}:{m:02}:{s:02}"

    # Case 1: String has colons → flexible parsing
    if ':' in t:
        parts = t.split(':')
        parts = [p.zfill(2) if p else '00' for p in parts]
        while len(parts) < 3:
            parts.append('00')

        h, m, s = map(int, parts[:3])
        if m > 59 or s > 59:
            raise ValueError(f"Invalid time value: {t}")
        if h > 47:
            raise ValueError(f"Invalid hour value: {t} is over 47 hours")

        return f"{h:02}:{m:02}:{s:02}"

    # Case 2: No colons → must be 4 or 6 digits
    digits = ''.join(c for c in t if c.isdigit())
    if not digits:
        raise ValueError(f"Could not parse time: {t}")

    if len(digits) == 4:  # HHMM
        h, m, s = int(digits[:2]), int(digits[2:4]), 0
    elif len(digits) == 6:  # HHMMSS
        h, m, s = int(digits[:2]), int(digits[2:4]), int(digits[4:6])
    else:
        raise ValueError(f"Invalid time format (must be 4 or 6 digits): {t}")

    if m > 59 or s > 59:
        raise ValueError(f"Invalid time value: {t}")
    if h > 47:
        raise ValueError(f"Invalid hour value: {t} is over 47 hours")

    return f"{h:02}:{m:02}:{s:02}"

# ------------------------------
# SCHEMA DEFINITION
# ------------------------------
def get_df_schema_dict(path: str) -> Tuple[Dict[str, Any], List[str]]:
    if "stops.txt" in str(path):
        schema_dict = {"stop_id": str, "stop_name": str, "stop_lat": float, "stop_lon": float}
        mandatory_cols = ["stop_id", "stop_lat", "stop_lon"]
    elif "trips.txt" in str(path):
        schema_dict = {"route_id": str, "service_id": str, "trip_id": str}
        mandatory_cols = ["route_id", "service_id", "trip_id"]
    elif "stop_times.txt" in str(path):
        schema_dict = {
            "trip_id": str,
            "arrival_time": "time",
            "departure_time": "time",
            "stop_id": str,
            "stop_sequence": int
        }
        mandatory_cols = ["trip_id", "arrival_time", "departure_time", "stop_id"]
    elif "routes.txt" in str(path):
        schema_dict = {"route_id": str, "agency_id": str, "route_short_name": str,
                       "route_long_name": str, "route_type": int}
        mandatory_cols = ["route_id", "route_type"]
    elif "calendar.txt" in str(path):
        schema_dict = {
            "service_id": str, "monday": int|bool, "tuesday": int|bool, "wednesday": int|bool,
            "thursday": int|bool, "friday": int|bool, "saturday": int|bool, "sunday": int|bool,
            "start_date": "date", "end_date": "date"
        }
        mandatory_cols = ["service_id", "monday", "tuesday","wednesday","thursday","friday","saturday","sunday","start_date","end_date"]
    else:
        raise Exception(f"File {path} not implemented.")
    return schema_dict, mandatory_cols

# ------------------------------
# CSV FORMAT DETECTION
# ------------------------------
def detect_csv_format(sample_text: str, max_lines: int = 1) -> Dict[str, Any]:
    lines = sample_text.strip().splitlines()[:max_lines]
    sample = "\n".join(lines)
    try:
        sniffer = csv.Sniffer()
        dialect = sniffer.sniff(sample, delimiters=[',', ';', '\t', '|'])
        delimiter, quotechar, doublequote = dialect.delimiter, dialect.quotechar, dialect.doublequote
    except Exception:
        possible_delims = [',', ';', '\t', '|']
        delim_scores = {}
        for d in possible_delims:
            counts = [ln.count(d) for ln in lines if ln.strip()]
            if counts:
                variance = max(counts) - min(counts)
                delim_scores[d] = (sum(counts)/len(counts), variance)
        delimiter = min(delim_scores, key=lambda k: delim_scores[k][1]) if delim_scores else ','
        quote_candidates = ['"', "'"]
        qcounts = {q: sample.count(q) for q in quote_candidates}
        quotechar = max(qcounts, key=qcounts.get) if max(qcounts.values()) > 0 else '"'
        doublequote = (quotechar*2) in sample
    dot_nums = len(re.findall(r'\d+\.\d+', sample))
    comma_nums = len(re.findall(r'\d+,\d+', sample))
    float_point = '.' if dot_nums >= comma_nums else ','
    return {"delimiter": delimiter, "quotechar": quotechar, "doublequote": doublequote, "float_point": float_point}

# ------------------------------
# TRY PARSE SINGLE LINE
# ------------------------------
def try_parse_line(line: str, config: Dict[str, Any], expected_cols: int|None = None, header:list|None=None, schema:dict|None=None) -> Tuple[List[str]|None, str|None, str|None, bool]:
    parsed = None
    error = ""
    fix = ""
    try:
        parsed = next(csv.reader([line], delimiter=config["delimiter"], quotechar=config["quotechar"], doublequote=config["doublequote"]))
    except Exception as e:
        fixed_line = re.sub(r'(?<=\w)"(?=\w)', "'", line)
        try:
            parsed = next(csv.reader([fixed_line], delimiter=config["delimiter"], quotechar=config["quotechar"], doublequote=config["doublequote"]))
            error += "Quotation error "
            fix += "replaced embedded \" with ' "
        except Exception:
            return None, f"Quotation error: {e}", "excluded", True
        
    detected_cols = len(parsed)
    if expected_cols is not None and detected_cols != expected_cols:
        fixed_line = re.sub(r'(?<=\w)"(?=\w)', "'", line)
        try:
            parsed = next(csv.reader([fixed_line], delimiter=config["delimiter"], quotechar=config["quotechar"], doublequote=config["doublequote"]))
            if len(parsed) == expected_cols:
                error += f"expected {expected_cols} cols, got {detected_cols} "
                fix += "replaced embedded \" with ' "
        except Exception:
            error += f"expected {expected_cols} cols, got {detected_cols} "
            fix += "excluded "
            return None, error, fix, True
    
    if schema is not None and header is not None and parsed is not None:
        for col_idx, col_name in enumerate(header):
            if col_name not in schema:
                continue

            dtype = schema[col_name]

            val = parsed[col_idx]
            original = val
            parsed_val = val

            # Skip empty values
            if val is None or str(val).strip() == '':
                continue

            try:
                if dtype == "date":
                    parsed_val = parse_date(str(val))
                elif dtype == "time":
                    parsed_val = parse_time(str(val))
                elif dtype == int | bool:
                    sval = str(val).strip().lower()
                    if sval in ("true", "1"):
                        parsed_val = 1
                    elif sval in ("false", "0"):
                        parsed_val = 0
                    else:
                        parsed_val = int(float(sval))
                elif dtype == int:
                    parsed_val = int(float(val))
                elif dtype == float:
                    parsed_val = float(val)
                else:
                    parsed_val = str(val)
            except Exception as e:
                error += f"Parse failed for column '{col_name}': {e} "
                fix += "excluded "
                return None, error, fix, True

            # Track modifications
            if str(parsed_val) != str(original):
                error += f"Value in column '{col_name}' modified after parsing "
                fix += f"{original} -> {parsed_val} "

            # Apply modification directly to parsed list
            parsed[col_idx] = str(parsed_val)


    return parsed, None, None, False


def validate_and_load_csv(path: str, header: bool = True, csv_text=None):
    # Get schema info
    schema_dict, mandatory_cols = get_df_schema_dict(path)

    # Read CSV text if not provided
    if csv_text is None:
        with open(path, encoding="utf-8") as f:
            csv_text = f.read()

    lines = csv_text.splitlines()
    config = detect_csv_format(csv_text)

    expected_cols = None
    df_cols = None

    if header:
        header_line = lines[0]
        lines = lines[1:]
        df_cols, error_msg, fix, error = try_parse_line(header_line, config)
        if error or (df_cols is None):
            raise Exception(f"Error parsing header of file {path}: {error_msg} {fix}")
        elif error_msg is not None:
            warnings.warn(f"Warning parsing header of file {path}: {error_msg} {fix}")

        if mandatory_cols is not None:
            for i in mandatory_cols:
                if i not in df_cols:
                    raise Exception(f"Column {i} not in file {path}")
            
        expected_cols = len(df_cols)

    # Build initial Polars DataFrame with line content
    lines_df = pl.DataFrame({
        "line_number": range(1, len(lines) + 1),
        "content": lines
    })

    # Parse each line into structured columns
    lines_df = lines_df.with_columns(
        pl.col("content").map_elements(
            lambda line: {
                "parsed": try_parse_line(line, config, expected_cols, df_cols, schema_dict)[0],
                "error": try_parse_line(line, config, expected_cols, df_cols, schema_dict)[1],
                "fix": try_parse_line(line, config, expected_cols, df_cols, schema_dict)[2],
                "excluded": try_parse_line(line, config, expected_cols, df_cols, schema_dict)[3],
            },
            return_dtype=pl.Struct({
                "parsed": pl.List(pl.Utf8),
                "error": pl.Utf8,
                "fix": pl.Utf8,
                "excluded": pl.Boolean,
            })
        ).alias("parsed_struct")
    ).unnest("parsed_struct")

    # Build final DataFrame with parsed columns and df_cols as column names
    parsed_cols_df = lines_df.filter(~pl.col("excluded")).select(['line_number','parsed'])
    if df_cols is not None:
        parsed_cols_df = lines_df.select(['line_number','parsed'])
        for i, col_name in enumerate(df_cols):
            parsed_cols_df = parsed_cols_df.with_columns(
                pl.Series(col_name, lines_df["parsed"].list.get(i).cast(pl.Utf8))
            )
        
        parsed_cols_df = parsed_cols_df.drop('parsed')

    errors_df = lines_df.select(['line_number','error','fix','excluded']).drop_nulls("error")
    if len(errors_df.filter("excluded")) > 0:
        warnings.warn(f"{len(errors_df.filter("excluded"))} rows of file {path} have failed while parsing.")

    return parsed_cols_df, errors_df

In [11]:
df, error_df = validate_and_load_csv("/home/miguel/Documents/Proyectos/PTLevelofService/accessibility/UrbanAccessAnalyzer/no_sync/cambridge_massachusetts_us/gtfs_files/mdb11__amtrak/stop_times.txt", header=True)



In [14]:
df.filter(pl.col("line_number").is_in(df['line_number']))

Please use `implode` to return to previous behavior.

See https://github.com/pola-rs/polars/issues/22149 for more information.
  df.filter(pl.col("line_number").is_in(df['line_number']))


line_number,trip_id,arrival_time,departure_time,stop_id,stop_sequence,pickup_type,drop_off_type,timepoint
i64,str,str,str,str,str,str,str,str
1,"""181909""","""01:00:00""","""01:00:00""","""LAX""","""1""","""0""","""0""","""1"""
2,"""181909""","""01:41:00""","""01:41:00""","""POS""","""2""","""0""","""0""","""1"""
3,"""181909""","""01:54:00""","""01:54:00""","""ONA""","""3""","""0""","""0""","""1"""
4,"""181909""","""03:36:00""","""03:36:00""","""PSN""","""4""","""0""","""0""","""1"""
5,"""181909""","""05:47:00""","""05:47:00""","""YUM""","""5""","""0""","""0""","""1"""
…,…,…,…,…,…,…,…,…
38607,"""215634""","""20:12:00""","""20:12:00""","""JEF""","""6""","""0""","""0""","""1"""
38608,"""215634""","""20:57:00""","""20:57:00""","""HEM""","""7""","""0""","""0""","""1"""
38609,"""215634""","""21:24:00""","""21:24:00""","""WAH""","""8""","""0""","""0""","""1"""
38610,"""215634""","""22:13:00""","""22:13:00""","""KWD""","""9""","""0""","""0""","""1"""


In [16]:
os.path.basename("b")

'b'