## Ingest Data From s3 to BTrDB
This notebook will demonstrate how to subclass `CloudMixin` to create a `DataParser` that can load data from AWS s3.

In [1]:
import os
import s3fs
import btrdb
import pandas as pd

from pgimport.csv_parser import CSVParser
from pgimport.ingest import DataIngestor
from pgimport.cloud import S3Mixin, S3File

### Creating a Custom File
We can extend the `File` object to include a `count`, which can be passed to a `DataIngestor` and allows for a progress bar during ingestion.  

In [2]:
class S3ProgBarFile(S3File):
    def __init__(self, path, count=True, header=False):
        super().__init__(path)
        # TODO: this works but its slow, need to find a more efficient way to do this
        if count:
            header_rows = 1 if header else 0
            df = pd.read_csv(path)
            self.count = (len(df) - header_rows) * (len(df.columns) - 1)

### Creating a Custom DataParser
We can create a custom `DataParser` by extending the `CSVParser` and combining it with the `S3Mixin`, which provides a connection to s3 by assigning a `client` attribute that is equivalent to `boto3.client("s3")`  

In [3]:
class S3CSVParser(S3Mixin, CSVParser):
    """
    Parameters
    -----------
    collection_prefix: str
        prefix to add to all streams' collection names
    regex: str
        regex string to use to retrieve collection from file name
    metadata: dict or str
        either a dict of metadata or a str filename referring to a yaml/json metadata file
    meta_func:
        callback function to use to map metadata to Stream objects
    bucket: str
        s3 bucket that contains raw data files
    s3_prefix: str
        (optional) subdirectory within bucket that contains raw data files 
    **kwargs: dict
        (optional) key/value pairs specifying required AWS credentials. Can be left as None
        if credentials are stored as environtment variables
    """
    def __init__(self, collection_prefix=None, regex=None, metadata=None, meta_func=None, bucket=None, s3_prefix=None, **kwargs):
        # init CSVParser
        CSVParser.__init__(self, collection_prefix=collection_prefix, regex=regex, metadata=metadata, meta_func=meta_func)
        # establish attrs that are specific to this class
        self.bucket = bucket
        self.s3_prefix = s3_prefix
        # init S3Mixin, connects to s3 and provides a handle via self.client
        S3Mixin.__init__(self, **kwargs)
        self.client = self.connect()
    
    def collect_files(self):
        return [S3ProgBarFile(fpath, header=True) for fpath in self.list_objects(self.bucket, prefix=self.s3_prefix)]

### Ingest s3 Data 
Next we will use the `S3CSVParser` to load data from s3, create `Stream` objects, and pass them to the `DataIngestor` for insertion into BTrDB.

In [5]:
# bucket = "your-bucket"
# subdir = "any-directory-within-bucket"
bucket = "pt-ni4ai-atc"
prefix = "test_csvs"

# explicity setting credentials is optional, you can also just store them as env vars
aws_creds = {
    "aws_access_key_id": os.environ["AWS_ACCESS_KEY_ID"],
    "aws_secret_access_key": os.environ["AWS_SECRET_ACCESS_KEY"],
    "aws_session_token": os.environ["AWS_SESSION_TOKEN"]
}

s3 = S3CSVParser(collection_prefix="test_ingest", bucket=bucket, s3_prefix=prefix, **aws_creds)
files = s3.collect_files()
total_points = sum([f.count for f in files])

print(f"Found {len(files)} files")
print(f"Files contain {total_points} points")

Found 4 files
Files contain 6480000 points


In [6]:
# replace with your BTrDB credentials
conn = btrdb.connect(os.environ["BTRDB_ENDPOINTS"], apikey=os.environ["BTRDB_API_KEY"])

ingestor = DataIngestor(conn, total_points=total_points)
for streams in s3.create_streams(files):
    ingestor.ingest(streams)

6480060it [05:40, 24656.75it/s]                             