In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import Imputer
from pyspark.ml.feature import VectorAssembler

class RandomForestImputer:
    def __init__(self, featuresCols, labelCol, predictionCol="prediction"):
        self.featuresCols = featuresCols
        self.labelCol = labelCol
        self.predictionCol = predictionCol

    def fit(self, dataset):
        imputer = Imputer(inputCols=self.featuresCols, outputCols=[f"{c}_imputed" for c in self.featuresCols])
        assembler = VectorAssembler(inputCols=[f"{c}_imputed" for c in self.featuresCols], outputCol="features")
        classifier = RandomForestClassifier(labelCol=self.labelCol, featuresCol="features", predictionCol=self.predictionCol)

        self.pipeline = Pipeline(stages=[imputer, assembler, classifier])
        self.model = self.pipeline.fit(dataset)
        return self

    def transform(self, dataset):
        return self.model.transform(dataset)


In [None]:
imputer = RandomForestImputer(featuresCols=["col1", "col2", "col3"], labelCol="label")
imputer_model = imputer.fit(dataset)
imputed_dataset = imputer_model.transform(dataset)


In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import Imputer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import OneHotEncoder, StringIndexer

class RandomForestImputer:
    def __init__(self, featuresCols, labelCol, predictionCol="prediction"):
        self.featuresCols = featuresCols
        self.labelCol = labelCol
        self.predictionCol = predictionCol

    def fit(self, dataset):
        stages = []
        
        # One-hot encode categorical features
        categorical_features = []
        for col in self.featuresCols:
            if dataset.schema[col].dataType == "string":
                categorical_features.append(col)
                indexer = StringIndexer(inputCol=col, outputCol=f"{col}_index")
                encoder = OneHotEncoder(inputCol=f"{col}_index", outputCol=f"{col}_vec")
                stages.extend([indexer, encoder])
        
        # Impute missing values
        imputer_cols = [col for col in self.featuresCols if col not in categorical_features]
        imputer_cols = [f"{c}_vec" if c in categorical_features else c for c in imputer_cols]
        imputer = Imputer(inputCols=imputer_cols, outputCols=[f"{c}_imputed" for c in imputer_cols])
        stages.append(imputer)
        
        # Assemble features into a single vector
        assembler = VectorAssembler(inputCols=[f"{c}_imputed" if c not in categorical_features else f"{c}_vec" for c in self.featuresCols], outputCol="features")
        stages.append(assembler)
        
        # Train Random Forest model
        classifier = RandomForestClassifier(labelCol=self.labelCol, featuresCol="features", predictionCol=self.predictionCol)
        stages.append(classifier)

        self.pipeline = Pipeline(stages=stages)
        self.model = self.pipeline.fit(dataset)
        return self

    def transform(self, dataset):
        return self.model.transform(dataset)


In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import Imputer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.ml.regression import RandomForestRegressor

class RandomForestImputer:
    def __init__(self, featuresCols, labelCol, predictionCol="prediction"):
        self.featuresCols = featuresCols
        self.labelCol = labelCol
        self.predictionCol = predictionCol

    def fit(self, dataset):
        stages = []
        
        # One-hot encode categorical features
        categorical_features = []
        for col in self.featuresCols:
            if dataset.schema[col].dataType == "string":
                categorical_features.append(col)
                indexer = StringIndexer(inputCol=col, outputCol=f"{col}_index")
                encoder = OneHotEncoder(inputCol=f"{col}_index", outputCol=f"{col}_vec")
                stages.extend([indexer, encoder])
        
        # Train Random Forest model
        assembler = VectorAssembler(inputCols=[f"{c}_vec" if c in categorical_features else c for c in self.featuresCols], outputCol="features")
        stages.append(assembler)
        regressor = RandomForestClassifier(labelCol=self.labelCol, featuresCol="features", predictionCol=self.predictionCol)
        stages.append(regressor)

        self.pipeline = Pipeline(stages=stages)
        self.model = self.pipeline.fit(dataset)
        return self

    def transform(self, dataset):
        dataset = self.model.transform(dataset)
        for col in self.featuresCols:
            if dataset.schema[col].nullable:
                dataset = dataset.na.fill({col: dataset.agg({self.predictionCol: "mean"}).first()[0]})
        return dataset


In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import Imputer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.ml.regression import RandomForestRegressor

class RandomForestImputer:
    def __init__(self, featuresCols, labelCol, predictionCol="prediction"):
        self.featuresCols = featuresCols
        self.labelCol = labelCol
        self.predictionCol = predictionCol

    def fit(self, dataset):
        stages = []
        
        # One-hot encode categorical features
        categorical_features = []
        for col in self.featuresCols:
            if dataset.schema[col].dataType == "string":
                categorical_features.append(col)
                indexer = StringIndexer(inputCol=col, outputCol=f"{col}_index")
                encoder = OneHotEncoder(inputCol=f"{col}_index", outputCol=f"{col}_vec")
                stages.extend([indexer, encoder])
        
        # Train Random Forest model
        assembler = VectorAssembler(inputCols=[f"{c}_vec" if c in categorical_features else c for c in self.featuresCols], outputCol="features")
        stages.append(assembler)
        regressor = RandomForestRegressor(labelCol=self.labelCol, featuresCol="features", predictionCol=self.predictionCol)
        stages.append(regressor)

        self.pipeline = Pipeline(stages=stages)
        self.model = self.pipeline.fit(dataset)
        return self

    def transform(self, dataset):
        dataset = self.model.transform(dataset)
        for col in self.featuresCols:
            if dataset.schema[col].nullable:
                if dataset.schema[col].dataType == "double":
                    mean = dataset.agg({col: "mean"}).first()[0]
                    dataset = dataset.na.fill({col: mean})
                elif dataset.schema[col].dataType == "string":
                    most_frequent = dataset.groupBy(col).count().sort("count", ascending=False).first()[0]
                    dataset = dataset.na.fill({col: most_frequent})
        return dataset


In [None]:
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml import Pipeline
from pyspark.sql.functions import isnan, when, count, avg

class RandomForestImputer:
    def __init__(self, categorical_cols, continuous_cols, label_col):
        self.categorical_cols = categorical_cols
        self.continuous_cols = continuous_cols
        self.label_col = label_col

    def fit(self, dataset):
        self.string_indexers = [StringIndexer(inputCol=c, outputCol=f"{c}_index")
                                for c in self.categorical_cols]
        self.encoder = OneHotEncoderEstimator(inputCols=[f"{c}_index" for c in self.categorical_cols],
                                              outputCols=[f"{c}_encoded" for c in self.categorical_cols])
        self.assembler = VectorAssembler(inputCols=[f"{c}_encoded" for c in self.categorical_cols] + self.continuous_cols,
                                         outputCol="features")
        self.regressor = RandomForestRegressor(featuresCol="features", labelCol=self.label_col)

        self.pipeline = Pipeline(stages=self.string_indexers + [self.encoder, self.assembler, self.regressor])
        self.model = self.pipeline.fit(dataset)

    def transform(self, dataset):
        self.fitted_dataset = self.model.transform(dataset)

        def fill_missing_value(row):
            for c in self.categorical_cols + self.continuous_cols:
                if row[c] is None or row[c] == "":
                    row_features = row[["features"] + self.categorical_cols + self.continuous_cols]
                    filled_value = self.model.transform(row_features.fillna(""))
                    return row.withColumn(c, filled_value.select(self.label_col).first()[0])
            return row

        self.filled_dataset = self.fitted_dataset.rdd.map(fill_missing_value).toDF()
        return self.filled_dataset


In [None]:
class ImputeMissingValuesWithRandomForest:
    def __init__(self, categorical_features, numerical_features):
        self.categorical_features = categorical_features
        self.numerical_features = numerical_features
        self.missing_values_dict = {}
    
    def fit(self, dataframe):
        # 对连续型特征进行编码
        for feature in self.numerical_features:
            imputer = SimpleImputer(strategy='mean')
            dataframe[feature] = imputer.fit_transform(dataframe[[feature]])
        # 对离散型特征进行编码
        for feature in self.categorical_features:
            dataframe[feature] = dataframe[feature].astype('category')
            dataframe[feature].fillna(dataframe[feature].mode()[0], inplace=True)
        
        # 对缺失值进行记录
        for col in dataframe.columns:
            missing_values_index = dataframe[col].index[dataframe[col].apply(np.isnan)]
            for i in missing_values_index:
                if i not in self.missing_values_dict:
                    self.missing_values_dict[i] = {}
                self.missing_values_dict[i][col] = np.nan
        
        # 进行缺失值填充
        filled_dataframe = dataframe.fillna(-999)
        # 建立随机森林模型并训练
        X = filled_dataframe.drop(columns='target')
        y = filled_dataframe['target']
        model = RandomForestClassifier()
        model.fit(X, y)
        
        # 预测缺失值
        for i, row in dataframe.iterrows():
            if i not in self.missing_values_dict:
                continue
            for col in self.missing_values_dict[i]:
                dataframe.at[i, col] = model.predict(row.drop(col))[0]
        
        return self