# Decision Tree Regressor with Pyspark low-level API

### Importing necessary packages

In [1]:
from pyspark import SparkContext, SparkConf, RDD
from pyspark.statcounter import StatCounter
from datetime import datetime
import math

## 1. Preparation

(Optional) Set the memory usage configurations for Pyspark session:

In [2]:
#Set the config for spark to boost performance
config = SparkConf()\
            .set("spark.driver.memory", "8g")\
            .set("spark.executor.memory", "8g")

Let's initialize a Spark session:

In [3]:
#pyspark init
sc = SparkContext(appName='taxi_duration_lowlevel', conf=config).getOrCreate()

your 131072x1 screen size is bogus. expect trouble
25/04/11 22:27:26 WARN Utils: Your hostname, HP-Envy resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/04/11 22:27:26 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/11 22:27:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 49294)
Traceback (most recent call last):
  File "/usr/lib/python3.10/socketserver.py", line 316, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/usr/lib/python3.10/socketserver.py", line 347, in process_request
    self.finish_request(request, client_address)
  File "/usr/lib/python3.10/socketserver.py", line 360, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/usr/lib/python3.10/socketserver.py", line 747, in __init__
    self.handle()
  File "/home/hatake/.local/lib/python3.10/site-packages/pyspark/accumulators.py", line 295, in handle
    poll(accum_updates)
  File "/home/hatake/.local/lib/python3.10/site-packages/pyspark/accumulators.py", line 267, in poll
    if self.rfile in r and func():
  File "/home/hatake/.local/lib/python3.10/site-packages/pyspark/accumulators.py", line 271, in accum_updates


Read input files:

In [4]:
def read_csv(filepath: str):
    """Read csv file into a rdd of values and a list of feature columns separately."""
    #Read the data into rdd
    raw_rdd = sc.textFile(filepath)

    #Remove the csv header row
    header = raw_rdd.first()

    #Strip the header row
    rdd_no_header = raw_rdd.filter(lambda row: row != header)

    return rdd_no_header, header.split(',')


raw_train_rdd, train_header = read_csv('train.csv')
raw_test_rdd, test_header = read_csv('test.csv')

                                                                                

## 2. Data preprocessing

Filter usable columns:

In [5]:
def extract_column(row: str, header: list[str], excluding_features: list[str]):
    """Extract usable feature columns with given excluding filter."""
    #Split the row with delimiter `,`
    values = row.split(',')
    
    #Filter values
    return dict((header[i], values[i]) for i in range(len(header)) if header[i] not in excluding_features)

Preprocess features into usable form:

In [6]:
def preprocess_data(row: dict, label_col = None):
    """- Flatten `pickup_datetime` into separate elements (day, month, year,...).
       - Encode `store_and_fwd_flag` to binary values.
       - Cast strings to numeric values.
       - Add Haversine distance."""

    #dict_row = {}

    #Flatten datetime strings into component elements
    #row['pickup_datetime'] = row['pickup_datetime'].replace('-', ':').replace(' ', ':').split(':')
    dt = datetime.strptime(row['pickup_datetime'], '%Y-%m-%d %H:%M:%S')
    
    #Encode binary values feature
    row['store_and_fwd_flag'] = 1 if row['store_and_fwd_flag'] == 'Y' else 0
    
    # Add Haversine distance
    R = 6371
    lon1, lat1 = math.radians(float(row['pickup_longitude'])), math.radians(float(row['pickup_latitude']))
    lon2, lat2 = math.radians(float(row['dropoff_longitude'])), math.radians(float(row['dropoff_latitude']))
    dlon, dlat = lon2 - lon1, lat2 - lat1
    a = math.sin(dlat/2)**2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon/2)**2
    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
    distance_km = R * c
    
    #Cast strings to float
    features = [float(value) for value in [
                    float(row['vendor_id']),
                    float(dt.year), float(dt.month), float(dt.day), float(dt.hour),
                    float(dt.minute), float(dt.second),
                    float(row['passenger_count']),
                    float(row['pickup_longitude']), 
                    float(row['pickup_latitude']),
                    float(row['dropoff_longitude']), 
                    float(row['dropoff_latitude']),
                    row['store_and_fwd_flag'],
                    distance_km
                ]
        ]
    
    #Return a dict of { 'features': vector_of_values, 'label': label (if any) }
    # dict_row['features'] = features
    # dict_row['label'] = float(row[label_col]) if label_col else None

    return {'id': row.get('id'), 'features': features, 'label': float(row[label_col]) if label_col else None}

# train_rdd = raw_train_rdd.map(lambda row: extract_column(row, train_header, ['id', 'dropoff_datetime']))\
#                          .map(lambda row: preprocess_data(row, 'trip_duration'))

    
# test_rdd = raw_test_rdd.map(lambda row: extract_column(row, test_header, ['id']))\
#                        .map(lambda row: preprocess_data(row, None))                 
train_rdd = raw_train_rdd.map(lambda row: extract_column(row, train_header, ['dropoff_datetime'])) \
                         .map(lambda row: preprocess_data(row, 'trip_duration'))
                         
test_rdd = raw_test_rdd.map(lambda row: extract_column(row, test_header, [])) \
                       .map(lambda row: preprocess_data(row))
                       
# Cache to reuse
train_rdd.cache()
test_rdd.cache()

PythonRDD[7] at RDD at PythonRDD.scala:53

## 3. Model training

### Ultility functions

`partition_feature_grouping` for flattening each row of dataset into a tuple of (feature_value, row_label):

In [7]:
def partition_feature_grouping(partition, usable_features: list):
    """Read each row and convert into a list of (feature_value, row_label), then flatten the results."""
    feature_stats = {}
    for row in partition:
        for feature in usable_features:
            # Store feature-wise values in a dictionary
            if feature not in feature_stats:
                feature_stats[feature] = []

            feature_stats[feature].append((row['features'][feature], row['label']))

    results = []
    
    for feature, values in feature_stats.items():
        results.append((feature, values))

    return iter(results)

`find_split_feature` for finding splitting point with maximum variance reduction on the domain of a feature:

In [8]:
def find_split_feature(values: list, parent_info: StatCounter):
    # Sort values by feature value (ascending order)
    sorted_values = sorted(values, key=lambda x: x[0])

    # Get total number of data points
    parent_count = parent_info.count()

    # Get variance and mean of the whole dataset
    parent_var = parent_info.variance()
    parent_mean = parent_info.mean()

    # Calculate parent's sum of squares: E[X²] = Var(X) + (E[X])²
    parent_pow_sum = parent_var + parent_mean ** 2

    # If not enough data to split, return no split
    if parent_count < 2:
        return (-float("inf"), None)

    # Initialize variables for left partition
    left_sum, left_pow_sum, left_count = 0, 0, 0
    best_split = (-float("inf"), None)

    # Iterate through possible split points
    for i in range(parent_count - 1):
        val, label = sorted_values[i]

        # Accumulate stats for left partition
        left_sum += label
        left_pow_sum += label ** 2
        left_count += 1

        # Skip splitting between identical feature values
        if val == sorted_values[i + 1][0]:
            continue

        # Compute right partition stats by subtracting left from parent
        right_count = parent_count - left_count
        right_sum = parent_info.sum() - left_sum
        right_pow_sum = parent_pow_sum * parent_count - left_pow_sum  # Scale to total sum of squares

        # Compute variance for each partition
        left_var = (left_pow_sum / left_count - (left_sum / left_count) ** 2) if left_count > 0 else 0
        right_var = (right_pow_sum / right_count - (right_sum / right_count) ** 2) if right_count > 0 else 0

        # Calculate weighted variance reduction
        var_reduction = parent_var - (left_var * left_count + right_var * right_count) / parent_count

        # Update best split if this reduces variance more
        if var_reduction > best_split[0]:
            best_split = (var_reduction, (val + sorted_values[i + 1][0]) / 2)

    return best_split

Sub-function `splitter` for splitting the rows of the parent dataset according to the splitting point and operands:

In [9]:
def splitter(iterator, split_feature: int, split_point: float, operand):
    ret = []
    for row in iterator:
        if operand(row['features'][split_feature], split_point):
            ret.append(row)

    return iter(ret)

Main class `DecisionTreeRegressor` for building and executing Decision Tree Regressor Algorithm:

Synchronized with maxDepth=10 from sections 3.2.1 and 3.2.2 (other parameters like maxBins and minInstancesPerNode cannot be directly applied in manual implementation)


In [10]:
class DecisionTreeRegressor:
    def __init__(self, max_depth = 1): # Synchronized with maxDepth=10 from sections 3.2.1 and 3.2.2
        #Initialize the estimator with given depth (if any)
        self.max_depth = max_depth
        self.rules = None
        self.num_features = None

    def set_maxDepth(self, depth):
        """Set current maximum depth for the estimator."""
        self.max_depth = depth

    def fit(self, train_rdd: RDD):
        """Execute Decision Tree Algorithm recursively on a given rdd based on variance reduction and the current maximum depth."""
        self.num_features = len(train_rdd.first()['features'])
        self.usable_features = [i for i in range(self.num_features)]
        sample_size = train_rdd.count()
        print(f"Starting tree construction with {sample_size} samples, max_depth={self.max_depth}")
        self.rules = self.__build_rule_tree_recursive(train_rdd, self.usable_features, sample_size)
        print("Tree construction completed")
        return self

    def transform(self, rdd: RDD):
        """Make predictions on an RDD using the decision tree rules."""
        def predict_row(row):
            # Start from the root of the decision tree
            node = self.rules

            # Traverse the tree until reaching a leaf node (which contains a prediction)
            while 'prediction' not in node:
                # Compare the feature value with the split point to decide the direction
                if row['features'][node['split_feature']] <= node['split_point']:
                    node = node['left']  # Go to the left subtree
                else:
                    node = node['right']  # Go to the right subtree

            # Return a tuple of (row ID, predicted value)
            return (row['id'], node['prediction'])

        # Apply prediction to each row in the RDD
        return rdd.map(lambda row: predict_row(row))

    def display_rule_tree(self):
        """Recursively display the rules of a Decision Tree model."""
        print("\nDecision Tree Rules:")
        self.__display_tree_recursive(self.rules)

    def __display_tree_recursive(self, rules: dict, indent = 0):
        #Stopping condition
        if not rules:
            return

        if 'prediction' in rules:
            #Is a leaf condition
            print(f"{indent * ' '}Predict: {rules['prediction']:.2f}")
            return

        #Print the splitting point information and call recursion of the left and right child

        print(f"{indent * ' '}If feature[{rules['split_feature']}] <= {rules['split_point']:.2f}")
        self.__display_tree_recursive(rules['left'], indent + 4)
        print(f"{indent * ' '}Else feature[{rules['split_feature']}] > {rules['split_point']:.2f}")
        self.__display_tree_recursive(rules['right'], indent + 4)       

    def __find_best_split(self, rdd: RDD, usable_features: list):         
        """Find the best splitting point with maximum variance reduction of the input dataset."""

        #Compute the dataset statistics and store into a StatCounter object
        parent_info = rdd.mapPartitions(lambda partition: [StatCounter([row['label'] for row in partition])], True)\
                        .reduce(lambda stat1, stat2: stat1.mergeStats(stat2))
        
        #Convert each row into a list of (row_feature_value, row_label) and flatten the results
        processed_rdd = rdd.mapPartitions(lambda partition: partition_feature_grouping(partition, usable_features), True)\
                      .reduceByKey(lambda l1, l2: l1 + l2).cache()
        
        #Find the best splitting point for each features
        best_split_per_feature = processed_rdd.mapValues(lambda values: find_split_feature(values, parent_info))\
                                  
        #Find the best splitting point for the dataset
        best_split = max(best_split_per_feature.collect(),key=lambda x: (x[1][0], -x[0]))

        return best_split[0], best_split[1][1]


    def __build_rule_tree_recursive(self, parent: RDD, usable_features: list, sample_size, depth = 0):
        """Recursively build the decision tree by finding the splitting point with maximum variance reduction and split the dataset with this point, up to the maximum depth."""
        print(f"Building tree at depth {depth}, sample size: {sample_size}")
        #Stopping condition
        if sample_size == 0:
            return None
        
        #Return the mean of un-splitted subset as the prediction if reached the depth limit
        if depth == self.max_depth or sample_size < 2:
            mean = parent.mapPartitions(lambda partition: [StatCounter([row['label'] for row in partition])], True)\
                        .reduce(lambda stat1, stat2: stat1.mergeStats(stat2)).mean()

            return {'prediction' : mean}

        #Find the best splitting point for the input dataset
        split_feature, split_point = self.__find_best_split(parent, usable_features)
        
        #Return the mean of un-splitted subset as the prediction if no splitting point is valid but has not reached the depth limit yet
        if split_point == None:
            mean = parent.mapPartitions(lambda partition: [StatCounter([row['label'] for row in partition])], True)\
                        .reduce(lambda stat1, stat2: stat1.mergeStats(stat2)).mean()
            
            return {'prediction' : mean}

        #Split the dataset into left and right subsets
        left_rdd = parent.mapPartitions(lambda iterator: splitter(iterator, split_feature, split_point, lambda a,b: a <= b), True).cache()
        right_rdd = parent.mapPartitions(lambda iterator: splitter(iterator, split_feature, split_point, lambda a,b: a >= b), True).cache()
        
        #Un-cache the parent dataset (excluding the input one)
        if depth > 0:
            parent.unpersist()

        #Compute the size of the left subset
        left_sample_size = left_rdd.count()
     
        #Build the dict of information with recursion call
        return {
            'split_feature' : split_feature,
            'split_point' : split_point,
            'left' : self.__build_rule_tree_recursive(left_rdd, usable_features, left_sample_size, depth + 1),
            'right' : self.__build_rule_tree_recursive(right_rdd, usable_features, sample_size - left_sample_size, depth + 1)
        }

In [11]:
# Training with max_depth=10
estimator = DecisionTreeRegressor(max_depth=10)  # Synchronized with maxDepth=10 from sections 3.2.1 and 3.2.2
model = estimator.fit(train_rdd)
estimator.display_rule_tree()

25/04/11 22:27:39 WARN BlockManager: Task 2 already completed, not releasing lock for rdd_6_0
                                                                                

Starting tree construction with 1458644 samples, max_depth=10
Building tree at depth 0, sample size: 1458644


                                                                                

Building tree at depth 1, sample size: 1160867


                                                                                

Building tree at depth 2, sample size: 677778


                                                                                

Building tree at depth 3, sample size: 356233


                                                                                

Building tree at depth 4, sample size: 166340


                                                                                

Building tree at depth 5, sample size: 90855


ERROR:root:Exception while sending command.                         (0 + 6) / 6]
Traceback (most recent call last):
  File "/home/hatake/.local/lib/python3.10/site-packages/py4j/clientserver.py", line 516, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/hatake/.local/lib/python3.10/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/home/hatake/.local/lib/python3.10/site-packages/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/home/hatake/.local/lib/python3.10/site-packages/py4j/clientserver.py", line 516, in send_command
    raise 

ConnectionRefusedError: [Errno 111] Connection refused

## 4. Sample predictions for a few test cases (test file)

In [None]:
# Predict and display samples
predictions_rdd = estimator.transform(test_rdd)
print("Sample Predictions:")
predictions_samples = predictions_rdd.take(5)
for pred in predictions_samples:
    print(f"ID: {pred[0]}, Prediction: {pred[1]}")

# Stop SparkContext
sc.stop()

Sample Predictions:


25/04/11 22:25:39 WARN BlockManager: Task 45 already completed, not releasing lock for rdd_7_0
                                                                                

ID: id3004672, Prediction: 735.6992713204887
ID: id3505355, Prediction: 735.6992713204887
ID: id1217141, Prediction: 735.6992713204887
ID: id2150126, Prediction: 1831.9367882677282
ID: id1598245, Prediction: 735.6992713204887
