### Introduction to PyArrow

*   PyArrow serves as a cross-language development environment specifically designed for in-memory data.
*   Its primary goal is to boost the performance of analytics applications.
*   Emerging from the Apache Arrow project, PyArrow aims to make data interoperability better across different languages and systems.
*   It uses an in-memory columnar data representation, offering an optimized memory footprint for complex data structures.
*   With zero-copy reads, it facilitates quick data sharing between Python and other languages, sidestepping the need for serialization.
*   It supports schemas and metadata, providing data structures that are rich and self-describing.


### PyArrow and Parquet

*   PyArrow offers seamless reading and writing operations for Parquet files.
*   With column pruning, you can selectively read only the necessary columns from a Parquet file, reducing I/O time.


### Apache Arrow

```The core feature of Apache Arrow is its in-memory columnar format. This language-agnostic standard is designed to store structured, table-like datasets efficiently in memory. The data format supports a rich set of data types, including nested and user-defined types, making it suitable for analytic databases, data frame libraries, and more.``` 

The Apache Arrow Project





<div align="center">
<img src="https://blog.djnavarro.net/posts/2021-11-19_starting-apache-arrow-in-r/img/with_arrow.jpg" width=700>
</div>

[picture source](https://blog.djnavarro.net/posts/2021-11-19_starting-apache-arrow-in-r/)

In [1]:
# !pip install pyarrow`

### PyArrow Data Structures

*   PyArrow offers a suite of low-level data structures and methods optimized for both speed and flexibility.
*   These structures can be used seamlessly across multiple languages.

### Arrow Array

*   An Arrow Array is essentially a column of data stored in an efficient, contiguous block of memory.
*   Unlike Python lists, these arrays are optimized for high-speed operations and can be transferred across languages without incurring serialization costs.

In [2]:
import pyarrow as pa
arrow_array = pa.array([1, 2, 3, 4, 5])
print(type(arrow_array))
print("---------")
print(arrow_array)

<class 'pyarrow.lib.Int64Array'>
---------
[
  1,
  2,
  3,
  4,
  5
]


### Arrow Buffer

* While not a data structure per se, Arrow Buffers are pivotal in understanding Arrow functionality.
* Buffers are blocks of memory that house the data for Arrow Arrays, contributing to efficient storage.
* You can even access the buffer's content directly.




In [3]:
buffer = arrow_array.buffers()[1]
print(buffer)


<pyarrow.Buffer address=0x36eb40300c0 size=40 is_cpu=True is_mutable=True>


In [4]:
byte_data = buffer.to_pybytes()
print(byte_data)

b'\x01\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00\x00\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x05\x00\x00\x00\x00\x00\x00\x00'


### Arrow Buffer - Cont'd

* Here, the buffer's data contains 40 bytes, each 8 bytes representing an `int64` value for each of the 5 elements in the array.
* You can use this buffer data to create a new NumPy array, showing that Arrow and NumPy can share memory.




In [5]:
import numpy as np 
numpy_array = np.frombuffer(buffer, dtype=np.int64)
numpy_array


array([1, 2, 3, 4, 5])

In [6]:
np.shares_memory(arrow_array, numpy_array)

True

### Arrow Buffer - Cont'd

* Both `arrow_array` and `numpy_array` share the same underlying data, demonstrating the concept of zero-copy.
* You can confirm this by modifying a value in one array and seeing the change in the other.
  * Both arrays will now show the updated value.
    
    

In [7]:
numpy_array[1] = 0
numpy_array

array([1, 0, 3, 4, 5])

In [8]:
arrow_array

<pyarrow.lib.Int64Array object at 0x7f8cd1812b20>
[
  1,
  0,
  3,
  4,
  5
]

### Schema

* A schema in PyArrow defines the structure, column names, and types for Arrow Arrays.
* Schemas are crucial as they set the framework for data manipulation and operations in Arrow.
  * Give Arrow an idea on how to encode the data



In [9]:
schema = pa.schema([('column1', pa.int64()), ('column2', pa.string())])
print(schema)

column1: int64
column2: string


### Chunked Array

*   A Chunked Array in PyArrow is like a single Arrow Array but divided into smaller "chunks."
*   This structure allows for the storage and processing of datasets that are too large to fit in memory.
*   It's commonly used in distributed computing frameworks and data streaming scenarios.

* For example:
  * you could have data sent in chunks to optimize throughput
  * you might have multiple nodes in a distributed system each producing Arrow Arrays that are collected and represented as a ChunkedArray by the master node.

* From a user perspective, a Chunked Array appears as a contiguous sequence of data.




In [10]:
results_node_1 = pa.array([0,1,2,3,4])
results_node_2 = pa.array([5,6,7,8,9,10])
chunked_array = pa.chunked_array([results_node_1, results_node_2])
chunked_array


<pyarrow.lib.ChunkedArray object at 0x7f8cd18329f0>
[
  [
    0,
    1,
    2,
    3,
    4
  ],
  [
    5,
    6,
    7,
    8,
    9,
    10
  ]
]

### Chunked Array - Cont'd

* You can index into a single position or even across multiple chunks, making the data handling more versatile.
* You can also access individual chunks, allowing for parallel processing.

In [11]:
chunked_array[3:6]

<pyarrow.lib.ChunkedArray object at 0x7f8cd18431d0>
[
  [
    3,
    4
  ],
  [
    5
  ]
]

In [12]:
chunked_array.chunk(0)

<pyarrow.lib.Int64Array object at 0x7f8cd1848520>
[
  0,
  1,
  2,
  3,
  4
]

### Table

* A Table in PyArrow is a container for multiple Arrow ChunkedArrays with a common schema.
* Each column in the Table is an Arrow ChunkedArray, and all columns share the same length.
* Tables offer an ideal format for handling data in the form of a dataframe.
* Tables can also be partitioned across multiple files for large-scale storage, or to be sent across a network, or even to be stored in-memory on a single machine.






In [13]:
column1 = pa.array([0, 1, 2, 3, 4]) 
column2 = pa.array(['a', 'b', 'c', 'd', 'e'])
table = pa.table({'column1': column1, 'column2': column2})  

table

pyarrow.Table
column1: int64
column2: string
----
column1: [[0,1,2,3,4]]
column2: [["a","b","c","d","e"]]

### Record Batch

*   A Record Batch is a collection of Arrow Arrays (columns) with the same length, all of which are bundled together with a schema.
*   Much like a Chunked Array is a collection of Arrow Arrays, a Table in Apache Arrow is a collection of Record Batches.

* Conceptual Relationship
  *   In Apache Arrow, the concept of a Record Batch is to a Table what an Arrow Array is to a Chunked Array.
    *   Arrays can be grouped together to form a Chunked Array.
    *   Record Batches can be grouped together to form a Table.




### Record Batch - Cont'd

* Use Cases
  *   The choice between using a Record Batch or a Table often depends on your specific needs. E.g.:
    
  *  Streaming Data: If you need to process data on-the-fly, perhaps in a streaming application where you want to process each chunk as it arrives, Record Batches are a good choice.
    *   You can serialize and process each Record Batch independently as they arrive, without having to wait for the entire data set.


In [14]:

column1_array = pa.array([1, 2, 3, 4, 5])
column2_array = pa.array(['a', 'b', 'c', 'd', 'e'])
schema = pa.schema([('column1', pa.int64()), ('column2', pa.string())])

record_batch = pa.record_batch([column1_array, column2_array], schema=schema)
record_batch


pyarrow.RecordBatch
column1: int64
column2: string
----
column1: [1,2,3,4,5]
column2: ["a","b","c","d","e"]

In [15]:
record_batch.columns

[<pyarrow.lib.Int64Array object at 0x7f8cd1848b80>
 [
   1,
   2,
   3,
   4,
   5
 ],
 <pyarrow.lib.StringArray object at 0x7f8cd1848fa0>
 [
   "a",
   "b",
   "c",
   "d",
   "e"
 ]]

In [16]:
record_batch["column1"]

<pyarrow.lib.Int64Array object at 0x7f8cd1848be0>
[
  1,
  2,
  3,
  4,
  5
]

In [17]:

column1_array_new = pa.array([6, 7, 8, 9, 10])
column2_array_new = pa.array(['f', 'g', 'h', 'i', 'j'])
record_batch_new = pa.record_batch([column1_array_new, column2_array_new], schema=schema)


table = pa.Table.from_batches([record_batch, record_batch_new], schema=schema)
table


pyarrow.Table
column1: int64
column2: string
----
column1: [[1,2,3,4,5],[6,7,8,9,10]]
column2: [["a","b","c","d","e"],["f","g","h","i","j"]]

### Record Batch - Cont'd

* In the example above, two Record Batches are combined to create a single Table. 
  * This is analogous to how individual Arrow Arrays can be combined to create a Chunked Array
  * Reinforces the idea that a Record Batch is to a Table what an Arrow Array is to a Chunked Array.


### Dive Into Real Data: Parquet and Memory Efficiency

1.  Let's get hands-on and read a Parquet file using Apache Arrow.
2.  Take note: the size of the data when using PyArrow is substantially smaller than a Pandas DataFrame for the same data.
3.  Think of this as a little teaser to whet your appetite for data science goodness.

**Note**: Here, I'm using the `parquet` module from the PyArrow package. This module knows how to read Parquet files among other things.




### Apache Arrow Datasets


*   Datasets in PyArrow let you work with large tabular data, even when it's larger than your machine's memory
*   It offers lazy data access, meaning you don't have to load the entire dataset into memory.
*   Datasets support data discovery, partitioning, and compatibility with various file systems like AWS, Google Cloud, and local storage.
  * I can read from AWS or Google without having to install anything.

* import the dataset library as:

```python
import pyarrow.dataset as ds
```


### Dataset Overview

* Provider: New York City Taxi and Limousine Commission (TLC)
* Data hosted on AWS. The URSA-LAB company account.
* Contains data on millions of taxi and limousine trips in NYC
* Time Period: 2009 to 2019


In [18]:
# **Note**: In the AWS S3 listing, "PRE" stands for "prefix," essentially representing a folder or directory.

!aws s3 ls "s3://ursa-labs-taxi-data/"

                           PRE 2009/
                           PRE 2010/
                           PRE 2011/
                           PRE 2012/
                           PRE 2013/
                           PRE 2014/
                           PRE 2015/
                           PRE 2016/
                           PRE 2017/
                           PRE 2018/
                           PRE 2019/


In [19]:
!aws s3 ls "s3://ursa-labs-taxi-data/2009/"

                           PRE 01/
                           PRE 02/
                           PRE 03/
                           PRE 04/
                           PRE 05/
                           PRE 06/
                           PRE 07/
                           PRE 08/
                           PRE 09/
                           PRE 10/
                           PRE 11/
                           PRE 12/


In [20]:
import pyarrow.dataset as ds
dataset = ds.dataset("s3://ursa-labs-taxi-data/", partitioning=["year", "month"])
dataset

<pyarrow._dataset.FileSystemDataset at 0x7f8cc0c77220>

In [21]:
len(dataset.files)

125

In [22]:
dataset.files[0:10]

['ursa-labs-taxi-data/2009/01/data.parquet',
 'ursa-labs-taxi-data/2009/02/data.parquet',
 'ursa-labs-taxi-data/2009/03/data.parquet',
 'ursa-labs-taxi-data/2009/04/data.parquet',
 'ursa-labs-taxi-data/2009/05/data.parquet',
 'ursa-labs-taxi-data/2009/06/data.parquet',
 'ursa-labs-taxi-data/2009/07/data.parquet',
 'ursa-labs-taxi-data/2009/08/data.parquet',
 'ursa-labs-taxi-data/2009/09/data.parquet',
 'ursa-labs-taxi-data/2009/10/data.parquet']

In [23]:
a = [1,2,3,3]

In [24]:
a.count?

In [25]:
# Here's how to load just one file (a fragment) and its schema:

frag = next(dataset.get_fragments())
frag.partition_expression

<pyarrow.compute.Expression ((year == 2009) and (month == 1))>

#### Play with a Single File

* Let's read in the data from this single fragment
* Take a look at the data
* List of column names
    

In [None]:
%%time
frag_table = frag.to_table()
frag_table

In [None]:
frag_table.column_names

In [None]:
frag_table.num_rows


#### Chunks: The Building Blocks

* Remember how we talked about Arrow tables having columns that could be split into chunks? 
* If you take a look, each column is divided into 216 chunks
  * Proving that this table is built in the way we discussed earlier.
* Take just a slice of the data.

In [None]:
frag_table.slice(0, 5)

In [None]:
[frag_table[col_name].num_chunks for col_name in frag_table.column_names]


### The Essentials of Apache Arrow Tables and Record Batches

*   Discussing how tables in Apache Arrow are essentially collections of record batches.
*   You can easily pull data from columns like `payment_type`, `fare_amount`, or `tip_amount`. 
* Because we're working with a single record batch, managing the data is pretty straightforward. 
  * We'll see that each column, for instance, holds 65,536 values.


In [None]:
record_batch_3 = frag_table.to_batches()[3]
record_batch_3

In [None]:
record_batch_3.num_rows

In [None]:
record_batch_3["fare_amount"]

In [None]:
record_batch_3['tip_amount']

In [None]:
record_batch_3['payment_type']

#### PyArrow's Computational Capabilities

*   PyArrow separates data storage concerns from computational functionality.    
    *   Structures like Arrow Arrays, Record Batches, and Tables handle data storage and serialization.
    *   For actual data operations, there's the `pyarrow.compute` module.
*   The `pyarrow.compute` module offers a range of functions for filtering, transforming, and aggregating data.    
    *   While it does provide basic operations, it's not a full-blown analytical tool. For more complex tasks, you'd typically use something like Pandas or Spark.

* Let's perform some computations like calculating the sum of tips and fares, etc.


In [None]:
import pyarrow.compute as pc
pc.add(record_batch_3['tip_amount'], record_batch_3['fare_amount'])

* How about finding the maximum total amount for a trip, including the tip?

In [None]:
pc.max(pc.add(record_batch_3['tip_amount'], record_batch_3['fare_amount']))

* And the average?


In [None]:
pc.mean(pc.add(record_batch_3['tip_amount'], record_batch_3['fare_amount']))

* We can also perform operations on string data, like converting the case of `payment_type`, which has been recorded inconsistently.


In [None]:
upper_cased_payment_type = pc.utf8_upper(record_batch_3["payment_type"])
upper_cased_payment_type

* You can then filter data based on whether the payment type was "CASH."


In [None]:
is_cash = pc.equal(upper_cased_payment_type, pa.scalar('CASH'))
is_cash 

In [None]:
filtered_record_batch_3 = pc.filter(record_batch_3, is_cash)
filtered_record_batch_3
filtered_record_batch_3.num_rows


#### Working with Parquet Files

*   You can read Parquet data into PyArrow as a ParquetDataset, and then work with it as ParquetFile Fragments.
* Recall that: 
    * Each fragment has its own metadata, 
    * You can also get statistics about each row group within the fragment.
      * However, it's usually more efficient to work with sorted data if you carry out frequent operations
      * You can then save this sorted table into a new Parquet file for optimized data retrieval.


In [None]:
import pyarrow as pa 
import pyarrow.parquet as pq
dataset = pq.ParquetDataset('s3://ursa-labs-taxi-data/2009/', partitioning=["month"])
dataset

In [None]:
data_table = dataset.fragments[0].to_table() 
sorted_indices = pc.sort_indices(data_table, sort_keys=[("pickup_at", "ascending"), ("fare_amount", "ascending")])
sorted_indices

In [None]:
sorted_table = data_table.take(sorted_indices)


In [None]:
#pq.write_table(sorted_table, 'optimized_parquet_file.parquet', row_group_size=65536)


#### Exploring Sorted Parquet Files

*   When you read the sorted table back into PyArrow, it's easier to work with.
  * We can reach the read groups meta data and only look at those we are interested in.
  * i.e., you can delve into the metadata to understand your data better.



In [None]:
optimized_parquet_file = pq.ParquetFile('optimized_parquet_file.parquet')

rg0_metadata = optimized_parquet_file.metadata.row_group(0)
rg0_metadata.to_dict()




In [None]:
[(i,x["path_in_schema"]) for i, x in enumerate(rg0_metadata.to_dict()["columns"])]


In [None]:
name_2_pos = {x["path_in_schema"]:i for i, x in enumerate(rg0_metadata.to_dict()["columns"])}
name_2_pos

In [None]:
from datetime import datetime
col_idx = name_2_pos['pickup_at']

datetime_obj = datetime.strptime("2009-1-1 14:00:00", "%Y-%m-%d %H:%M:%S")

for i in range(optimized_parquet_file.num_row_groups):
    col_stats = optimized_parquet_file.metadata.row_group(i).column(col_idx).statistics
    if col_stats.min <= datetime_obj <= col_stats.max:
        print(f"found it, it's row_group {i}")
    

### Bonus Questions
* can you get the average transaction between 2:00-2:59 PM

* Which day, on average has the highest tip? 

* Which time of the day has the highest tip?

### Resources

1.  [Apache Arrow Homepage](https://arrow.apache.org/)
2.  [PyArrow Documentation](https://arrow.apache.org/docs/python/)
3.  [PyArrow GitHub Repository](https://github.com/apache/arrow/tree/master/python/pyarrow)