In [None]:
import os
from uuid import uuid4
import pandas as pd
from IPython.core.interactiveshell import InteractiveShell

os.environ["AWS_ACCESS_KEY_ID"] = "minio"
os.environ["AWS_SECRET_ACCESS_KEY"] = "miniominio"

ENDPOINT_URL = "http://0.0.0.0:9000"

InteractiveShell.ast_node_interactivity = "all"

In [None]:
!tree

In [None]:
!docker-compose up -d

* What is it
  * why would I use it
  * core building blocks
  * use examples
* instalation
* core features
  * FileSystem
    * local
    * s3
  * DataIO
    * parquet
    * dsv
      * types
  * Bonus: JsonLogger
  
* notes
  * no pre-checks - be careful what you try to read, write
  * copy between bucket - possible but not always optimal
  * test-covered but not perfect, found a bug during preparation
  * 

This notebook demonstrats the capabilities a `data-toolz` package. An open-source python package for handling filesystem I/O and conveniant Pandas wrapper features.

# What is [data-toolz](https://pypi.org/project/data-toolz/)?
`data-toolz` is an open-source python package providing convenient access to I/O operations for both local filesystem and cloud storage (currently AWS S3 supported), as well a layer for accessing data-like objects (`parquet`, `dsv`).

## Why use it
The thought behind creating this package was to standardize common and recuring I/O operations and minimize boilerplate code.

Most "data processes" involve:
* reading input
* processing
* writing output

Let's look at a simple example of reading a file, processing it and storing the results locally

In [None]:
# reading/writing local files, using standard library

with open("example-bucket/data.txt") as file:
    data = [int(item) for item in file]

processed = list((item, f"hello {item} from local") for item in data)

with open("example-bucket/processed-local.txt", mode="wt") as file:
    for item in processed:
        size = file.write(f"{item}\n")

In [None]:
!cat example-bucket/processed-local.txt

And now the same operation in a cloud environment

In [None]:
# reading/writing s3 files, using boto3

import boto3

s3_client = boto3.client("s3", endpoint_url=ENDPOINT_URL)

obj = s3_client.get_object(Bucket="example-bucket", Key="data.txt")
data = [int(item) for item in obj["Body"].read().decode("utf-8").split()]

processed = list((item, f"hello {item} from s3") for item in data)

body = "".join(f"{item}\n" for item in processed).encode("utf-8")
response = s3_client.put_object(Bucket="example-bucket", Key="processed-s3.txt", Body=body)

In [None]:
!cat example-bucket/processed-s3.txt

And now the same steps using `data-toolz`

In [None]:
from datatoolz.filesystem import FileSystem

fs = FileSystem()  # equivalent to `FileSystem("local")`

with fs.open("example-bucket/data.txt") as file:
    data = [int(item) for item in file]

processed = list((item, f"hello {item} from local datatoolz") for item in data)

with fs.open("example-bucket/processed-local-dt.txt", mode="wt") as file:
    for item in processed:
        size = file.write(f"{item}\n")

In [None]:
!cat example-bucket/processed-local-dt.txt

In [None]:
fs = FileSystem("s3", endpoint_url=ENDPOINT_URL)

with fs.open("example-bucket/data.txt") as file:
    data = [int(item) for item in file]

processed = list((item, f"hello {item} from s3 datatoolz") for item in data)

with fs.open("example-bucket/processed-s3-dt.txt", mode="wt") as file:
    for item in processed:
        size = file.write(f"{item}\n")

In [None]:
!cat example-bucket/processed-s3-dt.txt

`data-toolz` allows to abstract the read and write part and use the same code both for local development as well as cloud deployment.

## Core building blocks

Source code can be found on Github: https://github.com/grzegorzme/data-toolz.
The package is a wrapper around [fsspec](https://filesystem-spec.readthedocs.io/en/latest/) and [s3fs](https://s3fs.readthedocs.io/en/latest/)

* `datatoolz.filesystem.FileSystem` - main entrypoint for accessing file system layer based on `fsspec.AbstractFileSystem`

* `datatoolz.io.DataIO` - class for handling I/O operations on datasets. Accessed via two main methods
  * `read(path, ...)`

  * `write(dataframe, path, ...)`
  
* `datatoolz.logging.JsonLogger` - utlity class used for structured logging

# Installation

`data-toolz` is indexed on [PyPI](https://pypi.org/project/data-toolz/) and latest version can be installed via `pip`

```bash
pip install data-toolz
```

# Feature overview

## FileSystem (`datatoolz.filesystem.FileSystem`)

This main entrypoint for accessing file system layer based on `fsspec.AbstractFileSystem`. It can be used to perform common file system operations like:
* opening/writing files
* listing/deleting files/folders
* and few more depending on the underlying implementation


Initialisation:

```python
from datatoolz.filesystem import FileSystem

fs = FileSystem()                                       # simple instance pointing to local file system
fs = FileSystem("local")                                # same as above
fs = FileSystem("s3")                                   # pointer to s3 service
fs = FileSystem("s3", assumed_role="arn:aws:iam::123456789012:role/my-role")  # s3 with custom access role
fs = FileSystem("s3", endpoint_url="s3.amazonaws.com")  # custom endpoint url passed to the service client
```


### Writing and reading a file
* `open` - open file in text/binary read/write mode

In [None]:
from datatoolz.filesystem import FileSystem

fs_name = "local"

fs = FileSystem(fs_name, endpoint_url=ENDPOINT_URL)

with fs.open("example-bucket/example.txt", mode="wt") as file:
    size = file.write(f"Hello {fs.name}!")

with fs.open("example-bucket/example.txt", mode="rt") as file:
    file.read()

### Basic operations

* `ls` - list contents of folder

* `mkdir` - create folder

* `rm` - remove file/folder


In [None]:
from datatoolz.filesystem import FileSystem

fs_name = "local"
fs = FileSystem(fs_name, endpoint_url=ENDPOINT_URL)

PATH = f"example-bucket/new-directory-{str(uuid4())[:4]}"

In [None]:
# create a new directory
fs.mkdir(PATH)
f"Contents of {PATH}: {[item['name'] for item in fs.ls(PATH)]}"

In [None]:
# write some files
for i in range(3):
    with fs.open(f"{PATH}/{i}.txt", mode="wt") as file:
        size = file.write(f"Hello {i}")

f"After adding files: {[item['name'] for item in fs.ls(PATH)]}"

In [None]:
# remove file
fs.rm(f"{PATH}/1.txt")

f"After removing single file: {[item['name'] for item in fs.ls(PATH)]}"

In [None]:
# remove whole folder
fs.rm(PATH, recursive=True)

# check the folder was removed
[item["name"] for item in fs.ls("example-bucket") if item["type"] == "directory"]

### Other operations

* `exists` - check if object exists

* `copy` / `cp` - copy object between two location in the file system

* `move` / `mv` - move object between two location in the file system

* `walk` - walk the directory tree (see standard library method `os.walk`)

* `isdir` - check if path is a directory

* `disk_usage` / `du` - get disk usage of a path


As the `FileSystem` object inherits from it's base class there are many more methods available (some may be restricted to `local` or `s3` types)

In [None]:
fs_name = "s3"
fs = FileSystem(fs_name, endpoint_url=ENDPOINT_URL)

In [None]:
# lets check two paths
for file_name in ["example-bucket/example.txt", "example-bucket/non-existent.txt"]:
    f"{file_name} exists: {fs.exists(file_name)}"

In [None]:
# move file
fs.mv("example-bucket/example.txt", "example-bucket/non-existent.txt")

for file_name in ["example-bucket/example.txt", "example-bucket/non-existent.txt"]:
    f"{file_name} exists: {fs.exists(file_name)}"

In [None]:
# copy file
fs.cp("example-bucket/non-existent.txt", "example-bucket/example.txt")

for file_name in ["example-bucket/example.txt", "example-bucket/non-existent.txt"]:
    f"{file_name} exists: {fs.exists(file_name)}"

In [None]:
# cleanup
fs.rm("example-bucket/non-existent.txt")

In [None]:
# lets check if path is a directory
for path in ["example-bucket/example.txt", "example-bucket"]:
    f"{path} is a directory: {fs.isdir(path)}"

In [None]:
# walk path
for r, d, f in fs.walk("example-bucket"):
    r
    d
    f
    "-----"

In [None]:
# check disk usage
f"File disk usage: {fs.du('example-bucket/data.txt')}"
f"Folder disk usage: {fs.du('example-bucket')}"

### Task
How to copy/move files between two `s3` buckets?

In [None]:
# copy file `./example-bucket/data.txt` to `s3://example-bucket/new.txt`





### AWS role-based access

`FileSystem` allows to access a `s3` bucket via a provided role. This is useful when your current role does not have direct access to a bucket, but instead are allowed to assume a role which does.

```python
fs = FileSystem(
    "s3", 
    endpoint_url=ENDPOINT_URL,
    assumed_role="arn:aws:iam::123456789012:role/my-role"
)
```

Alternatively if you need to jump through an "assume chain" it is also possible

```python
fs = FileSystem(
    "s3", 
    endpoint_url=ENDPOINT_URL,
    assumed_role=["arn:aws:iam::123456789012:role/role-1", "arn:aws:iam::123456789012:role/role-2"]
)
```

Note that for the assume chain to work each role is assumed in succession, therefore each role in the provided list needs to be permitted to be assumed by the previous one.

The assumed credentials are automatically refreshed in case your application runs for longer then one hour (AWS default for assume role action)

## DataIO (datatoolz.io.DataIO)
Is a wrapper class for reading and writing data files into/from a `pandas.DataFrame`.

It exposes two main methods
  * `read(path, ...)`
  
  * `write(dataframe, path, ...)`

Initialisation:
```python
from datatoolz.io import DataIO
from datatoolz.filesystem import FileSystem

dio = DataIO()                     # basic instance pointing to local file system


fs = FileSystem(...)
dio = DataIO(filesystem=fs)        # instance pointing to a predefined file system (local/s3)


def my_partition_transformer(prefix, partitions, values, suffix):
    return "string"
dio = DataIO(
    partition_transformer=my_partition_transformer
)                                  # instance with a custom `partition_transformer` callable

```

### Basic writing and reading parquet
* `write(..., filetype="parquet")` - specified via the `filetype` argument, and is the default value if `filetype` is omited

* `read(..., filetype="parquet")` - same as above

In [None]:
df = pd.DataFrame({"col1": [1, 2, 1], "col2": ["a", "b", "c"]})
df

from datatoolz.io import DataIO
dio = DataIO()

In [None]:
# write data to a given path(!)
dio.write(df, "my-parquet-file")

# read data from a give path(!)
df_read = dio.read("my-parquet-file")
df_read

# note the created folder/prefix - default settings do not result in idempotent operations!
!tree && rm -rf my-parquet-file

### Advanced writing
* `write(..., suffix=...)` - specify a custom output suffix

* `write(..., partition_by=...)` - specifies output partitioning

In [None]:
df = pd.DataFrame({"col1": [1, 2, 1], "col2": ["a", "b", "c"]})
df

from datatoolz.io import DataIO
dio = DataIO()

In [None]:
# with `suffix=""` the dataframe will be written under `path` as a single file
dio.write(df, path="my-parquet-file", suffix="")
dio.read("my-parquet-file")

!tree && rm -rf my-parquet-file

In [None]:
# with `suffix="string"` the dataframe will be written under `path/suffix` as a single file
dio.write(df, path="my-parquet-file", suffix="my-file")
dio.read("my-parquet-file")

!tree && rm -rf my-parquet-file

In [None]:
# with `suffix=["string1", "string2", ...]` the dataframe will be written in `path`
# under multiple files listed in `suffix` (uniform split)
dio.write(df, path="my-parquet-file", suffix=["my-file-1", "my-file-2"])
dio.read("my-parquet-file")

# dio.read("my-parquet-file/my-file-1")

!tree && rm -rf my-parquet-file

In [None]:
# with `partition_by=["field"]` the output in `path` will be additionally split by partition value
dio.write(df, path="my-parquet-file", partition_by=["col1"])
dio.read("my-parquet-file")

dio.read("my-parquet-file/col1=1")

!tree && rm -rf my-parquet-file

In [None]:
# the output can be partitioned by multiple fields `partition_by=["field1", "field2", ...]`
dio.write(df, path="my-parquet-file", partition_by=["col1", "col2"])
dio.read("my-parquet-file")

dio.read("my-parquet-file/col1=1")

!tree && rm -rf my-parquet-file

In [None]:
# you can overwrite the default partition_transformer resulting in different output path building e.g.
# * default: `my-parquet-file/col1=1/1649112618853172000-25105f49-24dd-443b-b6bf-a5ca8f7a18d9`
# * custom: `my-parquet-file/1/fixed-name`

def custom_partition_transformer(prefix, partitions, values, suffix):
    partition_part = "/".join(map(str, values))
    return f"{prefix}/{partition_part}/fixed-name"

dio = DataIO(partition_transformer=custom_partition_transformer)

dio.write(df, path="my-parquet-file", partition_by=["col1"])
dio.read("my-parquet-file")

!tree && rm -rf my-parquet-file

In [None]:
# additionally the partition columns can be dropped from output to reduce redundancy
# this process is NOT REVERSABLE by default!!!
dio = DataIO()

dio.write(df, path="my-parquet-file", partition_by=["col1"], drop_partitions=True)
dio.read("my-parquet-file/col1=1")

!tree && rm -rf my-parquet-file

### Other file types

Basides `parquet` the following types are handled
* `write(..., filetype="jsonlines")`
* `write(..., filetype="dsv")`
* `write(..., filetype="dsv", **pandas_kwargs)`

Both types support compression via `gzip=True`.

`pandas_kwargs` is passed to `pandas.DataFrame.to_csv` method.

In [None]:
df = pd.DataFrame({"col1": [1, 2, 1], "col2": ["a", "b", "c"]})
df

from datatoolz.io import DataIO
dio = DataIO()

In [None]:
# write gzip-ed jsonlines
dio.write(df, path="my-data.json.gz", filetype="jsonlines", gzip=True, suffix="")
dio.read("my-data.json.gz", filetype="jsonlines", gzip=True)

!tree && gunzip my-data.json.gz && cat my-data.json && rm my-data.json

In [None]:
# write tab-separated (default) file
dio.write(df, path="my-data.txt", filetype="dsv", suffix="")
dio.read("my-data.txt", filetype="dsv")

!tree && cat my-data.txt && rm my-data.txt

In [None]:
# write |-separated file without header
dio.write(df, path="my-data.txt", filetype="dsv", sep="|", header=None, suffix="")
dio.read("my-data.txt", filetype="dsv", sep="|", header=None)

!tree && cat my-data.txt && rm my-data.txt