# Scaling the Bodo Pipeline

<div class="alert alert-block alert-info">
<b>Warning:</b>
This notebook depends on the Parquet files generated by the notebook <b>01 Preparing the Data</b>. Make sure to run all cells in that notebook before executing this one.
    
In particular, the files required are:
<ul>
    <li><tt>ParkingData/Parking_Violations_Issued_-_Fiscal_Year_2016_segmented.parquet</tt></li>
    <li><tt>ParkingData/Parking_Violations_Issued_-_Fiscal_Year_2017_segmented.parquet</tt></li>
</ul>
</div>

This notebook follows on from the notebook **03 Constructing a Pipeline of Functions**. The principal distinction is the introduction of a way to *scale* the data. This allows us to make some timed experiments on various hardware configurations (that are available on [Bodo's cloud platform](https://platform.bodo.ai/)).

As usual, let's initialize an IPyParallel session (if needed).

In [1]:
# Some standard imports
import pandas as pd, numpy as np
import bodo, time

---

## A Tuned Pipeline

This is essentially the same pipeline from the notebook **03 Constructing a Pipeline of Functions**. There are two principal differences:

+ The function `load_parking_tickets` is replaced by a function `load_parking_tickets_scaled`. The original function produces a DataFrame with about 1.2 million rows; new function produces a DataFrame with $2^n$ times as many rows by concatenation (i.e., recursive doubling) for some input argument $n$.
+ Both the functions `load_parking_tickets_scaled` and `run_pipeline_scaled` accept an input parameter `n` (with default value `n=0`) to control the scale of the data operated on.

In [2]:
@bodo.jit(spawn=True)
def load_parking_tickets_scaled(n=0):
    """
    Load data and aggregate by day, violation type, and police precinct.
    """

    start = time.time()
    groupby_cols = ['Issue Date','Violation County','Violation Precinct','Violation Code']
    DATA_SRC = 'ParkingData/Parking_Violations_Issued_-_Fiscal_Year_2016_segmented.parquet'
    year_2016_df = pd.read_parquet(DATA_SRC)
    year_2016_df = year_2016_df.groupby(groupby_cols, as_index=False)['Summons Number'].count()

    DATA_SRC = 'ParkingData/Parking_Violations_Issued_-_Fiscal_Year_2017_segmented.parquet'
    year_2017_df = pd.read_parquet(DATA_SRC)
    year_2017_df = year_2017_df.groupby(groupby_cols, as_index=False)['Summons Number'].count()

    # concatenate all dataframes into one dataframe
    many_year_df = pd.concat([year_2016_df, year_2017_df])
    
    #### recursively double number of rows in dataframe by concatenation; scales as 2**n
    for _ in range(n): # n=0 by default, so this loop is ignored generically
        many_year_df = pd.concat([many_year_df,many_year_df], ignore_index=True)
    
    end = time.time()
    timing_str = f"\n{'Reading Time:':<42}{end - start:8.3f} sec"
    return many_year_df, timing_str

In [3]:
@bodo.jit(spawn=True, distributed=False)
def load_violation_precincts_codes():
    """
    Load violation codes and precincts information.
    """
    start = time.time()
    violation_codes = pd.read_csv("ParkingData/DOF_Parking_Violation_Codes.csv")
    violation_codes.columns = ['Violation Code','Definition','manhattan_96_and_below','all_other_areas']
    nyc_precincts_df = pd.read_csv("ParkingData/nyc_precincts.csv", index_col='index')
    end = time.time()
    timing_str = f"\n{'Violation and precincts load Time:':<42}{end - start:8.3f} sec"
    return violation_codes, nyc_precincts_df, timing_str

In [4]:
@bodo.jit(spawn=True)
def elim_code_36(main_df):
    """
    Remove undefined violations (code 36)
    """
    start = time.time()
    main_df = main_df[main_df['Violation Code']!=36].sort_values('Summons Number',ascending=False)
    end = time.time()
    timing_str = f"\n{'Eliminate undefined violations time:':<42}{end - start:8.3f} sec"
    return main_df, timing_str

In [6]:
@bodo.jit(spawn=True)
def remove_outliers(main_df):
    """
    Delete entries that have dates outside our dataset dates
    """
    start = time.time()
    main_df = main_df[(main_df['Issue Date'] >= '2016-01-01') & (main_df['Issue Date'] <= '2017-12-31')]
    end = time.time()
    timing_str = f"\n{'Remove outliers time:':<42}{end - start:8.3f} sec"
    return main_df, timing_str

In [7]:
@bodo.jit(spawn=True)
def merge_violation_code(main_df, violation_codes):
    """
    Merge violation information in the main_df
    """
    start = time.time()
    # left join main_df and violation_codes df so that there's more info on violation in main_df
    main_df = pd.merge(main_df, violation_codes, on='Violation Code', how='left')
    # cast precincts as integers from floats (inadvertent type change by merge)
    main_df['Violation Precinct'] = main_df['Violation Precinct'].astype(int)
    end = time.time()
    timing_str = f"\n{'Merge time:':<42}{end - start:8.3f} sec"
    return main_df, timing_str

In [8]:
@bodo.jit(spawn=True)
def calculate_total_summons(main_df):
    """
    Calculate the total summonses in dollars for a violation in a precinct on a day
    """
    start = time.time()
    #create column for portion of precinct 96th st. and below
    n = len(main_df)
    portion_manhattan_96_and_below = np.empty(n, np.int64)
    # NOTE: To run Pandas, use this loop.
    # for i in range(n):
    for i in bodo.prange(n):
        x = main_df['Violation Precinct'].iat[i]
        if x < 22 or x == 23:
            portion_manhattan_96_and_below[i] = 1.0
        elif x == 22:
            portion_manhattan_96_and_below[i] = 0.75
        elif x == 24:
            portion_manhattan_96_and_below[i] = 0.5
        else: #other
            portion_manhattan_96_and_below[i] = 0
    main_df["portion_manhattan_96_and_below"] = portion_manhattan_96_and_below

    #create column for average dollar amount of summons based on location
    main_df['average_summons_amount'] = (main_df['portion_manhattan_96_and_below'] * main_df['manhattan_96_and_below']
                                     + (1 - main_df['portion_manhattan_96_and_below']) * main_df['all_other_areas'])

    #get total summons dollars by multiplying average dollar amount by number of summons given
    main_df['total_summons_dollars'] = main_df['Summons Number'] * main_df['average_summons_amount']
    main_df = main_df.sort_values(by=['total_summons_dollars'], ascending=False)
    end = time.time()
    timing_str = f"\n{'Calculate Total Summons Time:':<42}{end - start:8.3f} sec"
    return main_df, timing_str

In [9]:
@bodo.jit(spawn=True)
def aggregate(main_df):
    '''function that aggregates and filters data
    e.g. total violations by precinct
    '''
    start = time.time()
    filtered_dataset = main_df[['Violation Precinct','Summons Number', 'total_summons_dollars']]
    precinct_offenses_df = filtered_dataset.groupby(by=['Violation Precinct']).sum().reset_index().fillna(0)
    end = time.time()
    timing_str = f"\n{'Aggregate code time:':<42}{end - start:8.3f} sec"
    return precinct_offenses_df, timing_str

---

## Executing the Pipeline



Having defined the functions above, we define a scalable pipeline function. This time, the pipeline accepts a parameter $n$ that controls the size of the computation (i.e., the number of rows is increased by a factor $2^n$ in the call to `load_parking_tickets`). 

In [10]:
@bodo.jit(spawn=True)
def run_pipeline_scaled(n=0):
    start = time.time()
    main_df, out1 = load_parking_tickets_scaled(n)
    violation_codes, nyc_precincts_df, out2 = load_violation_precincts_codes()
    main_df, out3 = elim_code_36(main_df)
    main_df, out4 = remove_outliers(main_df)
    main_w_violation, out5 = merge_violation_code(main_df, violation_codes)
    total_summons, out6 = calculate_total_summons(main_w_violation)
    precinct_offenses_df, out7 = aggregate(total_summons)
    end = time.time()
    out8 = f'Execution time: (run_pipeline_scaled, {n=:1d})'
    out8 = f"\n{54*'='}\n{out8:<42}{end-start:8.3f} sec"
    output_str = ''.join(['\n', out1, out2, out3, out4, out5, out6, out7, out8, '\n'])
    return precinct_offenses_df, output_str

In [11]:
%%time

# Executing this cell with the %%time cell magic gives the total time.
# The first time run_pipeline_scaled is executed, all the functions are compiled.
# Notice "Wall time" > "Full Run" significantly
result, output = run_pipeline_scaled(n=3)
display(result.head())
print(result.shape)
print(output)

    conda install openjdk=11 -c conda-forge
and then reactivate your environment via
    conda deactivate && conda activate /Users/scottroutledge/miniforge3


Unnamed: 0,Violation Precinct,Summons Number,total_summons_dollars
0,34,1204256,101339560
1,43,1327672,92031880
2,120,525080,36869160
3,163,272,24080
4,11,704,69120


(283, 3)


Reading Time:                                1.518 sec
Violation and precincts load Time:           0.085 sec
Eliminate undefined violations time:         0.358 sec
Remove outliers time:                        0.025 sec
Merge time:                                  0.261 sec
Calculate Total Summons Time:                2.714 sec
Aggregate code time:                         0.205 sec
Execution time: (run_pipeline_scaled, n=3)   5.201 sec

CPU times: user 665 ms, sys: 93.1 ms, total: 758 ms
Wall time: 41.8 s


We notice that the time measured in the output only captures the execution time (i.e., over a minute `Wall time` versus about a quarter of a minute `Execution time` on this system). The very first time `run_pipeline_scaled` is executed, *all* the functions get compiled. This is noticeable at smaller scales when the `Execution time` is relatively small. Notice, however, that when we re-execute the cell—with the same parameter `n`—the discrepancy effectively vanishes.

In [12]:
%%time
# Executing this cell with the %%time cell magic gives the total time
# If run_pipeline_scaled has already been executed once, there is no compilation time overhead.
result, output = run_pipeline_scaled(n=3)
display(result.head())
print(result.shape)
print(output)

Unnamed: 0,Violation Precinct,Summons Number,total_summons_dollars
0,34,1204256,101339560
1,43,1327672,92031880
2,120,525080,36869160
3,163,272,24080
4,11,704,69120


(283, 3)


Reading Time:                                4.115 sec
Violation and precincts load Time:           0.067 sec
Eliminate undefined violations time:         0.699 sec
Remove outliers time:                        0.029 sec
Merge time:                                  0.814 sec
Calculate Total Summons Time:                2.170 sec
Aggregate code time:                         0.654 sec
Execution time: (run_pipeline_scaled, n=3)   8.566 sec

CPU times: user 11 ms, sys: 28.7 ms, total: 39.7 ms
Wall time: 21.3 s


This time, all the compilation needed has already been done, so the discrepancy between the `Wall time` and the `Execution time` effectively vanishes (whatever minor differences remain reflect communication overheads & set-up costs and such). If we only need to run this code on a few million rows once, this compilation overhead is likely not needed. However, as our data scales, that compilation overhead diminishes quickly in a relative sense—the savings start to be well worth while.

---

## Timing Results

For the easiest apples-to-apples comparisons, here are some summaries using the pipeline functions from this notebook as well as those from the notebook **03 Constructing a Pipeline of Functions**.

On a workstation, executing `run_pipeline` yielded the following results:  

```
Reading Time:                              1.675 sec
Violation and precincts load Time:         0.009 sec
Eliminate undefined violations time:       0.050 sec
Remove outliers time:                      0.009 sec
Merge time:                                0.048 sec
Calculate Total Summons Time:              0.182 sec
Aggregate code time:                       0.057 sec
====================================================
Execution time (run_pipeline):             2.115 sec
```

Alternatively, the Bodo-compiled `run_pipeline_jitted` is able to make further improvements to overall run-time, in part thanks to keeping all data structures inside Bodo until the final return (i.e., avoiding the boxing/unboxing overhead).

```
Reading Time:                              1.132 sec
Violation and precincts load Time:         0.007 sec
Eliminate undefined violations time:       0.047 sec
Remove outliers time:                      0.004 sec
Merge time:                                0.060 sec
Calculate Total Summons Time:              0.196 sec
Aggregate code time:                       0.014 sec
====================================================
Execution time (run_pipeline_jitted):      1.461 sec
```


We present here timings when using much more data ($2^7=128$ times as much or roughly 140 million rows). We executed these on a Bodo cluster (2 c5.18xlarge, with 72 cores),

```
Reading Time:                                5.253 sec
Violation and precincts load Time:           0.091 sec
Eliminate undefined violations time:         2.971 sec
Remove outliers time:                        0.188 sec
Merge time:                                 16.986 sec
Calculate Total Summons Time:               45.737 sec
Aggregate code time:                         3.045 sec
======================================================
Execution time: (run_pipeline_scaled, n=7)  94.788 sec
```


```
Reading Time:                                6.101 sec
Violation and precincts load Time:           0.270 sec
Eliminate undefined violations time:         2.843 sec
Remove outliers time:                        0.057 sec
Merge time:                                 10.179 sec
Calculate Total Summons Time:               30.903 sec
Aggregate code time:                         3.286 sec
======================================================
Execution time: (run_pipeline_scaled, n=7)  53.789 sec
```

---