In [3]:
from sklearn.impute import SimpleImputer
import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_absolute_error

# Function for comparing different approaches
def score_dataset(X_train, X_valid, y_train, y_valid,methode='RandomForestRegressor'):
    '''return mean absolute error'''
    if methode == 'RandomForestRegressor':
        model = RandomForestRegressor(n_estimators=100, random_state=0)
        model.fit(X_train, y_train)
        preds = model.predict(X_valid)
        return mean_absolute_error(y_valid, preds)
    else:
        return 'working on it, please wait'

def impute(X_train:pd.DataFrame,X_test:pd.DataFrame)->tuple:
    '''
    填充缺失值 如 np.nan

    Univariate imputer for completing missing values with simple strategies.

    Replace missing values using a descriptive statistic (e.g. mean, median, or most frequent) along each column, or using a constant value.
    '''
    # Imputation
    my_imputer = SimpleImputer() # SimpleImputer(strategy='mean')
    imputed_X_train = pd.DataFrame(my_imputer.fit_transform(X_train))
    imputed_X_valid = pd.DataFrame(my_imputer.transform(X_test))

    # Imputation removed column names; put them back
    imputed_X_train.columns = X_train.columns
    imputed_X_valid.columns = X_test.columns
    return imputed_X_train,imputed_X_valid

def dropTypes(df:pd.DataFrame,exclude:str|list)->pd.DataFrame:
    return df.select_dtypes(exclude=exclude)



# `.fit()` || `.fit_transform()` || `.transform()`

In [4]:
# fit_transform 方法通常用于学习特征工程或数据预处理中的某些参数，并且将这些参数应用于数据集以进行转换。
# transform 方法通常用于将先前学到的参数应用于新的数据集。

from sklearn.preprocessing import StandardScaler,OrdinalEncoder,OneHotEncoder

def scaling(argScaler:str):
    # 创建一个标准化的对象
    scaler = eval(f"{argScaler}()")

    # 对训练数据使用fit_transform
    train_data = [[170, 70], [180, 80], [160, 60]]
    scaled_train_data = scaler.fit_transform(train_data)

    # 对测试数据使用transform
    test_data = [[165, 75], [175, 85]]
    scaled_test_data = scaler.transform(test_data)
    return scaled_test_data,scaled_train_data

scaling('StandardScaler')


(array([[-0.61237244,  0.61237244],
        [ 0.61237244,  1.83711731]]),
 array([[ 0.        ,  0.        ],
        [ 1.22474487,  1.22474487],
        [-1.22474487, -1.22474487]]))

In [5]:
def dropMissingCols(X:pd.DataFrame,Xtest:pd.DataFrame)->None:
    cols_with_missing = [col for col in X.columns if X[col].isnull().any()] 
    X.drop(cols_with_missing, axis=1, inplace=True)
    Xtest.drop(cols_with_missing, axis=1, inplace=True)

In [6]:
def filterUsefulColsForOrdinalEncoder(X_train,X_valid):
    # Categorical columns in the training data
    object_cols = [col for col in X_train.columns if X_train[col].dtype == "object"]

    # Columns that can be safely ordinal encoded
    good_label_cols = [col for col in object_cols if 
                    set(X_valid[col]).issubset(set(X_train[col]))]
            
    # Problematic columns that will be dropped from the dataset
    bad_label_cols = list(set(object_cols)-set(good_label_cols))
            
    print('Categorical columns that will be ordinal encoded:', good_label_cols)
    print('\nCategorical columns that will be dropped from the dataset:', bad_label_cols)


def applyOrdinalEncoder():
# Apply ordinal encoder 
    oe=OrdinalEncoder() # Your code here
    label_X_train=pd.DataFrame(oe.fit_transform(label_X_train)) # note that you should convert this into pd.DataFrame
    label_X_valid=pd.DataFrame(oe.fit_transform(label_X_valid))

In [7]:
def howManyUniqueValues():
    # count how many unique for each col
    df = pd.DataFrame({'A': [4, 5, 6], 'B': [4, 1, 1]})
    return df.nunique()

def intermediateFuncs(X_train,object_cols):
    # Get number of unique entries in each column with categorical data
    object_nunique = list(map(lambda col: X_train[col].nunique(), object_cols))
    d = dict(zip(object_cols, object_nunique))

    # Print number of unique entries by column, in ascending order
    '''按照元组第二个元素排序'''
    sorted(d.items(), key=lambda x: x[1])
    '''[('Street', 2),
 ('Utilities', 2),
 ('CentralAir', 2),
 ('LandSlope', 3),
 ('PavedDrive', 3),
 ('LotShape', 4),
 ('LandContour', 4),
 ('ExterQual', 4),
 ('KitchenQual', 4),
 ('MSZoning', 5),
 ('LotConfig', 5),
 ('BldgType', 5),
 ('ExterCond', 5),
 ('HeatingQC', 5),
 ('Condition2', 6),
 ('RoofStyle', 6),
 ('Foundation', 6),
 ('Heating', 6),
 ('Functional', 6),
 ('SaleCondition', 6),
 ('RoofMatl', 7),
 ('HouseStyle', 8),
 ('Condition1', 9),
 ('SaleType', 9),
 ('Exterior1st', 15),
 ('Exterior2nd', 16),
 ('Neighborhood', 25)]''' 

In [8]:
names=['Alice','Bob','Cindy']
list(enumerate(names))

[(0, 'Alice'), (1, 'Bob'), (2, 'Cindy')]

In [9]:
ints=[1,2,3,5,7,9,10]
power=list(map(lambda i:i**2,ints))
power

[1, 4, 9, 25, 49, 81, 100]

In [10]:
def isEven(x):
    if x%2==0:
        return True
    else:
        return False
evenNums=list(filter(isEven,ints))
evenNums

[2, 10]

In [11]:
complexList=[
    (0,'admin'),
    (777,'Fynn'),
    (77,'Fiona'),
    (888,'Finkenstr. 8')
]
sorted(complexList,key=lambda tup:tup[0])


[(0, 'admin'), (77, 'Fiona'), (777, 'Fynn'), (888, 'Finkenstr. 8')]

In [12]:
# Fill in the line below: How many categorical variables in the training data
# have cardinality greater than 10?
complexStruct=[('Street', 2),
 ('Utilities', 2),
 ('CentralAir', 2),
 ('LandSlope', 3),
 ('PavedDrive', 3),
 ('LotShape', 4),
 ('LandContour', 4),
 ('ExterQual', 4),
 ('KitchenQual', 4),
 ('MSZoning', 5),
 ('LotConfig', 5),
 ('BldgType', 5),
 ('ExterCond', 5),
 ('HeatingQC', 5),
 ('Condition2', 6),
 ('RoofStyle', 6),
 ('Foundation', 6),
 ('Heating', 6),
 ('Functional', 6),
 ('SaleCondition', 6),
 ('RoofMatl', 7),
 ('HouseStyle', 8),
 ('Condition1', 9),
 ('SaleType', 9),
 ('Exterior1st', 15),
 ('Exterior2nd', 16),
 ('Neighborhood', 25)]
# Fill in the line below: How many categorical variables in the training data
# have cardinality greater than 10?
def isGreaterThan10(tup):
    if tup[1]>10:
        return True
    else:
        return False
high_cardinality_numcols = len(list(map(lambda tup:tup[0],list(filter(isGreaterThan10,complexStruct)))))


def isNeighborhood(tup):
    if tup[0]=='Neighborhood':
        return True
    else:
        return False
# Fill in the line below: How many columns are needed to one-hot encode the 
# 'Neighborhood' variable in the training data?
num_cols_neighborhood = list(filter(isNeighborhood,complexStruct))[0][1]


In [13]:

def applyOHE(X_train,X_valid):
    # Apply one-hot encoder to each column with categorical data
    # Columns that will be one-hot encoded
    
    # Categorical columns in the training data
    object_cols = [col for col in X_train.columns if X_train[col].dtype == "object"]
    
    low_cardinality_cols = [col for col in object_cols if X_train[col].nunique() < 10]

    # Columns that will be dropped from the dataset
    high_cardinality_cols = list(set(object_cols)-set(low_cardinality_cols))

    print('Categorical columns that will be one-hot encoded:', low_cardinality_cols)
    print('\nCategorical columns that will be dropped from the dataset:', high_cardinality_cols)
    OH_encoder = OneHotEncoder(handle_unknown='ignore', sparse=False)
    OH_cols_train = pd.DataFrame(OH_encoder.fit_transform(X_train[low_cardinality_cols]))
    OH_cols_valid = pd.DataFrame(OH_encoder.transform(X_valid[low_cardinality_cols]))

    # One-hot encoding removed index; put it back
    OH_cols_train.index = X_train.index
    OH_cols_valid.index = X_valid.index

    # Remove categorical columns (will replace with one-hot encoding)
    num_X_train = X_train.drop(object_cols, axis=1)
    num_X_valid = X_valid.drop(object_cols, axis=1)

    # Add one-hot encoded columns to numerical features
    OH_X_train = pd.concat([num_X_train, OH_cols_train], axis=1)
    OH_X_valid = pd.concat([num_X_valid, OH_cols_valid], axis=1)

    # Ensure all columns have string type
    OH_X_train.columns = OH_X_train.columns.astype(str)
    OH_X_valid.columns = OH_X_valid.columns.astype(str)

# `sklearn.pipeline.Pipeline`

### Good thing about pipeline: Automatically apply(fit) models

In [14]:
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer  # handle missing data
from sklearn.preprocessing import OneHotEncoder

def preprocessingWithPipeline(numerical_cols, categorical_cols, X_train, y_train, X_valid, y_valid, n_estimators=100, random_state=42):
    # Preprocessing for numerical data
    # strategy: 'mean','median','most_frequent','constant'
    numerical_transformer = SimpleImputer(strategy='constant', fill_value=None)
    # stragegy=“constant” : replace missing values with fill_value. Can be used with strings or numeric data.

    # Preprocessing for categorical data
    categorical_transformer = Pipeline(steps=[
        # “most_frequent”: replace missing using the most frequent value along each column. Can be used with strings or numeric data. If there is more than one such value, only the smallest is returned.
        ('imputer', SimpleImputer(strategy='most_frequent')),
        ('onehot', OneHotEncoder(handle_unknown='ignore'))
    ])

    # Bundle preprocessing for numerical and categorical data
    preprocessor = ColumnTransformer(  # applies transformer to given Columns
        transformers=[
            ('num', numerical_transformer, numerical_cols),
            ('cat', categorical_transformer, categorical_cols)
        ])

    model = RandomForestRegressor(
        n_estimators=n_estimators, random_state=random_state)

    # Bundle preprocessing and modeling code in a pipeline
    my_pipeline = Pipeline(steps=[('preprocessor', preprocessor),
                                  ('model', model)
                                  ])

    # Preprocessing of training data, fit model
    my_pipeline.fit(X=X_train, y=y_train)



    # Now we have the model, we just need to plug in Xvalid and yvalid

    # Preprocessing of validation data, get predictions
    preds = my_pipeline.predict(X=X_valid)

    # Evaluate the model
    score = mean_absolute_error(y_true=y_valid, y_pred=preds)
    print('MAE:', score)
    return (
        preds,
        score
    )

In [15]:
from sklearn.model_selection import train_test_split
def standardTrainTestSplitter(Xtrain,Xtest,ytrain,ytest):
    return train_test_split(Xtrain,Xtest,ytrain,ytest)

# Cross Validation: 
## `sklearn.model_selection.cross_val_score()`

In [16]:
from sklearn.model_selection import cross_val_score

# Multiply by -1 since sklearn calculates *negative* MAE


def crossValidationScoreWithPipeLine(X, y, my_pipeline: Pipeline | None = None):
    if type(my_pipeline) != type(None):
        my_pipeline = Pipeline(steps=[('preprocessor', SimpleImputer()),
                                      ('model', RandomForestRegressor(n_estimators=50,
                                                                      random_state=0))
                                      ])
    scores = -1 * cross_val_score(my_pipeline, X, y,
                                  cv=5,
                                  scoring='neg_mean_absolute_error')

    print("MAE scores:\n", scores)