In [0]:
from pyspark.ml.functions import vector_to_array as toArray
from pyspark.ml.feature   import VectorAssembler
from pyspark.ml.feature   import StringIndexer
from pyspark.ml.feature   import OneHotEncoder
from pyspark.ml.feature   import RobustScaler
from pyspark.ml.feature   import Bucketizer
from pyspark.ml.feature   import Imputer

from pyspark.sql          import functions as F
from pyspark.sql          import Window as W
from pyspark.sql          import types as T

In [0]:
recipe    = "recipe-A"; projectPath = "/mnt/data-dev/project-adult-income/"
dsPath    = projectPath + "dataset.csv";
targetCol = "class"; naTokens = ["?"]

In [0]:
df = spark.read.option("inferSchema", True).option("header", True).csv(dsPath)

featureCols = [(col[0], col[1]) for col in df.dtypes if col[0] != targetCol]
featureCols = [F.col(col[0]).alias(f"x{idx}NUM") if col[1] != "string" 
               else F.col(col[0]).alias(f"x{idx}CAT") for idx, col 
               in enumerate(featureCols)]

df = df.select(F.col(targetCol).alias("y"), *featureCols)
numCols = [col for col in df.columns if "NUM" in col]
catCols = [col for col in df.columns if "CAT" in col]

In [0]:
for col in catCols + ["y"]: df = df.withColumn(col, F.trim(F.col(col)))

for col in df.columns:
    if len(naTokens) != 0:
        expr = F.when(F.col(col).isin(naTokens), F.lit(None))
        expr = expr.otherwise(F.col(col))
        df   = df.withColumn(col, expr)

for col in catCols + ["y"]:
    expr     = F.when(F.col(col).isNull(), F.lit(None)).otherwise(F.col(f"{col}x"))
    indexer  = StringIndexer().setInputCol(col).setOutputCol(f"{col}x")
    df       = indexer.setHandleInvalid("keep").fit(df).transform(df)
    df       = df.withColumn(f"{col}x", F.col(f"{col}x").cast("int"))
    df       = df.withColumn(f"{col}x", expr).drop(col)
    df       = df.withColumnRenamed(f"{col}x", col)

In [0]:
numClass    = df.select("y").dropDuplicates().count();  validProp = 0.2
valid       = df.sampleBy("y", {c: validProp for c in range(numClass)})
train       = df.subtract(valid).cache(); train.count()
valid       = valid.cache(); valid.count()

inputCols   = numCols
outputCols  = [f"{col}x" for col in inputCols]
outterCols  = [col for col in df.columns if col not in inputCols]
colNameMap  = [F.col(c1).alias(c2) for c1, c2 in zip(outputCols, inputCols)]
imputer     = Imputer().setInputCols(inputCols).setOutputCols(outputCols)
imputer     = imputer.setStrategy("median").fit(train)
train       = imputer.transform(train).select(*outterCols, *colNameMap)
valid       = imputer.transform(valid).select(*outterCols, *colNameMap)

inputCols   = catCols
outputCols  = [f"{col}x" for col in inputCols]
outterCols  = [col for col in df.columns if col not in inputCols]
colNameMap  = [F.col(c1).alias(c2) for c1, c2 in zip(outputCols, inputCols)]
imputer     = Imputer().setInputCols(inputCols).setOutputCols(outputCols)
imputer     = imputer.setStrategy("mode").fit(train)
train       = imputer.transform(train).select(*outterCols, *colNameMap)
valid       = imputer.transform(valid).select(*outterCols, *colNameMap)

inputCols   = catCols
outputCols  = [f"{col}x" for col in inputCols]
outterCols  = [col for col in df.columns if col not in inputCols]
colNameMap  = [F.col(c1).alias(c2) for c1, c2 in zip(outputCols, inputCols)]
encoder     = OneHotEncoder().setInputCols(inputCols).setOutputCols(outputCols)
encoder     = encoder.setHandleInvalid("keep").fit(train)
train       = encoder.transform(train).select(*outterCols, *colNameMap)
for col in inputCols: train = train.withColumn(col, toArray(col))
train       = train.withColumn("catFeatures", F.concat(*inputCols))
train       = train.drop(*inputCols)
valid       = encoder.transform(valid).select(*outterCols, *colNameMap)
for col in inputCols: valid = valid.withColumn(col, toArray(col))
valid       = valid.withColumn("catFeatures", F.concat(*inputCols))
valid       = valid.drop(*inputCols)

inputCols   = numCols
assembler   = VectorAssembler().setInputCols(inputCols).setOutputCol("numFeatures")
train       = assembler.transform(train).drop(*inputCols)
valid       = assembler.transform(valid).drop(*inputCols)

inputCol    = "numFeatures"
scaler      = RobustScaler(withCentering=True).setInputCol(inputCol)
scaler      = scaler.setOutputCol(f"{inputCol}x").fit(train)
train       = scaler.transform(train).drop(inputCol)
train       = train.withColumnRenamed(f"{inputCol}x", inputCol)
train       = train.withColumn(inputCol, toArray(inputCol))
valid       = scaler.transform(valid).drop(inputCol)
valid       = valid.withColumnRenamed(f"{inputCol}x", inputCol)
valid       = valid.withColumn(inputCol, toArray(inputCol))

In [0]:
dataset     = train  .withColumn("valid", F.lit(0)).union(valid.withColumn("valid", F.lit(1)))
dataset     = dataset.withColumn("x", F.concat("numFeatures", "catFeatures"))
dataset     = dataset.withColumn("x", F.col("x").cast(T.ArrayType(T.DecimalType(19, 4))))
dataset     = dataset.withColumn("x", F.col("x").cast(T.ArrayType(T.FloatType())))

dataset.drop("numFeatures", "catFeatures").coalesce(1).write.mode("overwrite") \
.parquet(projectPath + recipe + "/train/dataset")