In [0]:
%run ./utils

In [0]:
%run ./_setup

In [0]:
from pyspark.ml.feature import (
    OneHotEncoder,
    StringIndexer,
    VectorAssembler,
)
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
import jdc
from itertools import chain
from typing import Any, Dict, List, Literal, Optional, Tuple, Union
import numpy as np
from numpy.typing import ArrayLike
from sparkdl.xgboost import XgboostRegressor
from scipy import stats, special

  hook(module)


In [0]:
ModelRegressor = Union[XgboostRegressor, RandomForestRegressor]

Out[31]: {'EdLevel': {'Primary/elementary school': 1.0,
  'Secondary school (e.g. American high school, German Realschule or Gymnasium, etc.)': 2.0,
  'Associate degree (A.A., A.S., etc.)': 3.0,
  'Some college/university study without earning a degree': 4.0,
  'Something else, Professional degree (JD, MD, etc.)': 5.0,
  'Bachelor’s degree (B.A., B.S., B.Eng., etc.)': 6.0,
  'Master’s degree (M.A., M.S., M.Eng., MBA, etc.)': 7.0,
  'Other doctoral degree (Ph.D., Ed.D., etc.)': 8.0},
 'Age1stCode': {'Younger than 5 years': 1.0,
  '5 - 10 years': 2.0,
  '11 - 17 years': 3.0,
  '18 - 24 years': 4.0,
  '25 - 34 years': 5.0,
  '35 - 44 years': 6.0,
  '45 - 54 years': 7.0,
  '55 - 64 years': 8.0,
  'Older than 64 years': 9.0},
 'OrgSize': {'Just me - I am a freelancer, sole proprietor, etc.': 1.0,
  '2 to 9 employees': 2.0,
  '10 to 19 employees': 3.0,
  '20 to 99 employees': 4.0,
  '100 to 499 employees': 5.0,
  'I don’t know': 6.0,
  '500 to 999 employees': 7.0,
  '1,000 to 4,999 employees

In [0]:
TARGET_COL = "ConvertedCompYearly"
COLS_TO_SPLIT = [
    "DevType",
    "LanguageHaveWorkedWith",
    "LanguageWantToWorkWith",
    "DatabaseHaveWorkedWith",
    "DatabaseWantToWorkWith",
    "PlatformHaveWorkedWith",
    "PlatformWantToWorkWith",
    "WebframeHaveWorkedWith",
    "WebframeWantToWorkWith",
    "MiscTechHaveWorkedWith",
    "MiscTechWantToWorkWith",
    "ToolsTechHaveWorkedWith",
    "ToolsTechWantToWorkWith",
    "NEWCollabToolsHaveWorkedWith",
    "NEWCollabToolsWantToWorkWith",
    "NEWStuck",
]

In [0]:
def sum_splitted_cols(*, df: DataFrame, cols: List[str] = COLS_TO_SPLIT) -> DataFrame:
    """The function runs on the columns that were splitted, and sum the values of each group of columns in a new column.
    :param df: A pyspark.sql.dataframe.DataFrame object.
    :return df: A pyspark.sql.dataframe.DataFrame object.
    """
    for col_name in cols:
        df = sum_cols_group_by_prefix(df=df, col_prefix=col_name)
    return df

In [0]:
def _set_df(
    *, cols_to_drop: Optional[List] = [], selected_features: bool = False
) -> DataFrame:
    """Set the dataset to run features engineering and to train on.
    :return: A pyspark.sql.dataframe.DataFrame object with pre-processed and selected data.
    """
    base_df = spark.read.parquet(
        f"s3a://{S3_PROCESS_PATH}enriched_survey.parquet"
    ).cache()
    if selected_features:
        base_df = spark.read.parquet(
            f"s3a://{S3_PROCESS_PATH}selected_features.parquet"
        ).cache()
        
    df = base_df.drop(*cols_to_drop) if cols_to_drop else base_df
    return df.select(
        [
            f.when(f.col(c) == "NA", None).otherwise(f.col(c)).alias(c)
            for c in df.columns
        ]
    )

In [0]:
def yeojohnson_transform_train(
    *, df: DataFrame, col_name: str = TARGET_COL
) -> Tuple[ArrayLike, float]:
    col_vals = np.array(df.select(col_name).collect()).reshape(-1)  # for 1-D array
    transformed, lmda = stats.yeojohnson(col_vals)
    return transformed.tolist(), lmda

In [0]:
def yeojohnson_transform_test(
    *, df: DataFrame, lmda: float, col_name: str = TARGET_COL
) -> ArrayLike:
    col_vals = np.array(df.select(col_name).collect()).reshape(-1)  # for 1-D array
    transformed = stats.yeojohnson(col_vals, lmbda=lmda)
    return transformed.tolist()

In [0]:
def yeojohnson_transform_on_train_test(
    *,
    df: DataFrame,
    lmda: Union[Literal[None], float] = None,
    col_name: str = TARGET_COL,
):
    if lmda is None:
        transformed_arr, lmda = yeojohnson_transform_train(df=df)
    else:
        transformed_arr = yeojohnson_transform_test(df=df, lmda=lmda)
    # Add the new column to the original DataFrame using a monotonically increasing ID
    transformed_df = df.repartition(1).withColumn(
        f"transformed_{col_name}",
        f.udf(lambda i: transformed_arr[i])(f.monotonically_increasing_id()).cast(
            "double"
        ),
    )
    return transformed_df, lmda

In [0]:
def get_inversed_yeojohnson_df(
    *, df: DataFrame, lmda: float, col_name: str = "prediction"
) -> DataFrame: #ArrayLike:
    col_vals = (
        np.array(df.select(col_name).collect()).reshape(-1)
    )  # for 1-D array
    inversed_vals = special.inv_boxcox(col_vals, lmda)
    col_vals_list, inversed_vals_list = col_vals.tolist(), inversed_vals.tolist()
    df = spark.createDataFrame(zip(col_vals_list, inversed_vals_list), schema=[col_name, f"inversed_{col_name}"])
    return df

In [0]:
# Apply the inverse Yeo-Johnson transformation to the column
def inverse_yeojohnson(
    *, df: DataFrame, lmda: float, col_name: str = "prediction"
) -> DataFrame:
    # Add the new column to the original DataFrame using a monotonically increasing ID
    inversed_df = get_inversed_yeojohnson_df(df=df, lmda=lmda, col_name=col_name)
    joined_df = df.join(inversed_df, col_name, "left")
    joined_df = joined_df.withColumnRenamed(
        col_name, f"transformed_{col_name}"
    ).withColumnRenamed(f"inversed_{col_name}", col_name)
    # round the column
    joined_df = joined_df.withColumn(col_name, f.round(joined_df[col_name], 0))
    return joined_df

In [0]:
def get_indexers(*, cat_cols: List[str]) -> List[StringIndexer]:
    """The function index categorical features.
    :param cat_cols: A list of categorical columns.
    :return indexers: List[StringIndexer]
    """
    indexers = [
        StringIndexer(inputCol=c, outputCol=f"{c}Index", handleInvalid="keep")
        for c in cat_cols
    ]
    return indexers

In [0]:
def get_encoders(*, indexers: List[StringIndexer]) -> List[OneHotEncoder]:
    """The function encode categorical features that were indexed.
    :param indexers: List[StringIndexer]
    :return encoders: List[OneHotEncoder]
    """
    encoders = [
        OneHotEncoder(
            inputCol=indexer.getOutputCol(),
            outputCol=f"{indexer.getOutputCol()}Encoded",
        )
        for indexer in indexers
    ]
    return encoders

In [0]:
def get_assembler(
    *, cols_dict: Dict[str, List[str]], encoders: List[OneHotEncoder]
) -> VectorAssembler:
    """The function vectorize the features.
    :param cols_dict: Dict[str, List[str]]
    :param encoders: List[OneHotEncoder]
    :return assembler: VectorAssembler
    """
    cat_cols = [encoder.getOutputCol() for encoder in encoders]
    bin_cols, num_cols = cols_dict["bin_cols"], cols_dict["num_cols"]
    featuresCols = list(chain(cat_cols, bin_cols, num_cols))
    featuresCols.remove(TARGET_COL)
    assembler = VectorAssembler(
        inputCols=featuresCols, outputCol="features", handleInvalid="keep"
    )
    return assembler

In [0]:
def get_pipe_stages(*, df: DataFrame) -> List[Any]:
    """The function builds stages for transformation and model pipeline
    :param df: A pyspark.sql.dataframe.DataFrame
    :return stages: A list of stages of features transformations to be set for the ML pipeline.
    """
    cols_dict = get_cols_by_dtypes(df=df)
    indexers = get_indexers(cat_cols=cols_dict["cat_cols"])
    encoders = get_encoders(indexers=indexers)
    assembler = get_assembler(cols_dict=cols_dict, encoders=encoders)
    stages = list(chain(indexers, encoders, [assembler]))
    return stages

In [0]:
def _set_rf_reg_param_grid(
    *,
    estimator: RandomForestRegressor,
    maxDepth: List[int] = [5],
    maxBins: List[int] = [32],
    minInstancesPerNode: List[int] = [1],
    minInfoGain: List[float] = [0.0],
    checkpointInterval: List[int] = [10],
    subsamplingRate: List[float] = [1.0],
    numTrees: List[int] = [20],
    minWeightFractionPerNode: List[float] = [0.0],
) -> Dict[str, List[Any]]:
    """Define a grid of hyperparameters to test. All parameters are set to the default parameters of sklearn.xgboost
      :param estimator: RandomForestRegressor
      :param maxDepth: The maximum depth of the tree
      :param gamma: Minimum loss reduction required to make a further partition on a leaf node of the tree. The larger gamma is, the more conservative the algorithm will be.
      :param learning_rate: Step size shrinkage used in the update to prevent overfitting.
      :param min_samples_split: The minimum number of samples required to split an internal node.
      :param n_estimators: The numbers of trees used by the algorithm.
      :param subsample: Subsample ratio of the training instances. Setting it to 0.5 means that XGBoost would randomly sample half of the training data prior to growing trees. and this will prevent overfitting. Subsampling will occur once in every boosting iteration.
      :return paramGrid: Dict[str, List[Any]]
    #"""
    paramGrid = (
        ParamGridBuilder()
        .addGrid(estimator.maxDepth, maxDepth)
        .addGrid(estimator.maxBins, maxBins)
        .addGrid(estimator.minInstancesPerNode, minInstancesPerNode)
        .addGrid(estimator.minInfoGain, minInfoGain)
        .addGrid(estimator.checkpointInterval, checkpointInterval)
        .addGrid(estimator.subsamplingRate, subsamplingRate)
        .addGrid(estimator.numTrees, numTrees)
        .addGrid(estimator.minWeightFractionPerNode, minWeightFractionPerNode)
        .build()
    )
    return paramGrid

In [0]:
def _set_xgb_reg_param_grid(
    *,
    estimator: XgboostRegressor,
    colsample_bytree: List[float] = [1.0],
    gamma: List[float] = [0.0],
    learning_rate: List[float] = [0.3],
    max_depth: List[int] = [6],
    min_child_weight=[1],
    n_estimators: List[int] = [100],
    subsample: List[float] = [1.0],
) -> Dict[str, List[Any]]:
    """Define a grid of hyperparameters to test. All parameters are set to the default parameters of sklearn.xgboost
    :param estimator: XgboostRegressor
    :param colsample_bytree: The subsample ratio of columns when constructing each tree. Subsampling occurs once for every tree constructed.
    :param gamma: Minimum loss reduction required to make a further partition on a leaf node of the tree. The larger gamma is, the more conservative the algorithm will be.
    :param learning_rate: Step size shrinkage used in the update to prevent overfitting.
    :param max_depth: The depth of each desicsion tree.
    :param min_child_weight: Minimum sum of instance weight (hessian) needed in a child.
    :param n_estimators: The numbers of trees used by the algorithm.
    :param subsample: Subsample ratio of the training instances. Setting it to 0.5 means that XGBoost would randomly sample half of the training data prior to growing trees. and this will prevent overfitting. Subsampling will occur once in every boosting iteration.
    :return paramGrid: Dict[str, List[Any]]
    """
    paramGrid = (
        ParamGridBuilder()
        .addGrid(estimator.colsample_bytree, colsample_bytree)
        .addGrid(estimator.gamma, gamma)
        .addGrid(estimator.learning_rate, learning_rate)
        .addGrid(estimator.max_depth, max_depth)
        .addGrid(estimator.min_child_weight, min_child_weight)
        .addGrid(estimator.n_estimators, n_estimators)
        .addGrid(estimator.subsample, subsample)
        .build()
    )
    return paramGrid

In [0]:
def tune_param(
    *,
    estimator: ModelRegressor,
    metric_name: str = "rmse",
    param_grid: Dict[str, List[Any]],
    numFolds: int = 5
) -> CrossValidator:
    """Tune parameters for best result of the defined evaluation metric.
    The CrossValidator compares the true labels with predicted values for each combination of parameters, and calculates this value to determine the best model.
    :param estimator: ModelRegressor
    :param metric_name: The evaluation metric for the best result check
    :param param_grid: A grid of hyperparameters to test
    :param numFolds: Number of k-fold for splitting the data
    :return cv: CrossValidator
    """
    evaluator = RegressionEvaluator(
        metricName=metric_name,
        labelCol=estimator.getLabelCol(),
        predictionCol=estimator.getPredictionCol(),
    )
    # Declare the CrossValidator, which performs the model tuning.
    cv = CrossValidator(
        estimator=estimator,
        evaluator=evaluator,
        estimatorParamMaps=param_grid,
        collectSubModels=True,
    )
    return cv