In [None]:
import luigi
import time
import datetime as dt
import yaml
import pandas as pd
import os

In [None]:
def __find_config_docs__(path):
    for root, dirs, files in os.walk(path):
        for file in files:

            if file.endswith(".yaml"):
                data = root + "/" + file

    return {
        "data": data,
        "result": {
            "status_message": "Success",
            "status_code": 200
        }
    }

In [None]:
def __task_configs__(config_doc):
    yaml_jobs = []
    try:
        with open(config_doc["data"], "r") as stream:
            yaml_config = yaml.safe_load(stream)

        return {
            "data": yaml_config,
            "result": {
                "status_message": "Success",
                "status_code": 200,

            }
        }

    except Exception as e:
        return {
            "result": {
                "status_code": 400,
                "error": str(e)
            }
        }

In [None]:
config_file = __yaml_jobs__((__find_config_docs__("./")))

print(config_file)


In [None]:

def __add_data_timestamp__(tasks):
    try:
        stamped_tasks = []

        utc_now = dt.datetime.utcnow()  # datetime for cataloging

        data_timestamps = {

            "_IN_DATA_TIMESTAMP": utc_now.strftime('%Y-%m-%d %H:%M:%S'),  # timestamp found inside data
            "_OUT_DATA_TIMESTAMP": utc_now.strftime('%Y_%m_%d_%H%M%S'),  # timestamp found in file
            "_YEAR_DATA_TIMESTAMP": f'{utc_now.year}',  # folder and path dates
            "_MONTH_DATA_TIMESTAMP": f'{utc_now.month:02d}',  # folder and path dates
            "_DAY_DATA_TIMESTAMP": f'{utc_now.day:02d}',  # folder and path dates

        }

        for task in tasks:
            updated_task = task.copy()
            updated_task["TIMESTAMPS"] = data_timestamps
            stamped_tasks.append(updated_task)

        return {
            "data": stamped_tasks,
            "result": {
                "status_message": "Success",
                "status_code": 200
            }
        }

    except Exception as e:
        return {
            "result": {
                "status_message": str(e),
                "status_code": 500
            }
        }


In [None]:

def lambda_handler(event, context):
    results_list = []
    task_config = {}

In [None]:
class ExtractDattoAPI(luigi.Task):


def output(self):
    return luigi.LocalTarget("raw_data.")


def run(self):
    pass


In [None]:

#####################  API [EXTRACT] - datto_rmm - devices #########################################################

try:
    task_config = context[0]
    start = time.perf_counter()
    print(announce_start(task_config))

    # create DataFrame from API
    datto = DattoRMM(task_config)

    data = datto.create_devices_dataframe()
    result = data["result"]
    results_list.append(result)
    df = data["data"]

    # # add marker columns
    df['_SOURCE_PRODUCT'] = task_config["DETAILS"]["PRODUCT"]
    df['_SOURCE_SUBJECT'] = task_config["DETAILS"]["SUBJECT"]
    df['_SOURCE_ORIGIN'] = task_config["ORIGIN"]["TYPE"]
    df['_UTC_EXTRACTION_DATETIME'] = task_config["TIMESTAMPS"]["_IN_DATA_TIMESTAMP"]

except Exception as e:
    result = {
        "task_name": task_config["DETAILS"]['TITLE'],
        "status_code": 500,
        "message": f"Error: {e}",
    }
    print(result)
    exit(1)

end = time.perf_counter()
print(announce_end(task_config, start, end, result))
print("*" * 150)

In [None]:


#####################  DataFrame [TRANSFORM] - datto_rmm - devices  ################################################

try:

    task_config = context[1]
    start = time.perf_counter()
    print(announce_start(task_config))

    data = transform_devices_dataframe(df)
    result = data["result"]
    df = data["data"]

except Exception as e:
    result = {
        "task_name": task_config["DETAILS"]['TITLE'],
        "status_code": 500,
        "message": f"Error: {e}",
    }

    print(result)
    exit(1)
end = time.perf_counter()
print(announce_end(task_config, start, end, result))
print("*" * 150)

In [None]:


####################### Minio [LOAD] - datto_rmm - devices  ########################################################

try:
    task_config = context[2]
    start = time.perf_counter()
    print(announce_start(task_config))

    minio = MinioLoad(df, task_config)
    data = minio.upload_to_minio()
    result = data["result"]
    results_list.append(result)

except Exception as e:
    result = {
        "task_name": task_config["DETAILS"]['TITLE'],
        "status_code": 500,
        "message": f"Error: {e}",
    }
    print(result)
    exit(1)
print(announce_end(task_config, start, end, result))
end = time.perf_counter()
print("*" * 150)


In [None]:


####################### Postgres [LOAD] - datto_rmm - devices  #####################################################

try:

    task_config = context[3]
    start = time.perf_counter()
    print(announce_start(task_config))
    postgres = PostgresLoad(df, task_config)
    data = postgres.load_to_postgres()

    result = data["result"]
    results_list.append(result)

except Exception as e:
    result = {
        "task_name": task_config["DETAILS"]['TITLE'],
        "status_code": 500,
        "message": f"Error: {e}",
    }
    print(result)
    exit(1)
end = time.perf_counter()
print(announce_end(task_config, start, end, result))
print("*" * 150)

In [None]:

#####################  ********** FINAL RESULTS ************   #####################################################

print("#" * 75)
print("\nFINAL RESULTS\n")
print("--------------------------------")

for result in results_list:
    print("\n")
    print(json.dumps(result, indent=4))
    print("---------")

print("\n")
print("#" * 75)

exit(0)
