# Set-up

## Import Datalogue libraries

Note, you'll need to have downloaded and installed the Datalogue SDK before this step will work.

Right now, to do so you will need to get access through Artifactory.

In [72]:
# Import Datalogue libraries 
from datalogue import *
from datalogue.version import __version__
from datalogue.models.ontology import *
from datalogue.models.datastore_collection import *
from datalogue.models.datastore import *
from datalogue.models.datastore import GCSDatastoreDef 
from datalogue.models.credentials import *
from datalogue.models.stream import *
from datalogue.models.transformations import *
from datalogue.models.transformations.structure import *
from datalogue.dtl import Dtl, DtlCredentials
from datalogue.models.training import DataRef

# Import Datalogue Bag of Tricks
from DTLBagOTricks import DTL as DTLHelper


# Import other useful libraries
from datetime import datetime, timedelta
from os import environ
import pandas
from IPython.display import Image

# Checks the version of the SDK is correct
# The expected version is 0.28.3
# If the SDK is not installed, run `! pip install datalogue` and restart the Jupyter Notebook kernel
# If the wrong versions is installed, run `! pip install datalogue --upgrade` and restart the Jupyter Notebook kernel
__version__

'0.31.1'

In [73]:
# Set host, username and password variables

datalogue_host = "https://internal.dtl.systems"  # for connecting to internal (note)

# datalogue_host = "https://internal.dtl.systems"  # for connecting to internal (note)
# datalogue_host = "http://10.2.161.119:3000"  # for connecting to Eric's DGX
#email = environ.get("DTL_EMAIL")
email = "chrisr@datalogue.io"
#password = environ.get("DTL_PASSWORD")
password = "StreudelSauce1!"

# Log in to Datalogue
BOT = DTLHelper(datalogue_host, email, password)
dtl = BOT.dtl

# Expected output Datalogue v0.28.3
# "Logged in '[host location]' with '[username]' account)"

Datalogue v0.31.1
Logged in 'https://internal.dtl.systems/api' with 'chrisr@datalogue.io' account.


In [74]:
# Deploy the model before the tidy up to give it some time to be ready:

from datalogue.models.training import *
import uuid

OntologyId = '395a2b81-86c5-4f17-9d5f-6f17d4ae84f0'
trainingId = dtl.training.get_trainings(uuid.UUID(str(OntologyId)))[0].id

print(OntologyId)
print(trainingId)

dtl.training.deploy(trainingId, OntologyId)

395a2b81-86c5-4f17-9d5f-6f17d4ae84f0
f1175acb-f2e8-4175-a67f-3b67d57a1623


True

In [75]:
# First, let's clean up the assets this workbook creates from previous runs

# Warning! this will clean all your datastores and data collections and credentials

#BOT.server_summary()

# Clear Datastores and Datastore Collections
for store in dtl.datastore.list():
#    print(store.name, ',', store.name[:5])
    if (store.name == 'dtl-demo npl'):
        targetDS = store.id
        
    if (store.name[:5] == 'demo-'):
        dtl.datastore.delete(store.id)

for store in dtl.datastore_collection.list():
#    print(store.name)
    if (store.name[:5] == 'demo-'):    
        dtl.datastore_collection.delete(store.id)

# Clear data pipelines
for StreamCollection in dtl.stream_collection.list():
    print(StreamCollection, '\n')
    if (StreamCollection.name[:5] == 'demo-'):
        dtl.stream_collection.delete(StreamCollection.id)
    if (StreamCollection.name == 'Unnamed Pipeline'):
        dtl.stream_collection.delete(StreamCollection.id)        

## Clear ontologies
for Ontology in dtl.ontology.list():
#     dtl.ontology.delete(ontology.id)
    if (Ontology.name[:12] == 'NPL Ontology'):
        OntologyID = Ontology.id

print(targetDS)

StreamCollection(id: 69b4cecc-12c4-4af4-a026-5130424dbebe, name: 'Quiz Pipeline: CCohen', streams: [StreamMetadata(id: e2ae1ede-c940-4c4b-be6b-89993f3965b5, is_ready: True, stream: Stream(type: <datalogue.models.datastore.HttpDatastoreDef object at 0x000001E34781CA20>, pipelines: [Pipeline(type: [ElementCountSelection(count: 250)], pipelines: [], target: GCSDatastoreDef(bucket: 'dtl-training', file_name: '250names.csv', file_format: Csv, params: {}))], env: None)), StreamMetadata(id: 28f6ef91-b24b-4d49-92ed-5b4ff79df22b, is_ready: True, stream: Stream(type: <datalogue.models.datastore.VoidDef object at 0x000001E336782320>, pipelines: [Pipeline(type: [], pipelines: [], target: <datalogue.models.datastore.VoidDef object at 0x000001E336782978>)], env: None)), StreamMetadata(id: 03a74bdf-52f5-4334-ac91-77cdcfaa812d, is_ready: True, stream: Stream(type: <datalogue.models.datastore.VoidDef object at 0x000001E3477F4DA0>, pipelines: [Pipeline(type: [], pipelines: [], target: <datalogue.models.

d1babc12-8a60-4398-a5cc-7686c1ff25fc


In [76]:
# truncate the postgres table before writing data

import psycopg2
#from config import Config
 
def connect():
    """ Connect to the PostgreSQL database server """
    conn = None
    try:
        # read connection parameters
        #params = config()
 
        # connect to the PostgreSQL server
        print('Connecting to the PostgreSQL database...')
        #conn = psycopg2.connect(**params)
        conn = psycopg2.connect(host="34.73.161.131",database="demo", user="postgres", password="devout-north-solitude")

      
        # create a cursor
        cur = conn.cursor()
        
   # execute a statement
        print('PostgreSQL database version:')
        cur.execute('SELECT version()')
 
        # display the PostgreSQL database server version
        db_version = cur.fetchone()
        print(db_version)
        
        # truncate the table
        cur.execute('TRUNCATE TABLE npl')
        
       # close the communication with the PostgreSQL
        cur.close()
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
    finally:
        if conn is not None:
            conn.commit()
            conn.close()
            print('Database connection closed.')
 
 
if __name__ == '__main__':
    connect()

Connecting to the PostgreSQL database...
PostgreSQL database version:
('PostgreSQL 11.5 on x86_64-pc-linux-gnu, compiled by gcc (Debian 7.3.0-5) 7.3.0, 64-bit',)
Database connection closed.


## 2. Read Source Files from S3 bucket

In [77]:
from boto.s3.connection import S3Connection

conn = S3Connection('AKIAIXM6CXHGHC62R7GA','Gcb34qctsvPoQJGGDrXzmwMbyaCZOg6zY1RFOVQO')
bucket = conn.get_bucket('datalogue-demo')

keys = ["store_name", "URL"]
npl_data = []

for key in bucket.list():
    if 'Loan' in key.name:
        url="https://datalogue-demo.s3.amazonaws.com/" + key.name
        values = ["demo-"+key.name, url]
        npl_data.append(dict(zip(keys, values)))

print(type(npl_data))
print(type(npl_data[0]))

<class 'list'>
<class 'dict'>


In [78]:

print("\nCSV Customers Sources to connect to:\n" "-------------------------")
for data_store in npl_data:
    print("➜ " + data_store["store_name"])
print("\n")



CSV Customers Sources to connect to:
-------------------------
➜ demo-NPL/LoanTape1USD.csv
➜ demo-NPL/LoanTape2USD.csv
➜ demo-NPL/LoanTape3EUR.csv
➜ demo-NPL/LoanTape4GBP.csv
➜ demo-NPL/LoanTape5USD.json




## 3. Create datastore connections for each file in S3 bucket

In [79]:
current_stores = []

for data_store in npl_data:
    data_store["datastore_object"] = dtl.datastore.create(
        Datastore(
            data_store["store_name"],
            HttpDatastoreDef(data_store["URL"], FileFormat.Csv),
        )
    )
    print(data_store)
    current_stores.append(data_store["datastore_object"])

print(type(current_stores))
print(type(current_stores[0]))

{'store_name': 'demo-NPL/LoanTape1USD.csv', 'URL': 'https://datalogue-demo.s3.amazonaws.com/NPL/LoanTape1USD.csv', 'datastore_object': Datastore(id: 4e681fbd-14ff-4f96-9a59-24fbf2af6ef6, name: 'demo-NPL/LoanTape1USD.csv', alias: None, credential_id: None, definition: <datalogue.models.datastore.HttpDatastoreDef object at 0x000001E3477F49E8>, samples: None, schema_paths: [], schema_labels: [], schema_nodes: None)}
{'store_name': 'demo-NPL/LoanTape2USD.csv', 'URL': 'https://datalogue-demo.s3.amazonaws.com/NPL/LoanTape2USD.csv', 'datastore_object': Datastore(id: 78a11891-ca4e-40ca-b30b-77b03a3d86d9, name: 'demo-NPL/LoanTape2USD.csv', alias: None, credential_id: None, definition: <datalogue.models.datastore.HttpDatastoreDef object at 0x000001E347CCF470>, samples: None, schema_paths: [], schema_labels: [], schema_nodes: None)}
{'store_name': 'demo-NPL/LoanTape3EUR.csv', 'URL': 'https://datalogue-demo.s3.amazonaws.com/NPL/LoanTape3EUR.csv', 'datastore_object': Datastore(id: 6c8fd43a-ba94-4d8

###           3b. Create datastore for RDBMS target

In [80]:
# host: 34.74.11.127 (use jdbc:postgresql://34.74.11.127:5432/demo for creating target store)
# user: postgres
# pw: L8am0pO5zjJrFm2O

# bug in SDK for v<1.0; to be updated here but created in GUI for now


## 4. Collecting data stores into a collection

This is just used for organization, and uses the command `dtl.datastore_collection.create`.

In [81]:
print(type(npl_data))
print(type(npl_data[0]))
print(npl_data[0])
for test in npl_data:
#    print(type(test))
    print(test["datastore_object"].id)


<class 'list'>
<class 'dict'>
{'store_name': 'demo-NPL/LoanTape1USD.csv', 'URL': 'https://datalogue-demo.s3.amazonaws.com/NPL/LoanTape1USD.csv', 'datastore_object': Datastore(id: 4e681fbd-14ff-4f96-9a59-24fbf2af6ef6, name: 'demo-NPL/LoanTape1USD.csv', alias: None, credential_id: None, definition: <datalogue.models.datastore.HttpDatastoreDef object at 0x000001E3477F49E8>, samples: None, schema_paths: [], schema_labels: [], schema_nodes: None)}
4e681fbd-14ff-4f96-9a59-24fbf2af6ef6
78a11891-ca4e-40ca-b30b-77b03a3d86d9
6c8fd43a-ba94-4d89-ae84-735a74f597dd
b1bb5a35-95f5-42c9-be06-6aee051bb571
b4b7b001-ca92-48b2-a002-0e5f776bac57


In [82]:
npl_collection = DatastoreCollection(
  name ="demo-NPL Collection",
  storeIds = [Datastore["datastore_object"].id for Datastore in npl_data],
  description = "NPL tape data of various formats"
)


In [83]:
npl_collection2 = dtl.datastore_collection.create(npl_collection)


## 5. Creating a stream


In [84]:
my_output_store = dtl.datastore.get(targetDS)


In [85]:
print(type(my_output_store))
print(my_output_store)

<class 'datalogue.models.datastore.Datastore'>
Datastore(id: d1babc12-8a60-4398-a5cc-7686c1ff25fc, name: 'dtl-demo npl', alias: None, credential_id: 970d851a-e74d-4edf-b604-af18ffdaf009, definition: <datalogue.models.datastore.JdbcDatastoreDef object at 0x000001E3489C8D68>, samples: None, schema_paths: [], schema_labels: [], schema_nodes: None)


#### Sample pipeline

In [86]:
# Define the target output schema transformation using 'structure'

std_schema = Structure([
        ClassNodeDescription(
            path = ["LoanID_"],
            tag = "Loan ID",
            pick_strategy = PickStrategy.HighScore,
            data_type = DataType.String
        ),
        ClassNodeDescription(
            path = ["Unpaid_Principal_"],
            tag = "Unpaid Principal Balance",
            pick_strategy = PickStrategy.HighScore,
            data_type = DataType.String
        ),
        ClassNodeDescription(
            path = ["Orig_Val_"],
            tag = "Origination Value",
            pick_strategy = PickStrategy.HighScore,
            data_type = DataType.String
        ),
        ClassNodeDescription(
            path = ["Int_Type_"],
            tag = "Interest Type",
            pick_strategy = PickStrategy.HighScore,
            data_type = DataType.String
        ),
        ClassNodeDescription(
            path = ["Mat_Date_"],
            tag = "Maturity Date",
            pick_strategy = PickStrategy.HighScore,
            data_type = DataType.String
        ),
        ClassNodeDescription(
            path = ["Currency_"],
            tag = "Currency",
            pick_strategy = PickStrategy.HighScore,
            data_type = DataType.String
        ),
        ClassNodeDescription(
            path = ["Orig_Country_"],
            tag = "Country of Origination",
            pick_strategy = PickStrategy.HighScore,
            data_type = DataType.String
        ),
        ClassNodeDescription(
            path = ["Days_Past_Due_"],
            tag = "Days Past Due",
            pick_strategy = PickStrategy.HighScore,
            data_type = DataType.String
        )
    ]
)

In [87]:
from datalogue.models.training import *
import uuid
OntologyID = '395a2b81-86c5-4f17-9d5f-6f17d4ae84f0'
modelUuid = dtl.training.get_trainings(uuid.UUID(str(OntologyID)))[0].id
print(modelUuid)

f1175acb-f2e8-4175-a67f-3b67d57a1623


In [88]:
# Define classify transformation

#from datalogue.models.transformations import ReplaceLabel

#tx_definition = Definition(    # (List[Transformation], pipelines: List['Definition'], target_datastore )
#            [
#                Classify(training_id = modelUuid, use_context=True, include_classes=False, include_scores=False),
#                std_schema
#            ], # List of transformations
#            [], # pipelines list
#            my_output_store, # target_datastore
#        )

In [89]:
x = dtl.training.get_trainings('395a2b81-86c5-4f17-9d5f-6f17d4ae84f0')
print(type(x))
print(x[0].id)

<class 'list'>
f1175acb-f2e8-4175-a67f-3b67d57a1623


In [90]:
# Define classify transformation
from datalogue.models.transformations.classify import Classifier, MLMethod
tx_definition = Definition(    # (List[Transformation], pipelines: List['Definition'], target_datastore )
            [
                Classify(Classifier([MLMethod('f1175acb-f2e8-4175-a67f-3b67d57a1623')])),
                std_schema
            ], # List of transformations
        [], # pipelines list
            my_output_store, # target_datastore
        )

print(type(tx_definition))
print(tx_definition)

<class 'datalogue.models.stream.Definition'>
Pipeline(type: [Classify(classifier: Classifier(default_class: None classification_methods: 
[MLMethod(
 model_id: f1175acb-f2e8-4175-a67f-3b67d57a1623, threshold: None
)]), fields_to_target: None, add_class_fields: False, add_score_fields: False), Structure(structure: [ClassNodeDescription(path: ['LoanID_'], tag: Loan ID, strategy: PickStrategy.HighScore, dataType: DataType.String), ClassNodeDescription(path: ['Unpaid_Principal_'], tag: Unpaid Principal Balance, strategy: PickStrategy.HighScore, dataType: DataType.String), ClassNodeDescription(path: ['Orig_Val_'], tag: Origination Value, strategy: PickStrategy.HighScore, dataType: DataType.String), ClassNodeDescription(path: ['Int_Type_'], tag: Interest Type, strategy: PickStrategy.HighScore, dataType: DataType.String), ClassNodeDescription(path: ['Mat_Date_'], tag: Maturity Date, strategy: PickStrategy.HighScore, dataType: DataType.String), ClassNodeDescription(path: ['Currency_'], tag: Cu

In [91]:
# Define n stream(s), where n is number of datastore connections created from S3 bucket scan
n = len(current_stores)
i = 1

list_of_streams = []
for i in range(n):
    stream = Stream(current_stores[i], [tx_definition])
    i += 1
    list_of_streams.append(stream)

print(type(list_of_streams))
print(type(list_of_streams[0]))    

<class 'list'>
<class 'datalogue.models.stream.Stream'>


In [92]:
# Put the streams in a collection

stream_collection = dtl.stream_collection.create(
    list_of_streams,
    "demo-NPL pipeline"
)

In [93]:
# Run the Collection

dtl.stream_collection.run(stream_collection.id)

[Job(id: UUID('6a53a0a2-61cf-4924-9176-e8bbd675a736'), stream_id: UUID('9197f3c6-b53d-44de-8916-22ad549bc540'), stream_collection_id: UUID('5e7e2ea5-212d-427e-bd05-32cc45fff302'), status: Scheduled, run_at: datetime.datetime(2019, 11, 22, 10, 58, 33, tzinfo=tzutc()), created_by: UUID('5b333964-8fab-4ab0-9052-25f69fcb8689'), remaining_time_millis: 9223372036854775807, percent_progress: 0, errors: None, ended_at: None,
 Job(id: UUID('afbd9b22-815e-4bd8-a7e9-6c1a37169b61'), stream_id: UUID('6af8aa62-0e3f-477a-8b7d-4b66bb006d3c'), stream_collection_id: UUID('5e7e2ea5-212d-427e-bd05-32cc45fff302'), status: Scheduled, run_at: datetime.datetime(2019, 11, 22, 10, 58, 33, tzinfo=tzutc()), created_by: UUID('5b333964-8fab-4ab0-9052-25f69fcb8689'), remaining_time_millis: 9223372036854775807, percent_progress: 0, errors: None, ended_at: None,
 Job(id: UUID('98596316-eda7-4d65-9262-0cf1c16a9cdc'), stream_id: UUID('ceee7f05-7d24-40cf-8988-730c106c4e3a'), stream_collection_id: UUID('5e7e2ea5-212d-427e