# Export Monitoring Pool Data

## 1. Import

In [None]:
import requests
import pandas as pd
import json
import pycelonis
import gc
import os
from pycelonis import get_celonis
from pycelonis.ems import ExportType
from pycelonis.pql import PQL, PQLColumn, PQLFilter, OrderByColumn
from pycelonis.utils.parquet import read_parquet
from tqdm.auto import tqdm
from concurrent.futures import ThreadPoolExecutor
from IPython.display import clear_output

## 2. Connection Variables

In [None]:
# Connect to Celonis
celonis = get_celonis(key_type='APP_KEY')

## 3. Global Parameters

In [None]:
# Global parameters, such as CHUNK_SIZE (number of lines extracted on each file), PATH to store parquet files and maximum number of threads
# Data Pool: Monitoring Pool
MONITORING_POOL_ID = 'aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee' # <= Insert ID here
PATH = '../dir/Monitoring_Pool_Data' # <= Insert PATH here
IGNORE_FIELD = '_CELONIS_CHANGE_DATE'
CHUNK_SIZE = 500000
MAX_THREADS = 10

## 4. Delete Previous Execution Files

In [None]:
# Clean before start
for file in os.listdir(PATH):
    if file.endswith('.parquet'):
        file_path = os.path.join(PATH, file)
        os.remove(file_path)
        #print('Deleted File:',file_path)
    #endif
#endfor

## 5. Download Data From Monitoring Pool

In [None]:
# Get Monitoring Pool
data_pool = celonis.data_integration.get_data_pool(MONITORING_POOL_ID)

# Build PQL Queries List containing all Table Columns
queries = []

# Data Models From 'Monitoring Pool'
data_models = data_pool.get_data_models()

for data_model in tqdm(data_models):
    # Tables from each Data Model
    tables = data_model.get_tables()
    
    for table in tables:
        # Get Columns to build PQL Query
        cols = table.get_columns()
        query = PQL(distinct=False)
        
        for col in cols:
            # Format Query String: "TABLE"."COLUMN"
            query_str = '"{}"."{}"'.format(table.name,col.name)
            if col.name != IGNORE_FIELD:
                query += PQLColumn(name=col.name, query=query_str)
            #endif
        #endfor
        
        # Append Query into Queries List
        queries.append(
            {
                'data_model_id': data_model.id,
                'table_name': table.name,
                'query': query
            }
        )
    #endfor
#endfor

In [None]:
# Loop Through Queries - Generate Files in Parallel
generated_files = []

# Downlaod Parquet Function
def download_parquet(obj):
    # Get Data Model ID
    data_model = data_pool.get_data_model(obj['data_model_id'])
    
    # Config Data Export Object
    my_query = obj['query']
    limit = CHUNK_SIZE
    offset = 0
    count = 1
    control_file_size = 0
    
    while True:
        # Init
        my_query.limit = limit
        my_query.offset = offset
        data_export = data_model.create_data_export(query=my_query, export_type=ExportType.PARQUET)
        
        # Read Chunks
        print('Extracting Table:', obj['table_name'])
        data_export.wait_for_execution()
        chunks = data_export.get_chunks()
        
        # Write Chunks to File
        for chunk in chunks:
        
            # Control Variables
            if control_file_size == 0:
                # First Execution
                control_file_size = chunk.getbuffer().nbytes
            elif chunk.getbuffer().nbytes != control_file_size:
                # Second to N-th execution
                control_file_size = chunk.getbuffer().nbytes
            elif chunk.getbuffer().nbytes == control_file_size:
                # Repeated Execution on EMPTY file, exits function
                clear_output(wait=True)
                return
            #endif
            
            # Write File
            table_name = obj['table_name']
            file = f'{table_name}_{count}.parquet'
            file_path = os.path.join(PATH, file)
            
            generated_files.append({'table_name': table_name, 'index': count, 'file': file})
            
            with open(file_path, 'wb') as f:
                f.write(chunk.read())
            #endwith
            
            # Increment File Counter
            count += 1
            
        #endfor
        
        # Call Garbage Collector
        del chunks, chunk, data_export
        gc.collect()
        
        # Increment offset
        offset += CHUNK_SIZE
    #endwhile
#enddef

In [None]:
# Execute Threads
with ThreadPoolExecutor(MAX_THREADS) as executor:
    for q in queries:
        executor.submit(download_parquet, q)
    #endfor
#endwith

## 6. Export Monitoring Pool Files to any Celonis Data Pool

In [None]:
# Create Generated Files Data Frame
generated_files_df = pd.DataFrame(generated_files)
generated_files_df = generated_files_df.sort_values(by=['table_name','index'],ascending=[True,True])
generated_files_df.head()

In [None]:
# Create Aggregation with Max Number of Files
files_index_df = generated_files_df.groupby(['table_name']).max('index')
files_index_df.head()

In [None]:
# Set Data Pool to 'TARGET POOL'
target_pool = 'ffffffff-gggg-hhhh-iiii-jjjjjjjjjjjj' # <= Insert ID here
data_pool = celonis.data_integration.get_data_pool(target_pool)
data_pool

In [None]:
# Push Local Data Into Destination Data Pool 'TARGET POOL'
MAX_THREADS = 2

def push_table(table_name):
    df = generated_files_df[generated_files_df['table_name'] == table_name]
    for index, row in df.iterrows():
        push_chunk(row['table_name'],row['index'],row['file'])
    #endfor
    
    # Clear Output
    clear_output(wait=True)
#enddef

def push_chunk(table,index,file):
    # Set File Path
    file_path = os.path.join(PATH, file)
    
    # Get file info
    size_mb = os.path.getsize(file_path) / 1024 ** 2 # megabytes
    size_kb = os.path.getsize(file_path) / 1024 # kilobytes
    
    # Print file info
    if size_mb < 0.1:
        size = round(size_kb, 1)
        print('File:', file.split('/').pop(), '(', size, 'kb )')
    else:
        size = round(size_mb, 1)
        print('File:', file.split('/').pop(), '(', size, 'mb )')
    #endif
    
    # Open file and convert Parquet into Data Frame
    with open(file_path, 'rb') as f:
        table_df = read_parquet(f)
    #endwith
    
    # When Index = 1, overwrites table
    if index == 1:
        data_pool.create_table(
            table_df,
            table_name=table,
            drop_if_exists=True,
            force=True
        )
    
    # When Index > 1, append table
    else:
        dp_table = data_pool.get_table(table)
        dp_table.append(table_df)
    #endif
    
    # Free memory
    del table_df
    gc.collect()
#enddef

In [None]:
with ThreadPoolExecutor(MAX_THREADS) as executor:
    for index, row in files_index_df.iterrows():
        executor.submit(push_table, index)
    #endfor
#endwith