# RDD Lineage Caching Microbenchmark

In [1]:
import time
from functools import wraps
from typing import Callable, TypeVar, Any

T = TypeVar('T')

def time_function(func: Callable[..., T]) -> Callable[..., T]:
    @wraps(func)
    def wrapper(*args, **kwargs) -> T:
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()
        
        print(f"{func.__name__} executed in {end_time - start_time:.6f} seconds")
        return result
    
    return wrapper

In [2]:
# IMPORTANT: This code assumes you've done the preparation-related steps in getting-started.ipynb

from ndn_compute_client import NdnComputeClient
client = NdnComputeClient('http://localhost:5214')

In [4]:
dataset = client.create_dataset("appA/events.log.jsonl")
dataset.transform(lambda df: df.apply(lambda row: sum([__import__("math").sin(__import__("random").random()) * __import__("math").cos(__import__("random").random()) for _ in range(333)]), axis=1)).cache()

<ndn_compute_client.ndn_compute_client.NdnComputeClient.create_dataset.<locals>.Dataset at 0x107c04dd0>

In [5]:
@time_function
def fetch_without_intermediate_lineage():
    dataset = client.create_dataset("appA/events.log.jsonl")
    big_op = dataset.transform(lambda df: df.apply(lambda row: sum([__import__("math").sin(__import__("random").random()) * __import__("math").cos(__import__("random").random()) for _ in range(333)]), axis=1))
    lens = big_op.map(lambda r: len(str(r)))
    df1 = lens.map(lambda r: r + 1).collect()
    df2 = lens.map(lambda r: r + 2).collect()
    print(df1.shape)
    print(df2.shape)

In [18]:
@time_function
def fetch_with_intermediate_lineage():
    dataset = client.create_dataset("appA/events.log.jsonl")
    big_op = dataset.transform(lambda df: df.apply(lambda row: sum([__import__("math").sin(__import__("random").random()) * __import__("math").cos(__import__("random").random()) for _ in range(333)]), axis=1))
    lens = big_op.map(lambda r: len(str(r))).cache()
    df1 = lens.map(lambda r: r + 1).collect()
    df2 = lens.map(lambda r: r + 2).collect()
    print(df1.shape)
    print(df2.shape)

In [19]:
# No Intermediate Cache
fetch_without_intermediate_lineage()

(439417, 1)
(439417, 1)
fetch_without_intermediate_lineage executed in 93.137566 seconds


In [20]:
# With Intermediate Cache
fetch_with_intermediate_lineage()

(439417, 1)
(439417, 1)
fetch_with_intermediate_lineage executed in 81.193584 seconds


## Results:

Note that I need to restart NFD between trials, so the above code isn't in a loop.

|           | No intermediate cache | With intermediate cache |
|-----------|-----------------------|-------------------------|
| 1         | 93.005059             | 82.221597               |
| 2         | 93.009149             | 81.884614               |
| 3         | 93.138039             | 82.068494               |
| 4         | 93.061103             | 80.967617               |
| 5         | 93.137566             | 81.193584               |
| **Mean**  | 93.07                 | 81.67                   |
| **Stdev** | 0.07                  | 0.55                    |