# The DataFrame

The RAPIDS ecosystem is built on the concept of a `cudf.DataFrame`, built on [Apache Arrow](https://github.com/apache/arrow), shared between all of the different libraries and packages.

There are two libraries specific to data manipulation:
- BlazingSQL: SQL commands on a `cudf.DataFrame`
- cuDF: pandas-like commands on a `cudf.DataFrame`

## BlazingSQL (BSQL)
[GitHub](https://github.com/BlazingDB/blazingsql)

As mentioned in the [Welcome Notebook](../welcome.ipynb), BlazingSQL is a SQL engine built on top of cuDF, the `cudf.DataFrame` package. This means you can easily chain together SQL queries in Python and the RAPIDS ecosystem to build complex and scalable data pipelines for machine learning, graph analytics, and more.

We'll show off a series of examples that demonstrate the power of BlazingSQL.

#### SQL Query a `cudf.DataFrame`
To start we always need to make a BlazingContext. The BlazingContext is a live session with the SQL engine, and stores information such as created tables, registered storage plugins, and more.

In [None]:
from blazingsql import BlazingContext

bc = BlazingContext()

Then we can create a cuDF as if it were a pandas DataFrame, and make a BSQL table off of it using the `.create_table()` method. This is a zero-copy process, meaning it is very fast, and won't take up more space on your GPU.

In [None]:
import cudf

# read CSV file into cuDF DataFrame
df = cudf.read_csv('../data/sample_taxi.csv')

# create table from cuDF DataFrame
bc.create_table('taxi', df)

We can no run a SQL query on that `cudf.DataFrame` with the `.sql()` method which returns a new cuDF.

In [None]:
bc.sql('''
       select 
           cast(substring(tpep_pickup_datetime,0,10) || ' 00:00:00' as timestamp) as pickup_date, 
           count(*), 
           avg(trip_distance), 
           avg(fare_amount) 
       from taxi 
           group by 
               cast(substring(tpep_pickup_datetime,0,10) || ' 00:00:00' as timestamp)
           order by 
               cast(substring(tpep_pickup_datetime,0,10) || ' 00:00:00' as timestamp)
           limit 10
           ''')

#### SQL Query a CSV File
We could also save ourselves some time and run `.create_table()` directly on a [supported file format](https://docs.blazingdb.com/docs/text-files).

In [None]:
bc.create_table('taxi', '../data/sample_taxi.csv', header=0)

In [None]:
%%time
bc.sql('select count(*) from taxi')

#### SQL Query a Data Lake (AWS S3)
You can register multiple [Storage Plugins](https://docs.blazingdb.com/docs/connecting-data-sources) on a BlazingContext. These Storage Plugins help BSQL optimize IO requests during query execution. For example, with [Apache Parquet](https://parquet.apache.org/) files on AWS S3, BSQL can register AWS S3 buckets with the `.s3('name', bucket_name='bucket_name')` method, and will read the metadata and skip files/partitions based on a SQL query. 

In [None]:
bc.s3('blazingsql-colab', bucket_name='blazingsql-colab')

Now we can reference files in AWS S3 with a filesystem path convention `s3://storage_plugin_name/path_to_file...`.

In [None]:
bc.create_table('big_taxi', 's3://blazingsql-colab/yellow_taxi/1_0_0.parquet')

In [None]:
%%time
bc.sql('select count(*) from big_taxi')

Those are the most common features leveraged in BSQL. What makes it such a powerful and extensible tool though is the fact that everything in-GPU-memory is a `cudf.DataFrame`.

Let's now learn about cuDF.

## cuDF
[GitHub](https://github.com/rapidsai/cudf)

As mentioned in the [Welcome Notebook](../welcome.ipynb), cuDF is a pandas-like DataFrame library. cuDF is almost a drop-in replacement for pandas, the main difference is that it is operating on GPU memory. We'll walk through some examples that demonstrate how to use cuDF.

Parts of this were borrowed and lightly adapted from [10 Minutes to cuDF and Dask-cudf](https://rapidsai.github.io/projects/cudf/en/0.12.0/10min.html).

Creating a simple `cudf.Series`.

In [None]:
s = cudf.Series([1, 2, 3, None, 4])
s

Creating a `cudf.DataFrame` with 1000 rows.

In [None]:
n = 1000
df = cudf.DataFrame(
                    {'a':range(n),
                     'b':range(500, n + 500),
                     'c':range(1000, n + 1000)}
                   )
df

The `cudf.DataFrame` can be treated like a `pandas.DataFrame`.

Such as sorting by values in descending order.

In [None]:
df.sort_values(by='b', ascending=False)

Selection by position.

In [None]:
df.iloc[0:3, 0:2]

Or performing a `Join`.

In [None]:
df_a = cudf.DataFrame()
df_a['key'] = ['a', 'b', 'c', 'd', 'e']
df_a['vals_a'] = [float(i + 10) for i in range(5)]

df_b = cudf.DataFrame()
df_b['key'] = ['a', 'c', 'e']
df_b['vals_b'] = [float(i+100) for i in range(3)]

merged = df_a.merge(df_b, on=['key'], how='left')
merged

You can also use BSQL and cuDF together to chain together a data pipeline.

In [None]:
df = bc.sql('select * from taxi where trip_distance < 10')

Let's use the `.describe()` method to better understand taxi rides under 10 miles.

In [None]:
df.describe()

We can also quicky convert to `pandas.DataFrame` and immediately integrate with anything that supports pandas.

Below is an example with [Matplotlib](https://github.com/matplotlib/matplotlib).

In [None]:
df.to_pandas().plot(kind='scatter', x='passenger_count', y='tip_amount')

## Dask cuDF 

[Docs](https://docs.rapids.ai/api/cudf/stable/dask-cudf.html)

cuDF is a single-GPU library. For Multi-GPU cuDF solutions we use Dask and the dask-cudf package , which is able to scale cuDF across multiple GPUs on a single machine, or multiple GPUs across many machines in a cluster.

Dask DataFrame was originally designed to scale Pandas, orchestrating many Pandas DataFrames spread across many CPUs into a cohesive parallel DataFrame. Because cuDF currently implements only a subset of Pandas’s API, not all Dask DataFrame operations work with cuDF. 

In [None]:
import dask_cudf

df = dask_cudf.read_parquet("../data/blobs.parquet")

type(df)

In [None]:
df

Calling `.compute()`, `.head()` or `.tail()` on a `dask_cudf.DataFrame` returns a `cudf.DataFrame`.

In [None]:
df.tail()

## BlazingSQL Distributed

[Docs](https://docs.blazingdb.com/docs/distributed)

BlazingSQL can easily distribute query execution across multiple GPUs or servers with Dask. You don't have to pass a list of IPs and ports to BSQL, whatever you configure with Dask will give your BlazingContext instance awareness of where all the GPUs or servers are. Check out blog_posts/[distributed_sql_with_dask.ipynb](../blog_posts/distributed_sql_with_dask.ipynb) or [Distributed SQL with Dask](https://blog.blazingdb.com/distributed-sql-with-dask-2979262acc8a?source=friends_link&sk=077319064cd7d9e18df8c0292eb5d33d) for more.

In [None]:
from blazingsql import BlazingContext
from dask_cuda import LocalCUDACluster
from dask.distributed import Client

cluster = LocalCUDACluster()
client = Client(cluster)

bc = BlazingContext(dask_client=client, network_interface='lo')

Register a public AWS S3 bucket, then create & query a table (`big_taxi`) from there.

When BlazingSQL runs on multiple GPUs, query results will return as `dask_cudf.DataFrame`s.

In [None]:
bc.s3('blazingsql-colab', bucket_name='blazingsql-colab')

bc.create_table('big_taxi', 's3://blazingsql-colab/yellow_taxi/1_0_0.parquet')

In [None]:
type(bc.sql('select count(*) from big_taxi'))

In [None]:
bc.sql('select * from big_taxi where trip_distance < 10 limit 5').compute()

## That is the DataFrame Tour!
You've seen the basics of the DataFrame and how you interact at with it. Now is a good time to experiment with your own data and see how to parse, clean, and extract meaningful insights from it.

We'll also get into how to run visualization either with popular Python visualization packages, as well as GPU-accelerated visualization packages.


[Continue to the Data Visualization introductory Notebook](data_visualization.ipynb)