In [None]:
#| default_exp datalake

## Datalake interface

In [None]:
#| exporti
#| hide

import boto3
import polars as pl
import s3fs
import json
from dateutil.parser import parse

from io import BytesIO

from lakeinterface.config import ConfigManager


In [None]:
#| exporti


def most_recent(keys, prefix):
    dates = [
        o.replace(prefix, '').replace('data.parquet', '').replace('/', '')
        for o in keys
    ]
    latest = max(parse(d) for d in dates).strftime('%Y%m%d')
    return f'{prefix}/{latest}/data.parquet'


In [None]:
#| export
#| hide

class S3ObjectNotFound(Exception):
    pass


class Datalake(object):
    """
    A class to wrap interface to an AWS S3 datalake
    Implemented as a singleton to reduce number of live sessions
    ...

    Attributes
    ----------
    session: a boto3 session
    s3 : a boto3 S3 client
    bucket : S3 bucket location of lake
    
    Methods
    -------
    __init__(config, profile='default'):
        Initializes the AWS S3 client using AWS profile_name and dict of parameters from ConfigManager
    
    get_object(key):
        Core method for loading objects using boto3 S3 client
    
    load_csv(key, delimiter=',', skiprows=None, line_terminator=None):
        Loads csv object with S3 prefix = key
    
    load_json(key):
        Loads json object with S3 prefix = key
        
    list_objects(prefix):
        Lists all objects with S3 prefix = key
    
    save_json(path, data, timestamp=None):
        Saves json object to specified path with an optional timestamp that will be inserted into path
    
    put_object(key, data, metadata={}):
        Core method for saving objects using boto3 S3 client
    
    most_recent(prefix):
        For a given S3 prefix returns object has most recent timestamp
     
    put(path, df, timestamp=None):
        Saves a dataframe as parquet to specified path with an optional timestamp that will be inserted into path
    
    get(path):
        Loads parquet object from specified path as a dataframe

    """
    _instance = None

    def __new__(cls, config, profile_name='default'):
        if cls._instance is None:
            cls._instance = super(Datalake, cls).__new__(cls)
            # Put any initialization here.
        return cls._instance
    
    def __init__(self, config, profile_name='default'):
        self.session = boto3.session.Session(profile_name=profile_name)
        
        self.bucket = config.get('bucket')
        self.s3 = self.session.client('s3')
        self.fs = s3fs.S3FileSystem(profile=profile_name)
        
    def get_object(self, key):
        try:
            return self.s3.get_object(Bucket=self.bucket, Key=key)
        except Exception as e:
            if e.response['Error']['Code'] == 'NoSuchKey':
                raise S3ObjectNotFound('No S3 object with key = %s' % key)
            else:
                raise

    def load_csv(self,key, separator=',', skiprows=None, line_terminator=None):
        obj = self.get_object(key)
        if line_terminator:
            return pl.read_csv(obj['Body'], separator=separator, lineterminator=line_terminator)
        else:
            return pl.read_csv(obj['Body'], separator=separator)
    
    
    def load_json(self, key):
        obj = self.get_object(key)
        return json.loads(obj['Body'].read())
    
    def load_parquet(self, key):
        obj = self.get_object(key)
        return pl.read_parquet(BytesIO(obj['Body'].read()))
    
    def get(self, path):
        try:
            key = self.most_recent(path)
        except Exception as e:
            print(f'No objects found with path: {key}. {e}')
            return None

        return self.load_parquet(key)
    
    
    def list_objects(self, prefix):
        
        paginator = self.s3.get_paginator('list_objects_v2')
        pages = paginator.paginate(Bucket=self.bucket, Prefix=prefix)

        return sum([[obj['Key'] for obj in page['Contents']] for page in pages], [])
    

    def save_json(self, path, data, timestamp=None):
        if timestamp:
            key = f'{path}/timestamp={timestamp}/data.json'
        else:
            key = f'{path}/data.json'

        return self.put_object(key, json.dumps(data))
        
    def put_object(self, key, data, metadata={}):
        try:
            resp = self.s3.put_object(
                Bucket=self.bucket,
                Key=key,
                Body=data
            )
            status_code = resp['ResponseMetadata']['HTTPStatusCode']
            if status_code == 200:
                return True
            else:
                raise Exception(f'Unknown error. Status code: {status_code}')
        except Exception as e:
            raise Exception(f'Unknown error in put object for {key}. {str(e)}')

    
            
    def put(self, path, df, timestamp=None):
        if timestamp:
            key = f'{path}/{timestamp}/data.parquet'
        else:
            key = f'{path}/data.parquet'

        with self.fs.open(f'{self.bucket}/{key}', mode='wb') as f:
            df.write_parquet(f)
        
        return
    
    def most_recent(self, prefix):
        matched_objects = self.list_objects(prefix=prefix)
        
        if len(matched_objects) > 1:
            try:
                return most_recent(matched_objects, prefix)
            except:
                print(f'Multiple objects found for prefix {prefix}. Unable to find most recent.')
                return None
        elif len(matched_objects) == 0:
            print(f'No objects found for prefix {prefix}')
            return None
        else:
            return matched_objects[0]

    


In [None]:
print(Datalake.__doc__)


    A class to wrap interface to an AWS S3 datalake
    Implemented as a singleton to reduce number of live sessions
    ...

    Attributes
    ----------
    session: a boto3 session
    s3 : a boto3 S3 client
    bucket : S3 bucket location of lake
    
    Methods
    -------
    __init__(config, profile='default'):
        Initializes the AWS S3 client using AWS profile_name and dict of parameters from ConfigManager
    
    get_object(key):
        Core method for loading objects using boto3 S3 client
    
    load_csv(key, delimiter=',', skiprows=None, line_terminator=None):
        Loads csv object with S3 prefix = key
    
    load_json(key):
        Loads json object with S3 prefix = key
        
    list_objects(prefix):
        Lists all objects with S3 prefix = key
    
    save_json(path, data, timestamp=None):
        Saves json object to specified path with an optional timestamp that will be inserted into path
    
    put_object(key, data, metadata={}):
        Core m

In [None]:
cfgmgr = ConfigManager(profile='personal')
cfg = cfgmgr.fetch_config('bankdata')

In [None]:
lake = Datalake(cfg, profile_name='personal')

In [None]:
d = {'col1': [1, 2], 'col2': [3, 4]}
df = pl.DataFrame(data=d)
df

col1,col2
i64,i64
1,3
2,4


In [None]:
lake.put('test/example1', df)

In [None]:
lake.list_objects(prefix='test')

['test/example1/data.parquet', 'test/example2/data.parquet']

In [None]:
lake.get('test/example1')

col1,col2
i64,i64
1,3
2,4


### Testing polars

There are multiple ways of having polars load & save data from S3. This was to test out approaches for which approach to take in the Datalake class defined above

In [None]:
import pyarrow.parquet as pq
import pyarrow.dataset as ds
import time

In [None]:
fs = s3fs.S3FileSystem(profile='personal')
bucket = lake.bucket

#### Reading parquet file I

Doesn't work with scan

In [None]:
path = 'test/example1/data.parquet'

t0 = time.time()

dataset = pq.ParquetDataset(f"s3://{bucket}/{path}", filesystem=fs)
df = pl.from_arrow(dataset.read())
print(time.time() - t0)

3.8260788917541504


In [None]:
df

col1,col2,__index_level_0__
i64,i64,i64
1,3,0
2,4,1


#### Reading parquet file II

using pyarrow dataset to specify format

In [None]:
t0 = time.time()
dataset2 = ds.dataset(f"s3://{bucket}/{path}", filesystem=fs, format='parquet')
df_parquet = pl.scan_pyarrow_dataset(dataset2)

print(df_parquet.collect().head())
print(time.time() - t0)

shape: (2, 3)
┌──────┬──────┬───────────────────┐
│ col1 ┆ col2 ┆ __index_level_0__ │
│ ---  ┆ ---  ┆ ---               │
│ i64  ┆ i64  ┆ i64               │
╞══════╪══════╪═══════════════════╡
│ 1    ┆ 3    ┆ 0                 │
│ 2    ┆ 4    ┆ 1                 │
└──────┴──────┴───────────────────┘
2.759658098220825


#### Reading parquet file III

using boto3 get_object

Doesn't allow scanning but approach works for csv and json files too. Appears to be quicker too

This is first choice and an easy switch

In [None]:
t0 = time.time()

resp = lake.get_object('test/example2/data.parquet')
df = pl.read_parquet(BytesIO(resp['Body'].read()))

print(time.time() - t0)

1.2696380615234375


In [None]:
csv_pth = 'banks/call_reports/raw/20210331/FFIEC CDR Call Bulk POR 03312021.txt'

resp = lake.get_object(csv_pth)
df = pl.read_csv(BytesIO(resp['Body'].read()), separator='\t')


In [None]:
df.head()

IDRSSD,FDIC Certificate Number,OCC Charter Number,OTS Docket Number,Primary ABA Routing Number,Financial Institution Name,Financial Institution Address,Financial Institution City,Financial Institution State,Financial Institution Zip Code,Financial Institution Filing Type,Last Date/Time Submission Updated On
i64,i64,i64,i64,i64,str,str,str,str,i64,i64,str
37,10057,0,16553,61107146,"""BANK OF HANCOC…","""12855 BROAD ST…","""SPARTA""","""GA""",31087,51,"""2021-04-12T14:…"
242,3850,0,0,81220537,"""FIRST COMMUNIT…","""260 FRONT STRE…","""XENIA""","""IL""",62899,51,"""2021-04-16T14:…"
279,28868,0,2523,311972526,"""MINEOLA COMMUN…","""215 W BROAD ""","""MINEOLA""","""TX""",75773,51,"""2021-05-03T08:…"
354,14083,0,0,101107475,"""BISON STATE BA…","""223 MAIN STREE…","""BISON""","""KS""",67520,51,"""2021-04-30T04:…"
457,10202,0,0,91208332,"""LOWRY STATE BA…","""400 FLORENCE A…","""LOWRY""","""MN""",56349,51,"""2021-04-30T15:…"


#### Writing parquet file

In [None]:
d = {'col1': [1, 2], 'col2': [3, 4]}
df = pl.DataFrame(data=d)
df

col1,col2
i64,i64
1,3
2,4


In [None]:
t0 = time.time()
with fs.open(f'{bucket}/test/example3/data.parquet', mode='wb') as f:
    df.write_parquet(f)
    
print(time.time() - t0)

0.6675841808319092


In [None]:
path = 'test/example3/data.parquet'
dataset2 = ds.dataset(f"s3://{bucket}/{path}", filesystem=fs, format='parquet')
df_parquet = pl.scan_pyarrow_dataset(dataset2)
print(df_parquet.collect().head())

shape: (2, 2)
┌──────┬──────┐
│ col1 ┆ col2 │
│ ---  ┆ ---  │
│ i64  ┆ i64  │
╞══════╪══════╡
│ 1    ┆ 3    │
│ 2    ┆ 4    │
└──────┴──────┘
