# Testing Coframe API with Flask Server

This notebook demonstrates how to interact with the Coframe API through the Flask server interface.

In [None]:
import requests
import json
import pandas as pd
from IPython.display import display, HTML

# Base URLs
APP_BASE_URL = 'http://localhost:5000'  # For local app endpoints
COFRAME_BASE_URL = 'http://localhost:5000/coframe/api'  # For Coframe endpoints

# Helper function to pretty print JSON
def pretty_print(obj):
    if isinstance(obj, dict) or isinstance(obj, list):
        display(HTML(f"<pre>{json.dumps(obj, indent=2)}</pre>"))
    else:
        print(obj)

In [None]:
# Test application info endpoint (local, not Coframe)
def get_app_info():
    response = requests.get(f"{APP_BASE_URL}/info")
    return response.json()

app_info = get_app_info()
pretty_print(app_info)

## Database Statistics

Let's check how much data we have in the database before running queries.

def login(username, password):
    response = requests.post(f"{COFRAME_BASE_URL}/auth/login", json={
        'username': username,
        'password': password
    })

    if response.status_code == 200:
        return response.json()
    else:
        print(f"Login failed: {response.status_code}")
        return response.json()

# Login with credentials
auth_result = login('mrossi', 'hashed_password_here')
pretty_print(auth_result)

## Authentication

First, let's authenticate with the API to get a token.

In [None]:
def login(username, password):
    response = requests.post(f"{COFRAME_BASE_URL}/auth/login", json={
        'username': username,
        'password': password
    })

    if response.status_code == 200:
        return response.json()
    else:
        print(f"Login failed: {response.status_code}")
        return response.json()

# Login with credentials
auth_result = login('mrossi', 'hashed_password_here')
pretty_print(auth_result)

In [None]:
# Extract token from auth_result
token = auth_result.get('token')

# Setup headers for authenticated requests
headers = {
    'Authorization': f'Bearer {token}',
    'Content-Type': 'application/json'
}

In [None]:
# First define execute_query function
def execute_query(query, format='tuples'):
    response = requests.post(f"{COFRAME_BASE_URL}/query", headers=headers, json={
        'query': query,
        'format': format
    })
    return response.json()

# Check database sizes (count records without retrieving them)
def get_table_counts():
    tables = ['User', 'Book', 'Author', 'BookAuthor']
    counts = {}

    for table in tables:
        count_query = {
            "from": table,
            "select": ["count(*) as total"]
        }
        try:
            result = execute_query(count_query, format='tuples')  # Use tuples format

            if result.get('status') == 'success' and 'data' in result:
                # Parse tuples format: [['total'], [[count_value]]]
                if len(result['data']) >= 2 and len(result['data'][1]) > 0:
                    counts[table] = result['data'][1][0][0]  # First row, first column
                else:
                    counts[table] = "Error: No data returned"
            else:
                counts[table] = f"Error: {result.get('message', 'Unknown')}"
        except Exception as e:
            counts[table] = f"Exception: {str(e)}"

    return counts

# Now that we have authentication, let's check database sizes
print("\nDatabase record counts:")
counts = get_table_counts()
for table, count in counts.items():
    if isinstance(count, int):
        print(f"  {table}: {count:,} records")
    else:
        print(f"  {table}: {count}")

def get_all(table, params=None):
    # Add default limit for safety
    default_params = {'limit': 10}
    if params:
        default_params.update(params)
    response = requests.get(f"{COFRAME_BASE_URL}/db/{table}", headers=headers, params=default_params)
    return response.json()

# Get first 10 users only (safe for large datasets)
users = get_all('User', {'limit': 5})
pretty_print(users)

### Retrieve All Users

In [None]:
def get_one(table, id):
    response = requests.get(f"{COFRAME_BASE_URL}/db/{table}/{id}", headers=headers)
    return response.json()

# Get user with ID=1
user = get_one('User', 1)
pretty_print(user)

### Get User by ID

In [None]:
def create(table, data):
    response = requests.post(f"{COFRAME_BASE_URL}/db/{table}", headers=headers, json=data)
    return response.json()

# Create a new book
import datetime

new_book = {
    'title': 'Python for Data Science',
    'isbn': '9781234567890',
    'publication_date': datetime.date.today().isoformat(),
    'price': 29.99,
    'status': 'A'
}

created_book = create('Book', new_book)
pretty_print(created_book)

### Create a New Book

In [None]:
def update(table, id, data):
    response = requests.put(f"{COFRAME_BASE_URL}/db/{table}/{id}", headers=headers, json=data)
    return response.json()

# Update the book's price
if 'data' in created_book and 'id' in created_book['data']:
    book_id = created_book['data']['id']
    updated_book = update('Book', book_id, {'price': 34.99})
    pretty_print(updated_book)

### Update the Book

In [None]:
def delete(table, id):
    response = requests.delete(f"{COFRAME_BASE_URL}/db/{table}/{id}", headers=headers)
    return response.json()

# Delete the book
if 'data' in created_book and 'id' in created_book['data']:
    book_id = created_book['data']['id']
    deleted = delete('Book', book_id)
    pretty_print(deleted)

### Delete the Book

In [None]:
# Query to get books with authors (LIMITED for large datasets)
books_query = {
    "from": "Book",
    "select": [
        "Book.id",
        "Book.title",
        "Book.isbn",
        "Author.first_name",
        "Author.last_name"
    ],
    "joins": [
        {"BookAuthor": "BookAuthor.book_id = Book.id"},
        {"Author": "Author.id = BookAuthor.author_id"}
    ],
    "order_by": ["Book.title"],
    "limit": 5  # Limited to 5 for demo with large datasets
}

books_result = execute_query(books_query, format='records')
pretty_print(books_result)

q = {
  "from": "Book",
  "select": [
    "id",
    "title",
    "CASE WHEN COUNT(DISTINCT Author.id) > 1 THEN 'Various authors' ELSE MIN(Author.first_name) END as first_name",
    "CASE WHEN COUNT(DISTINCT Author.id) > 1 THEN '' ELSE MIN(Author.last_name) END as last_name",
    "isbn",
    "price"
  ],
  "joins": [
    {
      "BookAuthor": "BookAuthor.book_id = Book.id"
    },
    {
      "Author": "Author.id = BookAuthor.author_id"
    }
  ],
  "group_by": [
    "Book.id",
    "Book.title"
  ],
  "order_by": [
    "Book.title"
  ],
  "limit": 10,  # Limited to 10 for large datasets
  "offset": 0
}

books_result = execute_query(q, format='records')
pretty_print(books_result)

In [None]:
# Query to get books with authors
books_query = {
    "from": "Book",
    "select": [
        "Book.id",
        "Book.title",
        "Book.isbn",
        "Author.first_name",
        "Author.last_name"
    ],
    "joins": [
        {"BookAuthor": "BookAuthor.book_id = Book.id"},
        {"Author": "Author.id = BookAuthor.author_id"}
    ],
    "order_by": ["Book.title"],
    "limit": 10
}

books_result = execute_query(books_query, format='records')
pretty_print(books_result)

In [None]:
q = {
  "from": "Book",
  "select": [
    "id",
    "title",
    "CASE WHEN COUNT(DISTINCT Author.id) > 1 THEN 'Various authors' ELSE MIN(Author.first_name) END as first_name",
    "CASE WHEN COUNT(DISTINCT Author.id) > 1 THEN '' ELSE MIN(Author.last_name) END as last_name",
    "isbn",
    "price"
  ],
  "joins": [
    {
      "BookAuthor": "BookAuthor.book_id = Book.id"
    },
    {
      "Author": "Author.id = BookAuthor.author_id"
    }
  ],
  "group_by": [
    "Book.id",
    "Book.title"
  ],
  "order_by": [
    "Book.title"
  ],
  "limit": 10,
  "offset": 0
}

books_result = execute_query(q, format='records')
pretty_print(books_result)

### Convert Query Results to DataFrame

In [None]:
def call_endpoint(operation, params=None):
    response = requests.post(f"{COFRAME_BASE_URL}/endpoint/{operation}", headers=headers, json=params or {})
    return response.json()

books = call_endpoint('books', {})
pretty_print(books)

## Using Generic Endpoint

Let's test the generic endpoint functionality to call any Coframe operation.

In [None]:
def read_file(params=None):
    response = requests.post(f"{COFRAME_BASE_URL}/read_file", headers=headers, json=params or {})
    return response.json()

file = {
    "file_path": "data/book_list.yaml"
}
result = read_file(file)
pretty_print(result)

file = {
    "base_dir": "images",
    "file_path": "python_logo.png",
    "binary_encoding": "base64"
}
result = read_file(file)
pretty_print(result)

file = {
    "base_dir": "~/",
    "file_path": ".bashrc"
}
result = read_file(file)
pretty_print(result)


## Red file from filesystem

Read some file from filesystem, only files in whitelisted directories are allowed

In [None]:
def get_profile():
    response = requests.get(f"{COFRAME_BASE_URL}/profile", headers=headers)
    return response.json()

my_profile = get_profile()
pretty_print(my_profile)

## User Profile Access

Let's test accessing the current user's profile.

In [None]:
def get_profile():
    response = requests.get(f"{COFRAME_BASE_URL}/profile", headers=headers)
    return response.json()

my_profile = get_profile()
pretty_print(my_profile)

# Try to use an invalid token
invalid_headers = {
    'Authorization': 'Bearer invalid_token',
    'Content-Type': 'application/json'
}

response = requests.get(f"{COFRAME_BASE_URL}/profile", headers=invalid_headers)
pretty_print(response.json())

In [None]:
# Try to get a non-existent item
non_existent = get_one('Book', 999999)
pretty_print(non_existent)

In [None]:
# Try to use an invalid token
invalid_headers = {
    'Authorization': 'Bearer invalid_token',
    'Content-Type': 'application/json'
}

response = requests.get(f"{COFRAME_BASE_URL}/profile", headers=invalid_headers)
pretty_print(response.json())

## Batch Operations

Let's test how to perform batch operations.

In [None]:
# Get books with complex filtering using the query endpoint (LIMITED)
filter_query = {
    "from": "Book",
    "select": ["id", "title", "price"],
    "filters": {
        "conditions": [
            {"price": [">=", 35.0]}
        ]
    },
    "order_by": [["price", "desc"]],
    "limit": 5  # Limited to 5 for large datasets
}

filtered_books = execute_query(filter_query)
pretty_print(filtered_books)

## Filtering Records

Let's test filtering records using query parameters.

In [None]:
# Get books with complex filtering using the query endpoint
filter_query = {
    "from": "Book",
    "select": ["id", "title", "price", "status"],
    "filters": {
        "conditions": [
            {"price": [">=", 35.0]},
            {"status": "A"}
        ]
    },
    "order_by": [["price", "desc"]],
    "limit": 10
}

filtered_books = execute_query(filter_query)
pretty_print(filtered_books)

## Performing Aggregations

Let's test performing aggregations with the query endpoint.

In [None]:
# Aggregate query to get book statistics
agg_query = {
    "from": "Book",
    "select": [
        "count(id) as book_count",
        "avg(price) as avg_price",
        "min(price) as min_price",
        "max(price) as max_price",
        "sum(price) as total_price"
    ]
}

book_stats = execute_query(agg_query)
pretty_print(book_stats)

## Working with Relationships

Let's test working with relationships in the database.

In [None]:
# Add an author with unique name to avoid conflicts
import datetime
unique_suffix = datetime.datetime.now().strftime("%H%M%S")

new_author = {
    'first_name': f'TestAuthor{unique_suffix}',
    'last_name': f'LastName{unique_suffix}',
    'nationality': 'Test',
    'birth_date': '1980-10-06',
}

print(f"Creating test author: {new_author['first_name']} {new_author['last_name']}")
author_result = create('Author', new_author)
pretty_print(author_result)

# If author creation was successful, create a book-author relationship
if author_result.get('status') == 'success' and 'data' in author_result and 'id' in author_result['data']:
    author_id = author_result['data']['id']

    # Use the single book we created earlier instead of created_books list
    if created_book.get('status') == 'success' and 'data' in created_book and 'id' in created_book['data']:
        book_id = created_book['data']['id']

        # Create relationship
        book_author = {
            'book_id': book_id,
            'author_id': author_id,
            'notes': f'Created via API test at {datetime.datetime.now()}'
        }

        print(f"Creating relationship between book {book_id} and author {author_id}")
        relation_result = create('BookAuthor', book_author)
        pretty_print(relation_result)
    else:
        print("No book available to create relationship with.")
        print("created_book status:", created_book.get('status', 'Unknown'))
else:
    print("Author creation failed.")
    print("author_result status:", author_result.get('status', 'Unknown'))

## Cleanup

Let's clean up the data we created during testing.

In [None]:
# Cleanup: Delete the test data we created (with error handling)
cleanup_results = []

# Delete the single book we created
try:
    if 'created_book' in locals() and created_book.get('status') == 'success' and 'data' in created_book and 'id' in created_book['data']:
        book_id = created_book['data']['id']
        result = delete('Book', book_id)
        cleanup_results.append(f"Book {book_id}: {result.get('status', 'Unknown')}")
    else:
        cleanup_results.append("Book: No book to delete or creation failed")
except Exception as e:
    cleanup_results.append(f"Book deletion error: {str(e)}")

# Delete the author we created
try:
    if 'author_result' in locals() and author_result.get('status') == 'success' and 'data' in author_result and 'id' in author_result['data']:
        author_id = author_result['data']['id']
        result = delete('Author', author_id)
        cleanup_results.append(f"Author {author_id}: {result.get('status', 'Unknown')}")
    else:
        cleanup_results.append("Author: No author to delete or creation failed")
except Exception as e:
    cleanup_results.append(f"Author deletion error: {str(e)}")

# Show cleanup results
print("Cleanup results:")
for result in cleanup_results:
    print(f"  {result}")