In [1]:
import requests
import time
import sys
import os
import json
import uuid
import psycopg2
from psycopg2 import OperationalError, DatabaseError
from psycopg2.extras import DictCursor
from datetime import datetime, timezone, timedelta
from zoneinfo import ZoneInfo
import random
from dotenv import load_dotenv

In [14]:
########################################################
####### Functions for checking services
########################################################

def is_elasticsearch_ready(url="http://localhost:9200"):
    try:
        response = requests.get(url, timeout=5)
        return response.status_code == 200
    except requests.exceptions.RequestException as e:
        print(f"Error connecting to Elasticsearch service: {e}")
        return False

def is_mage_ready(url="http://localhost:6789"):
    try:
        response = requests.get(url, timeout=5)
        return response.status_code == 200
    except requests.exceptions.RequestException as e:
        print(f"Error connecting to Mage service: {e}")
        return False
    
def is_grafana_ready(url="http://localhost:3000"):
    try:
        response = requests.get(url, timeout=5)
        return response.status_code == 200
    except requests.exceptions.RequestException as e:
        print(f"Error connecting to Grafana service: {e}")
        return False


def wait_for_services(max_retries=12):  # 12 * 5 seconds = 1 minute total wait time
    retries = 0
    while retries < max_retries:
        if is_elasticsearch_ready() and is_mage_ready():
            print("All  Elasticsearch and Mage are ready!")
            return True
        else:
            print(f"Attempt {retries + 1}/{max_retries}: Services not ready. Waiting 5 seconds...")
            time.sleep(5)
            retries += 1
    print("Max retries reached. Services are not ready.")
    return False

In [16]:
def run_pipeline_populate_elasticsearch():
    """
    populating by our KNWLG.base
    """
    url = "http://localhost:6789/api/pipeline_schedules/5/pipeline_runs/e148d1b8a12c4c65b6fe0b4b703e196f"
    headers = {"Content-Type": "application/json"}

    print('!----> populate_elasticsearch for magic started', flush=True)
    
    try:
        response = requests.post(url, headers=headers)
        response.raise_for_status()
        print(f'!----> populate_elasticsearch magic finished with code: {response.status_code}', flush=True)
    except Exception as err:
        print(f"An unexpected error occurred magic: {err}", flush=True)
        print("Error details magic:", sys.exc_info(), flush=True)
    finally:
        print("!----> Script execution completed magic.", flush=True)

In [10]:
load_dotenv('../.env')

True

In [15]:
if not wait_for_services():
    print("Exiting script as services are not ready.")
    sys.exit(1)

All  Elasticsearch and Mage are ready!


In [17]:
# Trigger pipeline in mage.ai

run_pipeline_populate_elasticsearch()

!----> populate_elasticsearch for magic started
!----> populate_elasticsearch magic finished with code: 200
!----> Script execution completed magic.
