Home
Ferris-cli is a python library that simplifies and standardises interactions with the Ferris.ai platform.
It's main goals are
- Simplified access to infrastructure supported capabilities such as logging and metrics capture
- Transparent set up of infrastructure components required by platform applications
- Mediation across different implementations of the platform cluster.
- Consul : For configuration Storage
- Kafka Cluster: For logging and event transport
- Elasticsearch: For metrics storage
Ferris-cli can be installed using pip
pip install ferris-cli
The Ferris/cli lib utilises Consul for configuration management. Prior to using the platform the following key is required to be set up in the Consul Host.
Please set up the following key and values in your consul host.
Key Path: ferris.env
{
"KAFKA_BOOTSTRAP_SERVER": "kafka-server.example.com",
"KAFKA_PORT": "9092",
"HIVE_SERVER": "hiveserver.example.com",
"HIVE_PORT": "9000",
"HIVE_USERNAME": "s_DPR-ferris",
"ELASTICSEARCH_SERVER": "http://elasticsearch:9200",
"MINIO_HOST": "minio-server.example.com",
"MINIO_PORT": "8230",
"MINIO_ACCESS_KEY": "minio"
}
Before starting the application set the following environment variables
- CONSUL_HOST
- CONSUL_PORT These can be set in the docker-compose template or the kubernetes deployment template.
The Ferris-Cli retrieves the configuration of the cluster environment from Consul. Therefore on launching the application the ferris-cli retrieves the Key in Consul 'ferris.env' to retrieve the cluster settings.
The following Kafka Topics are used by the ferris-cli.
- ferris.events
- ferris.logs
- ferris.metrics
When a configuration key is retrieved the Ferris-Cli picks up the configuration from Consul. The consul library is deployed by the Ferris-Cli when it is installed. Configuration key naming is not enforced. Normally applications maintain only the application specific settings within an application owned key. Environment settings referring platform infrastructure should be placed in the 'ferris.env' key.
Logging is by default sent to the 'ferris.events' Kafka topic. The platform uses the Logstash JSON logging format. The logs are separately (not by the CLI) also loaded into HDFS/Hive in most environments for long term storage and Elasticsearch for short term storage.
The metrics that are sent through the CLI are stored within the Ferris Platform ElasticsearchServer. The cli implements an annotation based approach to tagging functions whose performance need to be tracked.
The CLoudEvents API send CloudEvents to the 'ferris.events' topic of the platform's Kafka cluster.
The Notifications API sends JSON formatted notifications to the 'ferris.notifications' topic of the platform's Kafka cluster. From here the messages are forwarded by the Notifications adapter to the platform specific outbound channels based on the message subject and address configurations.
from ferris_cli.ferris_cli import MetricMessage
from ferris_cli.ferris_cli import MetricsAPI
from ferris_cli.ferris_cli import ApplicationConfigurator
from ferris_cli.ferris_cli import CloudEventsAPI
from ferris_cli.ferris_cli import Notification
from ferris_cli.ferris_cli import NotificationsAPI
from datetime import datetime
from cloudevents.sdk.event import v03
import json
import uuid
# Get a config from consul
configs = ApplicationConfigurator().get('ferris.apps.dataloader.minio-adapter')
print(configs)
# Set up logging
app.logger = logging.getLogger(os.environ['APP_NAME'])
kh = FerrisKafkaLoggingHandler()
kh.setLevel(logging.INFO)
formatter = LogstashFormatterV1()
kh.setFormatter(formatter)
app.logger.addHandler(kh)
logger.info('my log message')
# Create and send a MetricMessage
mm = MetricMessage('some_metric',28)
print(mm.toJSON())
mapi = MetricsAPI().send(mm)
# Create and send a NotificationMessage
notification = Notification('from@example.com', 'to@example.com','the subject', 'the message')
print(notification.toJSON())
napi = NotificationsAPI().send(notification)
# CloudEvents API
data = {"schema": schema_name }
event = (
v03.Event()
.SetContentType("application/json")
.SetData(json.dumps(data))
.SetEventID(uuid.uuid1().hex)
.SetSource(os.environ['APP_NAME'])
.SetEventTime("tomorrow")
.SetEventType("ferris.dataloader.data_loaded_to_raw_zone")
)
print(json.dumps(event.Properties()))
cca = CloudEventsAPI()
cca.send(event)
from ferris_cli.ferris_cli import ExecutionTime
from ferris_cli.ferris_cli import MetricMessage
from ferris_cli.ferris_cli import MetricsAPI
e = ExecutionTime()
@e.timeit
def foo(arg1):
#do_something(arg1)
return
@e.timeit
def bar():
#hello_world()
return
foo("dragons")
bar()
print(e.logtime_data)
for k1 in e.logtime_data.keys():
for k2 in e.logtime_data[k1].keys():
print(f"{k1}.{k2}={e.logtime_data[k1][k2]}")
mm = MetricMessage(f"{k1}.{k2}",f"{e.logtime_data[k1][k2]}")
MetricsAPI().send(mm)
## {'foo': {'times_called': 1, 'total_time': 0.0745, 'average_time': 0.0745}, 'bar': {'times_called': 3, 'total_time': 0.2054, 'average_time': 0.0685}}