# BlazingSQL vs. Apache Spark 

Below we have one of our popular workloads running with [BlazingSQL + RAPIDS AI](https://blazingdb.com) and then running the entire ETL phase again, only this time with Apache Spark + PySpark.

In this notebook, we will cover: 
- How to set up [BlazingSQL](https://blazingsql.com) and the [RAPIDS AI](https://rapids.ai/) suite.
- How to read and query csv files with cuDF and BlazingSQL.
- How BlazingSQL compares against Apache Spark (analyzing over 20M records).

![Impression](https://www.google-analytics.com/collect?v=1&tid=UA-39814657-5&cid=555&t=event&ec=guides&ea=bsql_vs_spark&dt=bsql_vs_spark)

## Setup
### Environment Sanity Check 

RAPIDS packages (BlazingSQL included) require Pascal+ architecture to run. For Colab, this translates to a T4 GPU instance. 

The cell below will let you know what type of GPU you've been allocated, and how to proceed.

In [1]:
# tag specs
colab_smi = !nvidia-smi

# focus GPU type
try:
    my_gpu = ' '.join(colab_smi[7].split()[2:4])
# not on gpu acceleration 
except:
    raise Exception("\nPlease make sure you've configured Colab to request a GPU instance type.\n\n"
                    "At top of Colab, try: Runtime -> Change runtime type -> Hardware accelerator -> GPU -> Save\n")

# not allocated compatable GPU
if (my_gpu != b'Tesla T4') and (my_gpu != 'Tesla P100-PCIE...') and (my_gpu != 'GeForce GTX'):
    # allocated K80
    if my_gpu == 'Tesla K80':
        raise Exception("\nYou've been allocated a K80 instance\n\n"
                    "Unfortunately, this demo requires a T4 instance\n\n"
                    "At top of Colab, try: Runtime -> Reset all runtimes...\n")
    else:
        raise Exception(f"\nYou've achieved wizardy.\nyour GPU is {my_gpu}\nPlease inform info@blazingsql.com")

# allocated compatable GPU
else:
    print('Woo! You got the right kind of GPU!')

Woo! You got the right kind of GPU!


## Installs 

Below you will find three code blocks:
1. The first installs miniconda.
2. The second installs RAPIDS AI and sets up the system environment. 
3. The third installs BlazingSQL.

### Miniconda

In [0]:
# intall miniconda
!wget -c https://repo.continuum.io/miniconda/Miniconda3-4.5.4-Linux-x86_64.sh
!chmod +x Miniconda3-4.5.4-Linux-x86_64.sh
!bash ./Miniconda3-4.5.4-Linux-x86_64.sh -b -f -p /usr/local

### RAPIDS AI

In [None]:
# install RAPIDS packages
!conda install -q -y --prefix /usr/local -c nvidia -c rapidsai \
  -c numba -c conda-forge -c pytorch -c defaults \
  cudf=0.9 cuml=0.9 cugraph=0.9 python=3.6 cudatoolkit=10.0

# set environment vars
import sys, os, shutil
sys.path.append('/usr/local/lib/python3.6/site-packages/')
os.environ['NUMBAPRO_NVVM'] = '/usr/local/cuda/nvvm/lib64/libnvvm.so'
os.environ['NUMBAPRO_LIBDEVICE'] = '/usr/local/cuda/nvvm/libdevice/'

# copy .so files to current working dir
for fn in ['libcudf.so', 'librmm.so']:
    shutil.copy('/usr/local/lib/'+fn, os.getcwd())

### BlazingSQL

In [None]:
# Install BlazingSQL for CUDA 10.0
! conda install -q -y --prefix /usr/local -c conda-forge -c defaults -c nvidia -c rapidsai \
   -c blazingsql/label/cuda10.0 -c blazingsql \
   blazingsql-calcite blazingsql-orchestrator blazingsql-ral blazingsql-python

!pip install flatbuffers

## Import packages and create Blazing Context
You can think of the BlazingContext much like a Spark Context (i.e. where information such as FileSystems you have registered and Tables you have created will be stored). If you have issues running this cell, restart runtime and try running it again.

In [2]:
from blazingsql import BlazingContext
import cudf

bc = BlazingContext()

BlazingContext ready


### Load & Query Table

In [3]:
# takes a few minutes to download
!wget https://blazingsql-colab.s3.amazonaws.com/netflow_data/nf-chunk2.csv

--2019-10-22 02:31:46--  https://blazingsql-colab.s3.amazonaws.com/netflow_data/nf-chunk2.csv
Resolving blazingsql-colab.s3.amazonaws.com (blazingsql-colab.s3.amazonaws.com)... 52.216.66.0
Connecting to blazingsql-colab.s3.amazonaws.com (blazingsql-colab.s3.amazonaws.com)|52.216.66.0|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2725056295 (2.5G) [text/csv]
Saving to: ‘nf-chunk2.csv.1’


2019-10-22 02:34:06 (18.6 MB/s) - ‘nf-chunk2.csv.1’ saved [2725056295/2725056295]



In [4]:
%%time
#Load CSVs into GPU DataFrames (gdf)
netflow_gdf = cudf.read_csv('nf-chunk2.csv')

CPU times: user 148 ms, sys: 181 ms, total: 329 ms
Wall time: 339 ms


In [5]:
%%time
#Create BlazingSQL Tables - There is no copy in this process
bc.create_table('netflow', netflow_gdf)

CPU times: user 28.6 ms, sys: 0 ns, total: 28.6 ms
Wall time: 66.5 ms


<pyblazing.apiv2.sql.Table at 0x7fe618998160>

In [6]:
%%time
sql = '''
SELECT
  a.firstSeenSrcIp as source,
  a.firstSeenDestIp as destination,
  count(a.firstSeenDestPort) as targetPorts,
  SUM(a.firstSeenSrcTotalBytes) as bytesOut,
  SUM(a.firstSeenDestTotalBytes) as bytesIn,
  SUM(a.durationSeconds) as durationSeconds,
  MIN(parsedDate) as firstFlowDate,
  MAX(parsedDate) as lastFlowDate,
  COUNT(*) as attemptCount
  FROM
  main.netflow a
  GROUP BY
  a.firstSeenSrcIp,
  a.firstSeenDestIp
  '''

result = bc.sql(sql,['netflow']).get()

# print(result.columns)

result_gdf = result.columns
edges_df = result_gdf.to_pandas()
print(edges_df.head(10))

NOTE: You no longer need to send a table list to the .sql() funtion
          source      destination  targetPorts  bytesOut  bytesIn  \
0     172.10.0.6     172.10.1.251            1        74        0   
1    172.20.2.75  239.255.255.250            4      1050        0   
2   172.30.2.127        10.0.0.14            1       454      633   
3   172.30.1.160  239.255.255.250           22      4550        0   
4    172.30.2.68        172.0.0.1            1       270        0   
5    172.30.1.27        10.0.0.13            1       454      633   
6    172.30.1.11        172.0.0.1            2       450      450   
7    172.30.1.49  239.255.255.250           16      3850        0   
8   172.20.1.244  239.255.255.250            2       700        0   
9  10.170.32.181       172.20.0.4           10    132500  4609780   

   durationSeconds        firstFlowDate         lastFlowDate  attemptCount  
0                0  2013-04-03 06:47:57  2013-04-03 06:47:57             1  
1                6

## Apache Spark

In [7]:
%%time
# Install Spark 
# Note: This installs Spark (version 2.4.1, as tested in April 2019)

!pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/87/21/f05c186f4ddb01d15d0ddc36ef4b7e3cedbeb6412274a41f26b55a650ee5/pyspark-2.4.4.tar.gz (215.7MB)
[K     |████████████████████████████████| 215.7MB 39.2MB/s eta 0:00:01   |████▏                           | 28.2MB 1.7MB/s eta 0:01:52
[?25hCollecting py4j==0.10.7 (from pyspark)
[?25l  Downloading https://files.pythonhosted.org/packages/e3/53/c737818eb9a7dc32a7cd4f1396e787bd94200c3997c72c1dbe028587bd76/py4j-0.10.7-py2.py3-none-any.whl (197kB)
[K     |████████████████████████████████| 204kB 25.4MB/s eta 0:00:01
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-2.4.4-py2.py3-none-any.whl size=216130387 sha256=184d337b621c50c97cd08c1e1463a04c26f37bec88f54f221030a1c9b78e12c7
  Stored in directory: /home/rodrigo/.cache/pip/wheels/ab/09/4d/0d184230058e654eb1b04467dbc1292f00eaa186544604b471
Successfully b

In [8]:
%%time
#I copied this cell's snippet from another Google Colab by Luca Canali here: https://colab.research.google.com/github/LucaCanali/sparkMeasure/blob/master/examples/SparkMeasure_Jupyter_Colab_Example.ipynb

from pyspark.sql import SparkSession

# Create Spark Session
# This example uses a local cluster, you can modify master to use  YARN or K8S if available 
# This example downloads sparkMeasure 0.13 for scala 2_11 from maven central

spark = SparkSession \
 .builder \
 .master("local[*]") \
 .appName("PySpark Netflow Benchmark code") \
 .config("spark.jars.packages","ch.cern.sparkmeasure:spark-measure_2.11:0.13")  \
 .getOrCreate()

CPU times: user 47.5 ms, sys: 15.5 ms, total: 63 ms
Wall time: 6.97 s


### Load & Query Table

In [9]:
%%time

netflow_df = spark.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('nf-chunk2.csv')

CPU times: user 2.58 ms, sys: 772 µs, total: 3.35 ms
Wall time: 2.97 s


In [10]:
%%time
netflow_df.createOrReplaceTempView('netflow')

CPU times: user 1.35 ms, sys: 405 µs, total: 1.76 ms
Wall time: 151 ms


In [11]:
%%time
sql = '''
SELECT
  a.firstSeenSrcIp as source,
  a.firstSeenDestIp as destination,
  count(a.firstSeenDestPort) as targetPorts,
  SUM(a.firstSeenSrcTotalBytes) as bytesOut,
  SUM(a.firstSeenDestTotalBytes) as bytesIn,
  SUM(a.durationSeconds) as durationSeconds,
  MIN(parsedDate) as firstFlowDate,
  MAX(parsedDate) as lastFlowDate,
  COUNT(*) as attemptCount
  FROM
  netflow a
  GROUP BY
  a.firstSeenSrcIp,
  a.firstSeenDestIp
  '''

edges_df = spark.sql(sql)

edges_df.show()

+-------------+---------------+-----------+--------+-------+---------------+-------------------+-------------------+------------+
|       source|    destination|targetPorts|bytesOut|bytesIn|durationSeconds|      firstFlowDate|       lastFlowDate|attemptCount|
+-------------+---------------+-----------+--------+-------+---------------+-------------------+-------------------+------------+
|  172.10.1.13|239.255.255.250|         15|    2975|      0|              6|2013-04-03 06:36:19|2013-04-03 06:36:27|          15|
| 172.10.1.232|      172.0.0.1|          1|     180|    180|              0|2013-04-03 06:36:45|2013-04-03 06:36:45|           1|
| 172.10.1.238|239.255.255.250|          2|     700|      0|              6|2013-04-03 06:36:44|2013-04-03 06:36:51|           2|
|  172.10.1.35|      172.0.0.1|          1|     270|      0|              0|2013-04-03 06:36:21|2013-04-03 06:36:21|           1|
| 172.10.2.137|      172.0.0.1|          1|      90|     90|              0|2013-04-03 06: