In [None]:
import os
import pandas as pd
import datetime
import lakefs_sdk
from lakefs_sdk.api import objects_api
from dotenv import load_dotenv
import pyarrow as pa
import pyarrow.parquet as pq

In [None]:
load_dotenv()

LAKEFS_ACCESS_KEY_ID = os.getenv("ACCESS_KEY", "access_key")
LAKEFS_SECRET_ACCESS_KEY = os.getenv("SECRET_KEY", "secret_key")
LAKEFS_ENDPOINT_URL = os.getenv("LAKEFS_ENDPOINT", "http://lakefsdb:8000")
LAKEFS_REPOSITORY = os.getenv("LAKEFS_REPOSITORY", "egatdata")
LAKEFS_BRANCH = os.getenv("LAKEFS_BRANCH", "main")
LAKEFS_FILE_PATH = os.getenv("LAKEFS_PATH_PARQUET", "egat_realtime_power.parquet")

lakefs_storage_options = {
    "key": LAKEFS_ACCESS_KEY_ID,
    "secret": LAKEFS_SECRET_ACCESS_KEY,
    "client_kwargs": {"endpoint_url": LAKEFS_ENDPOINT_URL},
    "config_kwargs": {"s3": {"addressing_style": "path"}}
}

lakefs_s3_path = f"s3a://{LAKEFS_REPOSITORY}/{LAKEFS_BRANCH}/{LAKEFS_FILE_PATH}"

In [None]:
def read_from_lakefs_using_s3fs(lakefs_path=lakefs_s3_path, storage_options=lakefs_storage_options):
    return pd.read_parquet(lakefs_path, storage_options=storage_options)

def read_from_lakefs_using_sdk(endpoint_url=LAKEFS_ENDPOINT_URL, access_key=LAKEFS_ACCESS_KEY_ID, 
                              secret_key=LAKEFS_SECRET_ACCESS_KEY, repo=LAKEFS_REPOSITORY, 
                              branch=LAKEFS_BRANCH, file_path=LAKEFS_FILE_PATH):
    configuration = lakefs_sdk.Configuration()
    configuration.host = endpoint_url
    configuration.username = access_key
    configuration.password = secret_key
    client = lakefs_sdk.ApiClient(configuration)
    objects = objects_api.ObjectsApi(client)
    obj = objects.get_object(repository=repo, ref=branch, path=file_path)
    temp_file = 'temp_data.parquet'
    with open(temp_file, 'wb') as f:
        f.write(obj.read())
    df = pd.read_parquet(temp_file)
    os.remove(temp_file)
    return df

In [None]:
def save_to_local_parquet(df, file_path=None):
    if file_path is None:
        timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
        file_path = f"local_egat_data_{timestamp}.parquet"
    df.to_parquet(file_path, index=False)
    return os.path.getsize(file_path)

def filter_and_save_data(df, filter_date=None):
    if 'scrape_time' in df.columns:
        df['scrape_time'] = pd.to_datetime(df['scrape_time'])
        if filter_date:
            filter_date = pd.to_datetime(filter_date).date()
            df = df[df['scrape_time'].dt.date == filter_date]
    filter_text = f"_{filter_date}" if filter_date else ""
    file_path = f"filtered_data{filter_text}.parquet"
    save_to_local_parquet(df, file_path)

def save_with_pyarrow(df, file_path=None):
    if file_path is None:
        timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
        file_path = f"pyarrow_egat_data_{timestamp}.parquet"
    table = pa.Table.from_pandas(df)
    pq.write_table(table, file_path, compression='snappy', use_dictionary=True,
                   version='2.6', data_page_size=1024*1024)
    return os.path.getsize(file_path)