# Apache Arrow

-  Development platform for in-memory analytics
-  Process and move data faster
-  Standarized language indepedent columnar format for flat and hierarchical data
-  Zero-copy shared memory and RPC-based data movement


- Show Arrow picture
     https://www.tutorialandexample.com/wp-content/uploads/2019/06/Introduction-to-Apache-Arrow.png
     
     
References:

https://arrow.apache.org/docs/index.html
     

# Arrow Columnar Format

- Language agnostic in-memory data structure

- Data adjacency for sequential access
- Constant Random Access
- SIMD and vectorization friendly
- 'Pointer-Swizling' avoided resulting in zero-copy access in share memory

Source:
    https://arrow.apache.org/docs/format/Columnar.html

# PyArrow

- Memory Management
- Data Types 
- Compute Functions
- Plasma In-memory Object store
- Numpy Integration
- Pandas Integration

- Parquet and Arrow


# Memory management

-  Buffer 
-  Memory Pools
-  Input/Output Streams
-  On-Disk and Memory mapping Files

# Memory View and Buffer protocol

- Buffer protocol provides access to internal representation of memory
- MemoryView is a mechanism to access this internal mechanism from Python land.
- Let's look at some examples



In [2]:
data = b'hello world'
m = memoryview(data)
m.obj


b'hello world'

In [3]:
print(f'{m[0]=}, {bytes(m[0:1])=}')

m[0]=104, bytes(m[0:1])=b'h'


In [4]:
# mutate memory view
data = bytearray('abcefgh', encoding='utf-8')
m = memoryview(data)
print(f'{m[-1]=}')

# some assignments
k = m[-3:]  # a slice of memory view is a zero-copy
m[-1] = 72 # capital H

print(f'data after update = {data}')

print(bytes(k))

m[-1]=104
data after update = bytearray(b'abcefgH')
b'fgH'


In [5]:
# Pyarrow.buffer

import pyarrow as pa

data = b'abcdefgh'
data_buffer = pa.py_buffer(data)

z = data_buffer[-3:]
z.to_pybytes

<function Buffer.to_pybytes>

In [6]:
# Memory managed in Memory Pools

new_buffer = pa.allocate_buffer(4000)
print(pa.total_allocated_bytes())

new_buffer

#new_buffer[0] = None # Does not support assignment

4032


<pyarrow.lib.Buffer at 0x7fd110b8f5b0>

In [7]:
# Reading/Writing to these buffers

buffer_1 = memoryview(b"hello world")
stream = pa.input_stream(buffer_1)
print(stream.read(5))


b'hello'


In [8]:
# Reading/Writing To/From In-Memory Buffer

write = pa.BufferOutputStream()
write.write(b'hello world')

# write.getvalue # Finalize output stream and return stream as buffer
buffer = write.getvalue()

print('Type of buffer=', type(buffer))

reader = pa.BufferReader(buffer)
reader.read_at(6, 5)

# write.write(b'hello world, again') # not writing possible once read



Type of buffer= <class 'pyarrow.lib.Buffer'>


b' world'

# Data Types and In-Memory Data Model

-  Columnar data structures = type meta-data + memory buffers
-  pyarrow.DataType
-  pyarrow.Schema - collections of pyarrow.DataType
-  pyarrow.Arrays - atomic, contiguous columnar data structures composed from arrow buffer objects
-  pyarrow.RecordBatch - collection of ArrayObjects of a particular schema
-  pyarrow.Table  - Each column consists of pyarrow.Array




# Supported Types (Richer than what you store in Pandas/Numpy)

-  Fixed-length primitives - number, floats, timestamp etc
-  Variable-lenght primitives (strings)
-  Nested-Types (list, struct, union)
-  Dictionary types ( we wont cover this)
  
 

In [9]:
# Examples of types

print(pa.int32())
print(pa.float64())

int32
double


In [10]:
# Using these types - Define Fields


# Types used to define schema (metadata)

f1 = pa.field('country_count', pa.int32())



In [11]:
# Define collection of fields as structs

f1 = pa.field('idbbunique', pa.int32())
f2 = pa.field('price', pa.float64())
f3 = pa.field('data_date', pa.date64())
# Add nested lists?

new_struct =  pa.struct([f1, f2, f3])

new_struct


StructType(struct<idbbunique: int32, price: double, data_date: date64[ms]>)

In [12]:
# One can also define Schemas (to be used with Table/Record Batch)

schema = pa.schema([('idbb_unique', pa.int32()), ('price', pa.float64()), ('data_data', pa.date64())])
schema

# schema's can also have nested structures



idbb_unique: int32
price: double
data_data: date64[ms]

In [13]:
int_arr = pa.array([100, 200, 300, None], type=pa.uint32())
print(f'{int_arr=}')

float_arr = pa.array([100.1, 200.0, 300.0, None])
print(f'{float_arr=}')

# check out the NULLs
print(f'{float_arr[-1]=}')

float_arr.null_count

int_arr=<pyarrow.lib.UInt32Array object at 0x7fd10a4faf40>
[
  100,
  200,
  300,
  null
]
float_arr=<pyarrow.lib.DoubleArray object at 0x7fd10a4fafa0>
[
  100.1,
  200,
  300,
  null
]
float_arr[-1]=<pyarrow.DoubleScalar: None>


1

In [14]:
# Slicing is zero-copy operation, since Array's are readonly

float_arr[1:3]

<pyarrow.lib.DoubleArray object at 0x7fd110ae6ee0>
[
  200,
  300
]

# Time to talk about None and Nans!

In [15]:
x = int_arr[-1]

x.is_valid


False

# Union/Category


# Record Batches

- Collection of equal length array instances
- Remember (arrays are just contiguous elements of primitives, structs, unions)


In [16]:
# all elements should be of same length
data = [
    pa.array([1,2,3,4]),
    pa.array(['foo', 'bar','baz', None]),
    pa.array([True, None, False, True])
]
batch = pa.RecordBatch.from_arrays(data, ['f0', 'f1', 'f2'])



print(f'{batch.num_columns=}, {batch.num_rows=}, {batch.schema=}, {batch[1]}')


batch.num_columns=3, batch.num_rows=4, batch.schema=f0: int64
f1: string
f2: bool, [
  "foo",
  "bar",
  "baz",
  null
]


# Slicing into Record Batches

-  Zero-copy!

In [17]:
print(f'{batch[0:2][1]=}, {batch[0:2][0]=}')

batch[0:2][1]=<pyarrow.lib.StringArray object at 0x7fd10a2829a0>
[
  "foo",
  "bar"
], batch[0:2][0]=<pyarrow.lib.Int64Array object at 0x7fd10a2829a0>
[
  1,
  2
]


# Tables

- Grouping RecordBatches into one logical unit
- Needed so that record-batches arriving from different processes need not be copied/reorganized again

In [18]:
batches = [batch] * 5
table = pa.Table.from_batches(batches)

table[0]

<pyarrow.lib.ChunkedArray object at 0x7fd110aee9a0>
[
  [
    1,
    2,
    3,
    4
  ],
  [
    1,
    2,
    3,
    4
  ],
  [
    1,
    2,
    3,
    4
  ],
  [
    1,
    2,
    3,
    4
  ],
  [
    1,
    2,
    3,
    4
  ]
]

In [19]:
table_1 = [table]*2

table_all = pa.concat_tables(table_1)
table_all[0]

<pyarrow.lib.ChunkedArray object at 0x7fd10a289c70>
[
  [
    1,
    2,
    3,
    4
  ],
  [
    1,
    2,
    3,
    4
  ],
  [
    1,
    2,
    3,
    4
  ],
  [
    1,
    2,
    3,
    4
  ],
  [
    1,
    2,
    3,
    4
  ],
  [
    1,
    2,
    3,
    4
  ],
  [
    1,
    2,
    3,
    4
  ],
  [
    1,
    2,
    3,
    4
  ],
  [
    1,
    2,
    3,
    4
  ],
  [
    1,
    2,
    3,
    4
  ]
]

In [21]:
# Nested columns
# all elements should be of same length

a = pa.array([1,2,3,4], type=pa.uint16())

In [None]:
# Numpy Integration


In [None]:
# Pandas Frame Vs Arrow Table

- Both needs columns to be of same length
- Columns (arrays in Arrow) can have nested types like struct etc (first class objects)
-  

In [None]:
# The plasma in-memory data store

In [None]:
# relationship to Parquet

In [None]:
# More examples

In [22]:
# all elements should be of same length
data = [
    pa.array([1,2,3,4]),
    #pa.array(['foo', 'bar','baz', None]),
    #pa.array([True, None, False, True])
]
batch = pa.RecordBatch.from_arrays(data, ['f0']) # 'f1', 'f2'])

table = pa.Table.from_batches([batch]*2)

df = table.to_pandas(split_blocks=True, self_destruct=True)
df.loc[0, 'f0'] = 1000

df



Unnamed: 0,f0
0,1000
1,2
2,3
3,4
4,1
5,2
6,3
7,4


In [63]:
import pyarrow.compute as pc
a = pc.sum(batch[0])

b = pc.multiply(batch[0], batch[0])
print(b)

pc.add(batch[0], batch[0])

pc.equal(batch[0],1)

l = pc.SetLookupOptions(value_set=pa.array([1]))
            
pc.is_in(data[0], options=l)

pa.scalar(1)
           

[
  1,
  4,
  9,
  16
]


<pyarrow.Int64Scalar: 1>

# Feather vs Parquet

- https://stackoverflow.com/questions/48083405/what-are-the-differences-between-feather-and-parquet

In [81]:
# Example

schema = pa.schema([('sec1', pa.float64()),
                    ('sec2', pa.float64()),
                    ('sec3', pa.float64())])

data = [pa.array([101, 102, 99]),
        pa.array([201, 200, 195]),
        pa.array([10, 20, 15])
       ]

batch = pa.RecordBatch.from_arrays(data, schema=schema)

batch


def compute_returns(old, new):
        import pyarrow.compute as pc
        return pc.subtract(pc.divide(new, old),1)
        
returns = []
for i in range(batch.num_columns):
    price = batch[0]
    price1 = price[:-1]
    price2 = price[1:]
    z = compute_returns(price1, price2)
    returns.append(z)
    
returns_table = pa.RecordBatch.from_arrays(returns, schema=schema)


returns_table.to_pandas()

Unnamed: 0,sec1,sec2,sec3
0,0.009901,0.009901,0.009901
1,-0.029412,-0.029412,-0.029412


https://arrow.apache.org/blog/2017/10/15/fast-python-serialization-with-ray-and-arrow/