In [1]:
import findspark
from common.const import DATASET, FILEPATH, STAGING_FILENAME
from common.utils import change_case
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import (DecisionTreeRegressor, FMRegressor,
                                   GBTRegressor, LinearRegression,
                                   RandomForestRegressor)
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.mllib.evaluation import RegressionMetrics
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, monotonically_increasing_id
from pyspark.sql.types import DoubleType, StringType, StructField, StructType

In [2]:
findspark.init("/home/ubuntu/spark-3.2.1-bin-hadoop2.7")
spark = SparkSession.builder.appName("basics").getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/23 18:15:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/05/23 18:15:32 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)

In [4]:
df = spark.read.parquet(f"{FILEPATH.TEMP_STAGING_PATH}/{STAGING_FILENAME.DP}.parquet")
df.show()

24/05/23 18:15:55 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+---------------+---------------+---------------+-----------------+---------------+---------------+-----------------+----------------------+---------------------+------------------+------------------+------------------+-----------------------------+---------+---+----+---------------------+---------------+---------+------------+---------+------------+-----------------------------+--------------+--------------------+---------------------------+----------------------+----------------------------+----------------------------+
|carbon_emission|recycling_metal|recycling_glass|recycling_plastic|recycling_paper|recycling_count|cooking_with_oven|cooking_with_microwave|cooking_with_airfryer|cooking_with_grill|cooking_with_stove|cooking_with_count|monthly_grocery_bill_quantile|body_type|sex|diet|heating_energy_source|social_activity|recycling|cooking_with|transport|vehicle_type|frequency_of_traveling_by_air|waste_bag_size|monthly_grocery_bill|vehicle_monthly_distance_km|waste_bag_weekly_count|how

In [5]:
feature_columns = df.columns.copy()
feature_columns.remove(change_case(DATASET.TARGET))
vec_assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
vec_df = df.alias("vec_df")
vec_df = vec_assembler.transform(vec_df)
vec_df.show()
vec_df = vec_df.withColumnRenamed(change_case(DATASET.TARGET), "label")

                                                                                

+---------------+---------------+---------------+-----------------+---------------+---------------+-----------------+----------------------+---------------------+------------------+------------------+------------------+-----------------------------+---------+---+----+---------------------+---------------+---------+------------+---------+------------+-----------------------------+--------------+--------------------+---------------------------+----------------------+----------------------------+----------------------------+--------------------+
|carbon_emission|recycling_metal|recycling_glass|recycling_plastic|recycling_paper|recycling_count|cooking_with_oven|cooking_with_microwave|cooking_with_airfryer|cooking_with_grill|cooking_with_stove|cooking_with_count|monthly_grocery_bill_quantile|body_type|sex|diet|heating_energy_source|social_activity|recycling|cooking_with|transport|vehicle_type|frequency_of_traveling_by_air|waste_bag_size|monthly_grocery_bill|vehicle_monthly_distance_km|waste

In [6]:
ml_models = {
    "dt": DecisionTreeRegressor,
    "gbt": GBTRegressor,
    "rf": RandomForestRegressor,
}
feature_importances = {}
for model_name, ml_model in ml_models.items():
    model = ml_model(featuresCol="features", labelCol="label")
    grid = (
        ParamGridBuilder()
        .addGrid(model.maxDepth, [4, 5, 6])
        .addGrid(model.maxBins, [16, 32])
        .build()
    )
    cv = CrossValidator(
        estimator=model,
        estimatorParamMaps=grid,
        evaluator=RegressionEvaluator(),
        parallelism=2,
    )
    model = cv.fit(vec_df)
    feature_importances[f"{model_name}_importance"] = model.bestModel.featureImportances

24/05/23 18:16:12 WARN BlockManager: Block rdd_14_0 already exists on this machine; not re-adding it
24/05/23 18:16:55 WARN BlockManager: Block rdd_526_0 already exists on this machine; not re-adding it
24/05/23 18:17:42 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/05/23 18:17:42 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
24/05/23 18:18:47 WARN BlockManager: Block rdd_3637_0 already exists on this machine; not re-adding it
                                                                                

In [7]:
feature_importances_list = [
    (
        column_name,
        float(descision_tree_importance),
        float(gradient_boosted_tree),
        float(random_forest_importance),
    )
    for column_name, descision_tree_importance, gradient_boosted_tree, random_forest_importance in zip(
        feature_columns,
        feature_importances["dt_importance"],
        feature_importances["gbt_importance"],
        feature_importances["rf_importance"],
    )
]

fields = [
    StructField(f"{model_name}_importance", DoubleType(), True)
    for model_name in ml_models.keys()
]

fields.insert(0, StructField("feature", StringType(), True))
schema = StructType(fields)

feature_importances_df = spark.createDataFrame(feature_importances_list, schema=schema)

In [8]:
feature_importances_df.sort("dt_importance", ascending=False).show(
    len(feature_importances_list), truncate=False
)

[Stage 4427:>                                                       (0 + 2) / 2]

+-----------------------------+---------------------+---------------------+---------------------+
|feature                      |dt_importance        |gbt_importance       |rf_importance        |
+-----------------------------+---------------------+---------------------+---------------------+
|vehicle_type                 |0.5519874209196011   |0.28488889276095086  |0.3656340514304588   |
|frequency_of_traveling_by_air|0.3425347598272173   |0.15695681562999367  |0.2939953059957521   |
|body_type                    |0.059793460341281746 |0.05393755279005459  |0.046813510636606354 |
|sex                          |0.02420603804804404  |0.06562460487686103  |0.02927483200139387  |
|heating_energy_source        |0.014114709095020717 |0.11869522112815555  |0.018472123127870593 |
|waste_bag_size               |0.005608222059463289 |0.11186623972209228  |0.013946747859738212 |
|recycling                    |0.0017553897093718345|0.09667491896020602  |0.01887383630898774  |
|diet               

                                                                                

In [9]:
feature_importances_df.sort("gbt_importance", ascending=False).show(
    len(feature_importances_list), truncate=False
)

+-----------------------------+---------------------+---------------------+---------------------+
|feature                      |dt_importance        |gbt_importance       |rf_importance        |
+-----------------------------+---------------------+---------------------+---------------------+
|vehicle_type                 |0.5519874209196011   |0.28488889276095086  |0.3656340514304588   |
|frequency_of_traveling_by_air|0.3425347598272173   |0.15695681562999367  |0.2939953059957521   |
|heating_energy_source        |0.014114709095020717 |0.11869522112815555  |0.018472123127870593 |
|waste_bag_size               |0.005608222059463289 |0.11186623972209228  |0.013946747859738212 |
|recycling                    |0.0017553897093718345|0.09667491896020602  |0.01887383630898774  |
|sex                          |0.02420603804804404  |0.06562460487686103  |0.02927483200139387  |
|body_type                    |0.059793460341281746 |0.05393755279005459  |0.046813510636606354 |
|cooking_with       

In [10]:
feature_importances_df.sort("rf_importance", ascending=False).show(
    len(feature_importances_list), truncate=False
)

+-----------------------------+---------------------+---------------------+---------------------+
|feature                      |dt_importance        |gbt_importance       |rf_importance        |
+-----------------------------+---------------------+---------------------+---------------------+
|vehicle_type                 |0.5519874209196011   |0.28488889276095086  |0.3656340514304588   |
|frequency_of_traveling_by_air|0.3425347598272173   |0.15695681562999367  |0.2939953059957521   |
|transport                    |0.0                  |0.0018834843169304736|0.18208783306616808  |
|body_type                    |0.059793460341281746 |0.05393755279005459  |0.046813510636606354 |
|sex                          |0.02420603804804404  |0.06562460487686103  |0.02927483200139387  |
|recycling                    |0.0017553897093718345|0.09667491896020602  |0.01887383630898774  |
|heating_energy_source        |0.014114709095020717 |0.11869522112815555  |0.018472123127870593 |
|cooking_with       

In [11]:
ranked_feature_importances_df = feature_importances_df.sort(
    [col("dt_importance")], ascending=False
).withColumn("dt_rank", monotonically_increasing_id())
ranked_feature_importances_df = ranked_feature_importances_df.sort(
    [col("gbt_importance")], ascending=False
).withColumn("gbt_rank", monotonically_increasing_id())
ranked_feature_importances_df = ranked_feature_importances_df.sort(
    [col("rf_importance")], ascending=False
).withColumn("rf_rank", monotonically_increasing_id())
ranked_feature_importances_df = ranked_feature_importances_df.withColumn(
    "rank", col("dt_rank") + col("gbt_rank") + col("rf_rank")
)
ranked_feature_importances_df.sort("rank").show(
    len(feature_importances_list), truncate=False
)

+-----------------------------+---------------------+---------------------+---------------------+-------+--------+-------+----+
|feature                      |dt_importance        |gbt_importance       |rf_importance        |dt_rank|gbt_rank|rf_rank|rank|
+-----------------------------+---------------------+---------------------+---------------------+-------+--------+-------+----+
|vehicle_type                 |0.5519874209196011   |0.28488889276095086  |0.3656340514304588   |0      |0       |0      |0   |
|frequency_of_traveling_by_air|0.3425347598272173   |0.15695681562999367  |0.2939953059957521   |1      |1       |1      |3   |
|body_type                    |0.059793460341281746 |0.05393755279005459  |0.046813510636606354 |2      |6       |3      |11  |
|heating_energy_source        |0.014114709095020717 |0.11869522112815555  |0.018472123127870593 |4      |2       |6      |12  |
|sex                          |0.02420603804804404  |0.06562460487686103  |0.02927483200139387  |3      

In [12]:
importance_columns = [
    "dt_importance",
    "gbt_importance",
    "rf_importance",
]
for importance_columns in importance_columns:
    tranformed_df = df.select(
        [
            "carbon_emission",
            *[
                row.feature
                for row in feature_importances_df.where(col(importance_columns) > 0)
                .select("feature")
                .collect()
            ],
        ]
    )
    tranformed_df.write.mode("overwrite").parquet(
        f"{FILEPATH.TEMP_STAGING_PATH}/{STAGING_FILENAME.DMA}_{importance_columns}.parquet"
    )

                                                                                

In [13]:
df = spark.read.parquet(
    f"{FILEPATH.TEMP_STAGING_PATH}/{STAGING_FILENAME.DT}_gbt_importance.parquet"
)
df.show()

+---------------+---------------+------------------+-----------------------------+---------+---+----+---------------------+---------------+---------+------------+---------+------------+-----------------------------+--------------+--------------------+---------------------------+----------------------+----------------------------+----------------------------+
|carbon_emission|recycling_count|cooking_with_count|monthly_grocery_bill_quantile|body_type|sex|diet|heating_energy_source|social_activity|recycling|cooking_with|transport|vehicle_type|frequency_of_traveling_by_air|waste_bag_size|monthly_grocery_bill|vehicle_monthly_distance_km|waste_bag_weekly_count|how_many_new_clothes_monthly|how_long_internet_daily_hour|
+---------------+---------------+------------------+-----------------------------+---------+---+----+---------------------+---------------+---------+------------+---------+------------+-----------------------------+--------------+--------------------+---------------------------

In [14]:
feature_columns = df.columns.copy()
feature_columns.remove(change_case(DATASET.TARGET))
vec_assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
vec_df = vec_assembler.transform(df)
vec_df.show()
vec_df = vec_df.withColumnRenamed(change_case(DATASET.TARGET), "label")

+---------------+---------------+------------------+-----------------------------+---------+---+----+---------------------+---------------+---------+------------+---------+------------+-----------------------------+--------------+--------------------+---------------------------+----------------------+----------------------------+----------------------------+--------------------+
|carbon_emission|recycling_count|cooking_with_count|monthly_grocery_bill_quantile|body_type|sex|diet|heating_energy_source|social_activity|recycling|cooking_with|transport|vehicle_type|frequency_of_traveling_by_air|waste_bag_size|monthly_grocery_bill|vehicle_monthly_distance_km|waste_bag_weekly_count|how_many_new_clothes_monthly|how_long_internet_daily_hour|            features|
+---------------+---------------+------------------+-----------------------------+---------+---+----+---------------------+---------------+---------+------------+---------+------------+-----------------------------+--------------+------

In [15]:
train, test = vec_df.randomSplit([0.8, 0.2])

In [16]:
linear = LinearRegression(featuresCol="features")
fm = FMRegressor(featuresCol="features")
ran_for = RandomForestRegressor(featuresCol="features")
ml_models = {
    "linear": linear,
    "fm": fm,
    "random_forest": ran_for,
}
param_grid = {
    "linear": ParamGridBuilder()
    .addGrid(linear.regParam, [0, 0.1, 0.01])
    .addGrid(linear.tol, [1e-05, 1e-06])
    .addGrid(linear.loss, ["squaredError", "huber"])
    .build(),
    "fm": ParamGridBuilder()
    .addGrid(fm.regParam, [0, 0.1, 0.01])
    .addGrid(fm.tol, [1e-05, 1e-06])
    .addGrid(fm.factorSize, [7, 8, 9])
    .build(),
    "random_forest": ParamGridBuilder()
    .addGrid(ran_for.maxDepth, [4, 5, 6])
    .addGrid(ran_for.maxBins, [16, 32, 64])
    .build(),
}
best_models = {}
for model_name, ml_model in ml_models.items():
    cv = CrossValidator(
        estimator=ml_model,
        estimatorParamMaps=param_grid[model_name],
        evaluator=RegressionEvaluator(),
        parallelism=2,
        numFolds=2,
    )
    model = cv.fit(train)
    best_models[model_name] = {
        "model": model.bestModel,
    }

24/05/23 18:22:06 WARN Instrumentation: [68a70184] regParam is zero, which might cause numerical instability and overfitting.
24/05/23 18:22:07 WARN InstanceBuilder$NativeLAPACK: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
24/05/23 18:22:08 WARN Instrumentation: [54577418] regParam is zero, which might cause numerical instability and overfitting.
24/05/23 18:22:09 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
24/05/23 18:22:09 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
24/05/23 18:22:11 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
24/05/23 18:22:11 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK
24/05/23 18:22:20 WARN Instrumentation: [c5a4e989] regParam is zero, which might cause numerical instability and overfitting.
24/05/23 18:22:22 WARN Instrumentation: [85ffb90a] regParam is zero,

In [17]:
test.printSchema()

root
 |-- label: integer (nullable = true)
 |-- recycling_count: integer (nullable = true)
 |-- cooking_with_count: integer (nullable = true)
 |-- monthly_grocery_bill_quantile: integer (nullable = true)
 |-- body_type: double (nullable = true)
 |-- sex: double (nullable = true)
 |-- diet: double (nullable = true)
 |-- heating_energy_source: double (nullable = true)
 |-- social_activity: double (nullable = true)
 |-- recycling: double (nullable = true)
 |-- cooking_with: double (nullable = true)
 |-- transport: double (nullable = true)
 |-- vehicle_type: double (nullable = true)
 |-- frequency_of_traveling_by_air: double (nullable = true)
 |-- waste_bag_size: double (nullable = true)
 |-- monthly_grocery_bill: double (nullable = true)
 |-- vehicle_monthly_distance_km: double (nullable = true)
 |-- waste_bag_weekly_count: double (nullable = true)
 |-- how_many_new_clothes_monthly: double (nullable = true)
 |-- how_long_internet_daily_hour: double (nullable = true)
 |-- features: vector 

In [21]:
for model_name, value in best_models.items():
    y_pred = value["model"].transform(test)
    valuesAndPreds = y_pred.select(["label", "prediction"])
    valuesAndPreds = valuesAndPreds.withColumn("label", col("label").cast(DoubleType()))
    valuesAndPreds = valuesAndPreds.rdd.map(tuple)
    metrics = RegressionMetrics(valuesAndPreds)
    best_models[model_name]["score"] = {}
    best_models[model_name]["score"]["r2"] = metrics.r2
    best_models[model_name]["score"]["mse"] = metrics.meanSquaredError
    best_models[model_name]["score"]["rmse"] = metrics.rootMeanSquaredError
    best_models[model_name]["score"]["mae"] = metrics.meanAbsoluteError

In [22]:
for model_name, data in best_models.items():
    print(model_name)
    for key, value in data.items():
        if key == "model":
            print(f"\t{key}: {value.explainParams()}")
        else:
            print(f"\t{key}:")
            for score_name, score_value in value.items():
                print(f"\t\t{score_name}: {score_value}")

linear
	model: aggregationDepth: suggested depth for treeAggregate (>= 2). (default: 2)
elasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty. (default: 0.0)
epsilon: The shape parameter to control the amount of robustness. Must be > 1.0. Only valid when loss is huber (default: 1.35)
featuresCol: features column name. (default: features, current: features)
fitIntercept: whether to fit an intercept term. (default: True)
labelCol: label column name. (default: label)
loss: The loss function to be optimized. Supported options: squaredError, huber. (default: squaredError, current: squaredError)
maxBlockSizeInMB: maximum memory in MB for stacking input data into blocks. Data is stacked within partitions. If more than remaining data size in a partition then it is adjusted to the data size. Default 0.0 represents choosing optimal value, depends on specific algorithm. Must be >= 0. (default: 0.0)
maxIte