# Set-up

## Import Datalogue libraries

Note, you'll need to have downloaded and installed the Datalogue SDK before this step will work.

Right now, to do so you will need to get access through Artifactory.

In [124]:
# Import Datalogue libraries 
from datalogue import *
from datalogue.version import __version__
from datalogue.models.ontology import *
from datalogue.models.datastore_collection import *
from datalogue.models.datastore import *
from datalogue.models.datastore import GCSDatastoreDef 
from datalogue.models.credentials import *
from datalogue.models.stream import *
from datalogue.models.transformations import *
from datalogue.models.transformations.structure import *
from datalogue.dtl import Dtl, DtlCredentials
from datalogue.models.training import DataRef

# Import Datalogue Bag of Tricks
from DTLBagOTricks import DTL as DTLHelper


# Import other useful libraries
from datetime import datetime, timedelta
from os import environ
import pandas
from IPython.display import Image

# Checks the version of the SDK is correct
# The expected version is 0.28.3
# If the SDK is not installed, run `! pip install datalogue` and restart the Jupyter Notebook kernel
# If the wrong versions is installed, run `! pip install datalogue --upgrade` and restart the Jupyter Notebook kernel
__version__

'0.31.1'

In [125]:
# Set host, username and password variables
datalogue_host = "https://internal.dtl.systems"  # for connecting to internal (note)

#email = environ.get("DTL_EMAIL")
email = "chrisr@datalogue.io"
#password = environ.get("DTL_PASSWORD")
password = "StreudelSauce1!"

# Log in to Datalogue
BOT = DTLHelper(datalogue_host, email, password)
dtl = BOT.dtl

Datalogue v0.31.1
Logged in 'https://internal.dtl.systems/api' with 'chrisr@datalogue.io' account.


In [126]:
# Deploy the model before the tidy up to give it some time to be ready:

from datalogue.models.training import *
import uuid

OntologyId = 'ce9ced64-dbc6-468e-b86a-e3941c025e37'
trainingId = dtl.training.get_trainings(uuid.UUID(str(OntologyId)))[0].id

print(OntologyId)
print(trainingId)

dtl.training.deploy(trainingId, OntologyId)

ce9ced64-dbc6-468e-b86a-e3941c025e37
ffcc1ec1-89df-4df8-bac4-0db036f9977f


True

In [127]:
# First, let's clean up the assets this workbook creates from previous runs

# Warning! this will clean all your datastores and data collections and credentials

#BOT.server_summary()

# Clear Datastores and Datastore Collections
for store in dtl.datastore.list():
#    print(store.name, ',', store.name[:5])
    if (store.name == 'dtl-PG-cpg'):
        targetDS = store.id
        
    if (store.name[:5] == 'demo-'):
        dtl.datastore.delete(store.id)

for store in dtl.datastore_collection.list():
#    print(store.name)
    if (store.name[:5] == 'demo-'):    
        dtl.datastore_collection.delete(store.id)

# Clear data pipelines
for StreamCollection in dtl.stream_collection.list():
#    print(StreamCollection, '\n')
    if (StreamCollection.name[:5] == 'demo-'):
        dtl.stream_collection.delete(StreamCollection.id)
    if (StreamCollection.name == ''):
        dtl.stream_collection.delete(StreamCollection.id)

## Clear ontologies
for Ontology in dtl.ontology.list():
#     dtl.ontology.delete(ontology.id)
    if (Ontology.name[:12] == 'CPG Ontology'):
        OntologyID = Ontology.id

#BOT.server_summary()


In [128]:
# truncate the postgres table before writing data

import psycopg2
#from config import Config
 
def connect():
    """ Connect to the PostgreSQL database server """
    conn = None
    try:
        # read connection parameters
        #params = config()
 
        # connect to the PostgreSQL server
        print('Connecting to the PostgreSQL database...')
        #conn = psycopg2.connect(**params)
        conn = psycopg2.connect(host="34.73.161.131",database="demo", user="postgres", password="devout-north-solitude")

      
        # create a cursor
        cur = conn.cursor()
        
   # execute a statement
        print('PostgreSQL database version:')
        cur.execute('SELECT version()')
 
        # display the PostgreSQL database server version
        db_version = cur.fetchone()
        print(db_version)
        
        # truncate the table
        cur.execute('TRUNCATE TABLE cpg')
        
       # close the communication with the PostgreSQL
        cur.close()
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
    finally:
        if conn is not None:
            conn.commit()
            conn.close()
            print('Database connection closed.')
 
 
if __name__ == '__main__':
    connect()

Connecting to the PostgreSQL database...
PostgreSQL database version:
('PostgreSQL 11.5 on x86_64-pc-linux-gnu, compiled by gcc (Debian 7.3.0-5) 7.3.0, 64-bit',)
relation "cpg" does not exist

Database connection closed.


## 2. Read Source Files from S3 bucket

In [129]:
from boto.s3.connection import S3Connection

conn = S3Connection('AKIAIXM6CXHGHC62R7GA','Gcb34qctsvPoQJGGDrXzmwMbyaCZOg6zY1RFOVQO')
bucket = conn.get_bucket('datalogue-demo')

keys = ["store_name", "URL"]
cpg_data = []

for key in bucket.list():
    if 'retail' in key.name:
        url="https://datalogue-demo.s3.amazonaws.com/" + key.name
        values = ["demo-"+key.name, url]
        cpg_data.append(dict(zip(keys, values)))
            
print(type(cpg_data))
print(len(cpg_data))
print(type(cpg_data[0]))
print(cpg_data)


<class 'list'>
3
<class 'dict'>
[{'store_name': 'demo-CPG/raw_data/1_retail_UK_IRE_NED_NOR.csv', 'URL': 'https://datalogue-demo.s3.amazonaws.com/CPG/raw_data/1_retail_UK_IRE_NED_NOR.csv'}, {'store_name': 'demo-CPG/raw_data/2_retail_BAH_BRA_RSA.csv', 'URL': 'https://datalogue-demo.s3.amazonaws.com/CPG/raw_data/2_retail_BAH_BRA_RSA.csv'}, {'store_name': 'demo-CPG/raw_data/3_retail_FR_GER.json', 'URL': 'https://datalogue-demo.s3.amazonaws.com/CPG/raw_data/3_retail_FR_GER.json'}]


In [130]:

print("\nCSV Sources to connect to:\n" "-------------------------")
for data_store in cpg_data:
    print("➜ " + data_store["store_name"])
print("\n")



CSV Sources to connect to:
-------------------------
➜ demo-CPG/raw_data/1_retail_UK_IRE_NED_NOR.csv
➜ demo-CPG/raw_data/2_retail_BAH_BRA_RSA.csv
➜ demo-CPG/raw_data/3_retail_FR_GER.json




## 3. Create datastore connections for each file in S3 bucket

In [131]:
current_stores = []

for data_store in cpg_data:
    print(data_store["store_name"], "---", data_store["store_name"][-4:])
    if (data_store["store_name"][-4:] == '.csv'):
        data_store["datastore_object"] = dtl.datastore.create(
            Datastore(
                data_store["store_name"],
                HttpDatastoreDef(data_store["URL"], FileFormat.Csv),))
    if (data_store["store_name"][-4:] == '.xml'):
        data_store["datastore_object"] = dtl.datastore.create(
            Datastore(
                data_store["store_name"],
                HttpDatastoreDef(data_store["URL"], FileFormat.Xml),))
    if (data_store["store_name"][-4:] == 'json'):
        data_store["datastore_object"] = dtl.datastore.create(
            Datastore(
                data_store["store_name"],
                HttpDatastoreDef(data_store["URL"], FileFormat.Json),))

    current_stores.append(data_store["datastore_object"])

#print(type(current_stores))
#print(type(current_stores[0]))
#print(current_stores[0])


demo-CPG/raw_data/1_retail_UK_IRE_NED_NOR.csv --- .csv
demo-CPG/raw_data/2_retail_BAH_BRA_RSA.csv --- .csv
demo-CPG/raw_data/3_retail_FR_GER.json --- json


###           3b. Create datastore for RDBMS target

In [132]:
# host: 34.74.11.127 (use jdbc:postgresql://34.74.11.127:5432/demo for creating target store)
# user: postgres
# pw: L8am0pO5zjJrFm2O

# bug in SDK for v<1.0; to be updated here but created in GUI for now


## 4. Collecting data stores into a collection

This is just used for organization, and uses the command `dtl.datastore_collection.create`.

In [133]:
cpg_collection = DatastoreCollection(
  name ="demo-CPG Collection",
  storeIds = [Datastore["datastore_object"].id for Datastore in cpg_data],
  description = "Manufacturing data of various formats"
)
cpg_collection2 = dtl.datastore_collection.create(cpg_collection)

### 4b. Deploy the model

In [134]:
from datalogue.models.training import *
import uuid

#OntologyId = 'ce9ced64-dbc6-468e-b86a-e3941c025e37'
#trainingId = dtl.training.get_trainings(uuid.UUID(str(OntologyID)))[0].id

print(OntologyId)
print(trainingId)

#dtl.training.deploy(trainingId, OntologyId)

ce9ced64-dbc6-468e-b86a-e3941c025e37
ffcc1ec1-89df-4df8-bac4-0db036f9977f


## 5. Creating a stream


In [135]:
#set target of stream:

my_output_store = dtl.datastore.get(targetDS)
print(my_output_store)

Datastore(id: 2e2db0a4-b4a6-43b1-b9ae-b4c006956731, name: 'dtl-PG-cpg', alias: None, credential_id: 970d851a-e74d-4edf-b604-af18ffdaf009, definition: <datalogue.models.datastore.JdbcDatastoreDef object at 0x000001DE59E03320>, samples: None, schema_paths: [], schema_labels: [], schema_nodes: None)


#### Sample pipeline

In [136]:
# Define the target output schema transformation using 'structure'

std_schema = Structure([
        ClassNodeDescription(
            path = ["InvoiceNo2"],
            tag = "InvoiceNo",
            pick_strategy = PickStrategy.HighScore,
            data_type = DataType.String
        ),
        ClassNodeDescription(
            path = ["UnitPrice2"],
            tag = "UnitPrice",
            pick_strategy = PickStrategy.HighScore,
            data_type = DataType.String
        ),
        ClassNodeDescription(
            path = ["SKU2"],
            tag = "SKU",
            pick_strategy = PickStrategy.HighScore,
            data_type = DataType.String
        ),
        ClassNodeDescription(
            path = ["Description2"],
            tag = "Description",
            pick_strategy = PickStrategy.HighScore,
            data_type = DataType.String
        ),
        ClassNodeDescription(
            path = ["Quantity2"],
            tag = "Quantity",
            pick_strategy = PickStrategy.HighScore,
            data_type = DataType.String
        ),
        ClassNodeDescription(
            path = ["CustomerID2"],
            tag = "CustomerID",
            pick_strategy = PickStrategy.HighScore,
            data_type = DataType.String
        ),
        ClassNodeDescription(
            path = ["InvoiceDate2"],
            tag = "InvoiceDate",
            pick_strategy = PickStrategy.HighScore,
            data_type = DataType.String
        ),
        ClassNodeDescription(
            path = ["Country2"],
            tag = "Country",
            pick_strategy = PickStrategy.HighScore,
            data_type = DataType.String
        )
    ]
)

In [137]:
from datalogue.models.training import *
import uuid

modelUuid = dtl.training.get_trainings(uuid.UUID(str(OntologyID)))[0].id
print(modelUuid)

ffcc1ec1-89df-4df8-bac4-0db036f9977f


In [138]:
# Define classify transformation

#tx_definition = Definition(    # (List[Transformation], pipelines: List['Definition'], target_datastore )
#            [
#                Classify(training_id = modelUuid, use_context=True, include_classes=False, include_scores=False),
#                std_schema
#            ], # List of transformations
#        [], # pipelines list
#            my_output_store, # target_datastore
#        )

#print(type(tx_definition))
#print(tx_definition)

In [139]:
x = dtl.training.get_trainings('ce9ced64-dbc6-468e-b86a-e3941c025e37')
print(type(x))
print(x[0].id)

<class 'list'>
ffcc1ec1-89df-4df8-bac4-0db036f9977f


In [140]:
# Define classify transformation
from datalogue.models.transformations.classify import Classifier, MLMethod
tx_definition = Definition(    # (List[Transformation], pipelines: List['Definition'], target_datastore )
            [
                Classify(Classifier([MLMethod('ffcc1ec1-89df-4df8-bac4-0db036f9977f')])),
                std_schema
            ], # List of transformations
        [], # pipelines list
            my_output_store, # target_datastore
        )

print(type(tx_definition))
print(tx_definition)

<class 'datalogue.models.stream.Definition'>
Pipeline(type: [Classify(classifier: Classifier(default_class: None classification_methods: 
[MLMethod(
 model_id: ffcc1ec1-89df-4df8-bac4-0db036f9977f, threshold: None
)]), fields_to_target: None, add_class_fields: False, add_score_fields: False), Structure(structure: [ClassNodeDescription(path: ['InvoiceNo2'], tag: InvoiceNo, strategy: PickStrategy.HighScore, dataType: DataType.String), ClassNodeDescription(path: ['UnitPrice2'], tag: UnitPrice, strategy: PickStrategy.HighScore, dataType: DataType.String), ClassNodeDescription(path: ['SKU2'], tag: SKU, strategy: PickStrategy.HighScore, dataType: DataType.String), ClassNodeDescription(path: ['Description2'], tag: Description, strategy: PickStrategy.HighScore, dataType: DataType.String), ClassNodeDescription(path: ['Quantity2'], tag: Quantity, strategy: PickStrategy.HighScore, dataType: DataType.String), ClassNodeDescription(path: ['CustomerID2'], tag: CustomerID, strategy: PickStrategy.HighS

In [141]:
# Define n stream(s), where n is number of datastore connections created from S3 bucket scan
n = len(current_stores)
i = 1

list_of_streams = []
for i in range(n):
    stream = Stream(current_stores[i], [tx_definition])
    i += 1
    list_of_streams.append(stream)

print(len(list_of_streams), ' streams in collection')

3  streams in collection


In [142]:
# Put the streams in a collection

stream_collection = dtl.stream_collection.create(
    list_of_streams,
    'demo-CPGPipeline'
)

#print(type(stream_collection))
#print(stream_collection)

In [143]:
# Run the Collection

dtl.stream_collection.run(stream_collection.id)

[Job(id: UUID('4215f519-16a3-4253-8d24-86a6522011b8'), stream_id: UUID('f9a32cc6-3b72-437e-badd-4fcce91ea5f8'), stream_collection_id: UUID('bc4b73b9-6d85-43d2-835e-1c49f3169073'), status: Scheduled, run_at: datetime.datetime(2019, 11, 21, 22, 23, 54, tzinfo=tzutc()), created_by: UUID('5b333964-8fab-4ab0-9052-25f69fcb8689'), remaining_time_millis: 9223372036854775807, percent_progress: 0, errors: None, ended_at: None,
 Job(id: UUID('0415a783-355c-42d3-b0ee-56e620faab3d'), stream_id: UUID('3a10fbe9-ccb2-440f-967c-1ca3d822e5fa'), stream_collection_id: UUID('bc4b73b9-6d85-43d2-835e-1c49f3169073'), status: Scheduled, run_at: datetime.datetime(2019, 11, 21, 22, 23, 54, tzinfo=tzutc()), created_by: UUID('5b333964-8fab-4ab0-9052-25f69fcb8689'), remaining_time_millis: 9223372036854775807, percent_progress: 0, errors: None, ended_at: None,
 Job(id: UUID('ec09de69-80b3-4460-b98e-659c03b554a0'), stream_id: UUID('4a731ac8-7119-44d4-a4cc-9a335487766f'), stream_collection_id: UUID('bc4b73b9-6d85-43d2