Created and developed by Arman Abtahi at Vector Institute, University of Toronto

arman.abtahi@gmail.com

In [None]:
import json
from utils import (
Identity,
Policy,
Events,
OSDataSource,
Data_Catalog_Pipeline,
Data_Flow_Pipeline,
Data_Integration_Pipeline,
schedule_time,
config,path2uri
)

# Parameters

In [3]:
## Environment Values
env={
    "compartment_name":"soterlabs",
    "compartment_id":"XXXXXXXXXXXXX",
    "namespace_name":"XXXXXXXXXXXXX",
    "bucket_name":"Arman_Crypto_2023",
    "region": "ca-toronto-1",
    "user":"XXXXXXXXXXXXX",
    }

NewsData_env={
    "ND_ingest_path":"data-lake/raw-data/news",
    "api_key_datanews":"XXXXXXXXXXXXX",
    "country":"ca,us",
    "language":"en",
    "q":"Bitcoin",
    "from_date":"2021-04-01",
    "to_date":"2023-03-01",
    "interval_hour":24,
    }

CoinWatch_env={
    "CW_ingest_path":"data-lake/raw-data/crypto",
    "api_key_coinwatch":"XXXXXXXXXXXXX",
    "currency":"USD",
    "code":"BTC",
    "from_date":"2021-04-01",
    "to_date":"2023-03-01",
    "interval_hour":8,
    }

adw_env={
    "ND_ingest_path":"data-lake/raw-data/news",
    "CW_ingest_path":"data-lake/raw-data/crypto",
    'dsn' : "armanfs_high",
    'userpwd':"XXXXXX",
    'user':'ADMIN',
    'tnsnames_dir':'application/Wallet_ArmanFS/tnsnames.ora',
    'ewallet_dir':'application/Wallet_ArmanFS/ewallet.pem',
    }

key_files={
    "key_file":"ocikey/-03-02-00-27.pem",
    "fingerprint":"XXXXXXXXXXXXX",
}

archive_path='application/archive.zip'
logs_bucket_path="application/logs"

## application path in Object Storage
#history
API_newsdata_history_path='application/API_newsdata.py'
API_coinwatch_history_path='application/API_coinwatch.py'
OS2ADW_history_path='application/OS2ADW_history.py'
ADW_Feature_Extraction_history_path='application/ADW_Feature_Extraction_history.py'
#jobs
API_newsdata_path='application/API_newsdata.py'
API_coinwatch_path='application/API_coinwatch.py'
OS2ADW_path='application/OS2ADW.py'
ADW_Feature_Extraction_path='application/ADW_Feature_Extraction.py'
scoring_path='application/scoring.py'

## training application path in Object Storage
model_archive_path='application/models/archive.zip'
model_artifact_path='application/models/artifact'
model_logs_bucket_path="application/models/logs"
model_training_path='application/models/RF_model_training.py'

## ADW wallet path in Object Storage
Wallet_ArmanFS_tnsnames_path='application/Wallet_ArmanFS/tnsnames.ora'
Wallet_ArmanFS_ewallet_path='application/Wallet_ArmanFS/ewallet.pem'

env_str = json.dumps(env)
NewsData_env_str = json.dumps(NewsData_env)
CoinWatch_env_str = json.dumps(CoinWatch_env)
adw_env_str = json.dumps(adw_env)

## Notification in slack
slack_topic_id="XXXXXXXX"

# Configuration

In [3]:
config   = config(env,key_files,config_uri="ocikey/config.txt")
identity = Identity(config)



# Policies

In [4]:
policy=Policy(identity,env)
policy.create(
    name        =  "Arman-dataflowrun-policy",
    statements  = ["Allow any-user to manage objects in tenancy where ALL { request.principal.type='dataflowrun'}",
                   "Allow any-user to manage buckets in tenancy where ALL { request.principal.type='dataflowrun'}"
                  ],
    description =  "Grant access dataflow to manage buckets and objects in data flow run"
    )


policy.create(
    name        =  "Arman-data-integration-policy",
    statements  = ["Allow any-user to manage objects in tenancy where ALL { request.principal.type='disworkspace'}",
                   "Allow any-user to manage buckets in tenancy where ALL { request.principal.type='disworkspace'}"],
    description =  "Grant access to data integration in the tenancy"
    )

policy.create(
    name        =  "Arman-data-integration-dataflow-policy",
    statements  = ["Allow any-user to manage dataflow-application in tenancy where ALL { request.principal.type='disworkspace'}",
                   "Allow any-user to manage dataflow-run         in tenancy where ALL { request.principal.type='disworkspace'}",
                   "Allow any-user to read   dataflow-application in tenancy where ALL { request.principal.type='disworkspace'}"
                  ],
    description =  "Grant access to data integration to manage dataflow in the tenancy"
    )

policy.create(
    name        =  "Arman-event-policy",
    statements  = ["Allow any-user to manage  cloudevents-rules in tenancy",
                   "Allow any-user to inspect compartments      in tenancy",
                   "Allow any-user to use     tag-namespaces    in tenancy",
                   "Allow any-user to use     ons-topic         in tenancy"
                  ],
    description =  "Grant access to events-rules and notifications"
    )








<oci.response.Response at 0x7fa2856ffe20>

# Events

In [5]:
events=Events(identity,env)
create_rule_WS_begin_response=events.create_rule(
            display_name = "workspace-creation-begins",
            condition    = "{\"eventType\":[\"com.oraclecloud.dataintegration.createworkspace.begin\"]}",
            topic_id     = slack_topic_id,
            description  = "send a notification when create-workspace begins"
        )
create_rule_WS_end_response=events.create_rule(
            display_name = "workspace-creation-ends",
            condition    = "{\"eventType\":[\"com.oraclecloud.dataintegration.createdisworkspace.end\"]}",
            topic_id     = slack_topic_id,
            description  = "send a notification when create-workspace ends"
        )
create_rule_DF_end_response=events.create_rule(
            display_name = "dataflow-run-end",
            condition    = "{\"eventType\":[\"com.oraclecloud.dataflow.createrun.end\"]}",
            topic_id     = slack_topic_id,
            description  = "send a notification when data-flow-run ends"
        )







# Move applications into Object Storage

In [6]:
OSDS=OSDataSource(identity,env)
OSDS.create_bucket()





In [7]:
with open('Wallet_ArmanFS/tnsnames.ora', 'rb') as file:
    binary_file = file.read()
tnsnames_put_object_response=OSDS.create_object(binary_file,Wallet_ArmanFS_tnsnames_path)

with open('Wallet_ArmanFS/ewallet.pem', 'rb') as file:
    binary_file = file.read()
ewallet_put_object_response=OSDS.create_object(binary_file,Wallet_ArmanFS_ewallet_path)

with open('applications/API_newsdata_history.py', 'rb') as file:
    binary_file = file.read()
API_newsdata_history_put_object_response=OSDS.create_object(binary_file,API_newsdata_history_path)

with open('applications/API_coinwatch_history.py', 'rb') as file:
    binary_file = file.read()
API_newsdata_history_put_object_response=OSDS.create_object(binary_file,API_coinwatch_history_path)

with open('applications/OS2ADW_history.py', 'rb') as file:
    binary_file = file.read()
OS2ADW_history_put_object_response=OSDS.create_object(binary_file,OS2ADW_history_path)

with open('applications/ADW_Feature_Extraction_history.py', 'rb') as file:
    binary_file = file.read()
ADW_Feature_Extraction_history_put_object_response=OSDS.create_object(binary_file,ADW_Feature_Extraction_history_path)

with open('applications/API_newsdata.py', 'rb') as file:
    binary_file = file.read()
API_newsdata_put_object_response=OSDS.create_object(binary_file,API_newsdata_path)

with open('applications/API_coinwatch.py', 'rb') as file:
    binary_file = file.read()
API_newsdata_put_object_response=OSDS.create_object(binary_file,API_coinwatch_path)

with open('applications/OS2ADW.py', 'rb') as file:
    binary_file = file.read()
OS2ADW_put_object_response=OSDS.create_object(binary_file,OS2ADW_path)

with open('applications/ADW_Feature_Extraction.py', 'rb') as file:
    binary_file = file.read()
ADW_Feature_Extraction_put_object_response=OSDS.create_object(binary_file,ADW_Feature_Extraction_path)

with open('applications/scoring.py', 'rb') as file:
    binary_file = file.read()
scoring_put_object_response=OSDS.create_object(binary_file,scoring_path)

with open('training_application/RF_model_training.py', 'rb') as file:
    binary_file = file.read()
RF_model_training_put_object_response=OSDS.create_object(binary_file,model_training_path)

with open('training_application/archive.zip', 'rb') as file:
    binary_file = file.read()
archive_put_object_response=OSDS.create_object(binary_file,model_archive_path)

with open('application/archive.zip', 'rb') as file:
    binary_file = file.read()
archive_put_object_response=OSDS.create_object(binary_file,archive_path)











# Data Integration (Scheduling)

In [8]:
DIP=Data_Integration_Pipeline(identity,env)
create_workspace_response     = DIP.create_workspace(
                                                display_name = "Arman_Workspace"
                                                )





In [32]:
create_project_response       = DIP.create_project(
                                                name = "dataflow_projects"
                                                )
create_application_response   = DIP.create_application(
                                                name = "dataflow_applications"
                                                )





In [33]:
## how long the scheduling will work
integration_time = schedule_time()
timestamp_schedule_begin = integration_time.next_sharp_hour_timestamp_ms()
timestamp_schedule_end   = integration_time.next_sharp_hour_timestamp_ms(days=10)

# Create Data Flow Applications
We create applications first, then run them or make schedules. 

In [34]:
DFP       = Data_Flow_Pipeline(identity,env)




## <font color='red'>Historical Data</font>

### Ingest history data from NewsData

In [None]:
newsdata_history_application_response = DFP.create_application(
                                       display_name    ='Arman-request_api_newsdata_history', 
                                       file_uri        = path2uri(env,API_newsdata_path), 
                                       logs_bucket_uri = path2uri(env,logs_bucket_path),
                                       archive_uri     = path2uri(env,archive_path),
                                      )
dataflow_history_newsdata_application_id  = newsdata_history_application_response.data.id

In [None]:
arguments_NewsData_history  = ["{}".format(env_str),"{}".format(NewsData_env_str)]
newsdata_history_create_run_response = DFP.create_run(
                                        display_name = 'run_request_api_newsdata_history',
                                        arguments    = arguments_NewsData_history)

### Ingest history data from CoinWatch

In [None]:
coinwatch_history_application_response = DFP.create_application(
                                       display_name    ='Arman-request_api_coinwatch_history', 
                                       file_uri        = path2uri(env,API_coinwatch_path), 
                                       logs_bucket_uri = path2uri(env,logs_bucket_path),
                                       archive_uri     = path2uri(env,archive_path),
                                      )
dataflow_history_coinwatch_application_id  = coinwatch_history_application_response.data.id

In [None]:
arguments_CoinWatch_history  = ["{}".format(env_str),"{}".format(CoinWatch_env_str)]
coinwatch_history_create_run_response = DFP.create_run(
                                        display_name    = 'run_request_api_coinwatch_history',
                                        arguments       = arguments_CoinWatch_history)

### Historical Data: Object storage to ADW

In [None]:
arguments_OS2ADW_history_application  = ["{}".format(env_str),"{}".format(adw_env_str)]
OS2ADW_history_application_response = DFP.create_application(
                                       display_name    ='Arman-OS2ADW-history', 
                                       file_uri        = path2uri(env,OS2ADW_history_path), 
                                       logs_bucket_uri = path2uri(env,logs_bucket_path),
                                       archive_uri     = path2uri(env,archive_path),
                                       arguments       = arguments_OS2ADW_history_application
                                      )
OS2ADW_history_application_id  = OS2ADW_history_application_response.data.id

In [None]:
OS2ADW_history_create_run_response = DFP.create_run(display_name='run_OS2ADW_history')

### Historical Data: Feature Extraction

In [None]:
arguments_ADW_Feature_Extraction_history  = ["{}".format(env_str),"{}".format(adw_env_str)]
ADW_Feature_Extraction_history_application_response = DFP.create_application(
                                       display_name    ='Arman-ADW-Feature-Extraction-history', 
                                       file_uri        = path2uri(env,ADW_Feature_Extraction_history_path), 
                                       logs_bucket_uri = path2uri(env,logs_bucket_path),
                                       archive_uri     = path2uri(env,archive_path),
                                       arguments       = arguments_ADW_Feature_Extraction_history
                                      )
ADW_Feature_Extraction_history_application_id  = ADW_Feature_Extraction_history_application_response.data.id

In [None]:

ADW_Feature_Extraction_history_create_run_response = DFP.create_run(display_name='run_ADW_Feature_Extraction_history')

## <font color='red'>Latest Data</font>

### Ingest data from NewsData

In [35]:
arguments_NewsData  = ["{}".format(env_str),"{}".format(NewsData_env_str)]
newsdata_application_response = DFP.create_application(
                                       display_name    ='Arman-request_api_newsdata_latest', 
                                       file_uri        = path2uri(env,API_newsdata_path), 
                                       logs_bucket_uri = path2uri(env,logs_bucket_path),
                                       archive_uri     = path2uri(env,archive_path),
                                       arguments    = arguments_NewsData
                                      )
dataflow_newsdata_application_id  = newsdata_application_response.data.id




In [36]:
# newsdata_create_run_response = DFP.create_run(display_name = 'run_request_api_newsdata_latest')

In [37]:
newsdata_create_task_response          = DIP.create_task(
                                                name = "newsdata_ingest_task",
                                                dataflow_application_id = dataflow_newsdata_application_id
                                                )
newsdata_create_patch_response         = DIP.create_patch(
                                                name="newsdata_ingest_publish"
                                                )
newsdata_create_schedule_response      = DIP.create_schedule(
                                                name   = "newsdata_ingest_schedule",
                                                hour   = 1,
                                                minute = 0,
                                                second = 0
                                                )






In [38]:
newsdata_create_task_schedule_response = DIP.create_task_schedule(
                                                    name = "newsdata_ingest_task_schedule",
                                                    is_enabled = True, 
                                                    start_time_millis = timestamp_schedule_begin,
                                                    end_time_millis   = timestamp_schedule_end,
                                                    expected_duration = 10,
                                                    expected_duration_unit = "MINUTES"
                                                    )




### Ingest data from CoinWatch 

In [39]:
arguments_CoinWatch  = ["{}".format(env_str),"{}".format(CoinWatch_env_str)]
coinwatch_application_response = DFP.create_application(
                                       display_name    ='Arman-request_api_coinwatch_latest', 
                                       file_uri        = path2uri(env,API_coinwatch_path), 
                                       logs_bucket_uri = path2uri(env,logs_bucket_path),
                                       archive_uri     = path2uri(env,archive_path),
                                       arguments       = arguments_CoinWatch,
                                      )
dataflow_coinwatch_application_id  = coinwatch_application_response.data.id




In [40]:
# coinwatch_create_run_response = DFP.create_run(display_name    = 'run_request_api_coinwatch_latest')

In [41]:
coinwatch_create_task_response          = DIP.create_task(
                                                name = "coinwatch_ingest_task",
                                                dataflow_application_id = dataflow_newsdata_application_id
                                                )
coinwatch_create_patch_response         = DIP.create_patch(
                                                name = "coinwatch_ingest_publish"
                                                )
coinwatch_create_schedule_response      = DIP.create_schedule(
                                                name   = "coinwatch_ingest_schedule",
                                                hour   = 1,
                                                minute = 0,
                                                second = 0
                                                )






In [42]:
coinwatch_create_task_schedule_response = DIP.create_task_schedule(
                                                    name="coinwatch_ingest_task_schedule",
                                                    is_enabled=True, 
                                                    start_time_millis = timestamp_schedule_begin,
                                                    end_time_millis   = timestamp_schedule_end,
                                                    expected_duration = 10,
                                                    expected_duration_unit = "MINUTES"
                                                    )




### Object Storage to ADW

In [43]:
arguments_OS2ADW  = ["{}".format(env_str),"{}".format(adw_env_str)]
OS2ADW_application_response = DFP.create_application(
                                       display_name    ='Arman-OS2ADW_latest', 
                                       file_uri        = path2uri(env,OS2ADW_path), 
                                       logs_bucket_uri = path2uri(env,logs_bucket_path),
                                       archive_uri     = path2uri(env,archive_path),
                                       arguments       = arguments_OS2ADW
                                      )
OS2ADW_application_id  = OS2ADW_application_response.data.id




In [44]:
# OS2ADW_create_run_response = DFP.create_run(display_name='run_OS2ADW_latest')

In [45]:
OS2ADW_create_task_response          = DIP.create_task(
                                                name = "OS_2_ADW_task",
                                                dataflow_application_id = OS2ADW_application_id
                                                )
OS2ADW_create_patch_response         = DIP.create_patch(
                                                name = "OS_2_ADW_publish"
                                                )
OS2ADW_create_schedule_response      = DIP.create_schedule(
                                                name   = "OS_2_ADW_schedule",
                                                hour   = 1,
                                                minute = 15,
                                                second = 0
                                                )






In [46]:
OS2ADW_create_task_schedule_response = DIP.create_task_schedule(
                                                    name="OS_2_ADW_task_schedule",
                                                    is_enabled=True, 
                                                    start_time_millis = timestamp_schedule_begin,
                                                    end_time_millis   = timestamp_schedule_end,
                                                    expected_duration = 10,
                                                    expected_duration_unit = "MINUTES"
                                                    )




### Feature Store in ADW

In [47]:
arguments_ADW_Feature_Extraction  = ["{}".format(env_str),"{}".format(adw_env_str)]
ADW_Feature_Extraction_application_response = DFP.create_application(
                                       display_name    ='Arman-ADW-Feature-Extraction', 
                                       file_uri        = path2uri(env,ADW_Feature_Extraction_path), 
                                       logs_bucket_uri = path2uri(env,logs_bucket_path),
                                       archive_uri     = path2uri(env,archive_path),
                                       arguments       = arguments_ADW_Feature_Extraction
                                      )
ADW_Feature_Extraction_application_id  = ADW_Feature_Extraction_application_response.data.id




In [48]:
# ADW_Feature_Extraction_create_run_response = DFP.create_run(display_name='run_ADW_Feature_Extraction')

In [49]:
Feature_Extraction_create_task_response          = DIP.create_task(
                                                name = "Feature_Extraction_task",
                                                dataflow_application_id = ADW_Feature_Extraction_application_id
                                                )
Feature_Extraction_create_patch_response         = DIP.create_patch(
                                                name = "Feature_Extraction_publish"
                                                )
Feature_Extraction_create_schedule_response      = DIP.create_schedule(
                                                name   = "Feature_Extraction_schedule",
                                                hour   = 1,
                                                minute = 30,
                                                second = 0
                                                )






In [51]:
Feature_Extraction_create_task_schedule_response = DIP.create_task_schedule(
                                                    name="Feature_Extraction_task_schedule",
                                                    is_enabled=True, 
                                                    start_time_millis = timestamp_schedule_begin,
                                                    end_time_millis   = timestamp_schedule_end,
                                                    expected_duration = 10,
                                                    expected_duration_unit = "MINUTES"
                                                    )




# Training

### Create Data Science Project

In [52]:
import oci
DS=oci.data_science.DataScienceClient(identity.config)
create_project_details=oci.data_science.models.CreateProjectDetails(compartment_id=env['compartment_id'],
                                                                   display_name='Arman-RF-crypto')
create_project_response=DS.create_project(create_project_details=create_project_details)
project_id=create_project_response.data.id

In [53]:
DS_env={
    'project_id' : project_id,
    'deployment_instance_shape':"VM.Standard.E4.Flex"
    }
DS_env_str = json.dumps(DS_env)

### Model Training: Data Flow Application

In [54]:
arguments_RF_model_training  = ["{}".format(env_str),"{}".format(adw_env_str),"{}".format(DS_env_str)]
RF_model_training_application_response = DFP.create_application(
                                       display_name          ='Arman-RF-model-training', 
                                       file_uri              = path2uri(env,model_training_path), 
                                       logs_bucket_uri       = path2uri(env,model_logs_bucket_path),
                                       archive_uri           = path2uri(env,model_archive_path),
                                       arguments             = arguments_RF_model_training,
                                       driver_shape          = "VM.Standard.E4.Flex",
                                       executor_shape        = "VM.Standard.E4.Flex",
                                       driver_shape_config   = {"ocpus":2,"memoryInGBs":16}, 
                                       executor_shape_config = {"ocpus":2,"memoryInGBs":16}
                                      )
RF_model_training_application_id  = RF_model_training_application_response.data.id




In [55]:
#RF_model_training_create_run_response = DFP.create_run(display_name='run_RF-model-training')

In [56]:
RF_model_training_create_task_response          = DIP.create_task(
                                                name = "RF_model_training_task",
                                                dataflow_application_id = RF_model_training_application_id
                                                )
RF_model_training_create_patch_response         = DIP.create_patch(
                                                name = "RF_model_training_publish"
                                                )
RF_model_training_create_schedule_response      = DIP.create_schedule(
                                                name   = "RF_model_training_schedule",
                                                hour   = 24,
                                                minute = 0,
                                                second = 0
                                                )






In [57]:
RF_model_training_create_task_schedule_response = DIP.create_task_schedule(
                                                    name="RF_model_training_task_schedule",
                                                    is_enabled=True, 
                                                    start_time_millis = timestamp_schedule_begin,
                                                    end_time_millis   = timestamp_schedule_end,
                                                    expected_duration = 30,
                                                    expected_duration_unit = "MINUTES"
                                                    )




# Scoring

In [58]:
arguments_scoring  = ["{}".format(env_str),"{}".format(adw_env_str)]
scoring_application_response = DFP.create_application(
                                       display_name    ='Arman-scoring', 
                                       file_uri        = path2uri(env,scoring_path), 
                                       logs_bucket_uri = path2uri(env,logs_bucket_path),
                                       archive_uri     = path2uri(env,archive_path),
                                       arguments       = arguments_scoring
                                      )
scoring_application_id  = scoring_application_response.data.id




In [59]:
#scoring_create_run_response = DFP.create_run(display_name='run_scoring')

In [60]:
scoring_create_task_response          = DIP.create_task(
                                                name = "scoring_task",
                                                dataflow_application_id = scoring_application_id
                                                )
scoring_create_patch_response         = DIP.create_patch(
                                                name = "scoring_publish"
                                                )
scoring_create_schedule_response      = DIP.create_schedule(
                                                name   = "scoring_schedule",
                                                hour   = 1,
                                                minute = 45,
                                                second = 0
                                                )






In [61]:
scoring_create_task_schedule_response = DIP.create_task_schedule(
                                                    name="scoring_task_schedule",
                                                    is_enabled=True, 
                                                    start_time_millis = timestamp_schedule_begin,
                                                    end_time_millis   = timestamp_schedule_end,
                                                    expected_duration = 10,
                                                    expected_duration_unit = "MINUTES"
                                                    )




# Data Catalog

In [63]:
catalog_time = schedule_time()
datetime_schedule_begin = catalog_time.next_sharp_hour_datetime()
datetime_schedule_end   = catalog_time.next_sharp_hour_datetime(days=10)

In [64]:
DCP=Data_Catalog_Pipeline(identity,env)

create_catalog_response        = DCP.create_catalog(display_name = "Arman-Crypto-Catalog")
create_dynamic_group_response  = DCP.create_dynamic_group(
                                                           dynamic_group_name = "Arman-data-catalog-dynamic-group",
                                                           description        = "data catalog dynamic group"
                                                          )
create_policy_response         = DCP.create_policy(
                                                 name        = 'Arman-data-catalog-dynamic-group-policy',
                                                 statements  = ["Allow dynamic-group Arman-data-catalog-dynamic-group to read object-family in tenancy"],
                                                 description = "Grant access to object storage resources in any compartment in the tenancy"
                                                        )
create_data_asset_response     = DCP.create_data_asset(
                                                     display_name = "Object Storage Data Asset",
                                                     description  = "Data asset for object storage"
                                                    )
create_connection_response     = DCP.create_connection(
                                                     display_name = "Data Asset Connection",
                                                     description  = "Connection for data asset"
                                                    )
create_job_definition_response = DCP.create_job_definition(
                                                        display_name = "Harvest Object Storage Data Asset"
                                                        )
create_job_response            = DCP.create_job(
                                               time_schedule_begin      = datetime_schedule_begin,
                                               time_schedule_end        = datetime_schedule_end,
                                               schedule_cron_expression = "@hourly",
                                               display_name             = "Scheduled Harvest Oracle Object Storage Data Asset"
                                              )











In [None]:
#DCP.delete_data_catalog()