In [10]:
import pandas as pd
from minio import Minio
import re
import requests
import traceback
import inspect
from pprint import pprint
import os
import datetime as dt
import hvac
import yaml


In [11]:
class bcolors:
    HEADER = '\033[95m'
    OKBLUE = '\033[94m'
    OKCYAN = '\033[96m'
    OKGREEN = '\033[92m'
    WARNING = '\033[93m'
    FAIL = '\033[91m'
    ENDC = '\033[0m'
    BOLD = '\033[1m'
    UNDERLINE = '\033[4m'

In [18]:
class ParquetFileObjectsDownload:

    def __init__(self, cache_dir:str="d:/cache/", clear_cache:bool=False) -> None:
        if clear_cache:
            print("-" * 50)
            print(f"Clearing cache: {cache_dir}")

            for root, dirs, files in os.walk(cache_dir):
                for file in files:
                    if file.endswith(".parquet"):
                        os.remove(f"{root}/{file}")
                        print(f"Removed {root}/{file}")

            print("-" * 50 + "\n")

        self.__cache_dir = cache_dir
        # Get secret information for Minio Client
        self.__secrets = self.read_secret(mount_point="localhost",
                                          path="minio/jetbrains")
        self.__minio_client = self.__create_minio_client__()["data"]
        self.__objects_list = self.create_object_list()
        self.download_objects(self.__objects_list, self.__cache_dir)

    @staticmethod
    def read_secret(mount_point: str, path: str) -> dict:
        ca_path = os.environ.get("SSL_CERT_FILE")
        client = hvac.Client(verify=ca_path)
        print(f"client is authenticated: {client.is_authenticated}")
        resp = client.secrets.kv.read_secret(mount_point=mount_point, path=f'/{path}')
        secret = resp['data']['data']

        return secret

    @staticmethod
    def __read_yaml_query__(yaml_dir:str=(os.getcwd())) -> dict|None:
        try:
            with open(os.path.join(yaml_dir, "query.yaml"), "r") as f:
                data = (yaml.safe_load(f))["QUERY"]

                return data

        except Exception as e:
            print(e)
            return None

    def __create_minio_client__(self):
        try:
            minio_client = Minio(
                endpoint=re.sub("https?://", "", self.__secrets["url"]),
                secure=True,
                access_key=self.__secrets["accessKey"],
                secret_key=self.__secrets["secretKey"]
            )

            return {
                "data": minio_client,
                "result": {
                    "status_code": 200,
                    "message": "minio client created"
                }
            }
        except Exception as e:
            return {
                "result": {
                    "status_code": 500,
                    "message": traceback.format_exc()
                }
            }

    def create_object_list(self, days: int = 7) -> list:

        print("\n" + "-" * 50)
        query= self.__read_yaml_query__()
        pprint(f"QUERY: {query}", depth=1, compact=True, width=100)

        print("-" * 50 + "\n")

        bucket_name = query["bucket_name"].lower()
        product = query["product"].lower()
        subject = query["subject"].lower()
        source = query["source"].lower()
        start_date = pd.to_datetime(query["start_date"])
        end_date = pd.to_datetime(query["end_date"])

        if (start_date is None) | (end_date is None):
            start_date = dt.datetime.now() - dt.timedelta(days=days)
            end_date = dt.datetime.now()

        if self.__minio_client.bucket_exists(bucket_name):
            print(f'{bcolors.OKGREEN}Bucket [ {bucket_name.upper()} ] Exists!{bcolors.ENDC}\n')
        else:
            print(f'{bcolors.WARNING}Bucket [ {bucket_name.upper()} ] Does Not Exist!{bcolors.ENDC}\n')

        # Initialize an empty list
        date_list = []

        if end_date < start_date:
            print(f'{bcolors.FAIL}[start_date] date must be newer than the [end_date]!{bcolors.ENDC}')
            print(f'{bcolors.WARNING}[SWAPPING DATES]{bcolors.ENDC}')
            start_date, end_date = end_date, start_date

        # Loop through the range of dates and append to the list
        while start_date <= end_date:
            date_list.append({
                "year": start_date.strftime('%Y'),
                "month": start_date.strftime('%m'),
                "day": start_date.strftime('%d')
            })
            start_date += dt.timedelta(days=1)

        # List objects information recursively whose names starts with
        # "my/prefix/".

        objects_list = []

        for date in date_list:

            prefix = f'{product}/{subject}/{source}/{date["year"]}/{date["month"]}/{date["day"]}/'

            objects = self.__minio_client.list_objects(
                bucket_name=bucket_name,
                prefix=prefix,
                recursive=True
            )
            for obj in objects:
                objects_list.append({
                    "bucket_name": bucket_name,
                    "year": date["year"],
                    "month": date["month"],
                    "day": date["day"],
                    "prefix": prefix,
                    "filename": re.sub(prefix, "", obj.object_name),
                    "object_name": obj.object_name
                })

        return objects_list

    def download_objects(self, objects_list: list, export_path: str, max_results: int = 1) -> None:
        # print(objects_list)
        i = 1
        print("-" * 50)
        print(f'Attempting File DOWNLOAD: \n')
        for obj in objects_list:
            print(f'{i:03d}: {obj["object_name"]}')
            response = None
            try:
                response = self.__minio_client.get_object(
                    bucket_name=obj["bucket_name"],
                    object_name=obj["object_name"]
                )
                # Read data from response.
                with open(f"{export_path}/{obj['filename']}", "wb") as f:
                    f.write(response.data)
                
                if i == max_results:
                    break
            finally:
                if response:
                    response.close()
                    response.release_conn()
                i = i + 1
        print("-" * 50 + "\n")

In [19]:
object_download = ParquetFileObjectsDownload(clear_cache=True)

--------------------------------------------------
Clearing cache: d:/cache/
Removed d:/cache//api_2024_01_05_054913_datto_rmm_devices.parquet
Removed d:/cache//api_2024_01_05_055125_datto_rmm_devices.parquet
Removed d:/cache//api_2024_01_10_010520_datto_rmm_devices.parquet
Removed d:/cache//api_2024_01_10_011829_datto_rmm_devices.parquet
Removed d:/cache//api_2024_01_10_013327_datto_rmm_devices.parquet
Removed d:/cache//api_2024_01_10_013426_datto_rmm_devices.parquet
Removed d:/cache//api_2024_01_10_031826_datto_rmm_devices.parquet
Removed d:/cache//api_2024_01_10_204440_datto_rmm_devices.parquet
Removed d:/cache//api_2024_01_10_205052_datto_rmm_devices.parquet
Removed d:/cache//api_2024_01_10_212206_datto_rmm_devices.parquet
Removed d:/cache//api_2024_01_10_213852_datto_rmm_devices.parquet
Removed d:/cache//api_2024_01_10_215158_datto_rmm_devices.parquet
Removed d:/cache//api_2024_01_10_222200_datto_rmm_devices.parquet
Removed d:/cache//api_2024_01_10_232314_datto_rmm_devices.parquet

In [20]:
object_download

<__main__.ParquetFileObjectsDownload at 0x18c77779e50>

In [None]:
parquet_files = []

for root, dirs, files in os.walk("d:/cache"):
    for file in files:
        parquet_files.append(f'{root}/{file}')


In [None]:
# TODO: Find best way to create a large dictionary array that can be updated.  Is this using sqlite3 or mongo?  I want something disposable per query.  Is this similar to Athena flat database for query results?

In [None]:
from cachetools import cached, TTLCache

cache = TTLCache(maxsize=100, ttl=86400)


@cached(cache)
def extract_article_content(url):
    response = requests.get(url)
    content = response.content
    return content

In [None]:
dataframes = []

for file in parquet_files:
    df = pd.read_parquet(file, dtype_backend="pyarrow")
    dataframes.append(df)

In [None]:
df_concat = pd.concat(dataframes)

In [None]:
query = object_download.__read_yaml_query__()

In [None]:
query

In [None]:
df_concat.to_csv(
    f'd:/exports/{dt.datetime.utcnow().strftime("%Y_%m_%d_%H%M%S")}_{query["product"]}_{query["subject"]}_{query["source"]}.csv',
    index=False)

In [None]:
df_tnconnect = df_concat[df_concat["siteName"].str.contains("tnConnect")]

In [None]:
df_tnconnect[df_tnconnect["lastSeen"] < df_tnconnect["lastAuditDate"]]