In [1]:
import requests
import json
import time
import os

# 1 Atlas API helpers

In [4]:
baseUrl = "http://%s:21000/api/atlas" % os.environ['ATLAS_SERVER']
user = "admin"
password = "admin"
cluster = "Demo"

def getAtlas(path):
    r = requests.get("%s/%s" % (baseUrl, path), \
                     auth=(user, password))
    return r.json()

def postAtlas(path, content):
    r = requests.post("%s/%s" % (baseUrl, path), \
                      json=content, \
                      auth=(user, password), \
                      headers={"Content-Type": "application/json;charset=UTF-8"}
                     )
    return r.json()

def deleteAtlas(path):
    print "%s/%s" % (baseUrl, path)
    r = requests.delete("%s/%s" % (baseUrl, path), \
                        auth=(user, password))
    return r
    
def getHiveTable(qualifiedTable, cluster):
    path = "entities?type=hive_table&property=qualifiedName&value=%s@%s" % (qualifiedTable, cluster)
    return getAtlas(path)

def getHiveDB(dbName, cluster):
    path = "entities?type=hive_db&property=qualifiedName&value=%s@%s" % (dbName, cluster)
    return getAtlas(path)
    
def createType(content):
    return postAtlas("types", content)

def createEntity(content):
    return postAtlas("entities", content)

def updateEntity(guid, content):
    return postAtlas("entities/%s" % guid, content)

def deleteEntity(path):
    return deleteAtlas("entities?guid=%s" % path)


# 2 Create Spark Process Type in Atlas

In [3]:
spark_etl_process = {
    u'enumTypes': [],
    u'structTypes': [],
    u'traitTypes': [],
    u'classTypes': [
        {
            u'hierarchicalMetaTypeName': u'org.apache.atlas.typesystem.types.ClassType',
            u'typeName': u'spark_etl_process',
            u'typeDescription': None,
            u'superTypes': [u'Process'],
            u'attributeDefinitions': [
                {
                    u'isUnique': False,
                    u'name': u'etl_code',
                    u'reverseAttributeName': None,
                    u'multiplicity': u'required',
                    u'dataTypeName': u'string',
                    u'isIndexable': True,
                    u'isComposite': False
                },
                {
                    u'isUnique': False,
                    u'name': u'packages',
                    u'reverseAttributeName': None,
                    u'multiplicity': u'optional',
                    u'dataTypeName': u'array<string>',
                    u'isIndexable': False,
                    u'isComposite': False
                },
                {
                    u'isUnique': False,
                    u'name': u'startTime',
                    u'reverseAttributeName': None,
                    u'multiplicity': u'required',
                    u'dataTypeName': u'date',
                    u'isIndexable': False,
                    u'isComposite': False
                },
                {
                    u'isUnique': False,
                    u'name': u'endTime',
                    u'reverseAttributeName': None,
                    u'multiplicity': u'required',
                    u'dataTypeName': u'date',
                    u'isIndexable': False,
                    u'isComposite': False
                },
                {
                    u'isUnique': False,
                    u'name': u'userName',
                    u'reverseAttributeName': None,
                    u'multiplicity': u'optional',
                    u'dataTypeName': u'string',
                    u'isIndexable': True,
                    u'isComposite': False
                }
            ]
        }
    ]
}

In [None]:
createType(spark_etl_process)

# 3 Preparation

## 3.1 Define the ETL job configuration

In [4]:
clusterName = "Demo"

sourceDB = "employees"
sourceTables = ("employees.employees", "employees.departments", "employees.dept_emp")
targetTableName = "emp_dept_flat3"
targetDB = "default"
targetTable = "%s.%s" % (targetDB, targetTableName)
targetColumns = ["dept_no:string", "emp_no:int", "full_name:string", 
                "from_date:string", "to_date:string", "dept_name:string"]

transformation = """<pre>
val employees = sqlContext.sql("select * from employees.employees")
val departments = sqlContext.sql("select * from employees.departments")
val dept_emp = sqlContext.sql("select * from employees.dept_emp")

val flat = employees.withColumn("full_name", concat(employees("last_name"), lit(", "), employees("first_name")))
.select("full_name", "emp_no")
.join(dept_emp,"emp_no")
.join(departments, "dept_no")
</pre>"""

owner = "etl"

createTime = "2016-11-25T14:25:48.000Z"

## 3.2 Get GUIDS of involved databses and tables

In [5]:
guids = {"dbs": {}, "tables": {}}

for table in sourceTables:
    tableDef = getHiveTable("%s" % table, clusterName)
    guids["tables"][table] = tableDef["definition"]["id"]["id"]

for database in [sourceDB, targetDB]:
    guids["dbs"]["%s" % database] = getHiveDB(database, clusterName)["definition"]["id"]["id"]

guids

{'dbs': {'default': u'5c15c3f5-b465-458a-8b52-b188960853d7',
  'employees': u'a9f9d3b8-a42d-4638-a308-16e4fbeb9c29'},
 'tables': {'employees.departments': u'f08b9a13-10f5-494c-8173-87b928a9788b',
  'employees.dept_emp': u'cd0f7a13-13c6-4fc7-8fc6-e3e473ad61a7',
  'employees.employees': u'5110e28a-7954-414b-b375-2c877cf4be80'}}

# 4 Create Atlas Entity for new Hive table

## 4.1 Some Helpers for creating hive entities

In [6]:
def createHiveColumnJson(tableGUID, columnGuid, 
                         databaseName, tableName, clusterName, columnName, columnType, owner):
    
    hiveColumnDef = {
        "jsonClass": "org.apache.atlas.typesystem.json.InstanceSerialization$_Reference",
        "id": {
          "jsonClass": "org.apache.atlas.typesystem.json.InstanceSerialization$_Id",
          "id": ("%20d" % columnGuid).strip(),
          "version": 0,
          "typeName": "hive_column",
          "state": "ACTIVE"
        },
        "typeName": "hive_column",
        "values": {
          "name": "%s" % columnName,
          "description": None,
          "qualifiedName": "%s.%s.%s@%s" % (databaseName, tableName, columnName, clusterName),
          "comment": None,
          "owner": "%s" % owner,
          "type": "%s" % columnType,
          "table": {
            "jsonClass": "org.apache.atlas.typesystem.json.InstanceSerialization$_Id",
            "id": "%s" % tableGUID,
            "version": 0,
            "typeName": "hive_table",
            "state": "ACTIVE"
          }
        },
        "traitNames": [ ],
        "traits": {  }
    }
    
    return hiveColumnDef

    
def createHiveTableJson(databaseGuid, tableGuid,
                        databaseName, tableName, clusterName, createTime, owner):
    hiveTableDef = {
      "jsonClass": "org.apache.atlas.typesystem.json.InstanceSerialization$_Reference",
      "id": {
        "jsonClass": "org.apache.atlas.typesystem.json.InstanceSerialization$_Id",
        "id": ("%20d" % tableGuid).strip(),
        "version": 0,
        "typeName": "hive_table",
        "state": "ACTIVE"
      },
      "typeName": "hive_table",
      "values": {
        "aliases": None,
        "tableType": "MANAGED_TABLE",
        "name": "%s" % tableName,
        "viewExpandedText": None,
        "createTime": "%s" % createTime,
        "description": None,
        "temporary": False,
        "db": {
          "jsonClass": "org.apache.atlas.typesystem.json.InstanceSerialization$_Id",
          "id": "%s" % databaseGuid,
          "version": 0,
          "typeName": "hive_db",
          "state": "ACTIVE"
        },
        "viewOriginalText": None,
        "retention": 0,
        "qualifiedName": "%s.%s@%s" % (databaseName, tableName, clusterName),
        "columns": [ ],
        "comment": None,
        "lastAccessTime": "2016-11-02T14:25:48.000Z",
        "owner": "%s" % owner,
        "partitionKeys": None
      },
      "traitNames": [ ],
      "traits": { }
    }

    return hiveTableDef

def createUpdateHiveTableJson(tableGuid,
                        databaseName, tableName, clusterName, createTime, owner):
    hiveTableDef = {
      "jsonClass": "org.apache.atlas.typesystem.json.InstanceSerialization$_Reference",
      "id": {
        "jsonClass": "org.apache.atlas.typesystem.json.InstanceSerialization$_Id",
        "id": "%s" % tableGuid,
        "version": 0,
        "typeName": "hive_table",
        "state": "ACTIVE"
      },
      "typeName": "hive_table",
      "values": {
        "columns": [ ],
      },
      "traitNames": [ ],
      "traits": { }
    }

    return hiveTableDef

def createColumnIdJson(columnGuid):
    columnIdDef = { "id": {
      "jsonClass": "org.apache.atlas.typesystem.json.InstanceSerialization$_Id",
      "id": "%s" % columnGuid,
      "version": 0,
      "typeName": "hive_column",
      "state": "ACTIVE"
    }}
    return columnIdDef


## 4.2 Create Hive table without columns

In [7]:
newTableGuid = -(time.time()*1000)

hiveTable = createHiveTableJson(guids["dbs"][targetDB], newTableGuid,
                                targetDB, targetTableName, clusterName, createTime, owner)
newTable = createEntity(hiveTable)

newTableGuid = newTable["entities"]["created"][0]
newTableGuid

u'1e6e1a06-c6ec-40e0-821a-ea31177b9bf6'

## 4.3 Create Columns

In [8]:
newColumnGuid = -(time.time()*1000)
entities = []

for columns in targetColumns:
    newColumnGuid = newColumnGuid - 100
    columnName, columnType = columns.split(":")
    hiveColumn = createHiveColumnJson(newTableGuid, newColumnGuid,
                                      "default", targetTableName, clusterName, columnName, columnType, "etl")
    entities.append(hiveColumn)


newColumns = createEntity(entities)
newColumns["entities"]["created"]

[u'ef9a4538-342e-4a51-9453-7cd142a95a53',
 u'0257f483-3412-44c3-b72c-419956b059a3',
 u'6dd4ae48-5d36-47e0-a974-38de5247e3fd',
 u'e51ea9d7-514d-4f80-b5e7-6ecf544c9313',
 u'76a4a59d-db29-4cd3-8191-edeae449c34c',
 u'e3dc83e4-4742-413a-9bd0-ee4576dd1cf4']

## 4.4 Add Columns to table

Note: This can be executed multiple times (idempotent)

In [9]:
updateHiveTable = createUpdateHiveTableJson(newTableGuid,
                                           targetDB, targetTableName, clusterName, createTime, owner)

columnIds = [createColumnIdJson(guid)["id"] for guid in newColumns["entities"]["created"]]
updateHiveTable["values"]["columns"] = columnIds

updateEntity(newTableGuid, updateHiveTable)["entities"]["updated"]

[u'1e6e1a06-c6ec-40e0-821a-ea31177b9bf6']

# 5 Create Lineage

In [10]:
def createLineageJson(inputTableNames, inputTableGuids, 
                      outputTableName, outputTableGuid, 
                      clusterName, sparkCode, packages, startTime, endTime, owner):
    newLineageGuid = -(time.time()*1000)
    lineageDef = {
        "id": {
            "id": ("%20d" % newLineageGuid).strip(),
            "jsonClass": "org.apache.atlas.typesystem.json.InstanceSerialization$_Id",
            "state": "ACTIVE",
            "typeName": "spark_etl_process",
            "version": 0
        },
        "typeName": "spark_etl_process",
        "jsonClass": "org.apache.atlas.typesystem.json.InstanceSerialization$_Reference",
        "values": {
            "name": "Spark:%s->%s" %(",".join(inputTableNames), outputTableName),
            "qualifiedName": "Spark(%s):%s->%s@%s" %(startTime, ",".join(inputTableNames), outputTableName, clusterName),
            "inputs": [],
            "outputs": [
                {
                    "id": "%s" % outputTableGuid,
                    "jsonClass": "org.apache.atlas.typesystem.json.InstanceSerialization$_Id",
                    "state": "ACTIVE",
                    "typeName": "hive_table",
                    "version": 0
                }
            ],
            "etl_code": "%s"  % sparkCode,
            "packages": "%s"  % packages,
            "startTime": "%s" % startTime,
            "endTime": "%s"   % endTime,
            "userName": "%s"  % owner
        },
        "traitNames": [],
        "traits": {}
    }
    
    for guid in inputTableGuids:
        tableDef = {
            "id": "%s" % guid,
            "jsonClass": "org.apache.atlas.typesystem.json.InstanceSerialization$_Id",
            "state": "ACTIVE",
            "typeName": "hive_table",
            "version": 0
        }
        lineageDef["values"]["inputs"].append(tableDef)
    
    return lineageDef



In [11]:
lineageDef = createLineageJson(sourceTables, [guids["tables"][tableName] for tableName in sourceTables], \
                               targetTableName, newTableGuid, \
                               clusterName, transformation, [], 
                               "2016-11-25T14:25:41.000Z", "2016-11-25T14:25:48.000Z", owner)

newLIneage = createEntity(lineageDef)
newLIneage["entities"]["created"]

[u'6ec8e62e-a778-472c-aad6-33f3d254ce04']

# 6 Delete everything again

In [None]:
newGuids = newColumns["entities"]["created"] + [newTableGuid] + newLIneage["entities"]["created"]
for g in newGuids:
    print deleteEntity(g)
