# Ingest Data from gen_ClientServiceCase Using API

### Install Required Packages

In [None]:
# Install the missing packages
%pip install pyspark
%pip install requests
%pip install azure-identity
%pip install azure-synapse-spark
%pip install python-dotenv

Session Connection

### Notebook get and save functions

In [None]:
from azure.identity import ClientSecretCredential
from azure.synapse.spark import SparkClient
import os
from dotenv import load_dotenv
import requests
from requests.auth import HTTPBasicAuth
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.sql.functions import col, when, lit, to_timestamp


# Load environment variables
load_dotenv("ClientSpace.env")

# configure credentials
tenant_id = os.getenv("AZURE_TENANT_ID")
client_id = os.getenv("AZURE_CLIENT_ID")
client_secret = os.getenv("AZURE_CLIENT_SECRET")
workspace_name = os.getenv("SYNAPSE_WORKSPACE_NAME")
lakehouse_name = os.getenv("SYNAPSE_LAKEHOUSE_NAME")

# Create auhtentication and client objects
credential = ClientSecretCredential(tenant_id=tenant_id, client_id=client_id, client_secret=client_secret)
spark_client = SparkClient(endpoint=f"https://{workspace_name}.dev.azuresynapse.net", credential=credential)

# Create a Spark session in Synapse
spark_pool_name = "default"  # Ajusta si tienes otro pool
spark_session = spark_client.spark_batch_job.create_spark_session(lakehouse_name, spark_pool_name)
spark = SparkSession.builder.appName("API Data Extraction").getOrCreate()

# API Configuration
url = "https://vensureqa.clientspace.net/next/api/dataform/v3.0/query/gen_ClientServiceCase"
username = "api_creai"
password = "Vensure1$"
parquet_path = "abfss://CreaiFabricCapacity@onelake.dfs.fabric.microsoft.com/CreaiLakehouse.Lakehouse/Files/genClientServiceCase.parquet"
table = "genClientServiceCase"

def get_data_from_api(page, pageSize):
    payload = {
        "Fields": [
            {
                "Name": "fkUserIDAssignedTo"
            },
            {
                "Name": "CaseNotes"
            },
            {
                "Name": "CaseNumber"
            },
            {
                "Name": "crCategory"
            },
            {
                "Name": "CreateDate"        },
            {
                "Name": "CaseInfo"
            },
            {
                "Name": "fkEmployeeID"
            },
            {
                "Name": "fkOwnerUserID"
            },
            {
                "Name": "luPriority"
            },
            {
                "Name": "CallerName"
            },
            {
                "Name": "fkReportedByEmployeeID"
            },
            {
                "Name": "Resolution"
            },
            {
                "Name": "ResolutionDate"
            },
            {
                "Name": "luStatus"
            },
            {
                "Name": "Subject"
            },
            {
                "Name": "fkCaseTypeID"
            }
        ],
        "SortCol": "CreateDate",
        "Page": page,
        "PageSize": pageSize
    }
    response = requests.post(url, auth=HTTPBasicAuth(username, password), json=payload)
    if response.status_code == 200:
        return response.json()["Data"]
    else:
        print(f"Request error: {response.status_code} - {response.text}")
        return None
    
def process_and_save_data(data):
    df = pd.DataFrame(data)
    spark_df = spark.createDataFrame(df)

    df_mapped = spark_df.select(
        when(col("fkUserIDAssignedTo").isNotNull(), col("fkUserIDAssignedTo").cast("int")).alias("fkUserIDAssignedTo"),
        when(col("CaseNotes").isNotNull(), col("CaseNotes")).otherwise(lit("Unknown")).alias("CaseNotes"),
        when(col("CaseNumber").isNotNull(), col("CaseNumber").cast("int")).alias("CaseNumber"),
        col("crCategory").alias("crCategory"),
        when(to_timestamp(col("CreateDate"), 'yyyy-MM-dd\'T\'HH:mm:ss\'Z\'').isNotNull(), to_timestamp(col("CreateDate"), 'yyyy-MM-dd\'T\'HH:mm:ss\'Z\'')).alias("CreateDate"),
        col("CaseInfo").alias("CaseInfo"),
        when(col("fkEmployeeID").isNotNull(), col("fkEmployeeID").cast("int")).alias("fkEmployeeID"),
        when(col("fkOwnerUserID").isNotNull(), col("fkOwnerUserID").cast("int")).alias("fkOwnerUserID"),
        col("luPriority").alias("luPriority"),
        col("CallerName").alias("CallerName"),
        when(col("fkReportedByEmployeeID").isNotNull(), col("fkReportedByEmployeeID").cast("int")).alias("fkReportedByEmployeeID"),
        when(col("Resolution").isNotNull(), col("Resolution")).otherwise(lit("Unknown")).alias("Resolution"),
        when(to_timestamp(col("ResolutionDate"), 'yyyy-MM-dd\'T\'HH:mm:ss\'Z\'').isNotNull(), to_timestamp(col("ResolutionDate"), 'yyyy-MM-dd\'T\'HH:mm:ss\'Z\'')).alias("ResolutionDate"),
        col("luStatus").alias("luStatus"),
        col("Subject").alias("Subject"),
        when(col("fkCaseTypeID").isNotNull(), col("fkCaseTypeID").cast("int")).alias("fkCaseTypeID")
    )
    
    df_mapped.write.mode("overwrite").parquet(parquet_path)
    df_output = spark.read.parquet(parquet_path)
    df_output.write.mode("append").format("delta").saveAsTable(table)

#region main block
page = 1
pageSize = 50
lastPage = 2

while True:
    data = get_data_from_api(page, pageSize)
    
    if not data:  
        break

    process_and_save_data(data)
    
    if len(data) < pageSize or page >= lastPage:
        break

    print(page)

    page += 1