# Comparing Data Sources

As a reminder: in the notebook **01 Preparing the Data**, we read in two CSV files and converted them into Parquet format (segmented & unsegmented).

<div class="alert alert-block alert-info">
<b>Warning:</b>
This notebook depends on the Parquet files generated by the notebook <b>01 Preparing the Data</b>. Make sure to run all cells in that notebook before executing this one.
    
In particular, the files required are:
<ul>
    <li><tt>ParkingData/Parking_Violations_Issued_-_Fiscal_Year_2016.parquet</tt></li>
    <li><tt>ParkingData/Parking_Violations_Issued_-_Fiscal_Year_2016_segmented.parquet</tt></li>
    <li><tt>ParkingData/Parking_Violations_Issued_-_Fiscal_Year_2017.parquet</tt></li>
    <li><tt>ParkingData/Parking_Violations_Issued_-_Fiscal_Year_2017_segmented.parquet</tt></li>
</ul>
</div>

In [1]:
import numpy as np
import pandas as pd
import time
import datetime
import bodo
import os
import warnings
warnings.filterwarnings('ignore')

---------------------

## Baseline Computations from CSV

First, as a baseline check, let's run the code we want to execute locally, reading from CSV, without multiple compute engines, and *without* Bodo.

In [2]:
def load_parking_tickets():
    """
    Load data and aggregate by day, violation type, and police precinct.
    """
    start = time.time()
    DATA_SRC = 'ParkingData/Parking_Violations_Issued_-_Fiscal_Year_2016.csv'
    groupby_cols = ['Issue Date','Violation County','Violation Precinct','Violation Code']
    year_2016_df = pd.read_csv(DATA_SRC, parse_dates=["Issue Date"])
    year_2016_df['Violation County'] = year_2016_df['Violation County'].fillna('NAN')
    year_2016_df = year_2016_df.groupby(groupby_cols, as_index=False)['Summons Number'].count()
    end = time.time()
    print("Reading Time: ", end - start)
    return year_2016_df

In [3]:
main_df = load_parking_tickets()
display(main_df.head())

Reading Time:  37.44481635093689


Unnamed: 0,Issue Date,Violation County,Violation Precinct,Violation Code,Summons Number
0,1970-04-13,NAN,70.0,46,1
1,1970-12-02,K,71.0,46,1
2,1971-10-02,K,73.0,21,1
3,1973-02-26,K,73.0,21,1
4,1973-09-26,NY,20.0,98,1


On the system on which this notebook was executed, running the preceding function—whose principal work is executing `read_csv` followed by a groupby—took about 70 seconds. The output should look something like this:

```
Reading Time:  69.130507230758
  Issue Date Violation County  Violation Precinct  Violation Code     Summons Number
0 1970-04-13              NAN                70.0              46                  1
1 1970-12-02                K                71.0              46                  1
2 1971-10-02                K                73.0              21                  1
3 1973-02-26                K                73.0              21                  1
4 1973-09-26               NY                20.0              98                  1
```

Notice these results are nonsensical (i.e., the dates from the 1970s do not belong in a file that purports to contain records of tickets issued in 2016). This is not a parsing error; if we look carefully, we'll find those dates—and ones from 2069(!)—in the original CSV source file.

Anyway, now that we have our files in both CSV and Parquet we can dive in to what we gain from Bodo with their use.

We'll start by executing code to launch our local `ipyparallel` cluster (not necessary on the [Bodo cloud platform]()).

<div class="alert alert-block alert-info">
<b>Reminder:</b>

Having launched IPyParallel, the `px` magics ensure that code is executed on all engines.
    
1. The `%px` line magic executes a single line of code an all engines.
2. The `%%px` cell magic—placed as the first line of a cell—ensures all lines in a given cell are executed on all engines.
3. Lines executed outside of the context of a `px` line or cell magic execute in the namespace of the default Jupyter kernel.

Consult the [IPyParallel documentation](https://ipyparallel.readthedocs.io/en/latest/) for more information about setting up a local cluster. [Bodo's documentation](https://docs.bodo.ai/2022.2/bodo_parallelism/bodo_parallelism_basics/) provides more information about the parallel execution model.
</div>

In [4]:
# This cell does not need to be run on the Bodo.ai platform and should be skipped
import ipyparallel as ipp
import psutil; n_proc = min(psutil.cpu_count(logical=False), 8)
rc = ipp.Cluster(engines='mpi', n=n_proc).start_and_connect_sync(activate=True)

Starting 4 engines with <class 'ipyparallel.cluster.launcher.MPIEngineSetLauncher'>
100%|██████████| 4/4 [00:05<00:00,  1.48s/engine]


With the IPyParallel compute engines initialized, we need first to carry out imports on each engine. Remember, each engine has its own namespace *separate* from the Jupyter notebook session. As such, the imports made a few cells above have yet to be made in the namespaces of the distinct engines.

In [5]:
%%px

import numpy as np
import pandas as pd
import time
import datetime
import bodo
import os

%px: 100%|██████████| 4/4 [00:01<00:00,  2.56tasks/s]


We're now ready to repeat the computation reading from CSV again, but this time using *multiple compute engines* and using *Bodo's JIT compiler*.

In [6]:
%%px

@bodo.jit(distributed=['many_year_df'], cache=True)
def load_parking_tickets():
    """
    Load data and aggregate by day, violation type, and police precinct.
    """
    start = time.time()
    DATA_SRC = 'ParkingData/Parking_Violations_Issued_-_Fiscal_Year_2016.csv'
    groupby_cols = ['Issue Date','Violation County','Violation Precinct','Violation Code']
    year_2016_df = pd.read_csv(DATA_SRC, parse_dates=["Issue Date"])
    year_2016_df['Violation County'] = year_2016_df['Violation County'].fillna('NAN')
    year_2016_df = year_2016_df.groupby(groupby_cols, as_index=False)['Summons Number'].count()
    end = time.time()
    print("Reading Time: ", end - start)
    return year_2016_df
    
main_df_csv = load_parking_tickets()

%px:   0%|          | 0/4 [00:12<?, ?tasks/s]

[stdout:0] Reading Time:  13.600718021392822


%px: 100%|██████████| 4/4 [00:12<00:00,  3.11s/tasks]


The first few computed values in `main_df_csv` should match the computed output from above:

```
  Issue Date Violation County  Violation Precinct  Violation Code     Summons Number  
0 2015-07-09                K                  74              46                  1
1 2015-07-09                K                  79              71                 19 
2 2015-07-09              NAN                  94              21                  3
3 2015-07-09                K                  84              21                 53  
4 2015-07-09                K                  84              37                 76 
```

We can display this as follows:

In [7]:
%%px
if bodo.get_rank() == 0:
    display(main_df_csv.head())

[output:0]

Unnamed: 0,Issue Date,Violation County,Violation Precinct,Violation Code,Summons Number
0,2015-07-09,K,74,46,1
1,2015-07-09,K,79,71,19
2,2015-07-09,NAN,94,21,3
3,2015-07-09,K,84,21,53
4,2015-07-09,K,84,37,76


Notice that the dataframe `main_df_csv` is *distributed* to all the engines; that is, it is partitioned by rows into pieces stored on each engine. Without the preceding `if` block branching on the condition `bodo.rank() == 0`, the first few rows of the data stored on each partition will be displayed:

In [8]:
%%px
display(main_df_csv.head()) # Displays on all engines

[output:0]

Unnamed: 0,Issue Date,Violation County,Violation Precinct,Violation Code,Summons Number
0,2015-07-09,K,74,46,1
1,2015-07-09,K,79,71,19
2,2015-07-09,NAN,94,21,3
3,2015-07-09,K,84,21,53
4,2015-07-09,K,84,37,76


[output:2]

Unnamed: 0,Issue Date,Violation County,Violation Precinct,Violation Code,Summons Number
317020,2015-07-09,K,0,21,5
317021,2015-07-09,NAN,94,41,1
317022,2015-07-09,K,94,70,11
317023,2015-07-09,K,94,21,134
317024,2015-07-09,K,88,71,12


[output:3]

Unnamed: 0,Issue Date,Violation County,Violation Precinct,Violation Code,Summons Number
475246,2015-07-09,K,79,21,86
475247,2015-07-09,K,0,75,1
475248,2015-07-09,K,84,20,46
475249,2015-07-09,K,84,71,26
475250,2015-07-09,K,90,20,26


[output:1]

Unnamed: 0,Issue Date,Violation County,Violation Precinct,Violation Code,Summons Number
158443,2015-07-09,K,88,21,59
158444,2015-07-09,K,94,71,14
158445,2015-06-20,NAN,107,21,2
158446,2015-06-19,Q,109,21,21
158447,2015-06-24,NY,110,98,1


Moreover, on the system on which this notebook was executed, the time elapsed looked something like the following:

```
[stdout:0] Reading Time:  18.125343799
```

We can see it took about 20 seconds to load and execute a groupby on data from the 10 million row CSV file (rather than over a minute before). Already, Bodo has significantly improved the time it takes to make this initial step in our ETL.

---------------------

## Reading from Parquet (unsegmented)

But there's room for improvement yet; let's now execute the same logic, but reading from Parquet files, including another 11 million rows of data, and, of course, compiling with Bodo.

In [9]:
%%px

@bodo.jit(distributed=['many_year_df'], cache=True)
def load_parking_tickets():
    """
    Load data and aggregate by day, violation type, and police precinct.
    """
    start = time.time()
    groupby_cols = ['Issue Date','Violation County','Violation Precinct','Violation Code']
    
    DATA_SRC = 'ParkingData/Parking_Violations_Issued_-_Fiscal_Year_2016.parquet'
    year_2016_df = pd.read_parquet(DATA_SRC)
    year_2016_df = year_2016_df.groupby(groupby_cols, as_index=False)['Summons Number'].count()

    DATA_SRC = 'ParkingData/Parking_Violations_Issued_-_Fiscal_Year_2017.parquet'
    year_2017_df = pd.read_parquet(DATA_SRC)
    year_2017_df = year_2017_df.groupby(groupby_cols, as_index=False)['Summons Number'].count()
    
    # concatenate all dataframes into one dataframe
    many_year_df = pd.concat([year_2016_df, year_2017_df])
    end = time.time()
    print("Reading Time: ", end - start)
    return many_year_df
    
main_df_pq = load_parking_tickets()

if bodo.get_rank() == 0:
    display(main_df_pq.head())

For best performance the number of row groups should be greater than the number of workers (4)

For best performance the number of row groups should be greater than the number of workers (4)



%px:   0%|          | 0/4 [00:05<?, ?tasks/s]

[stdout:0] Reading Time:  7.02411413192749


[output:0]

Unnamed: 0,Issue Date,Violation County,Violation Precinct,Violation Code,Summons Number
0,2015-07-09,K,94.0,70,11
1,2015-07-09,K,94.0,21,134
2,2015-07-09,K,84.0,21,53
3,2015-07-09,K,84.0,37,76
4,2015-07-09,K,90.0,21,134


%px: 100%|██████████| 4/4 [00:05<00:00,  1.32s/tasks]


On the same hardware, this now took less than 5 seconds—and, remember, this computation used twice as much data. This is possible in part because Parquet uses columnar storage. While the CSV files must be read in entirety, using Parquet allows Bodo to extract efficiently only the subset of columns needed. Notice that Bodo compilation results in results that differ slightly in this case by virtue of sorting of rows in the output result.

---------------------

## Reading from Parquet (segmented)

We notice there were warnings in using Parquet files above:

```
BodoWarning: Total number of row groups in parquet dataset ParkingData/Parking_Violations_Issued_-_Fiscal_Year_2016.parquet (1) is too small for effective IO parallelization.
For best performance the number of row groups should be greater than the number of workers (4)
```

Recalling that we created *segmented* Parquet files (using the options `row_group_size=100_000` & `engine='pyarrow'` in the call to `DataFrame.to_parquet`), we can read from those files instead for even greater efficiency when working with multiple engines:

In [10]:
%%px

@bodo.jit(distributed=['many_year_df'], cache=True)
def load_parking_tickets():
    """
    Load data and aggregate by day, violation type, and police precinct.
    """
    start = time.time()
    groupby_cols = ['Issue Date','Violation County','Violation Precinct','Violation Code']
    
    DATA_SRC = 'ParkingData/Parking_Violations_Issued_-_Fiscal_Year_2016_segmented.parquet'
    year_2016_df = pd.read_parquet(DATA_SRC)
    year_2016_df = year_2016_df.groupby(groupby_cols, as_index=False)['Summons Number'].count()

    DATA_SRC = 'ParkingData/Parking_Violations_Issued_-_Fiscal_Year_2017_segmented.parquet'
    year_2017_df = pd.read_parquet(DATA_SRC)
    year_2017_df = year_2017_df.groupby(groupby_cols, as_index=False)['Summons Number'].count()
    
    # concatenate all dataframes into one dataframe
    many_year_df = pd.concat([year_2016_df, year_2017_df])
    end = time.time()
    return many_year_df
    
main_df_pq_seg = load_parking_tickets()
if bodo.get_rank() == 0:
    display(main_df_pq_seg.head())

%px:   0%|          | 0/4 [00:02<?, ?tasks/s]

[output:0]

Unnamed: 0,Issue Date,Violation County,Violation Precinct,Violation Code,Summons Number
0,2015-07-09,K,94.0,70,11
1,2015-07-09,K,94.0,21,134
2,2015-07-09,K,84.0,21,53
3,2015-07-09,K,84.0,37,76
4,2015-07-09,K,90.0,21,134


%px: 100%|██████████| 4/4 [00:02<00:00,  1.86tasks/s]


With the Parquet files segmented into row groups, there's even more efficiency in Bodo's read process, and we now see that the process takes 4.3sec.

```
[stdout:0] Reading Time:  4.311285018920898
  Issue Date Violation County  Violation Precinct  Violation Code     Summons Number
0 2015-07-09                K                94.0              70                 11
1 2015-07-09                K                94.0              21                134 
2 2015-07-09                K                84.0              21                 53 
3 2015-07-09                K                84.0              37                 76  
4 2015-07-09                K                90.0              21                134
```

With only a few small changes to our function—compiling with Bodo and using row-segmented Parquet files—we have reduced a computation from over a minute—on *less* data—to barely 4 seconds.

Finally, it's important thing to remember is to cleanly shut down the `ipyparallel` session. Generically, this requires inserting a line like this at the end of a notebook. These mechanics happen behind the scene on Bodo's cloud platform, so this command is not required when executing this notebook on Bodo's cloud platform.

In [11]:
# To stop the cluster run the following command.
rc.cluster.stop_cluster_sync()

Stopping controller
Controller stopped: {'exit_code': 0, 'pid': 1048, 'identifier': 'ipcontroller-1651334243-dq1h-1005'}
Stopping engine(s): 1651334244
engine set stopped 1651334244: {'exit_code': 0, 'pid': 1087, 'identifier': 'ipengine-1651334243-dq1h-1651334244-1005'}


---------------------