## Import library

In [14]:
from pyspark import SparkContext
import math

## Create spark contex

In [15]:
sc = SparkContext.getOrCreate()

## Read file

In [16]:
raw_data = sc.textFile("file:///home/cap/hadoop/hadoop-3.4.1/train.csv")

## Preprocess data

Base on the analysed from structure API we only choose 5 features ("passenger_count", "pickup_longitude", "pickup_latitude", "dropoff_longitude", "dropoff_latitude") for building decision tree

In [17]:
# remove header and read data
header = raw_data.first()

lst_header = header.split(',')
dict_header = {}
for i, head in enumerate(lst_header):
    dict_header[head] = i
dict_header
choosen_header = ["passenger_count", "pickup_longitude", "pickup_latitude", "dropoff_longitude", "dropoff_latitude"]

data = raw_data.filter(lambda line: line != header)
parsedData = data.map(\
                lambda line: [x for x in line.split(",")])

# check missing value
def has_missing(row):
    for item in row:
        if item is None:
            return True
        if isinstance(item, str) and (item == "" or item.lower() in ['na', 'null', 'nan']):
            return True
    return False
missing_rows = parsedData.filter(has_missing).count()
print(f"Rows with missing values: {missing_rows}")

# remove duplicate values
rdd_data = parsedData.map(lambda cols: (tuple(cols[:-1]), cols[-1])).distinct()
rdd_data = rdd_data.map(lambda x: list((list(*x[:-1],), x[-1])))


# prepare data in proper format
def prepare_features(fields):
    features = [float(fields[0][dict_header[i]]) for i in choosen_header]
    label = float(fields[-1]  )            
    return (features, label)

rdd_data = rdd_data.map(prepare_features).cache()



[Stage 1:>                                                          (0 + 6) / 6]

Rows with missing values: 0


                                                                                

All numeric values have been archived 

In [18]:
rdd_data.take(1)

25/04/11 20:01:09 WARN BlockManager: Task 13 already completed, not releasing lock for rdd_8_0
                                                                                

[([1.0,
   -73.9790267944336,
   40.763938903808594,
   -74.00533294677734,
   40.710086822509766],
  2124.0)]

Remove outlier in `trip_duration`

In [19]:
# percentile
def calculate_quantiles(rdd, quantiles=[0.25, 0.75]):
    durations = rdd.map(lambda x: x[1]).collect()  
    durations_sorted = sorted(durations)
    n = len(durations_sorted)
    
    if n == 0:
        return [0.0] * len(quantiles)
    

    quantile_values = []
    for q in quantiles:
        index = int(n * q)
        if index >= n:
            index = n - 1
        quantile_values.append(durations_sorted[index])
    
    return quantile_values

# Q1  Q3
Q1, Q3 = calculate_quantiles(rdd_data, [0.25, 0.75])

# IQR
IQR = Q3 - Q1

# bound
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR

# remove outliers
clean_data = rdd_data.filter(lambda x: lower_bound <= x[1] <= upper_bound).cache()


original_count = rdd_data.count()
clean_count = clean_data.count()
print(f"Original len: {original_count}")
print(f"After remove outliers: {clean_count}")
print(f"Removed row: {original_count - clean_count}")


[Stage 9:>                                                          (0 + 6) / 6]

Original len: 1458644
After remove outliers: 1384424
Removed row: 74220


                                                                                

In [20]:
# sorted and devided the feature into n equal parts to get threshold
def get_thresholds(rdd, feature_idx, num_thresholds=5):
    values = rdd.map(lambda x: x[0][feature_idx]).collect()
    sorted_values = sorted(values)
    step = len(sorted_values) // (num_thresholds + 1)
    return [sorted_values[i * step] for i in range(1, num_thresholds + 1)]


thresholds = {
    0: [1, 2, 3, 4, 5],  # passenger_count
    1: get_thresholds(clean_data, 1),  # pickup_longitude
    2: get_thresholds(clean_data, 2),  # pickup_latitude
    3: get_thresholds(clean_data, 3),  # dropoff_longitude
    4: get_thresholds(clean_data, 4)   # dropoff_latitude
}



def calculate_variance(rdd):
    count = rdd.count()
    if count < 10:  # Ngăn nút quá nhỏ
        return 0.0, 0.0, count
    sum_duration = rdd.map(lambda x: x[1]).reduce(lambda x, y: x + y)
    mean = sum_duration / count
    variance = rdd.map(lambda x: (x[1] - mean)**2).reduce(lambda x, y: x + y) / count
    return variance, mean, count



# funct to find the best split
def find_best_split(rdd, features_idx, thresholds):
    best_split = None
    best_variance = float("inf")
    best_left_rdd = None
    best_right_rdd = None
    best_mean_left = None
    best_mean_right = None
    best_variance_reduction = 0.0

    parent_var, _, parent_count = calculate_variance(rdd)

    for idx in features_idx:
        for threshold in thresholds[idx]:
            left_rdd = rdd.filter(lambda x: x[0][idx] <= threshold)
            right_rdd = rdd.filter(lambda x: x[0][idx] > threshold)

            left_var, left_mean, left_count = calculate_variance(left_rdd)
            right_var, right_mean, right_count = calculate_variance(right_rdd)

            if left_count < 10 or right_count < 10:
                continue

            total_count = left_count + right_count
            total_variance = (left_count / total_count) * left_var + (right_count / total_count) * right_var
            variance_reduction = parent_var - total_variance

            if total_variance < best_variance:
                best_variance = total_variance
                best_split = (idx, threshold)
                best_left_rdd = left_rdd
                best_right_rdd = right_rdd
                best_mean_left = left_mean
                best_mean_right = right_mean
                best_variance_reduction = variance_reduction

    return best_split, best_left_rdd, best_right_rdd, best_mean_left, best_mean_right, best_variance_reduction




def build_tree(rdd, depth, max_depth, features_idx, thresholds, feature_counts=None):
    if feature_counts is None:
        feature_counts = {i: 0.0 for i in features_idx}
    
    if depth >= max_depth or rdd.count() < 10:
        _, mean, count = calculate_variance(rdd)
        return {"leaf": True, "value": mean, "count": count}, feature_counts

    split, left_rdd, right_rdd, mean_left, mean_right, variance_reduction = find_best_split(rdd, features_idx, thresholds)
    if split is None:
        _, mean, count = calculate_variance(rdd)
        return {"leaf": True, "value": mean, "count": count}, feature_counts

    feature_idx, threshold = split
    total_count = rdd.count()
    feature_counts[feature_idx] += variance_reduction * total_count  # Cộng dồn tầm quan trọng

    left_node, feature_counts = build_tree(left_rdd, depth + 1, max_depth, features_idx, thresholds, feature_counts)
    right_node, feature_counts = build_tree(right_rdd, depth + 1, max_depth, features_idx, thresholds, feature_counts)

    return {
        "leaf": False,
        "feature_idx": feature_idx,
        "threshold": threshold,
        "left": left_node,
        "right": right_node,
        "count": total_count
    }, feature_counts



def print_tree(node, depth=0, feature_names=None):
    indent = "  " * depth
    if node["leaf"]:
        print(f"{indent}Leaf: predict={node['value']:.2f}, samples={node['count']}")
        return
    feature = feature_names[node["feature_idx"]] if feature_names else f"feature_{node['feature_idx']}"
    print(f"{indent}Node: {feature} <= {node['threshold']:.4f}, samples={node['count']}")
    print_tree(node["left"], depth + 1, feature_names)
    print_tree(node["right"], depth + 1, feature_names)



def predict(tree, features):
    if tree["leaf"]:
        return tree["value"]
    feature_idx = tree["feature_idx"]
    threshold = tree["threshold"]
    if features[feature_idx] <= threshold:
        return predict(tree["left"], features)
    else:
        return predict(tree["right"], features)
    


# Hàm tính RMSE và R²
def calculate_metrics(rdd, tree):
    predictions = rdd.map(lambda x: (predict(tree, x[0]), x[1]))
    n = predictions.count()
    if n == 0:
        return float("inf"), 0.0

    # Tính RMSE
    squared_errors = predictions.map(lambda x: (x[0] - x[1])**2)
    mse = squared_errors.reduce(lambda x, y: x + y) / n
    rmse = math.sqrt(mse)

    # Tính R²
    mean_y = predictions.map(lambda x: x[1]).reduce(lambda x, y: x + y) / n
    ss_tot = predictions.map(lambda x: (x[1] - mean_y)**2).reduce(lambda x, y: x + y)
    ss_res = squared_errors.reduce(lambda x, y: x + y)
    r2 = 1 - (ss_res / ss_tot) if ss_tot > 0 else 0.0

    return rmse, r2

                                                                                

In [21]:
# Xây dựng cây
features_idx = [0, 1, 2, 3, 4]
max_depth = 5
tree, feature_counts = build_tree(clean_data, 0, max_depth, features_idx, thresholds)

# In cây
print("Cấu trúc cây quyết định:")
print_tree(tree, feature_names=choosen_header)

# Tính Feature Importances
total_importance = sum(feature_counts.values())
feature_importances = {choosen_header[i]: (count / total_importance if total_importance > 0 else 0.0)
                      for i, count in feature_counts.items()}
print("\nFeature Importances:")
for feature, importance in feature_importances.items():
    print(f"{feature}: {importance:.4f}")

# Tính RMSE và R² trên tập train
train_rmse, train_r2 = calculate_metrics(clean_data, tree)
print(f"\nTrain RMSE: {train_rmse:.2f}")
print(f"Train R²: {train_r2:.4f}")



                                                                                

Cấu trúc cây quyết định:
Node: dropoff_latitude <= 40.7272, samples=1384424
  Node: dropoff_latitude <= 40.7272, samples=1153720
    Node: dropoff_latitude <= 40.7272, samples=1153720
      Node: dropoff_latitude <= 40.7272, samples=1153720
        Node: dropoff_latitude <= 40.7272, samples=1153720
          Leaf: predict=739.25, samples=1153720
          Leaf: predict=0.00, samples=0
        Leaf: predict=0.00, samples=0
      Leaf: predict=0.00, samples=0
    Leaf: predict=0.00, samples=0
  Node: pickup_latitude <= 40.7632, samples=230704
    Leaf: predict=0.00, samples=0
    Node: pickup_latitude <= 40.7632, samples=230704
      Leaf: predict=0.00, samples=0
      Node: pickup_latitude <= 40.7632, samples=230704
        Leaf: predict=0.00, samples=0
        Node: pickup_latitude <= 40.7632, samples=230704
          Leaf: predict=0.00, samples=0
          Leaf: predict=693.96, samples=230704

Feature Importances:
passenger_count: 0.0000
pickup_longitude: 0.0000
pickup_latitude: 0.577

## Preprocess test set

In [30]:
raw_test = sc.textFile("file:///home/cap/hadoop/hadoop-3.4.1/test.csv")

# remove header and read data
header = raw_test.first()

lst_header = header.split(',')
dict_header = {}
for i, head in enumerate(lst_header):
    dict_header[head] = i
dict_header
choosen_header = ["passenger_count", "pickup_longitude", "pickup_latitude", "dropoff_longitude", "dropoff_latitude"]

data = raw_test.filter(lambda line: line != header)
parsedData = data.map(\
                lambda line: [x for x in line.split(",")])

# check missing value
def has_missing(row):
    for item in row:
        if item is None:
            return True
        if isinstance(item, str) and (item == "" or item.lower() in ['na', 'null', 'nan']):
            return True
    return False
missing_rows = parsedData.filter(has_missing).count()
print(f"Rows with missing values: {missing_rows}")

# remove duplicate values
rdd_data = parsedData.map(lambda cols: (tuple(cols[:-1]))).distinct()
rdd_data = rdd_data.map(lambda x: (list(x)))


# prepare data in proper format
def prepare_features(fields):
    features = [float(fields[dict_header[i]]) for i in choosen_header]      
    return (features)

rdd_data = rdd_data.map(prepare_features).cache()





Rows with missing values: 0


                                                                                

## Illustrate low-level implementation.
Using 2 row from test set to implement and run code to check the result

Row 1

In [32]:
print(f'rdd: {rdd_data.take(1)[0]}, predict: {predict(tree, rdd_data.take(1)[0])}')

rdd: [1.0, -73.9974365234375, 40.73758316040039, -73.98616027832031, 40.729522705078125], predict: 0.0


25/04/11 21:32:20 WARN BlockManager: Task 8314 already completed, not releasing lock for rdd_1423_0
25/04/11 21:32:20 WARN BlockManager: Task 8315 already completed, not releasing lock for rdd_1423_0


- Here dropoff_latitude = 40.7295 > 40.7272 so it will go to the right node
- Next pickup_latitude = 40.73758 <= 40.7632 so it will go to the left node
- The left node is a leaf so predict value is 0

Row 53

In [41]:
x = rdd_data.take(53)[-1]
print(f'rdd: {x}, predict: {predict(tree, x)}')

rdd: [1.0, -73.99176788330078, 40.764888763427734, -73.9789810180664, 40.7445182800293], predict: 693.9621549691379


25/04/11 22:24:38 WARN BlockManager: Task 8325 already completed, not releasing lock for rdd_1423_0


- Here dropoff_latitude = 40.7445 > 40.7272 so it will go to the right node
- Next pickup_latitude = 40.76488 <= 40.7632 so it will go to the right node
- The pickup_latitude = 40.76488 <= 40.7632 so it will go to the right node
- The pickup_latitude = 40.76488 <= 40.7632 so it will go to the right node
- The pickup_latitude = 40.76488 <= 40.7632 so it will go to the right node
- The predict value is 693.90

In [42]:
# Đóng Spark
sc.stop()