In [1]:
import os
import polars as pl
from deltalake import write_deltalake
import json
import time
import csv
from pathlib import Path


def MakeParquetChunks(
    newly_created_data_file_guid: str,
    origin_filepath: str,
    delta_dir_network_server_path: str,
    save_temporary_deltalake_and_copy: bool
) -> str:
    """
    Fully streaming CSV â†’ Delta pipeline that does NOT materialize the whole CSV.

    - Reads a small sample from the file (first `max_sample_rows`) using the builtin
      `csv` reader (no Polars full-file read) to infer schema and average row size.
    - Streams the file with `csv.reader`, building small chunk DataFrames and
      writing them directly to Delta (overwrite first, append subsequent).
    - Keeps memory usage bounded by `rows_per_partition` (and the sample size).
    """

    target_mb_per_partition: int = 512
    skip_rows: int = 0
    max_sample_rows: int = 10
    buffer_rows: int = 50_000

    # Validate inputs
    if not origin_filepath or not os.path.isfile(origin_filepath):
        raise FileNotFoundError(f"Origin CSV not found: {origin_filepath}")
    
    # Temporary local delta lake directory because delta lake is not being saved in network address
    if save_temporary_deltalake_and_copy == True:
        delta_dir = "C:\\TemporaryDeltaLake\\" + newly_created_data_file_guid
    else:
        delta_dir = delta_dir_network_server_path

    # Directories preparation
    os.makedirs(delta_dir, exist_ok=True)

    # -----------------------------
    # Lazy scan CSV for random sample
    # -----------------------------
    #lf = pl.scan_csv(origin_filepath, ignore_errors=True)
    ## Read some rows while skipping some rows to get a better sample
    df_sample = pl.read_csv(origin_filepath, skip_rows=skip_rows, n_rows=max_sample_rows, ignore_errors=True)
 
    ## Getting the base schema
    schema_dict = {col: dtype for col, dtype in zip(df_sample.columns, df_sample.dtypes)}
    base_columns = list(schema_dict.keys())
 
    # Estimate partition size
    temp_path = "temp_sample.parquet"
    df_sample.write_parquet(temp_path)
    row_size = os.path.getsize(temp_path) / len(df_sample)
    os.remove(temp_path)
    rows_per_partition = max(int((target_mb_per_partition * 1024 * 1024) / row_size), buffer_rows)
    print(f"Target rows per partition â‰ˆ {rows_per_partition:,} ")
 
    # -----------------------------
    # Streaming CSV read + Delta write
    # -----------------------------
    first_chunk = True
    pos = 0
    chunk_idx = 1
 
    ## Since the file is huge we read in chunks and write directly to Delta
    while True:
        print(f"Reading rows {pos:,} â†’ {pos + rows_per_partition - 1:,} ",time.time())
        df_chunk = pl.read_csv(
            origin_filepath,
            skip_rows=pos,
            n_rows=rows_per_partition,
            dtypes=schema_dict,
            ignore_errors=True
        ).select(base_columns)
 
        if df_chunk.is_empty():
            break
 
        write_deltalake(
            delta_dir,
            df_chunk,
            mode="overwrite" if first_chunk else "append",
            schema_mode="merge"
        )
        first_chunk = False
        print(f"â†’ Written partition #{chunk_idx} with {len(df_chunk):,} rows")
        pos += len(df_chunk)
        chunk_idx += 1
 
    print("\n==========================================")
    print(" FULLY STREAMING CSV â†’ DELTA COMPLETED ðŸŽ‰")
    print(" Delta path:", delta_dir)
    print("==========================================\n")
    return delta_dir


def main():
    MakeParquetChunks(
        "abc",
        "E:\\Dev\\Oxyzo R&D\\RndGit\\OxyzoDummyTemplate.csv",
        "E:\\Dev\\Oxyzo R&D\\RndGit\\SourceDeltaLake",
        False
    )


if __name__ == "__main__":
    main()

Target rows per partition â‰ˆ 357,866 
Reading rows 0 â†’ 357,865  1764614117.234116
â†’ Written partition #1 with 5 rows
Reading rows 5 â†’ 357,870  1764614117.2510238

 FULLY STREAMING CSV â†’ DELTA COMPLETED ðŸŽ‰
 Delta path: E:\Dev\Oxyzo R&D\RndGit\SourceDeltaLake



  df_chunk = pl.read_csv(


In [4]:
from deltalake import DeltaTable
import pyarrow as pa
import pyarrow.dataset as ds
from enum import Enum

class DataType(Enum):
    """Enumeration for data types."""
    Unknown = 0
    Numeric = 1
    DateTime = 2
    Float = 3
    String = 4
    Formula = 5
    Blank = 6
    Boolean = 7
    Error = 8

from deltalake import DeltaTable
import pyarrow as pa
import pyarrow.dataset as ds

def map_pyarrow_type(dtype: pa.DataType) -> int:
    if pa.types.is_integer(dtype) or pa.types.is_decimal(dtype):
        return DataType.Numeric.value
    if pa.types.is_floating(dtype):
        return DataType.Float.value
    if pa.types.is_string(dtype):
        return DataType.String.value
    if pa.types.is_boolean(dtype):
        return DataType.Boolean.value
    if pa.types.is_timestamp(dtype) or pa.types.is_date(dtype):
        return DataType.DateTime.value
    return DataType.Unknown.value

def GetFullDetailsOfDeltaLakeFile(folder_path: str):
    dt = DeltaTable(folder_path)

    # Get a PyArrow Dataset and its (PyArrow) schema
    dataset = dt.to_pyarrow_dataset()          # <- produces a standard PyArrow dataset
    arrow_schema = dataset.schema              # <- PyArrow schema (pa.Schema)

    row_count = 0
    col_info = {
        f.name: {
            "DataFileColumnName": f.name,
            "ColumnType": map_pyarrow_type(f.type),  # pa.DataType OK here
            "HasNullValue": False,
            "DistinctValueCount": set(),
        }
        for f in arrow_schema
    }

    scanner = ds.Scanner.from_dataset(dataset, columns=None, batch_size=50_000)

    for batch in scanner.to_batches():
        row_count += len(batch)
        for col_name, array in zip(batch.schema.names, batch.columns):
            if array.null_count > 0:
                col_info[col_name]["HasNullValue"] = True
            try:
                uniques = pa.compute.unique(array.drop_null())
                col_info[col_name]["DistinctValueCount"].update(uniques.to_pylist())
            except Exception:
                pass

    for col in col_info.values():
        col["DistinctValueCount"] = len(col["DistinctValueCount"])

    return {
        "StgDataFileInfo": {"RowCount": row_count},
        "StgDataFileColumnInfo": list(col_info.values()),
    }

def main():
    data = GetFullDetailsOfDeltaLakeFile("E:\\Dev\\Oxyzo R&D\\RndGit\\SourceDeltaLake")
    print(data)

if __name__ == "__main__":
    main()


{'StgDataFileInfo': {'RowCount': 5}, 'StgDataFileColumnInfo': [{'DataFileColumnName': 'Loan ID', 'ColumnType': 1, 'HasNullValue': True, 'DistinctValueCount': 4}, {'DataFileColumnName': 'Loan No.', 'ColumnType': 4, 'HasNullValue': True, 'DistinctValueCount': 4}, {'DataFileColumnName': 'Org ID', 'ColumnType': 4, 'HasNullValue': True, 'DistinctValueCount': 4}, {'DataFileColumnName': 'Org Name', 'ColumnType': 4, 'HasNullValue': True, 'DistinctValueCount': 0}, {'DataFileColumnName': 'Sanction Date', 'ColumnType': 4, 'HasNullValue': True, 'DistinctValueCount': 3}, {'DataFileColumnName': 'Current max DPD', 'ColumnType': 4, 'HasNullValue': False, 'DistinctValueCount': 2}, {'DataFileColumnName': 'Lifetime max DPD', 'ColumnType': 4, 'HasNullValue': False, 'DistinctValueCount': 4}, {'DataFileColumnName': 'Lifetime max DPD on', 'ColumnType': 4, 'HasNullValue': False, 'DistinctValueCount': 4}, {'DataFileColumnName': 'Total Overdue', 'ColumnType': 4, 'HasNullValue': False, 'DistinctValueCount': 2}, 