# ETL Pipeline for Caspion Tables and Views

This notebook will provide a list of ***Caspio*** **Tables** and **Views**. The list will be stored in Table named **CaspioObjects**

In [0]:
import pandas as pd
import requests


from pyspark.sql.functions import *
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

## Obtain **_Token_** from Caspio

In [0]:
# Define the API endpoint and your credentials
token_url = "https://c1acs125.caspio.com/oauth/token"
client_id = "949bb92c93f44e05ced279e6480a6a0a16b3f402fb0f53438a"
client_secret = "148669bbb258406c9ab30f0b407e172379ffd468e923f50fc8"

# Prepare the data for the token request
data = {
    "grant_type": "client_credentials",
    "client_id": client_id,
    "client_secret": client_secret
}

# Make a POST request to get the access token
response = requests.post(token_url, data=data)

# Check the response status and extract the access token
if response.status_code == 200:
    token_info = response.json()
    access_token = token_info.get("access_token")
else:
    print(f"Failed to retrieve access token: {response.status_code}")
    access_token = None

# Print the access token (for debugging purposes)
print(f"Access Token: {access_token}")

### Constants and Variabes

In [0]:
lstOptions = ['tables','views']
lstActions = ['records','fields']
lstColumns = ['ObjectName','FieldName','Type','Description','DisplayOrder']

strQCount = "?q.select=count(*)"
strQLimit0001 = "?q.limit=1"
strQLimit1000 = "?q.limit=1000"
strRestAPI = "https://c1acs125.caspio.com/rest/v2/"

# headers
headers = {
    "Authorization": f"Bearer {access_token}",
    "Content-Type": "application/json"
}

### Get caspio_object table

In [0]:
df_objects = spark.read.format("delta").table("main.itb_forecasting.tb_caspio_objects").toPandas()

#display(df_objects)

### Dataframe for Caspio Object columns/fields

In [0]:
# Dataframe for Caspio Object columns/fields

df_objectFields = pd.DataFrame(columns=lstColumns)

for index, row in df_objects.iterrows():
    strApiFieldsURL = None
    if row['type'] == 'tables':
        strApiFieldsURL = strRestAPI + row['type']+ "/" + row['Result'] + "/" + lstActions[1]
        print("Data API: ",strApiFieldsURL)
        response = requests.get(strApiFieldsURL, headers=headers)
        if (response.status_code == 200):
            data = response.json()
            #print("\nData: ",data)
            if len(data['Result']) > 0:
                df_fields = pd.DataFrame(data=data['Result'])
                df_fields = df_fields[['Name','Type','Description','DisplayOrder']].copy()
                df_fields['ObjectName'] = row['Result']
                df_objectFields = pd.concat([df_objectFields,df_fields])
            
        else:
            print(f"TABLES: Failed to retrieve data: {response}")
            print(strApiFieldsURL)
    # Views definitions cannot be extracted via the current Caspio REST API
    # extract view with one row and derive the fields
    elif row['type'] == 'views':
        strApiFieldsURL = strRestAPI + row['type']+ "/" + row['Result'] + "/" + lstActions[0] + strQLimit0001
        print("Views API: ",strApiFieldsURL)
        response = requests.get(strApiFieldsURL, headers=headers)
        if (response.status_code == 200):
            data = response.json()
            #print(f"Data: {data}")
            if len(data['Result']) > 0:
                serFields = pd.DataFrame(data=data['Result']).dtypes
                lstRow = []
                for index in range(0,len(serFields),1):
                      lstRow.append([row['Result'],serFields.index[index],str(serFields.values[index]),'',index])

                df_fields = pd.DataFrame(data=lstRow,columns=lstColumns)
                df_objectFields = pd.concat([df_objectFields,df_fields])
                #del df_fields
        else:
            print(f"VIEWS: Failed to retrieve data: {response}")
            print(strApiFieldsURL)

### Create Delta Live Tables

In [0]:
# Ensure consistent data types
for column in df_objectFields.columns:
        if df_objectFields[column].dtype == 'int64':
            df_objectFields[column] = df_objectFields[column].astype('float64')
        elif df_objectFields[column].dtype == 'object':
            df_objectFields[column] = df_objectFields[column].astype('string')

In [0]:
# create a Spark Dataframe to store the data as a permanent table
dfs_objectFields = spark.createDataFrame(df_objectFields)

In [0]:
strTable = "main.itb_forecasting.tb_caspio_object_fields"

In [0]:
# Write the data to a permanent table in delta format. 
# Delta format is a columnar storage format that provides high performance and low latency for data warehousing workloads.
# Parquet is a popular format for data warehousing, but it is not optimized for streaming and batch processing.

dfs_objectFields.write.format("delta").mode("overwrite").option("mergeSchema", "true").saveAsTable(strTable)

### Assign permissions

In [0]:
# Assign permissions to the table
spark.sql("GRANT SELECT ON TABLE main.itb_forecasting.tb_caspio_object_fields TO `KID_Forecasting`")

In [0]:
%sql
--GRANT SELECT ON main.itb_forecasting.tb_caspio_objects TO `KID_Forecasting`;

In [0]:
%sql
use main.itb_forecasting;

show tables;

--DESCRIBE SCHEMA EXTENDED main.itb_forecasting;

In [0]:
%sql
select count(*) from main.itb_forecasting.tb_caspio_objects;
select * from main.itb_forecasting.tb_caspio_objects where priority = 1 and type = 'views';