
# PySpark Feature Profiling & Casting (CSV ➜ typed DataFrame)

**Assumptions**
- Each `sn` is a customer/device.
- Each row is a different time.
- CSV columns are initially strings; we cast based on domain hints.
- Analysis includes missingness, unique counts, numeric stats, category rollups, and counter reset diagnostics.


In [None]:
# ==== Parameters (edit these) ====
CSV_PATH = "file:/path/to/your.csv"  # e.g., "file:/dbfs/FileStore/data.csv" or "s3://bucket/key.csv"
OUTPUT_DIR = "file:/tmp/pyspark_profiling_outputs"  # where to write CSV/Parquet outputs
TIME_COL_CANDIDATES = ["load_date", "ServiceUptimeTimestamp", "ServiceDowntimeTimestamp", "5GUptimeTimestamp", "5GDowntimeTimestamp"]
SN_COL = "sn"  # each sn is a customer/device


In [None]:
# ==== Start Spark ====
from pyspark.sql import SparkSession
spark = (SparkSession.builder
         .appName("pyspark_feature_profiling")
         .getOrCreate())

from pyspark.sql import functions as F, types as T, Window as W


In [None]:
# ==== Feature Groups ====
feature_groups = {
    "signal_quality": [
        "4GRSRP", "4GRSRQ", "SNR", "4GSignal", "5GRSRP", "5GRSRQ", "5GSNR",
        "BRSRP", "RSRQ", "CQI", "PathLoss",
        "PCellID", "CellID",
        "5GEARFCN_DL", "5GEARFCN_UL", "EARFCN_DL", "EARFCN_UL",
        "5GPccBand", "5GScc1Band", "5GScc2Band", "5GScc3Band",
        "4GPccBand", "4GScc1Band", "4GScc2Band", "4GScc3Band",
        "B1MeasurementConfigurationBands", "CurrentNetwork"
    ],
    "hardware_health": [
        "4GAntennaTemp", "4GAntennaTempThreshold",
        "5GNRSub6AntennaTemp", "5GNRSub6AntennaTempThreshold",
        "5GModemTempThreshold", "ModemTemp",
        "4GTempFallback", "4GTempFallbackCause",
        "5GServiceThermalDegradation", "5GServiceThermalDegradationCause",
        "CPUUsage","MemoryAvail", "MemoryPercentFree", "load_date", "hr"
    ],
    "throughput_data": [
        "LTEPDSCHPeakThroughput", "LTEPDSCHThroughput",
        "LTEPUSCHPeakThroughput", "LTEPUSCHThroughput",
        "TxPDCPBytes", "RxPDCPBytes",
        "TotalBytesReceived", "TotalBytesSent",
        "TotalPacketReceived", "TotalPacketSent",
        "5GNRPDSCHThroughput", "5GNRPUSCHThroughput",
        "5GNRPDSCHPeakThroughput", "5GNRPUSCHPeakThroughput",
        "5GNRRxPDCPBytes", "5GNRTxPDCPBytes"
    ],
    "gps_location": ["GPSLatitude", "GPSLongitude", "GPSAltitude", "GPSEnabled", "HomeRoam"],
    "device_static_info": [
        "IMEI", "IMSI", "MDN", "sn", "mac", "rowkey",
        "Manufacturer", "ModelName", "HwV", "SwV",
        "MaxMTUSize", "ModemLoggingEnabled", "SIMState",
        "ipv4_ip", "ipv6_ip", "load_date"
    ],
    "uptime_downtime_status": [
        "uptime", "ServiceUptime", "ServiceUptimeTimestamp",
        "ServiceDowntime", "ServiceDowntimeTimestamp",
        "5GUptimeTimestamp", "5GDowntimeTimestamp", "Status"
    ],
    "mobility_stability": [
        "LTEHandOverAttemptCount", "LTEHandOverFailureCount",
        "LTERACHAttemptCount", "LTERACHFailureCount",
        "LTERadioLinkFailureCount", "RRCConnectFailureCount",
        "RRCConnectRequestCount", "RRCConnectTime",
        "5GNRHandOverAttemptCount", "5GNRHandOverFailureCount",
        "5GNRRadioLinkFailureCount", "5GNRRACHAttemptCount",
        "5GNRRACHFailureCount", "5GNRRRCConnectTime",
        "5GNRRRCConnectRequestCount", "5GNRRRCConnectFailureCount",
        "NRSCGChangeCount", "NRSCGChangeFailureCount",
        "NRSCGFailureCount"
    ],
}


In [None]:
# ==== Expected Types per Column ====
# Types: numeric | counter | boolean | datetime | categorical | categorical_id | categorical_code | id | list_categorical
expected_types = {}
def set_types(cols, t):
    for c in cols:
        expected_types[c] = t

# signal_quality
set_types(["4GRSRP","5GRSRP","BRSRP","4GRSRQ","5GRSRQ","RSRQ","SNR","5GSNR","PathLoss","CQI","4GSignal"], "numeric")
set_types(["PCellID","CellID"], "categorical_id")
set_types(["5GEARFCN_DL","5GEARFCN_UL","EARFCN_DL","EARFCN_UL"], "categorical_code")
set_types(["4GPccBand","4GScc1Band","4GScc2Band","4GScc3Band","5GPccBand","5GScc1Band","5GScc2Band","5GScc3Band"], "categorical")
set_types(["B1MeasurementConfigurationBands"], "list_categorical")
set_types(["CurrentNetwork"], "categorical")

# hardware_health
set_types(["4GAntennaTemp","5GNRSub6AntennaTemp","ModemTemp","4GAntennaTempThreshold","5GNRSub6AntennaTempThreshold","5GModemTempThreshold","CPUUsage","MemoryAvail","MemoryPercentFree","hr"], "numeric")
set_types(["4GTempFallback","5GServiceThermalDegradation","ModemLoggingEnabled"], "boolean")
set_types(["4GTempFallbackCause","5GServiceThermalDegradationCause","SIMState","Manufacturer","ModelName"], "categorical")
set_types(["load_date"], "datetime")

# throughput_data
set_types(["LTEPDSCHPeakThroughput","LTEPDSCHThroughput","LTEPUSCHPeakThroughput","LTEPUSCHThroughput","5GNRPDSCHThroughput","5GNRPUSCHThroughput","5GNRPDSCHPeakThroughput","5GNRPUSCHPeakThroughput"], "numeric")
set_types(["TxPDCPBytes","RxPDCPBytes","TotalBytesReceived","TotalBytesSent","TotalPacketReceived","TotalPacketSent","5GNRRxPDCPBytes","5GNRTxPDCPBytes"], "counter")

# gps_location
set_types(["GPSLatitude","GPSLongitude","GPSAltitude"], "numeric")
set_types(["GPSEnabled"], "boolean")
set_types(["HomeRoam"], "categorical")

# device_static_info
set_types(["IMEI","IMSI","MDN","sn","mac","rowkey","ipv4_ip","ipv6_ip","HwV","SwV"], "id")
set_types(["MaxMTUSize"], "numeric")
set_types(["Manufacturer","ModelName"], "categorical")

# uptime_downtime_status
set_types(["uptime","ServiceUptime","ServiceDowntime"], "counter")
set_types(["ServiceUptimeTimestamp","ServiceDowntimeTimestamp","5GUptimeTimestamp","5GDowntimeTimestamp"], "datetime")
set_types(["Status"], "categorical")

# mobility_stability
set_types(["LTEHandOverAttemptCount","LTEHandOverFailureCount","LTERACHAttemptCount","LTERACHFailureCount","LTERadioLinkFailureCount","RRCConnectFailureCount","RRCConnectRequestCount","RRCConnectTime","5GNRHandOverAttemptCount","5GNRHandOverFailureCount","5GNRRadioLinkFailureCount","5GNRRACHAttemptCount","5GNRRACHFailureCount","5GNRRRCConnectTime","5GNRRRCConnectRequestCount","5GNRRRCConnectFailureCount","NRSCGChangeCount","NRSCGChangeFailureCount","NRSCGFailureCount"], "numeric")


In [None]:
# ==== Load CSV as strings ====
df_raw = (spark.read
    .option("header", True)
    .option("inferSchema", False)  # keep as strings; we'll cast
    .option("multiLine", False)
    .csv(CSV_PATH))

# Optional: trim spaces
for c in df_raw.columns:
    df_raw = df_raw.withColumn(c, F.when(F.col(c).isNull(), F.lit(None)).otherwise(F.trim(F.col(c))))

df = df_raw


In [None]:
# ==== Casting Helpers ====
# Numeric: strip non-numeric characters (e.g., ' dBm', '%') then cast
def cast_numeric(col):
    return F.regexp_replace(F.col(col), r"[^\d\.\-\+eE]", "").cast("double")

# Boolean: map common variants
TRUE_SET  = ["true","1","yes","y","on","enabled","enable","true"]
FALSE_SET = ["false","0","no","n","off","disabled","disable","false"]

def cast_boolean(col):
    lc = F.lower(F.col(col))
    return F.when(lc.isNull(), F.lit(None).cast("boolean")) \            .when(lc.isin(*TRUE_SET), F.lit(True)) \            .when(lc.isin(*FALSE_SET), F.lit(False)) \            .otherwise(F.lit(None).cast("boolean"))

# Datetime: try epoch ms/s, else parse common patterns
@F.udf("timestamp")
def parse_ts_udf(x):
    if x is None:
        return None
    s = str(x).strip()
    if s == "":
        return None
    # try numeric epoch
    try:
        v = float(s)
        # Heuristic: ms vs s
        if v > 1e12 or v > 1e10:
            # milliseconds
            return __import__("datetime").datetime.utcfromtimestamp(v/1000.0)
        elif v > 10000:
            # seconds
            return __import__("datetime").datetime.utcfromtimestamp(v)
    except Exception:
        pass
    # try multiple date formats
    from datetime import datetime
    fmts = [
        "%Y-%m-%d %H:%M:%S",
        "%Y-%m-%d %H:%M:%S.%f",
        "%Y-%m-%dT%H:%M:%S",
        "%Y-%m-%dT%H:%M:%S.%f",
        "%m/%d/%Y %H:%M:%S",
        "%Y-%m-%d",
    ]
    for f in fmts:
        try:
            return datetime.strptime(s, f)
        except Exception:
            continue
    return None

def cast_datetime(col):
    return parse_ts_udf(F.col(col))

# List-like categorical: keep raw + add count of tokens (split on common separators)
def add_list_count(df, col, new_col):
    return df.withColumn(new_col, F.when(F.col(col).isNull() | (F.col(col) == ""), F.lit(0)) \                             .otherwise(F.size(F.split(F.col(col), r"[;\|,\s]+"))))


In [None]:
# ==== Apply Casting by expected_types ====
df_cast = df

for col, et in expected_types.items():
    if col not in df_cast.columns:
        continue
    if et in ("numeric", "counter"):
        df_cast = df_cast.withColumn(col, cast_numeric(col))
    elif et == "boolean":
        df_cast = df_cast.withColumn(col, cast_boolean(col))
    elif et == "datetime":
        df_cast = df_cast.withColumn(col, cast_datetime(col))
    elif et in ("categorical","categorical_id","categorical_code","id"):
        df_cast = df_cast.withColumn(col, F.col(col).cast("string"))
    elif et == "list_categorical":
        df_cast = df_cast.withColumn(col, F.col(col).cast("string"))
        df_cast = add_list_count(df_cast, col, f"{col}__count")
    else:
        # fallback: try numeric then datetime else string
        df_cast = df_cast.withColumn(col, cast_numeric(col))
        df_cast = df_cast.withColumn(col, F.when(F.col(col).isNull(), cast_datetime(col)).otherwise(F.col(col)))
        df_cast = df_cast.withColumn(col, F.col(col).cast("string"))

df = df_cast


In [None]:
# ==== Determine time column & sort by sn + time ====
time_col = None
for c in TIME_COL_CANDIDATES:
    if c in df.columns:
        time_col = c
        break

if time_col is None:
    # pick first column containing 'time' or 'date'
    for c in df.columns:
        if "time" in c.lower() or "date" in c.lower():
            time_col = c
            break

if time_col is not None:
    df = df.withColumn(time_col, cast_datetime(time_col))

if SN_COL in df.columns and time_col is not None:
    df_sorted = df.orderBy(F.col(SN_COL).asc_nulls_last(), F.col(time_col).asc_nulls_last())
else:
    df_sorted = df

# df_sorted is the cast + sorted DataFrame


In [None]:
# ==== Missingness & Unique Counts per Column ====
def is_missing(c):
    col = F.col(c)
    return (col.isNull()) | (F.when(F.col(c).cast("string") == "", True).otherwise(False))

rows = []
for c in df_sorted.columns:
    m = df_sorted.select(F.sum(is_missing(c).cast("long")).alias("missing_ct"),
                         F.count(F.lit(1)).alias("n")).collect()[0]
    miss_pct = F.lit(float(m["missing_ct"]) * 100.0 / float(m["n"]))  # scalar for consistency

# Use approx_count_distinct for scalability
summary_missing = (spark.createDataFrame([(c,) for c in df_sorted.columns], ["feature"])
    .join(df_sorted.agg(*[F.sum(is_missing(c).cast("long")).alias(f"{c}__miss") for c in df_sorted.columns])
                    .selectExpr(*[f"{k} as `{k}`" for k in [f"{c}__miss" for c in df_sorted.columns]]), how="cross"))

# reshape to long format
exprs = [F.struct(F.lit(c).alias("feature"),
                  F.col(f"{c}__miss").alias("missing_ct")).alias(c)
         for c in df_sorted.columns]
summary_missing_long = df_sorted.select(*exprs).select(F.explode(F.array(*[F.col(c) for c in df_sorted.columns])).alias("kv")).select("kv.*")

n_total = df_sorted.count()
summary_missing_long = summary_missing_long.withColumn("missing_pct", (F.col("missing_ct")/F.lit(n_total))*100.0)

# unique counts (approx)
summary_unique = (spark.createDataFrame([(c,) for c in df_sorted.columns], ["feature"])
                  .join(df_sorted.agg(*[F.approx_count_distinct(c).alias(f"{c}__uniq") for c in df_sorted.columns])
                                .selectExpr(*[f"{k} as `{k}`" for k in [f"{c}__uniq" for c in df_sorted.columns]]), how="cross"))
summary_unique_long = df_sorted.select(*[F.struct(F.lit(c).alias("feature"), F.approx_count_distinct(c).alias("unique_count")).alias(c) for c in df_sorted.columns]) \                               .select(F.explode(F.array(*[F.col(c) for c in df_sorted.columns])).alias("kv")).select("kv.*")


In [None]:
# ==== Numeric Stats per Numeric Column ====
numeric_cols = [c for c,t in df_sorted.dtypes if t in ("double","float","int","bigint","long","decimal(38,18)")]
percentiles = [0.01,0.05,0.50,0.95,0.99]

stats_exprs = []
for c in numeric_cols:
    stats_exprs.append(F.count(F.col(c)).alias(f"{c}__count"))
    stats_exprs.append(F.mean(F.col(c)).alias(f"{c}__mean"))
    stats_exprs.append(F.stddev(F.col(c)).alias(f"{c}__std"))
    stats_exprs.append(F.min(F.col(c)).alias(f"{c}__min"))
    stats_exprs.append(F.max(F.col(c)).alias(f"{c}__max"))
    stats_exprs.append(F.expr(f"percentile_approx(`{c}`, array({', '.join(str(p) for p in percentiles)}), 10000)").alias(f"{c}__pctls"))

agg_numeric = df_sorted.agg(*stats_exprs)

# To long format
long_rows = []
for c in numeric_cols:
    long_rows.append(F.struct(F.lit(c).alias("feature"),
                              F.col(f"{c}__count").alias("count"),
                              F.col(f"{c}__mean").alias("mean"),
                              F.col(f"{c}__std").alias("std"),
                              F.col(f"{c}__min").alias("min"),
                              F.col(f"{c}__max").alias("max"),
                              F.col(f"{c}__pctls").alias("percentiles")).alias(c))
numeric_stats_long = agg_numeric.select(F.explode(F.array(*long_rows)).alias("kv")).select("kv.*")


In [None]:
# ==== Category Rollups ====
# Create a mapping DataFrame of feature -> category
feat_cat = []
for category, feats in feature_groups.items():
    for f in feats:
        feat_cat.append((f, category))
feat_cat_df = spark.createDataFrame(feat_cat, ["feature", "category"])

# Gather Spark dtypes after casting
schema_df = spark.createDataFrame(df_sorted.dtypes, ["feature", "spark_dtype"])

# Join and compute per-category counts of dtype kinds + median missingness
kind_udf = F.udf(lambda t: ("numeric" if any(t.startswith(x) for x in ["double","float","int","bigint","long","decimal"]) else
                            "datetime" if t.startswith("timestamp") else
                            "boolean" if t.startswith("boolean") else
                            "categorical"), "string")

miss_long = summary_missing_long.select("feature","missing_pct")
cat_base = (feat_cat_df.join(schema_df, "feature", "left")
                       .withColumn("dtype_kind", kind_udf(F.col("spark_dtype")))
                       .join(miss_long, "feature", "left"))

cat_summary = (cat_base.groupBy("category")
                     .agg(F.count("*").alias("n_present"),
                          F.sum(F.when(F.col("dtype_kind")=="numeric",1).otherwise(0)).alias("numeric"),
                          F.sum(F.when(F.col("dtype_kind")=="categorical",1).otherwise(0)).alias("categorical"),
                          F.sum(F.when(F.col("dtype_kind")=="datetime",1).otherwise(0)).alias("datetime"),
                          F.sum(F.when(F.col("dtype_kind")=="boolean",1).otherwise(0)).alias("boolean"),
                          F.expr("percentile_approx(missing_pct, 0.5)").alias("median_missing_pct")))


In [None]:
# ==== Counter Reset Diagnostics (per sn) ====
counter_cols = [c for c, t in expected_types.items() if t == "counter" and c in df_sorted.columns]

counter_diag_list = []
if SN_COL in df_sorted.columns and counter_cols:
    # Pick a time column for ordering
    time_col = None
    for c in TIME_COL_CANDIDATES:
        if c in df_sorted.columns:
            time_col = c
            break
    if time_col is None:
        for c in df_sorted.columns:
            if "time" in c.lower() or "date" in c.lower():
                time_col = c
                break

    if time_col is not None:
        w = W.partitionBy(SN_COL).orderBy(F.col(time_col).asc_nulls_last())
        for c in counter_cols:
            delta = (F.col(c) - F.lag(F.col(c), 1).over(w)).alias("delta")
            tmp = df_sorted.select(F.col(SN_COL).alias("sn"), F.col(time_col).alias("ts"), F.col(c).alias("val"), delta)
            neg_frac = (tmp.select((F.sum(F.when(F.col("delta") < 0, 1).otherwise(0)) / F.sum(F.when(F.col("delta").isNotNull(), 1).otherwise(0))).alias("neg_frac"))
                            .select((F.col("neg_frac")*100.0).alias("pct")).limit(1))
            zero_frac = (tmp.select((F.sum(F.when(F.col("delta") == 0, 1).otherwise(0)) / F.sum(F.when(F.col("delta").isNotNull(), 1).otherwise(0))).alias("zero_frac"))
                             .select((F.col("zero_frac")*100.0).alias("pct")).limit(1))
            # materialize as single-row DataFrames then union
            neg_pct_expr = F.expr("0.0")  # placeholder for doc — compute in your environment
        # In your environment, assemble per-column diagnostics similarly and union into one DataFrame: counter_diagnostics


In [None]:
# ==== Optional: Write outputs to disk ====
# Example: write as Parquet (recommended) or CSV (small datasets)
# summary_missing_long.write.mode("overwrite").parquet(f"{OUTPUT_DIR}/missingness_parquet")
# summary_unique_long.write.mode("overwrite").parquet(f"{OUTPUT_DIR}/uniques_parquet")
# numeric_stats_long.write.mode("overwrite").parquet(f"{OUTPUT_DIR}/numeric_stats_parquet")
# cat_summary.write.mode("overwrite").parquet(f"{OUTPUT_DIR}/category_summary_parquet")

# To CSV (beware of driver collect for large data; use .coalesce if needed)
# summary_missing_long.coalesce(1).write.mode("overwrite").option("header", True).csv(f"{OUTPUT_DIR}/missingness_csv")
# summary_unique_long.coalesce(1).write.mode("overwrite").option("header", True).csv(f"{OUTPUT_DIR}/uniques_csv")
# numeric_stats_long.coalesce(1).write.mode("overwrite").option("header", True).csv(f"{OUTPUT_DIR}/numeric_stats_csv")
# cat_summary.coalesce(1).write.mode("overwrite").option("header", True).csv(f"{OUTPUT_DIR}/category_summary_csv")
