### Arrow Experiments
https://arrow.apache.org/docs/python/index.html

Arrow Memory Pools

In [None]:
import pyarrow as pa
data = b'abcdefghijklmnopqrstuvwxyz'
buf = pa.py_buffer(data)

In [None]:
buf

In [None]:
buf.size

In [None]:
memoryview(buf)

In [None]:
buf.to_pybytes()

Memory Pools

In [None]:
pa.total_allocated_bytes()

In [None]:
buf = pa.allocate_buffer(1024, resizable=True)
pa.total_allocated_bytes()

In [None]:
buf.resize(2048)
pa.total_allocated_bytes()

Input and outputs - Files and Streams

Input Stream

In [None]:
buf = memoryview(b"some data")
stream = pa.input_stream(buf)
stream.read(4)

In [None]:
import gzip
with gzip.open('example.gz', 'wb') as f:
    f.write(b'some data\n' * 3)
stream = pa.input_stream('example.gz')
stream.read()

Output Streams

In [None]:
with pa.output_stream('example1.dat') as stream:
    stream.write(b'some data')

f = open('example1.dat', 'rb')

f.read()

CSV Files

In [None]:
import pandas
import pyarrow as pa
from pyarrow import csv

pa.csv.ParseOptions(delimiter=',')
fn = 'orders.csv'
table = csv.read_csv(fn)
table
pa.Table
len(table)
df = table.to_pandas()
df.head()

In [None]:
pa.cpu_count()

Data Types and In-Memory Data Model
pyarrow types - DataType (metadata), Schema, Array, RecordBatch (Array objects with Schema), Tables (columns with one or more Array)

Metadata for type information

Fixed length primitive - int, float, bool, date
Variable length primitive - string, binary
Nested - List, struct, union
Dictionary - Encoded Categorical type

In [None]:
import pyarrow as pa
t1 = pa.int32()
t2 = pa.string()
t3 = pa.binary()
t4 = pa.binary(10)
t5 = pa.timestamp('ms')

In [None]:
t1

In [None]:
print(t1)
print(t2)
print(t3)
print(t4)
print(t5)

Field has data type and some information

In [None]:
f0 = pa.field('int32_field', t1)

In [None]:
f0

In [None]:
f0.name
f0.type

In [None]:
t6 = pa.list_(t1)
t6

Struct is a collection of names fields

In [None]:
fields = [
    pa.field('s0', t1),
    pa.field('s1', t2),
    pa.field('s2', t4),
    pa.field('s3', t6),
]


t7 = pa.struct(fields)

print(t7)

In [None]:
t8 = pa.struct([('s0', t1), ('s1', t2), ('s2', t4), ('s3', t6)])
print(t8)
t8 == t7

Schemas are Struct defining name and type of the columns.

In [None]:
my_schema = pa.schema([('field0', t1),
                       ('field1', t2),
                       ('field2', t4),
                       ('field3', t6)])
my_schema

Arrays - One block of data

In [None]:
arr = pa.array([1, 2, None, 3])
arr

In [None]:
pa.array([1, 2], type=pa.uint16())

In [None]:
arr.type

In [None]:
len(arr)

In [None]:
arr.null_count

List Arrays

In [None]:
nested_arr = pa.array([[], None, [1, 2], [None, 1]])
nested_arr

Struct Arrays - Pass type explicitly

In [None]:
ty = pa.struct([('x', pa.int8()),
                ('y', pa.bool_())])
pa.array([{'x': 1, 'y': True}, {'x': 2, 'y': False}], type=ty)

In [None]:
pa.array([(3, True), (4, False)], type=ty)

In [None]:
xs = pa.array([5, 6, 7], type=pa.int16())
ys = pa.array([False, True, True])
arr = pa.StructArray.from_arrays((xs, ys), names=('x', 'y'))

In [None]:
arr.type

In [None]:
arr

Union Arrays

In [None]:
xs = pa.array([5, 6, 7])
ys = pa.array([False, False, True])
types = pa.array([0, 1, 1], type=pa.int8())
union_arr = pa.UnionArray.from_sparse(types, [xs, ys])

In [None]:
union_arr.type

In [None]:
union_arr

In [None]:
xs = pa.array([5, 6, 7])
ys = pa.array([False, True])
types = pa.array([0, 1, 1, 0, 0], type=pa.int8())
offsets = pa.array([0, 0, 1, 1, 2], type=pa.int32())
union_arr = pa.UnionArray.from_dense(types, offsets, [xs, ys])

In [None]:
union_arr.type

In [None]:
union_arr

Dictionary Arrays

In [None]:
indices = pa.array([0, 1, 0, 1, 2, 0, None, 2])
dictionary = pa.array(['foo', 'bar', 'baz'])
dict_array = pa.DictionaryArray.from_arrays(indices, dictionary)

dict_array

In [None]:
print(dict_array.type)
dict_array.indices
dict_array.dictionary

In [None]:
dict_array.to_pandas()

Record Batches - Collection of Equal length array instances

In [None]:
data = [
    pa.array([1, 2, 3, 4]),
    pa.array(['foo', 'bar', 'baz', None]),
    pa.array([True, None, False, True])
]

In [None]:
data

In [None]:
batch = pa.RecordBatch.from_arrays(data, ['f0', 'f1', 'f2'])

In [None]:
batch.num_columns

In [None]:
batch.num_rows

In [None]:
batch.schema

In [None]:
batch[1]

In [None]:
batch2 = batch.slice(1, 3)

In [None]:
batch2[1]

Tables - Single Logical data set with multiple batches and array pieces

In [None]:
batches = [batch] * 5
table = pa.Table.from_batches(batches)

In [None]:
table

In [None]:
pa.Table

In [None]:
table.num_rows

In [None]:
c = table[0]

Can be converted to pandas for processing

In [None]:
c.to_pandas()

In [None]:
tables = [table] * 2
table_all = pa.concat_tables(tables)

In [None]:
table_all.num_rows

In [None]:
c = table_all[0]
c.num_chunks

Streaming, Serialization and IPC

In [None]:
import pyarrow as pa
data = [
    pa.array([1, 2, 3, 4]),
    pa.array(['foo', 'bar', 'baz', None]),
    pa.array([True, None, False, True])
]
batch = pa.record_batch(data, names=['f0', 'f1', 'f2'])
batch.num_rows
batch.num_columns

In [None]:
sink = pa.BufferOutputStream()
writer = pa.ipc.new_stream(sink, batch.schema)

In [None]:
for i in range(5):
   writer.write_batch(batch)
writer.close()
buf = sink.getvalue()
buf.size

In [None]:
reader = pa.ipc.open_stream(buf)
reader.schema
batches = [b for b in reader]
len(batches)

In [None]:
batches[0].equals(batch)

In [None]:
sink = pa.BufferOutputStream()
writer = pa.ipc.new_file(sink, batch.schema)
for i in range(10):
   writer.write_batch(batch)
writer.close()
buf = sink.getvalue()
buf.size

In [None]:
reader = pa.ipc.open_file(buf)

In [None]:
reader.num_record_batches
b = reader.get_batch(3)

In [None]:
b.equals(batch)

In [None]:
df = pa.ipc.open_file(buf).read_pandas()
df[:5]

In [None]:
import numpy as np
data = {
    i: np.random.randn(500, 500)
    for i in range(100)
}

In [None]:
buf = pa.serialize(data).to_buffer()

In [None]:
type(buf)

In [None]:
buf.size

In [None]:
restored_data = pa.deserialize(buf)
restored_data[0]

In [None]:
import pandas as pd
df = pd.DataFrame({'a': [1, 2, 3, 4, 5]})
context = pa.default_serialization_context()
serialized_df = context.serialize(df)
df_components = serialized_df.to_components()
original_df = context.deserialize_components(df_components)
original_df

These sections have not been tested. They do not currently run
Filesystem Interfaces - Local FileSystem, S3, HDFS

In [None]:
from pyarrow import fs
local = fs.LocalFileSystem()

In [None]:
s3, path = fs.FileSystem.from_uri("s3://my-bucket")

In [None]:
s3

In [None]:
path

In [None]:
# This needs more debug work
pd.read_table("s3://my-bucket/data.parquet")
s3 = fs.S3FileSystem(".")
pd.read_table("my-bucket/data.parquet", filesystem=s3)

In [None]:
local = fs.LocalFileSystem()

with local.open_output_stream("test.arrow") as file:
   with pa.RecordBatchFileWriter(file, table.schema) as writer:
      writer.write_table(table)

In [None]:
local.get_file_info(fs.FileSelector("dataset/", recursive=True))

In [None]:
local.get_file_info('test.arrow')
local.get_file_info('non_existent')

In [None]:
from pyarrow import fs
s3 = fs.S3FileSystem(region='eu-west-3')

In [None]:
f = s3.open_input_stream('my-test-bucket/Dir1/File2')
f.readall()

Set classpath to include hadoop libraries
export CLASSPATH=`$HADOOP_HOME/bin/hdfs classpath --glob`

In [None]:
import gcsfs
fs = gcsfs.GCSFileSystem(project='my-google-project')

# using this to read a partitioned dataset
import pyarrow.dataset as ds
ds.dataset("data/", filesystem=fs)

Plasma In-Memory Object Storage

In [None]:
plasma_store  -s /tmp/plasma

NumPy to Arrow

In [None]:
import numpy as np
import pyarrow as pa
data = np.arange(10, dtype='int16')
arr = pa.array(data)
arr

Arrow to Numpy

In [None]:
import numpy as np
import pyarrow as pa
arr = pa.array([4, 5, 6], type=pa.int32())
view = arr.to_numpy()
view

Pandas Integration

In [None]:
import pandas as pd
import pyarrow as pa

In [None]:
import pyarrow as pa
import pandas as pd

df = pd.DataFrame({"a": [1, 2, 3]})
# Convert from pandas to Arrow
table = pa.Table.from_pandas(df)
# Convert back to pandas
df_new = table.to_pandas()

# Infer Arrow schema from pandas
schema = pa.Schema.from_pandas(df)

In [None]:
from datetime import date
s = pd.Series([date(2018, 12, 31), None, date(2000, 1, 1)])
s

In [None]:
arr = pa.array(s)
arr.type

In [None]:
arr[0]

In [None]:
arr = pa.array(s, type='date64')
arr.type

In [None]:
arr.to_pandas()

In [None]:
s2 = pd.Series(arr.to_pandas(date_as_object=False))
s2.dtype

Reading CSV files

In [None]:
from pyarrow import csv
fn = 'orders.csv.gz'
table = csv.read_csv(fn)
table

In [None]:
pa.Table

In [None]:
len(table)

In [None]:
df = table.to_pandas()
df.head()

JSon Reader

In [None]:
from pyarrow import json
fn = 'my_data.json'
table = json.read_json(fn)
table

Parquet Reader

In [None]:
import numpy as np
import pandas as pd
import pyarrow as pa
df = pd.DataFrame({'one': [-1, np.nan, 2.5],
                   'two': ['foo', 'bar', 'baz'],
                   'three': [True, False, True]},
                   index=list('abc'))
table = pa.Table.from_pandas(df)

In [None]:
import pyarrow.parquet as pq
pq.write_table(table, 'example.parquet')

In [None]:
table2 = pq.read_table('example.parquet')
table2.to_pandas()

In [None]:
pq.read_table('example.parquet', columns=['one', 'three'])

In [None]:
pq.read_pandas('example.parquet', columns=['two']).to_pandas()

In [None]:
df = pd.DataFrame({'one': [-1, np.nan, 2.5],
                   'two': ['foo', 'bar', 'baz'],
                   'three': [True, False, True]},
                   index=list('abc'))

In [None]:
df

In [None]:
table = pa.Table.from_pandas(df, preserve_index=False)

In [None]:
pq.write_table(table, 'example_noindex.parquet')
t = pq.read_table('example_noindex.parquet')
t.to_pandas()

In [None]:
parquet_file = pq.ParquetFile('example.parquet')
parquet_file.metadata
parquet_file.schema

In [None]:
parquet_file.num_row_groups
parquet_file.read_row_group(0)

In [None]:
writer = pq.ParquetWriter('example2.parquet', table.schema)
for i in range(3):
    writer.write_table(table)
writer.close()
pf2 = pq.ParquetFile('example2.parquet')
pf2.num_row_groups

In [None]:
with pq.ParquetWriter('example3.parquet', table.schema) as writer:
    for i in range(3):
        writer.write_table(table)

In [None]:
parquet_file = pq.ParquetFile('example.parquet')
metadata = parquet_file.metadata

In [None]:
metadata = pq.read_metadata('example.parquet')
metadata

In [None]:
metadata.row_group(0)

In [None]:
metadata.row_group(0).column(0)

In [None]:
pq.read_table(table, ".", read_dictionary=['binary_c0', 'stringb_c2'])

Arrow can read data from HDFS, Azure blob storage, S3 storage etc. and create an Arrow Array or Table.

Tabular Data Set - pyarrow.dataset allows connection to database also. 
It also can be used with Cuda with Numba package. Numba implements pyarrow code on LLVM or parallel compiler or GPU for parallel implementations.

In [None]:
import tempfile
import pathlib
import pyarrow as pa
import pyarrow.parquet as pq
base = pathlib.Path(tempfile.gettempdir())
(base / "parquet_dataset").mkdir(exist_ok=True)
# creating an Arrow Table
table = pa.table({'a': range(10), 'b': np.random.randn(10), 'c': [1, 2] * 5})
# writing it into two parquet files
pq.write_table(table.slice(0, 5), base / "parquet_dataset/data1.parquet")
pq.write_table(table.slice(5, 10), base / "parquet_dataset/data2.parquet")

In [None]:
import pyarrow.dataset as ds
dataset = ds.dataset(base / "parquet_dataset", format="parquet")
dataset

In [None]:
dataset.files

In [None]:
print(dataset.schema.to_string(show_field_metadata=False))

In [None]:
dataset.to_table()

In [None]:
dataset.to_table().to_pandas()

In [None]:
dataset = ds.dataset(base / "parquet_dataset", format="parquet")
dataset.to_table(columns=['a', 'b']).to_pandas()

In [None]:
dataset.to_table(filter=ds.field('a') >= 7).to_pandas()

In [None]:
dataset.to_table(filter=ds.field('c') == 2).to_pandas()

In [None]:
ds.field('a') != 3

In [None]:
ds.field('a').isin([1, 2, 3])

In [None]:
(ds.field('a') > ds.field('b')) & (ds.field('b') > 1)