In [126]:
from kdp_connector import KdpConn
import pandas as pd
import kdp_api
from kdp_api.api import write_api
from kdp_api.api import datasets_api
import numpy as np
from datetime import datetime
from dateutil.relativedelta import relativedelta

import os

# This notebook will go over how to:
## 1. Write data into KDP (a. As a new dataset b. Appending to an existing dataset c. Overwriting an existing dataset) from a manual upload or possibly from an automatic process
## 2. Read data from KDP
## 3. Perform 1 and 2 in a flow (forward or backward), making it possible to create dataflows that are connected to external data sources
## using Pandas dataframes. There will be a separate notebook showing Pyspark

### Define a few helper functions

In [43]:
def dfs_equivalent(df1, df2):
    
    '''
    Function to help determine if two dataframe are equivalent. Equivalent meaning that they share the same
    data except the order of rows or columns differ
    
    '''
    
    
    #Number of duplicated rows
    df1_dupe = df1.duplicated().sum()
    df2_dupe = df2.duplicated().sum()
    
    if df1_dupe != df2_dupe:
        return False

    DF1 = df1.copy().drop_duplicates()
    DF2 = df2.copy().drop_duplicates()

    mergeData = pd.merge(DF1, DF2, on = list(DF2.columns), how = 'inner')

    if len(DF1) == len(DF2) == len(mergeData):
        return True
    else:
        return False

In [44]:
def dfs_similar_data(df1, df2):
    
    '''
    Function to help determine if two dataframes are similar. Similar meaning that they have the same column
    names and data types for those columns.
    '''
    
    df1_dataTypes = pd.DataFrame(df1.dtypes).reset_index()
    df1_dataTypes.columns = ['Column', 'Type']
    df2_dataTypes = pd.DataFrame(df2.dtypes).reset_index()
    df2_dataTypes.columns = ['Column', 'Type']
    
    mergeData = pd.merge(df1_dataTypes, df2_dataTypes, on = ['Column', 'Type'], how = 'inner')
    
    if len(df1_dataTypes) == len(df2_dataTypes) == len(mergeData):
        return True
    else:
        return False

In [45]:
def dfs_difference_types(df1, df2):
    '''
    Helper function used to help output the differences between two input dataframes when they are not the
    same.
    '''
    
    df1_dataTypes = pd.DataFrame(df1.dtypes).reset_index()
    df1_dataTypes.columns = ['Column', 'Type']
    df2_dataTypes = pd.DataFrame(df2.dtypes).reset_index()
    df2_dataTypes.columns = ['Column', 'Type']
    
    mergeData = pd.merge(df1_dataTypes, df2_dataTypes, on = ['Column', 'Type'], how = 'outer', indicator = True)
    mergeData = mergeData[mergeData['_merge'].isin(['left_only', 'right_only'])]
    
    return mergeData

In [62]:
def overwrite_to_kdp(data, dataset_id, workspace_id, jwt, batch_size, starting_record_id, equivalenceCheck):
    '''
    This function provides a way to more directly write over an existing dataset name by deleting the existing
    dataset and replacing it with the transformed version. 
    
    This feature likely should be used for normalizations as with transformations are typically written to another
    dataset.
    
    Ideally, there would be a way to define a custom dataset ID in order to assign the new dataset to have the
    same dataset ID as the original so that any pointers to the dataset ID are not disrupted. Slight modifications
    would be required to accomodate for the change since two datasets can't have the same ID. A temporary dataset
    ID would need to be created first to ensure the ingest is working properly before deleting the original dataset
    then creating the replacement dataset with the original dataset ID.
    
    Parameters:
    
    data - pandas df to write to KDP, the one that was read and normalized/transformed.
    dataset_id - the dataset ID of this of the dataset originally read into Jupyter
    workspace_id - matches initial settings
    jwt - matches initial settings
    batch_size - matches initial settings
    starting_record_id - matches initial settings
    equivalenceCheck - boolean option to check if input dataframe and reading from KDP are equivalent (having
    the same rows except in a different order).
    
    '''
    
    ingestFailed = False
    
    #API Config
    configuration = kdp_api.Configuration(
        host='https://api.dev.koverse.com'
    )
    configuration.access_token = jwt

    
    #Get current dataset name
    current_name = kdp_conn.get_dataset(dataset_id=dataset_id, jwt=jwt).name
    
    #Create new dataset with same name on KDP
    dataset = kdp_conn.create_dataset(name=current_name, workspace_id=workspace_id, jwt=jwt)
    new_dataset_id = dataset.id
    
    #Ingest data into newly created dataset
    try:
        partitions_set = kdp_conn.ingest(data, new_dataset_id, jwt, batch_size)
    except:
        ingestFailed = True
    
    if not ingestFailed and equivalenceCheck:
        #Read df from KDP
        dfCheck = kdp_conn.read_dataset_to_pandas_dataframe(dataset_id=new_dataset_id,
                                                  jwt=jwt,
                                                  starting_record_id=starting_record_id,
                                                  batch_size=batch_size)
        if dfs_equivalent(dfCheck, data):
            print('equivalenceCheck pass')
        else:
            print('equivalenceCheck fail:\n')
            print('Input data length: {}\n'.format(len(data)))
            print('Input data non-duplicated length: {}\n'.format(len(data.drop_duplicates())))
            print('KDP data length: {}\n'.format(len(dfCheck)))
            print('KDP data non-duplicated length: {}\n'.format(len(dfCheck.drop_duplicates)))
            print('Merge data length: {}'.format(len(pd.merge(data.drop_duplicates(), dfCheck.drop_duplicates(), 
                                                              on = list(data.columns), how = 'inner'))))
            raise Exception('equivalenceCheck failed, if wish to continue, disable this check.')
        
             
    #API Connect and delete either new dataset or old dataset by id
    with kdp_api.ApiClient(configuration) as api_client:
        
        if ingestFailed:
            try:
                print('Ingest failed. Attempting to delete newly created dataset')
                api_instance = datasets_api.DatasetsApi(api_client)
                api_instance.datasets_id_delete(id = new_dataset_id)
                print('Newly created dataset {} was successfully deleted.'.format(new_dataset_id))
            except kdp_api.ApiException as e:
                print("Exception : %s\n" % e)
                raise Exception("Error deleting associated dataset id from KDP. See printed error message above.")
            raise Exception('Ingest failed, exiting')
            
        else:
            print('Ingest successful. Deleting old dataset.')
            try:
                api_instance = datasets_api.DatasetsApi(api_client)
                api_instance = datasets_api.DatasetsApi(api_client)
                api_instance.datasets_id_delete(id = dataset_id)
                print('Dataset {} was successfully deleted.'.format(dataset_id))
            except kdp_api.ApiException as e:
                print("Exception : %s\n" % e)
                raise Exception("Error deleting associated dataset id from KDP. See printed error message above.")

    return new_dataset_id

In [63]:
def write_to_new_kdp(data, new_dataset_name, workspace_id, jwt, batch_size, starting_record_id, equivalenceCheck):
    
    '''
    This function provides a way to more directly write a dataframe into KDP as a new dataset with Pandas. 
    Would be utilized for transformations.
    
    Parameters:
    
    data - pandas df to write to KDP, the one that was read and normalized/transformed.
    dataset_id - the dataset ID of this of the dataset originally read into Jupyter
    workspace_id - matches initial settings
    jwt - matches initial settings
    batch_size - matches initial settings
    starting_record_id - matches initial settings
    equivalenceCheck - boolean option to check if input dataframe and reading from KDP are equivalent (having
    the same rows except in a different order).
    
    
    '''
    
    
    ingestFailed = False
    
    #Create new dataset on KDP
    dataset = kdp_conn.create_dataset(name=new_dataset_name, workspace_id=workspace_id, jwt=jwt)
    new_dataset_id = dataset.id

    #Ingest data into newly created dataset
    try:
        partitions_set = kdp_conn.ingest(data, new_dataset_id, jwt, batch_size)
    except:
        ingestFailed = True
            
    if not ingestFailed and equivalenceCheck:
        #Read df from KDP
        dfCheck = kdp_conn.read_dataset_to_pandas_dataframe(dataset_id=new_dataset_id,
                                                  jwt=jwt,
                                                  starting_record_id=starting_record_id,
                                                  batch_size=batch_size)
        if dfs_equivalent(dfCheck, data):
            print('equivalenceCheck pass')
        else:
            print('equivalenceCheck fail:\n')
            print('Input data length: {}\n'.format(len(data)))
            print('Input data non-duplicated length: {}\n'.format(len(data.drop_duplicates())))
            print('KDP data length: {}\n'.format(len(dfCheck)))
            print('KDP data non-duplicated length: {}\n'.format(len(dfCheck.drop_duplicates)))
            print('Merge data length: {}'.format(len(pd.merge(data.drop_duplicates(), dfCheck.drop_duplicates(), 
                                                              on = list(data.columns), how = 'inner'))))
            raise Exception('equivalenceCheck failed, if wish to continue, disable this check.')
            
            
    #API Connect and delete new dataset if ingest failed
    if ingestFailed:
        
        #API Config
        configuration = kdp_api.Configuration(
            host='https://api.dev.koverse.com'
        )
        configuration.access_token = jwt

        
        with kdp_api.ApiClient(configuration) as api_client:
            try:
                print('Ingest failed. Attempting to delete newly created dataset')
                api_instance = datasets_api.DatasetsApi(api_client)
                api_instance.datasets_id_delete(id = new_dataset_id)
                print('Newly created dataset {} was successfully deleted.'.format(new_dataset_id))
            except kdp_api.ApiException as e:
                print("Exception : %s\n" % e)
                raise Exception("Error deleting associated dataset id from KDP. See printed error message above.")
            raise Exception('Ingest failed, exiting')

    return new_dataset_id

In [64]:
def write_to_existing_kdp(data, target_dataset_id, workspace_id, jwt, batch_size, starting_record_id, ingestCheck, equivalenceCheck, similarCheck, returnNewData):
    
    '''
    This function provides a way to more directly write into an existing KDP dataset and append additional rows
    with Pandas. Since this can be potentially dangerous, there are safety checks in place to ensure the integrity
    of the data.
    
    Parameters:
    
    data - pandas df to write to KDP, the one that was read and normalized/transformed.
    dataset_id - the dataset ID of this of the dataset originally read into Jupyter
    workspace_id - matches initial settings
    jwt - matches initial settings
    batch_size - matches initial settings
    starting_record_id - matches initial settings
    ingestCheck - boolean option to check if ingest is working properly.
    equivalenceCheck - boolean option to check if input dataframe and reading from KDP are equivalent (having
    the same rows or columns except in a different order). ingestCheck must be enabled for this check to work.
    similarCheck - boolean option to check if the incoming data is similar to the target dataset. Similar
    meaning same column names with the same data types.
    returnNewData - boolean option to return the dataset with the appended data as output
    
    '''
    
    ingestFailed = False
       
    #Check if ingest works
    if ingestCheck:
        #Create new dataset with temporary name
        tempName = 'Temporary_'+ str(np.random.randint(10000000))
        temp_dataset = kdp_conn.create_dataset(name=tempName, workspace_id=workspace_id, jwt=jwt)
        temp_dataset_id = temp_dataset.id

        #Ingest data into newly created dataset
        try:
            partitions_set = kdp_conn.ingest(data, temp_dataset_id, jwt, batch_size)
        except:
            ingestFailed = True
     
        if not ingestFailed:

            #If ingest successful and perform equivalenceCheck
            if equivalenceCheck:

                dfCheck = kdp_conn.read_dataset_to_pandas_dataframe(dataset_id=temp_dataset_id,
                                                      jwt=jwt,
                                                      starting_record_id=starting_record_id,
                                                      batch_size=batch_size)
                if dfs_equivalent(dfCheck, data):
                    print('equivalenceCheck pass')
                else:
                    print('equivalenceCheck fail:\n')
                    print('Input data length: {}\n'.format(len(data)))
                    print('Input data non-duplicated length: {}\n'.format(len(data.drop_duplicates())))
                    print('KDP data length: {}\n'.format(len(dfCheck)))
                    print('KDP data non-duplicated length: {}\n'.format(len(dfCheck.drop_duplicates)))
                    print('Merge data length: {}'.format(len(pd.merge(data.drop_duplicates(), dfCheck.drop_duplicates(), 
                                                                      on = list(data.columns), how = 'inner'))))
                    raise Exception('equivalenceCheck failed, if wish to continue, disable this check.')


    if similarCheck:
        #Check if data similar to target dataset data
        targetDf = kdp_conn.read_dataset_to_pandas_dataframe(dataset_id=target_dataset_id,
                                                  jwt=jwt,
                                                  starting_record_id=starting_record_id,
                                                  batch_size=batch_size)

        if dfs_similar_data(targetDf, data):
            print('Similar data check pass')
        else:
            print('Similar data check failed. See below for differences.')
            print(dfs_difference_types(targetDf, data))
            raise Exception('Similar data check failed, if wish to continue, disable similarCheck.')

    
    if not ingestFailed:
        #Ingest data into KDP dataset via append
        try:
            partitions_set = kdp_conn.ingest(data, target_dataset_id, jwt, batch_size)
        except:
            ingestFailed = True
    
    if ingestCheck:
        
        #API Config
        configuration = kdp_api.Configuration(
            host='https://api.dev.koverse.com'
        )
        configuration.access_token = jwt
        
        #Delete temporary dataset if ingestCheck enabled
        with kdp_api.ApiClient(configuration) as api_client:
            try:
                print('Deleting temporary dataset')
                api_instance = datasets_api.DatasetsApi(api_client)
                api_instance.datasets_id_delete(id = temp_dataset_id)
                print('Temporary dataset {} was successfully deleted.'.format(temp_dataset_id))
            except kdp_api.ApiException as e:
                print("Exception : %s\n" % e)
                raise Exception("Error deleting associated dataset id from KDP. See printed error message above.")
                
    if returnNewData:
        targetDf = kdp_conn.read_dataset_to_pandas_dataframe(dataset_id=target_dataset_id,
                                                  jwt=jwt,
                                                  starting_record_id=starting_record_id,
                                                  batch_size=batch_size)
           
        return targetDf



# Starting from Jupyter (Having dataset of interest to write into KDP)

# Manual Upload

### When you want to manually read in data into KDP, it's very likely that you want to do one of two things.
### 1. Create a new dataset and upload data into it.
### 2. Append data into a similar existing dataset.

-------------

# 1. Create a new dataset and upload data into it.

### First, initialize settings for KDP connector

In [None]:
################# REPLACE WITH YOUR INFO  #################
email = 'spongebob@koverse.com'
password = 'Password1!'
path_to_ca_file = ''
host = 'https://api.dev.koverse.com'
workspace_id = 'spongebob'
###########################################################

#Connect
kdp_conn = KdpConn(path_to_ca_file=path_to_ca_file, host=host)

### passed from oauth env var in jupyterhub_config
jwt = os.getenv('ACCESS_TOKEN')

### Read in data

In [105]:
df = pd.read_csv('titanic.csv')

### Some normalizations are required since writing to KDP currently requires no null values

In [118]:
def standardize_titanic(data):
    data['Age'] = round(data['Age'], 0).astype(str).apply(lambda x: x[:-2] if '.' in x else '')
    data['Cabin'] = data['Cabin'].fillna('')
    data['Embarked'] = data['Embarked'].fillna('')
    return data

In [106]:
df = standardize_titanic(df)

### Use write_to_kdp function to write to a new dataset on KDP and output associated dataset ID

In [107]:
batch_size = 100000
starting_record_id = ''


#Use write_to_kdp function to write to new dataset on KDP and output associated dataset ID
dataset_id = write_to_new_kdp(df, 'titanicTest', workspace_id, jwt, batch_size, starting_record_id, 
                          equivalenceCheck = True)



equivalenceCheck pass


In [100]:
dataset_id

'c9f01fc9-33f9-48d7-a679-25e8ee30abe1'

# 2. Append data into a similar existing dataset.

In [101]:
df = write_to_existing_kdp(df, dataset_id, workspace_id, jwt, batch_size, starting_record_id, 
                      ingestCheck = True, equivalenceCheck = True, similarCheck = True, returnNewData = True)



equivalenceCheck pass




Similar data check pass




Deleting temporary dataset
Temporary dataset a23192f6-9181-4ab1-ab16-c4c3ca15f3dd was successfully deleted.




In [102]:
len(df)

1782

# Reading from KDP

### When you want to read something from KDP, it's very likely that you want to do one of two things.

### 1. Transform a dataset and output the results into a new dataset.
### 2. Normalize a dataset and overwrite the dataset, effectively deleting the old one.

-----------
# 1. Transform a dataset and output the results into a new dataset.

### Initialize the connector the same as before then grab the dataset id off the URL in KDP for the dataset of interest and copy paste it into the variable below (sake of simplicity using same example dataset ID from above)

In [None]:
#dataset_id = ''

### Read KDP dataset into a pandas dataframe

In [77]:
df = kdp_conn.read_dataset_to_pandas_dataframe(dataset_id=dataset_id,
                                                  jwt=jwt,
                                                  starting_record_id=starting_record_id,
                                                  batch_size=batch_size)



### Perform desired transformations

In [78]:
df['AnySibSp'] = df['SibSp'].apply(lambda x: 1 if x >= 1 else 0)
df['AnyParch'] = df['Parch'].apply(lambda x: 1 if x >= 1 else 0)

### Output results into new dataset on KDP

In [79]:
dataset_id = write_to_new_kdp(df, 'titanicTest2', workspace_id, jwt, batch_size, starting_record_id, 
                          equivalenceCheck = True)



equivalenceCheck pass


### The new dataset ID is assigned, so this allows you to easily access and remember the results of this transform. This would be useful if more than one output is created from one or more datasets which are used as inputs in a future step. For those cases it may be worth appending the dataset_id into a list.

# 2. Normalize a dataset and overwrite the dataset, effectively deleting the old one.

### Get dataset ID from KDP and read dataset into a pandas dataframe (For the sake of example using the same dataset ID from above. Under normal circumstances we probably wouldn't normalize and overwrite anything except the initial dataset)

In [80]:
df = kdp_conn.read_dataset_to_pandas_dataframe(dataset_id=dataset_id,
                                                  jwt=jwt,
                                                  starting_record_id=starting_record_id,
                                                  batch_size=batch_size)



In [83]:
df.head()

Unnamed: 0,Embarked,Survived,Pclass,Ticket,PassengerId,Sex,AnySibSp,SibSp,Name,Fare,Parch,AnyParch,Cabin,Age
0,S,1,1,19943,487,female,1,1,"Hoyt, Mrs. Frederick Maxfield (Jane Anne Forby)",90.0,0,0,C93,35.0
1,S,1,1,113781,306,male,1,1,"Allison, Master. Hudson Trevor",151.6,2,1,C22 C26,0.0
2,S,0,3,Fa 265302,155,male,0,0,"Olsen, Mr. Ole Martin",7.3,0,0,,
3,C,1,1,11767,311,female,0,0,"Hays, Miss. Margaret Bechstein",83.2,0,0,C54,24.0
4,S,0,2,28228,419,male,0,0,"Matthews, Mr. William John",13.0,0,0,,30.0


In [82]:
df['Fare'] = round(df['Fare'], 1)

In [103]:
dataset_id = overwrite_to_kdp(df, dataset_id, workspace_id, jwt, batch_size, starting_record_id, 
                              equivalenceCheck = True)



equivalenceCheck pass
Ingest successful. Deleting old dataset.
Dataset c9f01fc9-33f9-48d7-a679-25e8ee30abe1 was successfully deleted.


# Starting from Jupyter (Having dataset of interest to write into KDP)

# Automatic Upload / External database / data pipeline setups

### With real-time running processes, it's possible to do several different things depending on need. 

### With data being pulled every day, week, month, or other time interval, would define how frequently the read/write process would need to be run. 

### Here are just a few possible use cases of how something could be set up. No transformations will be used here for simplicity, though they would be used as necessary in reality.

# New data sets ++ - Adding a newer timestamped dataset if it's important to distinguish and separate something by week, month, year, etc.

### Assume that the automatic data pull is somehow set up and is being read into a dataframe. We'll just continue to use a manual upload process

### Here the data pull could run once a month, so the data could be labeled "Trains_Mar2022" for March 2022, "Trains_Apr2022" for April 2022 etc.

In [135]:
### Some automatic data pull process would run 1x month
df = pd.read_csv('titanic.csv')
df = standardize_titanic(df)
datasetDate = datetime.today().strftime('%b-%Y')

dataset_id = write_to_new_kdp(df, 'titanicTest_{}'.format(datasetDate), workspace_id, jwt, batch_size, starting_record_id, 
                          equivalenceCheck = True)




equivalenceCheck pass


### It could also be a good idea to include an analytics summary report of each month as a separate transform of aggregations or custom report as another dataset. Comparisons could be done as per report values from previous month to the new month etc.

# Append ++ - Similar to above, except separating into different datasets is not important and use one dataset instead

### Create initial dataset since it's required to start appending. So, for the first month it would be a manual read.

In [136]:
df = pd.read_csv('titanic.csv')
df = standardize_titanic(df)
datasetDate = datetime.today().strftime('%b-%Y')

#Create datasetDate column to track time. (Optional if desired to track)
df['DatasetDate'] = datasetDate

#Write to KDP
dataset_id = write_to_new_kdp(df, 'titanicAppendTest', workspace_id, jwt, batch_size, starting_record_id, 
                          equivalenceCheck = True)



equivalenceCheck pass


### Assume that the automatic data pull is somehow set up and is being read into a dataframe. We'll just continue to use a manual upload process

### Assume data pull runs once a month again and want to track the date of the data

In [137]:
df = pd.read_csv('titanic.csv')
df = standardize_titanic(df)

datasetDate = (datetime.today() + relativedelta(months = 1)).strftime('%b-%Y')

#Create datasetDate column to track time. (Optional if desired to track, required if previously used)
df['DatasetDate'] = datasetDate

df = write_to_existing_kdp(df, dataset_id, workspace_id, jwt, batch_size, starting_record_id, 
                      ingestCheck = True, equivalenceCheck = True, similarCheck = True, returnNewData = True)



equivalenceCheck pass




Similar data check pass




Deleting temporary dataset
Temporary dataset c4068f51-f4fb-4703-96e0-c2f6f15fc8c1 was successfully deleted.




### Since it uses the same dataset_id and the variable may not be saved forever, it's ideal to save the existing dataset_id directly into a script that would perform this process or into a text file.

# Replace ++ - Newer versions of the same datasets would directly replace the existing dataset. May be most useful in cases when reference files/datasets need to be periodically updated

### Create initial dataset since it's required to start replacing. So, for the first month it would be a manual read.

In [138]:
df = pd.read_csv('titanic.csv')
df = standardize_titanic(df)

#Write to KDP
dataset_id = write_to_new_kdp(df, 'titanicReplaceTest', workspace_id, jwt, batch_size, starting_record_id, 
                          equivalenceCheck = True)



equivalenceCheck pass


### Assume that the automatic data pull is somehow set up and is being read into a dataframe. We'll just continue to use a manual upload process

In [None]:
df = pd.read_csv('titanic.csv')
df = standardize_titanic(df)

#Can read in last data pull, or current version of the data
df2 = kdp_conn.read_dataset_to_pandas_dataframe(dataset_id=dataset_id,
                                                  jwt=jwt,
                                                  starting_record_id=starting_record_id,
                                                  batch_size=batch_size)

### Now could potentially compare the current version with the new version and directly find the differences between each other or create aggregated/custom reports to find high level differences etc. This could go into a new dataset which could track all the differences between all the datapulls and be part of an Append++ flow series.

In [139]:
dataset_id = overwrite_to_kdp(df, dataset_id, workspace_id, jwt, batch_size, starting_record_id, 
                              equivalenceCheck = True)



equivalenceCheck pass
Ingest successful. Deleting old dataset.
Dataset 96446bfa-8fa9-4c8e-965d-e25c7be93bdc was successfully deleted.


### Since the dataset_id changes with every overwrite in the current implementation of KDP4, it would be good to save the current dataset_id directly into a text file so that a script can read the dataset id directly off the text file and write into the file in cases when the script stops and loses track of the variable.

### An alternative is to have an Excel sheet to keep track of which datasets/files are using which dataset ID, which processes (Append++, Replace++, New data++), sources, etc so everything is contained in one centralized file, then for the case of Replace++, the current dataset ID can be overwritten in a specific cell in that file.