In [1]:
import os
from pathlib import Path
import time
import json

import pandas as pd
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import (
    IntegerType,
    LongType,
    FloatType,
    DoubleType,
    ShortType,
    DecimalType,
)
from pyspark.ml.feature import Bucketizer

os.chdir("..")
print("CWD:", os.getcwd())

BASE_DIR = Path(".").resolve()
DATA_DIR = BASE_DIR / "data"
INDEX_PATH = DATA_DIR / "dataset_index.csv"

NUMERIC_PROFILE_DIR = BASE_DIR / "profiles" / "numeric"
NUMERIC_PROFILE_DIR.mkdir(parents=True, exist_ok=True)

spark = (
    SparkSession.builder
    .appName("NYC_Numeric_Profiling")
    .getOrCreate()
)

index_df = pd.read_csv(INDEX_PATH)
index_records = index_df.to_dict(orient="records")
len(index_records)



CWD: /home/jovyan


Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/07 23:45:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


40

In [2]:
numeric_types = (
    IntegerType,
    LongType,
    FloatType,
    DoubleType,
    ShortType,
    DecimalType,
)

def get_numeric_columns(sdf):
    cols = []
    for field in sdf.schema.fields:
        if isinstance(field.dataType, numeric_types):
            cols.append(field.name)
    return cols


In [3]:
profiling_stats = []

for row in index_records:
    if row.get("download_status") != "ok":
        continue

    dataset_id = row["dataset_id"]
    local_path = row["local_path"]
    full_path = str(BASE_DIR / local_path)

    start = time.time()
    status = "ok"
    error = None

    try:
        sdf = (
            spark.read
            .option("header", True)
            .option("inferSchema", True)
            .csv(full_path)
        )
    except Exception as e:
        status = "read_error"
        error = str(e)
        profiling_stats.append(
            {
                "dataset_id": dataset_id,
                "status": status,
                "error": error,
                "num_rows": None,
                "num_cols": None,
                "seconds": time.time() - start,
                "num_numeric_cols": 0,
            }
        )
        continue

    try:
        num_rows = sdf.count()
    except Exception as e:
        status = "count_error"
        error = str(e)
        num_rows = None

    num_cols = len(sdf.columns)
    numeric_cols = get_numeric_columns(sdf)

    numeric_profile = {}

    for col in numeric_cols:
        try:
            stats_row = sdf.select(
                F.count(F.col(col)).alias("count"),
                F.mean(F.col(col)).alias("mean"),
                F.stddev(F.col(col)).alias("stddev"),
                F.min(F.col(col)).alias("min"),
                F.max(F.col(col)).alias("max"),
            ).first()

            numeric_profile[col] = {
                "count": int(stats_row["count"]) if stats_row["count"] is not None else None,
                "mean": float(stats_row["mean"]) if stats_row["mean"] is not None else None,
                "stddev": float(stats_row["stddev"]) if stats_row["stddev"] is not None else None,
                "min": stats_row["min"],
                "max": stats_row["max"],
            }
        except Exception:
            continue

    for col in numeric_cols:
        col_non_null = sdf.select(F.col(col)).where(F.col(col).isNotNull())
        if col_non_null.limit(1).count() == 0:
            continue

        probs = [i / 10.0 for i in range(11)]
        try:
            quantiles = sdf.approxQuantile(col, probs, 0.01)
        except Exception:
            continue

        splits = sorted(set([q for q in quantiles if q is not None]))
        if len(splits) <= 2:
            continue
        if splits[0] == splits[-1]:
            continue

        try:
            bucketizer = Bucketizer(
                splits=splits,
                inputCol=col,
                outputCol="_bucket_idx",
            )
            bucketed = bucketizer.transform(col_non_null)
            hist_df = (
                bucketed.groupBy("_bucket_idx")
                .agg(F.count("*").alias("count"))
                .orderBy("_bucket_idx")
            )
            hist_rows = hist_df.collect()
            counts = [int(r["count"]) for r in hist_rows]

            numeric_profile[col]["bins"] = splits
            numeric_profile[col]["counts"] = counts
        except Exception:
            continue

    out_obj = {
        "dataset_id": dataset_id,
        "num_rows": num_rows,
        "num_cols": num_cols,
        "numeric_profile": numeric_profile,
    }

    out_path = NUMERIC_PROFILE_DIR / f"{dataset_id}_numeric.json"
    with out_path.open("w") as f:
        json.dump(out_obj, f, indent=2)

    elapsed = time.time() - start

    profiling_stats.append(
        {
            "dataset_id": dataset_id,
            "status": status,
            "error": error,
            "num_rows": num_rows,
            "num_cols": num_cols,
            "seconds": elapsed,
            "num_numeric_cols": len(numeric_cols),
        }
    )


{"ts": "2025-12-07 23:50:19.282", "level": "ERROR", "logger": "DataFrameQueryContextLogger", "msg": "[UNRESOLVED_COLUMN.WITH_SUGGESTION] A column, variable, or function parameter with name `Adj`.` FA` cannot be resolved. Did you mean one of the following? [`Adj. FA`, `Exam No`, `List No`, `MI`, `Group No`]. SQLSTATE: 42703", "context": {"file": "line 53 in cell [3]", "line": "", "fragment": "col", "errorClass": "UNRESOLVED_COLUMN.WITH_SUGGESTION"}, "exception": {"class": "Py4JJavaError", "msg": "An error occurred while calling o22723.select.\n: org.apache.spark.sql.AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column, variable, or function parameter with name `Adj`.` FA` cannot be resolved. Did you mean one of the following? [`Adj. FA`, `Exam No`, `List No`, `MI`, `Group No`]. SQLSTATE: 42703;\n'Project ['count('Adj. FA) AS count#38168, 'avg('Adj. FA) AS mean#38169, 'stddev('Adj. FA) AS stddev#38170, 'min('Adj. FA) AS min#38171, 'max('Adj. FA) AS max#38172]\n+- Relation [Exa

AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column, variable, or function parameter with name `Adj`.` FA` cannot be resolved. Did you mean one of the following? [`Adj. FA`, `Exam No`, `List No`, `MI`, `Group No`]. SQLSTATE: 42703;
'Project ['Adj. FA]
+- Relation [Exam No#37949,List No#37950,First Name#37951,MI#37952,Last Name#37953,Adj. FA#37954,List Title Code#37955,List Title Desc#37956,Group No#37957,List Agency Code#37958,List Agency Desc#37959,List Div Code#37960,Published Date#37961,Established Date#37962,Anniversary Date#37963,Extension Date#37964,Veteran Credit#37965,Parent Lgy Credit#37966,Sibling Lgy Credit#37967,Residency Credit#37968] csv


In [4]:
stats_df = pd.DataFrame(profiling_stats)
stats_df.to_csv(BASE_DIR / "profiles" / "numeric_profiling_times.csv", index=False)

index_df = pd.read_csv(INDEX_PATH)

if "num_rows" not in index_df.columns:
    index_df["num_rows"] = None
if "num_cols" not in index_df.columns:
    index_df["num_cols"] = None

rows_map = stats_df.set_index("dataset_id")["num_rows"].to_dict()
cols_map = stats_df.set_index("dataset_id")["num_cols"].to_dict()

index_df["num_rows"] = index_df["dataset_id"].map(rows_map)
index_df["num_cols"] = index_df["dataset_id"].map(cols_map)

index_df.to_csv(INDEX_PATH, index=False)

stats_df.head()


Unnamed: 0,dataset_id,status,error,num_rows,num_cols,seconds,num_numeric_cols
0,f9bf-2cp4,ok,,478,6,10.406724,0
1,x3bb-kg5j,ok,,277153,6,31.347393,5
2,zt9s-n5aj,ok,,460,6,7.507833,4
3,s3k6-pzi2,ok,,440,462,51.237556,49
4,pd5h-92mc,ok,,64,14,9.17831,8
