# Set-up

## Import Datalogue libraries

Note, you'll need to have downloaded and installed the Datalogue SDK before this step will work.

Right now, to do so you will need to get access through Artifactory.

In [131]:
# Import Datalogue libraries 
from datalogue import *
from datalogue.version import __version__
from datalogue.models.ontology import *
from datalogue.models.datastore_collection import *
from datalogue.models.datastore import *
from datalogue.models.datastore import GCSDatastoreDef 
from datalogue.models.credentials import *
from datalogue.models.stream import *
from datalogue.models.transformations import *
from datalogue.models.transformations.structure import *
from datalogue.dtl import Dtl, DtlCredentials
from datalogue.models.training import DataRef

# Import Datalogue Bag of Tricks
from DTLBagOTricks import DTL as DTLHelper


# Import other useful libraries
from datetime import datetime, timedelta
from os import environ
import pandas
from IPython.display import Image

# Checks the version of the SDK is correct
# The expected version is 0.28.3
# If the SDK is not installed, run `! pip install datalogue` and restart the Jupyter Notebook kernel
# If the wrong versions is installed, run `! pip install datalogue --upgrade` and restart the Jupyter Notebook kernel
__version__

'0.31.1'

In [132]:
# Set host, username and password variables

datalogue_host = "https://internal.dtl.systems"  # for connecting to internal (note)

# datalogue_host = "https://internal.dtl.systems"  # for connecting to internal (note)
# datalogue_host = "http://10.2.161.119:3000"  # for connecting to Eric's DGX
#email = environ.get("DTL_EMAIL")
email = "chrisr@datalogue.io"
#password = environ.get("DTL_PASSWORD")
password = "StreudelSauce1!"

# Log in to Datalogue
BOT = DTLHelper(datalogue_host, email, password)
dtl = BOT.dtl

# Expected output Datalogue v0.28.3
# "Logged in '[host location]' with '[username]' account)"

Datalogue v0.31.1
Logged in 'https://internal.dtl.systems/api' with 'chrisr@datalogue.io' account.


In [133]:
# Deploy the model before the tidy up to give it some time to be ready:

from datalogue.models.training import *
import uuid

OntologyId = 'b00f972c-8c68-49c1-8de5-babde571ae92'
trainingId = dtl.training.get_trainings(uuid.UUID(str(OntologyId)))[0].id

print(OntologyId)
print(trainingId)

dtl.training.deploy(trainingId, OntologyId)

b00f972c-8c68-49c1-8de5-babde571ae92
b05710cc-62cb-4e59-b337-6fc49fd88433


True

In [134]:
# First, let's clean up the assets this workbook creates from previous runs

# Warning! this will clean all your datastores and data collections and credentials

#BOT.server_summary()

# Clear Datastores and Datastore Collections
for store in dtl.datastore.list():
#    print(store.name, ',', store.name[:5])
    if (store.name == 'dtl-demo mfg'):
        targetDS = store.id
        
    if (store.name[:5] == 'demo-'):
        dtl.datastore.delete(store.id)

for store in dtl.datastore_collection.list():
#    print(store.name)
    if (store.name[:5] == 'demo-'):    
        dtl.datastore_collection.delete(store.id)

# Clear data pipelines
for StreamCollection in dtl.stream_collection.list():
#    print(StreamCollection, '\n')
    if (StreamCollection.name[:5] == 'demo-'):
        dtl.stream_collection.delete(StreamCollection.id)

## Clear ontologies
for Ontology in dtl.ontology.list():
#     dtl.ontology.delete(ontology.id)
    if (Ontology.name[:12] == 'MFG Ontology'):
        OntologyID = Ontology.id

#BOT.server_summary()

# After running the above, the Stores and Collections variables should both be 0

In [135]:
# truncate the postgres table before writing data

import psycopg2
#from config import Config
 
def connect():
    """ Connect to the PostgreSQL database server """
    conn = None
    try:
        # read connection parameters
        #params = config()
 
        # connect to the PostgreSQL server
        print('Connecting to the PostgreSQL database...')
        #conn = psycopg2.connect(**params)
        conn = psycopg2.connect(host="34.73.161.131",database="demo", user="postgres", password="devout-north-solitude")

      
        # create a cursor
        cur = conn.cursor()
        
   # execute a statement
        print('PostgreSQL database version:')
        cur.execute('SELECT version()')
 
        # display the PostgreSQL database server version
        db_version = cur.fetchone()
        print(db_version)
        
        # truncate the table
        cur.execute('TRUNCATE TABLE mfg')
        
       # close the communication with the PostgreSQL
        cur.close()
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
    finally:
        if conn is not None:
            conn.commit()
            conn.close()
            print('Database connection closed.')
 
 
if __name__ == '__main__':
    connect()

Connecting to the PostgreSQL database...
PostgreSQL database version:
('PostgreSQL 11.5 on x86_64-pc-linux-gnu, compiled by gcc (Debian 7.3.0-5) 7.3.0, 64-bit',)
Database connection closed.


## 2. Read Source Files from S3 bucket

In [136]:
from boto.s3.connection import S3Connection

conn = S3Connection('AKIAIXM6CXHGHC62R7GA','Gcb34qctsvPoQJGGDrXzmwMbyaCZOg6zY1RFOVQO')
bucket = conn.get_bucket('datalogue-demo')

keys = ["store_name", "URL"]
npl_data = []

for key in bucket.list():
    if 'data_machine' in key.name:
        url="https://datalogue-demo.s3.amazonaws.com/" + key.name
        values = ["demo-"+key.name, url]
        npl_data.append(dict(zip(keys, values)))
            
print(type(npl_data))
print(type(npl_data[0]))
print(npl_data)


<class 'list'>
<class 'dict'>
[{'store_name': 'demo-mfg/00_data_machine.csv', 'URL': 'https://datalogue-demo.s3.amazonaws.com/mfg/00_data_machine.csv'}, {'store_name': 'demo-mfg/01_data_machine.csv', 'URL': 'https://datalogue-demo.s3.amazonaws.com/mfg/01_data_machine.csv'}, {'store_name': 'demo-mfg/02_data_machine.csv', 'URL': 'https://datalogue-demo.s3.amazonaws.com/mfg/02_data_machine.csv'}, {'store_name': 'demo-mfg/03-data_machine.json', 'URL': 'https://datalogue-demo.s3.amazonaws.com/mfg/03-data_machine.json'}, {'store_name': 'demo-mfg/04-data_machine.json', 'URL': 'https://datalogue-demo.s3.amazonaws.com/mfg/04-data_machine.json'}]


In [137]:

print("\nCSV Machine Sources to connect to:\n" "-------------------------")
for data_store in npl_data:
    print("➜ " + data_store["store_name"])
print("\n")



CSV Machine Sources to connect to:
-------------------------
➜ demo-mfg/00_data_machine.csv
➜ demo-mfg/01_data_machine.csv
➜ demo-mfg/02_data_machine.csv
➜ demo-mfg/03-data_machine.json
➜ demo-mfg/04-data_machine.json




## 3. Create datastore connections for each file in S3 bucket

In [138]:
current_stores = []

for data_store in npl_data:
#    print(data_store["store_name"], "---", data_store["store_name"][-4:])
    if (data_store["store_name"][-4:] == '.csv'):
        data_store["datastore_object"] = dtl.datastore.create(
            Datastore(
                data_store["store_name"],
                HttpDatastoreDef(data_store["URL"], FileFormat.Csv),))
    if (data_store["store_name"][-4:] == '.xml'):
        data_store["datastore_object"] = dtl.datastore.create(
            Datastore(
                data_store["store_name"],
                HttpDatastoreDef(data_store["URL"], FileFormat.Xml),))
    if (data_store["store_name"][-4:] == 'json'):
        data_store["datastore_object"] = dtl.datastore.create(
            Datastore(
                data_store["store_name"],
                HttpDatastoreDef(data_store["URL"], FileFormat.Json),))

    current_stores.append(data_store["datastore_object"])

print(type(current_stores))
print(type(current_stores[0]))

print(data_store)


<class 'list'>
<class 'datalogue.models.datastore.Datastore'>
{'store_name': 'demo-mfg/04-data_machine.json', 'URL': 'https://datalogue-demo.s3.amazonaws.com/mfg/04-data_machine.json', 'datastore_object': Datastore(id: 72f11ac2-fb2e-4ab7-9a12-2fc9fc204193, name: 'demo-mfg/04-data_machine.json', alias: None, credential_id: None, definition: <datalogue.models.datastore.HttpDatastoreDef object at 0x000002C5C5CBB898>, samples: None, schema_paths: [], schema_labels: [], schema_nodes: None)}


###           3b. Create datastore for RDBMS target

In [139]:
# host: 34.74.11.127 (use jdbc:postgresql://34.74.11.127:5432/demo for creating target store)
# user: postgres
# pw: L8am0pO5zjJrFm2O

# bug in SDK for v<1.0; to be updated here but created in GUI for now


## 4. Collecting data stores into a collection

This is just used for organization, and uses the command `dtl.datastore_collection.create`.

In [140]:
npl_collection = DatastoreCollection(
  name ="demo-MFG Collection",
  storeIds = [Datastore["datastore_object"].id for Datastore in npl_data],
  description = "Manufacturing data of various formats"
)

dtl.datastore_collection.create(npl_collection)

DatastoreCollection(id: e16e9369-7164-427c-8f9e-677b74b2d6af, name: 'demo-MFG Collection')

In [141]:
#Deploy the model
from datalogue.models.training import *
import uuid

OntologyId = 'b00f972c-8c68-49c1-8de5-babde571ae92'
trainingId = dtl.training.get_trainings(uuid.UUID(str(OntologyID)))[0].id

print(OntologyId)
print(trainingId)

dtl.training.deploy(trainingId, OntologyId)

b00f972c-8c68-49c1-8de5-babde571ae92
b05710cc-62cb-4e59-b337-6fc49fd88433


True

## 5. Creating a stream


In [142]:
#set target of stream:
my_output_store = dtl.datastore.get(targetDS)
print(my_output_store)

Datastore(id: 5d4f4f1b-55d6-4ef0-823e-d37d7f58dc3f, name: 'dtl-demo mfg', alias: None, credential_id: 970d851a-e74d-4edf-b604-af18ffdaf009, definition: <datalogue.models.datastore.JdbcDatastoreDef object at 0x000002C5C444EA58>, samples: None, schema_paths: [], schema_labels: [], schema_nodes: None)


#### Sample pipeline

In [143]:
# Define the target output schema transformation using 'structure'

std_schema = Structure([
        ClassNodeDescription(
            path = ["Machine ID_"],
            tag = "Machine ID",
            pick_strategy = PickStrategy.HighScore,
            data_type = DataType.String
        ),
        ClassNodeDescription(
            path = ["Part ID_"],
            tag = "Part ID",
            pick_strategy = PickStrategy.HighScore,
            data_type = DataType.String
        ),
        ClassNodeDescription(
            path = ["Output Qty_"],
            tag = "Output Qty",
            pick_strategy = PickStrategy.HighScore,
            data_type = DataType.String
        ),
        ClassNodeDescription(
            path = ["Timestamp_"],
            tag = "Timestamp",
            pick_strategy = PickStrategy.HighScore,
            data_type = DataType.String
        ),
        ClassNodeDescription(
            path = ["Machine Status_"],
            tag = "Machine Status",
            pick_strategy = PickStrategy.HighScore,
            data_type = DataType.String
        ),
        ClassNodeDescription(
            path = ["Rejects_"],
            tag = "Rejects",
            pick_strategy = PickStrategy.HighScore,
            data_type = DataType.String
        ),
        ClassNodeDescription(
            path = ["City_"],
            tag = "City",
            pick_strategy = PickStrategy.HighScore,
            data_type = DataType.String
        ),
        ClassNodeDescription(
            path = ["Line ID_"],
            tag = "Line ID",
            pick_strategy = PickStrategy.HighScore,
            data_type = DataType.String
        ),
        ClassNodeDescription(
            path = ["State_"],
            tag = "State",
            pick_strategy = PickStrategy.HighScore,
            data_type = DataType.String
        )
    ]
)

In [144]:
from datalogue.models.training import *
import uuid

modelUuid = dtl.training.get_trainings(uuid.UUID(str(OntologyID)))[0].id
print(modelUuid)

b05710cc-62cb-4e59-b337-6fc49fd88433


In [145]:
# Define classify transformation

#tx_definition = Definition(    # (List[Transformation], pipelines: List['Definition'], target_datastore )
#            [
#                Classify(training_id = modelUuid, use_context=True, include_classes=False, include_scores=False),
#                std_schema
#            ], # List of transformations
#        [], # pipelines list
#            my_output_store, # target_datastore
#        )

#print(type(tx_definition))
#print(tx_definition)

In [146]:
x = dtl.training.get_trainings('b00f972c-8c68-49c1-8de5-babde571ae92')
print(type(x))
print(x[0].id)


<class 'list'>
b05710cc-62cb-4e59-b337-6fc49fd88433


In [147]:
# Define classify transformation
from datalogue.models.transformations.classify import Classifier, MLMethod
tx_definition = Definition(    # (List[Transformation], pipelines: List['Definition'], target_datastore )
            [
                Classify(Classifier([MLMethod('b05710cc-62cb-4e59-b337-6fc49fd88433')])),
                std_schema
            ], # List of transformations
        [], # pipelines list
            my_output_store, # target_datastore
        )

print(type(tx_definition))
print(tx_definition)

<class 'datalogue.models.stream.Definition'>
Pipeline(type: [Classify(classifier: Classifier(default_class: None classification_methods: 
[MLMethod(
 model_id: b05710cc-62cb-4e59-b337-6fc49fd88433, threshold: None
)]), fields_to_target: None, add_class_fields: False, add_score_fields: False), Structure(structure: [ClassNodeDescription(path: ['Machine ID_'], tag: Machine ID, strategy: PickStrategy.HighScore, dataType: DataType.String), ClassNodeDescription(path: ['Part ID_'], tag: Part ID, strategy: PickStrategy.HighScore, dataType: DataType.String), ClassNodeDescription(path: ['Output Qty_'], tag: Output Qty, strategy: PickStrategy.HighScore, dataType: DataType.String), ClassNodeDescription(path: ['Timestamp_'], tag: Timestamp, strategy: PickStrategy.HighScore, dataType: DataType.String), ClassNodeDescription(path: ['Machine Status_'], tag: Machine Status, strategy: PickStrategy.HighScore, dataType: DataType.String), ClassNodeDescription(path: ['Rejects_'], tag: Rejects, strategy: Pick

In [148]:
# Define n stream(s), where n is number of datastore connections created from S3 bucket scan
n = len(current_stores)
i = 1

list_of_streams = []
for i in range(n):
    stream = Stream(current_stores[i], [tx_definition])
    i += 1
    list_of_streams.append(stream)

print(type(list_of_streams))
print(type(list_of_streams[0]))    
print(list_of_streams[0])

<class 'list'>
<class 'datalogue.models.stream.Stream'>
Stream(type: <datalogue.models.datastore.HttpDatastoreDef object at 0x000002C5C5CBBE10>, pipelines: [Pipeline(type: [Classify(classifier: Classifier(default_class: None classification_methods: 
[MLMethod(
 model_id: b05710cc-62cb-4e59-b337-6fc49fd88433, threshold: None
)]), fields_to_target: None, add_class_fields: False, add_score_fields: False), Structure(structure: [ClassNodeDescription(path: ['Machine ID_'], tag: Machine ID, strategy: PickStrategy.HighScore, dataType: DataType.String), ClassNodeDescription(path: ['Part ID_'], tag: Part ID, strategy: PickStrategy.HighScore, dataType: DataType.String), ClassNodeDescription(path: ['Output Qty_'], tag: Output Qty, strategy: PickStrategy.HighScore, dataType: DataType.String), ClassNodeDescription(path: ['Timestamp_'], tag: Timestamp, strategy: PickStrategy.HighScore, dataType: DataType.String), ClassNodeDescription(path: ['Machine Status_'], tag: Machine Status, strategy: PickStrat

In [149]:
print(type(current_stores[0]))
print(current_stores[0])
print("\n")

print(len(list_of_streams))

<class 'datalogue.models.datastore.Datastore'>
Datastore(id: cef0fea6-29fa-4a3d-b819-c263053add37, name: 'demo-mfg/00_data_machine.csv', alias: None, credential_id: None, definition: <datalogue.models.datastore.HttpDatastoreDef object at 0x000002C5C5CBBE10>, samples: None, schema_paths: [], schema_labels: [], schema_nodes: None)


5


In [150]:
# Put the streams in a collection

stream_collection = dtl.stream_collection.create(
    list_of_streams,
    'demo-MFGPipeline'
)

print(type(stream_collection))
print(stream_collection)

<class 'datalogue.models.stream_collection.StreamCollection'>
StreamCollection(id: d7b66dd6-34e8-47c6-8dfc-ef0de3ea600a, name: 'demo-MFGPipeline', streams: [StreamMetadata(id: 6ceabe02-2811-477a-8012-1ba8b43937e8, is_ready: False, stream: Stream(type: <datalogue.models.datastore.HttpDatastoreDef object at 0x000002C5C503FC18>, pipelines: [Pipeline(type: [Classify(classifier: Classifier(default_class: None classification_methods: 
[MLMethod(
 model_id: b05710cc-62cb-4e59-b337-6fc49fd88433, threshold: None
)]), fields_to_target: None, add_class_fields: False, add_score_fields: False), Structure(structure: [ClassNodeDescription(path: ['Machine ID_'], tag: Machine ID, strategy: PickStrategy.HighScore, dataType: DataType.String), ClassNodeDescription(path: ['Part ID_'], tag: Part ID, strategy: PickStrategy.HighScore, dataType: DataType.String), ClassNodeDescription(path: ['Output Qty_'], tag: Output Qty, strategy: PickStrategy.HighScore, dataType: DataType.String), ClassNodeDescription(path:

In [151]:
# Run the Collection

dtl.stream_collection.run(stream_collection.id)

[Job(id: UUID('3cd5362d-f8f4-4eaf-85f7-6f1badd2427c'), stream_id: UUID('7533cac9-3921-4d4e-addf-e28004a12550'), stream_collection_id: UUID('d7b66dd6-34e8-47c6-8dfc-ef0de3ea600a'), status: Scheduled, run_at: datetime.datetime(2019, 11, 27, 14, 51, tzinfo=tzutc()), created_by: UUID('5b333964-8fab-4ab0-9052-25f69fcb8689'), remaining_time_millis: 9223372036854775807, percent_progress: 0, errors: None, ended_at: None,
 Job(id: UUID('157d2dac-ba70-48e0-9ebf-70cfd879e2c7'), stream_id: UUID('4d6b88cf-d0fe-4e6d-a9bb-c9a6cd7bbbe5'), stream_collection_id: UUID('d7b66dd6-34e8-47c6-8dfc-ef0de3ea600a'), status: Scheduled, run_at: datetime.datetime(2019, 11, 27, 14, 51, tzinfo=tzutc()), created_by: UUID('5b333964-8fab-4ab0-9052-25f69fcb8689'), remaining_time_millis: 9223372036854775807, percent_progress: 0, errors: None, ended_at: None,
 Job(id: UUID('7a4aa347-f09e-4378-8927-b6fad8bc7384'), stream_id: UUID('6670100e-0a14-41c6-b943-e7203e249939'), stream_collection_id: UUID('d7b66dd6-34e8-47c6-8dfc-ef