<a href="https://colab.research.google.com/github/bilgrami/gen-ai-smart-support-agent/blob/feature%2F00-kb-articles/Data_Ingestion_API.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Data Ingestion API Implementation

This notebook sets up a simple data ingestion system using Delta Lake and PySpark. It allows you to register datasets, start ingestion jobs, and check their status.

## Step 1: Install Required Packages

We need to install some Python packages to make everything work. Run this cell to install them:

1.   List item
2.   List item



In [1]:
!pip install pyspark==3.5.1 delta-spark==3.2.0 boto3 google-cloud-storage azure-storage-blob oci

Collecting pyspark==3.5.1
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting delta-spark==3.2.0
  Downloading delta_spark-3.2.0-py3-none-any.whl (21 kB)
Collecting boto3
  Downloading boto3-1.34.140-py3-none-any.whl (139 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m139.2/139.2 kB[0m [31m2.0 MB/s[0m eta [36m0:00:00[0m
Collecting azure-storage-blob
  Downloading azure_storage_blob-12.20.0-py3-none-any.whl (392 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m392.2/392.2 kB[0m [31m26.7 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting oci
  Downloading oci-2.129.1-py3-none-any.whl (26.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m26.8/26.8 MB[0m [31m33.6 MB/s[0m eta [36m0:00:00[0m
Collecting botocore<1.35.0,>=1.34.140 (from boto3)


## Step 2: Import Libraries and Set Up Spark

This step imports necessary Python libraries and sets up our Spark environment. Spark is a tool that helps us process large amounts of data quickly.

In [2]:
import os
import json
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta import *

builder = SparkSession.builder.appName("DeltaLakeQuickStart") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
spark = configure_spark_with_delta_pip(builder).getOrCreate()


## Step 3: Set Up Storage Provider

This function helps us work with different storage systems, like your local computer or cloud storage services.

In [3]:
import os

def get_file_path(path, provider='local'):
    if provider == 'local':
        return os.path.join('/content', path)
    elif provider == 'aws':
        return f"s3a://{path}"
    elif provider == 'gcp':
        return f"gs://{path}"
    elif provider == 'azure':
        return f"wasbs://{path}"
    elif provider == 'oracle':
        return f"oci://{path}"
    else:
        raise ValueError("Unsupported storage provider")

## Step 4: Create Functions for Working with Data Tables

This function lets us read from and write to our data tables easily.

In [4]:
from delta import DeltaTable

def delta_operation(path, mode='read', data=None, provider='local'):
    full_path = get_file_path(path, provider)
    if mode == 'read':
        if DeltaTable.isDeltaTable(spark, full_path):
            return spark.read.format("delta").load(full_path)
        else:
            return spark.createDataFrame([], schema=StructType([]))
    elif mode == 'write':
        data.write.format("delta").mode("overwrite").save(full_path)
    elif mode == 'upsert':
        if DeltaTable.isDeltaTable(spark, full_path):
            delta_table = DeltaTable.forPath(spark, full_path)
            delta_table.alias("old").merge(
                data.alias("new"),
                "old.id = new.id"
            ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
        else:
            data.write.format("delta").mode("overwrite").save(full_path)

## Step 5: Create API Functions

These functions form the core of our API. They allow us to register datasets, start ingestion jobs, and check job status.

In [14]:
from pyspark.sql.functions import when, col

def get_ingestion_status(job_id=None, dataset_id=None):
    table_path = get_file_path("ingestion_logs")
    if not DeltaTable.isDeltaTable(spark, table_path):
        return []  # Return an empty list if the table doesn't exist

    ingestion_logs = spark.read.format("delta").load(table_path)

    if job_id:
        ingestion_logs = ingestion_logs.filter(col("jobId") == job_id)
    if dataset_id:
        ingestion_logs = ingestion_logs.filter(col("datasetId") == dataset_id)

    # Check the data type of startTime and endTime
    start_time_type = ingestion_logs.schema["startTime"].dataType
    end_time_type = ingestion_logs.schema["endTime"].dataType

    # If they're already strings, just select them as is
    if isinstance(start_time_type, StringType) and isinstance(end_time_type, StringType):
        selected_logs = ingestion_logs
    else:
        # If they're timestamps, convert to strings
        selected_logs = ingestion_logs.select(
            "*",
            when(col("startTime").isNotNull(), col("startTime").cast("string")).otherwise(None).alias("startTime"),
            when(col("endTime").isNotNull(), col("endTime").cast("string")).otherwise(None).alias("endTime")
        )

    return selected_logs.toPandas().to_dict(orient="records")

from pyspark.sql.types import StructType, StructField, StringType
from datetime import datetime

def start_ingestion(ingestion_info):
    # Define the schema explicitly
    schema = StructType([
        StructField("jobId", StringType(), True),
        StructField("datasetId", StringType(), True),
        StructField("status", StringType(), True),
        StructField("startTime", StringType(), True),
        StructField("endTime", StringType(), True),
        StructField("errorMessage", StringType(), True)
    ])

    # Check if the ingestion_logs Delta table exists
    table_path = get_file_path("ingestion_logs")
    if not DeltaTable.isDeltaTable(spark, table_path):
        # If it doesn't exist, create an empty Delta table with the correct schema
        empty_df = spark.createDataFrame([], schema=schema)
        empty_df.write.format("delta").save(table_path)

    # Read existing ingestion logs
    ingestion_logs = spark.read.format("delta").load(table_path)

    # Create a new log entry
    current_time = datetime.now().isoformat()
    new_log = spark.createDataFrame([(
        ingestion_info['jobId'],
        ingestion_info['datasetId'],
        "running",
        current_time,
        None,
        None
    )], schema=schema)

    # Union the existing logs with the new one and remove duplicates
    updated_logs = ingestion_logs.union(new_log).dropDuplicates(["jobId"])

    # Write the updated logs back to the Delta table
    updated_logs.write.format("delta").mode("overwrite").save(table_path)

    return {"status": "success", "message": "Ingestion job started", "jobId": ingestion_info['jobId']}


## Step 6: Set Up Initial Data

This step creates some initial data for our system, including API documentation and knowledge base articles.

In [15]:
# API Documentation
api_doc = {
  "openapi": "3.0.0",
  "info": {
    "title": "Data Ingestion API",
    "version": "1.0.0",
    "description": "API for registering datasets and managing data ingestion into the silver area of the data lake."
  },
  "paths": {
    "/api/table": {
      "post": {
        "summary": "Register a new dataset",
        "requestBody": {
          "required": True,
          "content": {
            "application/json": {
              "schema": {
                "$ref": "#/components/schemas/DatasetRegistration"
              }
            }
          }
        },
        "responses": {
          "200": {
            "description": "Successful registration",
            "content": {
              "application/json": {
                "schema": {
                  "$ref": "#/components/schemas/RegistrationResponse"
                }
              }
            }
          }
        }
      },
      "get": {
        "summary": "Retrieve registered datasets",
        "parameters": [
          {
            "name": "datasetName",
            "in": "query",
            "schema": {
              "type": "string"
            }
          }
        ],
        "responses": {
          "200": {
            "description": "List of registered datasets",
            "content": {
              "application/json": {
                "schema": {
                  "$ref": "#/components/schemas/DatasetList"
                }
              }
            }
          }
        }
      }
    },
    "/api/ingestion": {
      "post": {
        "summary": "Trigger ingestion process",
        "requestBody": {
          "required": True,
          "content": {
            "application/json": {
              "schema": {
                "$ref": "#/components/schemas/IngestionRequest"
              }
            }
          }
        },
        "responses": {
          "200": {
            "description": "Ingestion job started",
            "content": {
              "application/json": {
                "schema": {
                  "$ref": "#/components/schemas/IngestionResponse"
                }
              }
            }
          }
        }
      },
      "get": {
        "summary": "Retrieve ingestion job status",
        "parameters": [
          {
            "name": "jobId",
            "in": "query",
            "schema": {
              "type": "string"
            }
          },
          {
            "name": "datasetId",
            "in": "query",
            "schema": {
              "type": "string"
            }
          }
        ],
        "responses": {
          "200": {
            "description": "List of ingestion jobs",
            "content": {
              "application/json": {
                "schema": {
                  "$ref": "#/components/schemas/JobStatusList"
                }
              }
            }
          }
        }
      }
    }
  },
  "components": {
    "schemas": {
      "DatasetRegistration": {
        "type": "object",
        "properties": {
          "datasetName": {"type": "string"},
          "sourceFileOptions": {
            "type": "object",
            "properties": {
              "delimiter": {"type": "string"},
              "hasHeader": {"type": "boolean"},
              "fileType": {"type": "string", "enum": ["csv", "json", "parquet", "xls", "orc"]}
            }
          },
          "sourceColumns": {
            "type": "array",
            "items": {
              "type": "object",
              "properties": {
                "name": {"type": "string"},
                "ordinalNumber": {"type": "integer"},
                "dataType": {"type": "string"},
                "sampleData": {"type": "string"}
              }
            }
          },
          "targetColumns": {
            "type": "array",
            "items": {
              "type": "object",
              "properties": {
                "name": {"type": "string"},
                "ordinalNumber": {"type": "integer"},
                "dataType": {"type": "string"},
                "sampleData": {"type": "string"},
                "sourceColumnName": {"type": "string"},
                "transformationFunction": {"type": "string"}
              }
            }
          },
          "computeRowChecksum": {"type": "boolean"},
          "computeColumnChecksum": {"type": "boolean"},
          "computeLineage": {"type": "boolean"}
        }
      },
      "RegistrationResponse": {
        "type": "object",
        "properties": {
          "status": {"type": "string"},
          "message": {"type": "string"},
          "datasetId": {"type": "string"}
        }
      },
      "DatasetList": {
        "type": "object",
        "properties": {
          "datasets": {
            "type": "array",
            "items": {
              "$ref": "#/components/schemas/DatasetRegistration"
            }
          }
        }
      },
      "IngestionRequest": {
        "type": "object",
        "properties": {
          "datasetId": {"type": "string"},
          "sourceFilePath": {"type": "string"}
        }
      },
      "IngestionResponse": {
        "type": "object",
        "properties": {
          "status": {"type": "string"},
          "message": {"type": "string"},
          "jobId": {"type": "string"}
        }
      },
      "JobStatusList": {
        "type": "object",
        "properties": {
          "jobs": {
            "type": "array",
            "items": {
              "type": "object",
              "properties": {
                "jobId": {"type": "string"},
                "datasetId": {"type": "string"},
                "status": {"type": "string", "enum": ["running", "completed", "failed"]},
                "startTime": {"type": "string"},
                "endTime": {"type": "string"},
                "errorMessage": {"type": "string"}
              }
            }
          }
        }
      }
    }
  }
}

api_doc_df = spark.createDataFrame([(json.dumps(api_doc),)], ["doc"])
delta_operation("api_documentation", "write", api_doc_df)

# KB Articles
kb_articles = {
    "articles": [
      {
        "kbArticleId": "TS001",
        "AppName": "pipeline",
        "AppVersion": "1",
        "ArticleTitle": "Table Setup Guide",
        "Content": "This guide provides information on setting up tables for data ingestion using our Data Ingestion API.",
        "StepByStepInstructions": "1. Prepare Your Dataset\n2. Register the Dataset\n3. Verify Registration\n4. Configure Checksums and Lineage",
        "OwnerTeamName": "ingestion-team",
        "TeamContact": "Jane Doe",
        "WrittenBy": "John Smith",
        "ArticleVersion": "1.0",
        "CreatedOn": "2024-07-01",
        "CreatedBy": "John Smith",
        "ModifiedOn": "2024-07-01",
        "ModifiedBy": "John Smith",
        "IssueCategory": "table-setup"
      },
      {
        "kbArticleId": "IN001",
        "AppName": "pipeline",
        "AppVersion": "1",
        "ArticleTitle": "Data Ingestion Guide",
        "Content": "This guide covers the process of ingesting data into the silver area of the data lake using our Data Ingestion API.",
        "StepByStepInstructions": "1. Prepare for Ingestion\n2. Trigger Ingestion\n3. Monitor Ingestion Progress\n4. Verify Ingested Data",
        "OwnerTeamName": "ingestion-team",
        "TeamContact": "Jane Doe",
        "WrittenBy": "Alice Johnson",
        "ArticleVersion": "1.0",
        "CreatedOn": "2024-07-02",
        "CreatedBy": "Alice Johnson",
        "ModifiedOn": "2024-07-02",
        "ModifiedBy": "Alice Johnson",
        "IssueCategory": "ingestion"
      },
      {
        "kbArticleId": "JS001",
        "AppName": "job-runner",
        "AppVersion": "2",
        "ArticleTitle": "Job Submission Guide",
        "Content": "This guide explains how to submit and manage data ingestion jobs using our Data Ingestion API.",
        "StepByStepInstructions": "1. Prepare Job Parameters\n2. Submit the Job\n3. Track Job Progress\n4. Handle Job Completion",
        "OwnerTeamName": "consumption-team",
        "TeamContact": "Bob Wilson",
        "WrittenBy": "Emma Brown",
        "ArticleVersion": "1.0",
        "CreatedOn": "2024-07-03",
        "CreatedBy": "Emma Brown",
        "ModifiedOn": "2024-07-03",
        "ModifiedBy": "Emma Brown",
        "IssueCategory": "job-submission"
      },
      {
        "kbArticleId": "CF001",
        "AppName": "query-engine",
        "AppVersion": "3",
        "ArticleTitle": "Configuration Guide",
        "Content": "This guide covers the configuration options available for dataset registration and ingestion jobs in our Data Ingestion API.",
        "StepByStepInstructions": "1. Configure Dataset Options\n2. Set Up Column Configurations\n3. Enable Data Quality Options\n4. Configure Ingestion Job Parameters",
        "OwnerTeamName": "support",
        "TeamContact": "Charlie Green",
        "WrittenBy": "David Lee",
        "ArticleVersion": "1.0",
        "CreatedOn": "2024-07-04",
        "CreatedBy": "David Lee",
        "ModifiedOn": "2024-07-04",
        "ModifiedBy": "David Lee",
        "IssueCategory": "configuration"
      }
    ]
  }

kb_articles_df = spark.createDataFrame(kb_articles['articles'])
delta_operation("kb_articles", "write", kb_articles_df)


## Step 7: Use the API

Now that everything is set up, we can use our API functions. This example shows how to register a dataset, start an ingestion job, and check its status.

In [16]:
# Register a dataset
dataset_info1 = {
    "datasetId": "DS001",
    "datasetName": "SampleDataset",
    "sourceFileOptions": {
        "delimiter": ",",
        "hasHeader": True,
        "fileType": "csv"
    },
    "sourceColumns": [
        {"name": "id", "ordinalNumber": "1", "dataType": "int", "sampleData": "1"},
        {"name": "name", "ordinalNumber": "2", "dataType": "string", "sampleData": "John Doe"}
    ],
    "targetColumns": [
        {"name": "id", "ordinalNumber": "1", "dataType": "int", "sampleData": "1", "sourceColumnName": "id"},
        {"name": "full_name", "ordinalNumber": "2", "dataType": "string", "sampleData": "John Doe", "sourceColumnName": "name"}
    ],
    "computeRowChecksum": True,
    "computeColumnChecksum": False,
    "computeLineage": True
}

dataset_info = {
    "datasetId": "DS001",
    "datasetName": "SampleDataset",
    "sourceFileOptions": {
        "delimiter": ",",
        "hasHeader": "true",
        "fileType": "csv"
    },
    "sourceColumns": [
        {"name": "id", "ordinalNumber": "1", "dataType": "int", "sampleData": "1"},
        {"name": "name", "ordinalNumber": "2", "dataType": "string", "sampleData": "John Doe"}
    ],
    "targetColumns": [
        {"name": "id", "ordinalNumber": "1", "dataType": "int", "sampleData": "1", "sourceColumnName": "id"},
        {"name": "full_name", "ordinalNumber": "2", "dataType": "string", "sampleData": "John Doe", "sourceColumnName": "name"}
    ],
    "computeRowChecksum": True,
    "computeColumnChecksum": False,
    "computeLineage": True
}
result = register_dataset(dataset_info)
print("Register dataset result:", result)



# Start ingestion
ingestion_info = {
    "jobId": "JOB001",
    "datasetId": "DS001",
    "sourceFilePath": "/path/to/source/file.csv"
}

result = start_ingestion(ingestion_info)
print("Start ingestion result:", result)

# Get ingestion status
status = get_ingestion_status(job_id="JOB001")
print("Ingestion status:", status)

Register dataset result: {'status': 'success', 'message': 'Dataset registered successfully', 'datasetId': 'DS001'}
Start ingestion result: {'status': 'success', 'message': 'Ingestion job started', 'jobId': 'JOB001'}
Ingestion status: [{'jobId': 'JOB001', 'datasetId': 'DS001', 'status': 'running', 'startTime': '2024-07-06 06:18:33.96634', 'endTime': None, 'errorMessage': None}]


## Step 8: Clean Up

When you're done, it's good practice to close the Spark session to free up resources.

In [None]:
spark.stop()