# Creating and Starting Flueme Agent through Cloudera Manager API

Documentation:
* https://cloudera.github.io/cm_api/docs/python-client-swagger/
* https://archive.cloudera.com/cm7/7.0.3/generic/jar/cm_api/swagger-html-sdk-docs/python/README.html
* https://archive.cloudera.com/cm6/6.0.0/generic/jar/cm_api/apidocs/json_ApiServiceList.html


In [1]:
#import cm_client
#from cm_client.rest import ApiException
from pprint import pprint
from requests.auth import HTTPBasicAuth
from codecs import decode
import requests
import json
import time
import datetime

### Necessary Information to execute API
Finding out the CM API version and Cluster Name

In [2]:
cm_server = 'localhost'
cm_port = '7180'
api_uri = 'http://{0}:{1}'.format(cm_server,cm_port)
api_version = ''
cluster_name = ''
free_port = ''
cm_user= 'admin'
cm_pass= 'admin'

In [3]:
def api_cm_get(url):
    response = requests.get(url, auth=HTTPBasicAuth(cm_user, cm_pass))
    services = response.content.decode('UTF-8')
    try:
        return  json.loads(services)
    except ValueError:
        return services
    return result

def api_cm_post(url, body=None):
    response = requests.post(url,json=body, auth=HTTPBasicAuth(cm_user, cm_pass))
    services = response.content.decode('UTF-8')
    print(services)
    try:
        return  json.loads(services)
    except ValueError:
        return services
    return result

def api_cm_delete(url):
    response = requests.delete(url, auth=HTTPBasicAuth(cm_user, cm_pass))
    services = response.content.decode('UTF-8')
    try:
        return  json.loads(services)
    except ValueError:
        return services
    return result

def get_api_version():
    response = api_cm_get(api_uri + '/api/version')
    return response

def get_cluster_name():
    result =  api_cm_get('{uri}/api/{version}/clusters'.format(uri=api_uri, version=api_version))
    return result['items'][0]['name']

In [4]:
api_version = get_api_version()
cluster_name = get_cluster_name()

In [5]:
get_api_version()

'v18'

In [6]:
get_cluster_name()

'cluster'

### Available Port
Methods created to find out the next port available to use the new flume agent

In [7]:
def get_all_flumes():
    results = api_cm_get('{uri}/api/{version}/clusters/{cluster}/services?view=full'
               .format(uri=api_uri, version=api_version, cluster = cluster_name))
    flumes = list(filter(lambda d: d["type"] == 'FLUME', results["items"]))
    flumes_names = [x['name'] for x in flumes ]
    return flumes_names

def get_flume_agents_names(flume_name):
    result = api_cm_get('{uri}/api/{version}/clusters/{cluster}/services/{flume}/roles'
                        .format(uri=api_uri, version=api_version, cluster = cluster_name, flume=flume_name))
    agents = list(filter(lambda d: d["type"] == 'AGENT', result["items"]))
    agents_names = [a["name"] for a in agents]
    return agents_names   

def get_port_list():
    flumes = get_all_flumes()
    #agents = sum([get_flume_agents_names(f) for f in flumes], []) 
    #ports = [a["agent_http_port"] for a in agents["items"]]
    ports_configs = list()
    ports = list()
    for f in flumes:
        agents = get_flume_agents_names(f)
        for a in agents:
            configs_flume = api_cm_get('{uri}/api/{version}/clusters/{cluster}/services/{flume}/roles/{agent}/config?view=full'
                .format(uri=api_uri, version=api_version, cluster=cluster_name, flume=f, agent=a))
            ports_configs.append(list(filter(lambda d : d["name"] == 'agent_http_port', configs_flume["items"])))
    ports_configs = sum(ports_configs,[])
    for p in ports_configs:
        #quando o agente flume está usando a porta default ele nao cria a propriedade com o valor da porta ('value')
        if 'value' in p:
            ports.append(p['value'])
    return ports

'''
Esse método busca todas as portas que está sendo utilizada pelos agentes flumes existentes
e verifica o maior valor, após isso retorna o maior valor + 1.

'''
def get_new_port():
    ports = get_port_list()
    return max(map(int,ports)) + 1

In [8]:
get_all_flumes()

['flume',
 'flume2',
 'flume3',
 'flume_python',
 'flume_pythont2',
 'flume_python2',
 'flume_testcycleexport']

In [9]:
free_port = get_new_port()

### Criar nova instância do Flume e seu Agente

#### Agent Config

In [35]:
flume_agent_conf = '''

################## flume_test ##################

flume_test.sources = src_to_teste
flume_test.channels = channel_to_teste
flume_test.sinks = sink_to_teste

######### Spooldir Source #########
flume_test.sources.src_to_teste.type = spooldir
flume_test.sources.src_to_teste.channels = channel_to_teste
flume_test.sources.src_to_teste.spoolDir = /data/in
flume_test.sources.src_to_teste.fileHeader = true
flume_test.sources.src_to_teste.basenameHeader = true
flume_test.sources.src_to_teste.includePattern = ^.*.xml$
flume_test.sources.src_to_teste.trackerDir = .flume_test_xml
flume_test.sources.src_to_teste.deserializer = org.apache.flume.sink.solr.morphline.BlobDeserializer$Builder
flume_test.sources.src_to_teste.deserializer.maxBlobLength = 400000000

######### Memory Channel #########
flume_test.channels.channel_to_teste.type = memory
flume_test.channels.channel_to_teste.capacity = 1073741824

######### HDFS Sink #########
flume_test.sinks.sink_to_teste.type = hdfs
flume_test.sinks.sink_to_teste.hdfs.kerberosPrincipal = hdfs
flume_test.sinks.sink_to_teste.hdfs.kerberosKeytab = /home/user/hdfs.keytab
flume_test.sinks.sink_to_teste.hdfs.fileType = DataStream
flume_test.sinks.sink_to_teste.hdfs.path = /data/teste
flume_test.sinks.sink_to_teste.hdfs.filePrefix = testcycleexport_%Y%m%d
flume_test.sinks.sink_to_teste.hdfs.fileSuffix = .txt
flume_test.sinks.sink_to_teste.hdfs.rollInterval = 300
flume_test.sinks.sink_to_teste.hdfs.idleTimeout= 0
flume_test.sinks.sink_to_teste.hdfs.callTimeout= 120000 
flume_test.sinks.sink_to_teste.hdfs.useLocalTimeStamp = true
flume_test.sinks.sink_to_teste.hdfs.rollSize = 1073741824
flume_test.sinks.sink_to_teste.hdfs.rollCount = 0
flume_test.sinks.sink_to_teste.channel = channel_to_teste
flume_test.sinks.sink_to_teste.hdfs.closeTries = 0
'''

#### Create Flume Config  - API 
Flume API Properties: https://docs.cloudera.com/documentation/enterprise/5-5-x/topics/cm_props_cdh540_flume.html#concept_5.5.x_service_wide_props

In [36]:
flume_name = 'flume_python_new'
flume_host = 'lab-dev-master-00.eastus.cloudapp.azure.com'
agent_name = 'flume_agent_python'

create_flume_cm_config = {
  "items": [
    {
      "name": flume_name,
      "displayName": flume_name,
      "type": "FLUME",
      "config": {
        "items": [
          {
            "name": "hdfs_service",
            "value": "hdfs"
          },
          {
            "name": "flume_env_safety_valve",
            "value": "HADOOP_HOME=/usr/lib/hadoop"
          }
        ]
      },
      "roles": [
        {
          "name": "agent_" + flume_name,
          "type": "AGENT",
          "hostRef": {
            "hostId": flume_host
          },
          "config": {
            "items": [
              {
                "name": "agent_name",
                "value": agent_name
              },
              {
                "name": "agent_config_file",
                "value": flume_agent_conf
              },
              {
                "name": "flume_agent_java_opts",
                "value": "-Xmx6g -Dorg.apache.flume.log.printconfig=true"
              },
              {
                "name": "agent_http_port",
                "value": free_port
              }
            ]
          }
        }
      ]
    }
  ]
}

In [37]:
create_flume_cm_config

{'items': [{'name': 'flume_python_new',
   'displayName': 'flume_python_new',
   'type': 'FLUME',
   'config': {'items': [{'name': 'hdfs_service', 'value': 'hdfs'},
     {'name': 'flume_env_safety_valve',
      'value': 'HADOOP_HOME=/usr/lib/hadoop'}]},
   'roles': [{'name': 'agent_flume_python_new',
     'type': 'AGENT',
     'hostRef': {'hostId': 'lab-dev-master-00.eastus.cloudapp.azure.com'},
     'config': {'items': [{'name': 'agent_name',
        'value': 'flume_agent_python'},
       {'name': 'agent_config_file',
        'value': '\n\n################## flume_test ##################\n\nflume_test.sources = src_to_teste\nflume_test.channels = channel_to_teste\nflume_test.sinks = sink_to_teste\n\n######### Spooldir Source #########\nflume_test.sources.src_to_teste.type = spooldir\nflume_test.sources.src_to_teste.channels = channel_to_teste\nflume_test.sources.src_to_teste.spoolDir = /data/in\nflume_test.sources.src_to_teste.fileHeader = true\nflume_test.sources.src_to_teste.basenam

#### Calling the API to create

In [13]:
def create_new_agent_flume(flume_cm_config):
    response = api_cm_post('{uri}/api/{version}/clusters/{cluster}/services'
                            .format(uri=api_uri,version=api_version, cluster=cluster_name),
                            body = flume_cm_config)    
    if "items" in response:
        print("Agente Flume {0} criado com sucesso!".format(flume_name))
    else:
        print("Falha na criação do agente flume {0}:\n{1}"
              .format(flume_name, json.dumps(response, indent=4)))
           

In [14]:
create_new_agent_flume(create_flume_cm_config)

{
  "items" : [ {
    "name" : "flume_python_new",
    "type" : "FLUME",
    "clusterRef" : {
      "clusterName" : "cluster"
    },
    "serviceUrl" : "http://lab-dev-master-00.eastus.cloudapp.azure.com:7180/cmf/serviceRedirect/flume_python_new",
    "roleInstancesUrl" : "http://lab-dev-master-00.eastus.cloudapp.azure.com:7180/cmf/serviceRedirect/flume_python_new/instances",
    "serviceState" : "STOPPED",
    "configStalenessStatus" : "FRESH",
    "clientConfigStalenessStatus" : "FRESH",
    "maintenanceMode" : false,
    "maintenanceOwners" : [ ],
    "displayName" : "flume_python_new",
    "entityStatus" : "UNKNOWN"
  } ]
}
Agente Flume flume_python_new criado com sucesso!


#### Mensagem possíves ao criar o Flume

* Quando não a erro:

```json
{
      "items" : [ {
            "name" : "flume_python",
            "type" : "FLUME",
            "clusterRef" : {
              "clusterName" : "cluster"
            },
            "serviceUrl" : "http://lab-dev-master-00.eastus.cloudapp.azure.com:7180/cmf/serviceRedirect/flume_pythont",
            "roleInstancesUrl" : "http://lab-dev-master-00.eastus.cloudapp.azure.com:7180/cmf/serviceRedirect/flume_pythont/instances",
            "serviceState" : "STOPPED",
            "configStalenessStatus" : "FRESH",
            "clientConfigStalenessStatus" : "FRESH",
            "maintenanceMode" : false,
            "maintenanceOwners" : [ ],
            "displayName" : "flume_python",
            "entityStatus" : "UNKNOWN"
      } ]
}
```

* Quando da erro:
    * Ja existe um **Flume** com esse nome
    ```json
    {
          "message" : "Duplicate entry 'flume_pythont' for key 'NAME'",
          "causes" : [ "Duplicate entry 'flume_pythont' for key 'NAME'" ]
    }
    ```
    * Ja existe um **Agente Flume** com esse nome
    ```json
    {
          "message" : "Duplicate role name 'agent-test'."
    }
    ```
    * Sintaxe errada no body
    ```json
    {
          "message" : "Unexpected character ('\"' (code 34)): was expecting comma to separate OBJECT entries\n at [Source: org.apache.cxf.transport.http.AbstractHTTPDestination$1@78e45b69; line: 1, column: 290]"
    }
    ```
    * Atributo dentro do dicionario **"config": { "items"** nao existente
    ```json
    {
          "message" : "Unknown configuration attribute 'agent_namex'."
    }
    ```
    

### Check Flume Status

In [15]:
flume_name

'flume_python_new'

In [16]:
def get_flume_status(flume_name):
    try:
        result = api_cm_get('{uri}/api/{version}/clusters/{cluster}/services/{flume}'
                            .format(uri=api_uri,version=api_version, cluster=cluster_name, flume=flume_name))
        return result['serviceState']
    except KeyError:
        print("Agente Flume {0} não existe!".format(flume_name))

In [17]:
get_flume_status('flume_python2')

'STARTED'

### Flume Starting

In [18]:
def start_flume(flume_name):
    #print(datetime.datetime.now())
    response = api_cm_post('{uri}/api/{version}/clusters/{cluster}/services/{flume}/commands/start'
                            .format(uri=api_uri,version=api_version, cluster=cluster_name, flume=flume_name))

    if "active" in response:
        if response["active"] == True:
            while (get_flume_status(flume_name) == 'STARTING'):
                time.sleep(3)
            print("Agente Flume {0} iniciado com sucesso!".format(flume_name))
        else:
            print("Agente Flume {0} já estava iniciado.".format(flume_name))
    else:
        print("Falha na inicialização do agente flume {0}:\n{1}"
              .format(flume_name, json.dumps(response, indent=4)))
    #print(datetime.datetime.now())

In [19]:
start_flume(flume_name)

{
  "id" : 6916,
  "name" : "Start",
  "startTime" : "2020-01-23T14:03:38.785Z",
  "active" : true,
  "serviceRef" : {
    "clusterName" : "cluster",
    "serviceName" : "flume_python_new"
  }
}
Agente Flume flume_python_new iniciado com sucesso!


In [20]:
get_flume_status(flume_name)

'STARTED'

#### Mensagem possíves ao iniciar o Flume

* Quando não a erro:

```json
{
      "id" : 6236,
      "name" : "Start",
      "startTime" : "2020-01-15T01:12:16.854Z",
      "active" : true,
      "serviceRef" : {
            "clusterName" : "cluster",
            "serviceName" : "flume_python"
      }
}
```

* Quando da erro:
    * O Flume ja foi iniciado
    ```json
    {
          "id" : 6239,
          "name" : "Start",
          "startTime" : "2020-01-15T01:12:49.595Z",
          "endTime" : "2020-01-15T01:12:49.595Z",
          "active" : false,
          "success" : false,
          "resultMessage" : "Command Start is not currently available for execution.",
          "serviceRef" : {
                "clusterName" : "cluster",
                "serviceName" : "flume_python"
          }
    }
    ```
    * Flume nao existente
    ```json
    {
          "message" : "Service 'flume_pythontf2' not found in cluster 'cluster'."
    }
    ```
    

In [21]:
def stop_flume(flume_name):
    response = api_cm_post('{uri}/api/{version}/clusters/{cluster}/services/{flume}/commands/stop'
                            .format(uri=api_uri,version=api_version, cluster=cluster_name, flume=flume_name))
    
    if "active" in response:
        if response["active"] == True:
            print("Agente Flume {0} parado com sucesso!".format(flume_name))
        else:
            print("Agente Flume {0} já estava parado.".format(flume_name))
    else:
        print("Falha na execução de parada do agente flume {0}:\n{1}"
              .format(flume_name, json.dumps(response, indent=4)))
    

In [22]:
stop_flume(flume_name)

{
  "id" : 6919,
  "name" : "Stop",
  "startTime" : "2020-01-23T14:04:03.024Z",
  "active" : true,
  "serviceRef" : {
    "clusterName" : "cluster",
    "serviceName" : "flume_python_new"
  }
}
Agente Flume flume_python_new parado com sucesso!


In [23]:
get_flume_status('flume_python')

'STOPPED'

In [24]:
def restart_flume(flume_name):
    stop_flume(flume_name)
    time.sleep(5)
    start_flume(flume_name)

In [25]:
restart_flume(flume_name)

{
  "id" : 6921,
  "name" : "Stop",
  "startTime" : "2020-01-23T14:04:03.647Z",
  "endTime" : "2020-01-23T14:04:03.647Z",
  "active" : false,
  "success" : false,
  "resultMessage" : "At least one role must be started.",
  "serviceRef" : {
    "clusterName" : "cluster",
    "serviceName" : "flume_python_new"
  }
}
Agente Flume flume_python_new já estava parado.
{
  "id" : 6922,
  "name" : "Start",
  "startTime" : "2020-01-23T14:04:08.963Z",
  "active" : true,
  "serviceRef" : {
    "clusterName" : "cluster",
    "serviceName" : "flume_python_new"
  }
}
Agente Flume flume_python_new iniciado com sucesso!


---------------

### Deleting Flume Agent

In [26]:
get_flume_status(flume_name)

'STARTED'

In [27]:
def delete_agent_flume(flume_name):
    if get_flume_status(flume_name) is not 'STOPPED':
        stop_flume(flume_name)
    while (get_flume_status(flume_name) == 'STOPPING'):
        time.sleep(3)
    response = api_cm_delete('{uri}/api/{version}/clusters/{cluster}/services/{flume}'
                            .format(uri=api_uri,version=api_version, cluster=cluster_name, flume=flume_name))
    if "name" in response:
        print("Agente Flume {0} deletado com sucesso!".format(flume_name))
    else:
        print("Falha na remoção do agente flume {0}:\n{1}"
              .format(flume_name, json.dumps(response, indent=4)))

In [28]:
delete_agent_flume(flume_name)

{
  "id" : 6926,
  "name" : "Stop",
  "startTime" : "2020-01-23T14:04:34.261Z",
  "active" : true,
  "serviceRef" : {
    "clusterName" : "cluster",
    "serviceName" : "flume_python_new"
  }
}
Agente Flume flume_python_new parado com sucesso!
Agente Flume flume_python_new deletado com sucesso!
