<a href="https://colab.research.google.com/github/Kmaralla/AgentGPT/blob/main/pyairbyte_notebooks/PyAirbyte_Basic_Features_Demo.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# PyAirbyte Demo

Below is a pre-release demo of PyAirbyte.


## Install PyAirbyte


In [11]:
# Add virtual environment support for running in Google Colab
!apt-get install -qq python3.10-venv

# Install PyAirbyte
%pip install --quiet airbyte

## Locating your Data Source

To see what data sources are available, you can check [our docs](https://docs.airbyte.com/using-airbyte/airbyte-lib/getting-started) or run the following:


In [12]:
# Import PyAirbyte
import airbyte as ab

# Show all available connectors
ab.get_available_connectors()

['destination-astra',
 'destination-aws-datalake',
 'destination-chroma',
 'destination-convex',
 'destination-duckdb',
 'destination-firebolt',
 'destination-firestore',
 'destination-google-sheets',
 'destination-milvus',
 'destination-motherduck',
 'destination-pgvector',
 'destination-pinecone',
 'destination-qdrant',
 'destination-rabbitmq',
 'destination-sftp-json',
 'destination-snowflake-cortex',
 'destination-sqlite',
 'destination-timeplus',
 'destination-typesense',
 'destination-vectara',
 'destination-weaviate',
 'source-7shifts',
 'source-activecampaign',
 'source-adjust',
 'source-agilecrm',
 'source-aha',
 'source-airbyte',
 'source-aircall',
 'source-airtable',
 'source-akeneo',
 'source-algolia',
 'source-alpha-vantage',
 'source-amazon-ads',
 'source-amazon-seller-partner',
 'source-amazon-sqs',
 'source-amplitude',
 'source-apify-dataset',
 'source-appcues',
 'source-appfigures',
 'source-appfollow',
 'source-apple-search-ads',
 'source-appsflyer',
 'source-apptivo'

## Load the Source Data using PyAirbyte


Create and install a source connector:


In [13]:
import airbyte as ab

# Create and install the source:
source: ab.Source = ab.get_source("source-xero")

Writing `source-xero` logs to file: /tmp/airbyte/logs/source-xero/source-xero-log-JFXRBZ1BV.log


In [19]:
# Configure the source
source.set_config(
    config={
        "credentials": {
            "auth_type": "oauth2_confidential_application",
            "client_id": "0307ED7C4AE3451485C4A3B8AEF52C8D",
            "client_secret": "0Sz-uIX9vOl32nloIBR8a1j1_bX6G0Z56YBDfP6tETPRTElK"
        },
        "tenant_id": "your_tenant_id_here",
        "start_date": "2023-01-01T00:00:00Z",  # Adjust this date to get more historical data
        "page_size": 100,  # Add this to get more records per request
        # Add report configurations
        "report_types": [
            "ProfitAndLoss",
            "BalanceSheet"
        ],
        # Optional: Specify report parameters
        "report_params": {
            "date_from": "2022-03-01",
            "date_to": "2025-12-31",
            "periods": 12,  # Number of periods to compare
            "timeframe": "MONTH"  # MONTH, QUARTER, or YEAR
        }
    }
)

# Available streams
streams_to_sync = [
    "invoices",
    "accounts",
    "bank_transactions",
    "bank_transfers",
    "contacts",
    "credit_notes",
    "manual_journals",
    "overpayments",
    "payments",
    "prepayments",
    "tracking_categories",
    #"balances",
    #"journals"
]
source.check()
source.select_streams(streams_to_sync)
read_result: ab.ReadResult = source.read()


#     config={
#         "credentials": {
#             "auth_type": "oauth2_confidential_application",  # or "oauth2_access_token" if using access token
#             "client_id": "0307ED7C4AE3451485C4A3B8AEF52C8D",
#             "client_secret": "0Sz-uIX9vOl32nloIBR8a1j1_bX6G0Z56YBDfP6tETPRTElK"
#         },
#         "tenant_id": "your_tenant_id_here",
#         "start_date": "2022-03-01T00:00:00Z"
#     }
# )

# Verify the config and creds by running `check`:


------------------------------------------------

### Sync Progress: `source-xero -> DuckDBCache`

**Started reading from source at `02:01:27`:**

- Read **109** records over **3.8 seconds** (28.6 records/s, 0.02 MB/s).

- Received records for 3 streams:
  - 50 contacts
  - 1 invoices
  - 58 accounts

- Cached **109** records into 3 local cache file(s).

- Finished reading from source at `02:01:36`.

**Started cache processing at `02:01:36`:**

- Processed **3** cache file(s) over **3.6 seconds**.

- Completed cache processing for 3 streams:
  - invoices
  - contacts
  - accounts

- Finished cache processing at `02:01:39`.






**Sync completed at `02:01:39`. Total time elapsed: 12 seconds**

------------------------------------------------


## Read Data from the PyAirbyte Cache

Once data is read, we can do anything we want to with the resulting streams. This includes `to_pandas()` which registers a Pandas dataframe and `to_sql_table()` which gives us a SQLAlchemy `Table` boject, which we can use to run SQL queries.


In [15]:
# Access the data
df = read_result.cache.get_pandas_dataframe("accounts")

# Create Balance Sheet view
def create_balance_sheet(df):
    # Group by class and type to show the structure
    print("\n=== BALANCE SHEET ===\n")

    # ASSETS
    print("ASSETS")
    print("-" * 50)
    # Bank accounts
    bank_accounts = df[df['type'] == 'BANK']
    print("\nBank Accounts:")
    for _, row in bank_accounts.iterrows():
        print(f"{row['name']}: {row.get('bankaccountnumber', 'No account number')}")

    # Current Assets
    current_assets = df[(df['class'] == 'ASSET') & (df['type'].isin(['CURRENT', 'BANK']))]
    print("\nCurrent Assets:")
    for _, row in current_assets.iterrows():
        print(f"{row['name']}")

    # Fixed Assets
    fixed_assets = df[(df['class'] == 'ASSET') & (df['type'] == 'FIXED')]
    print("\nFixed Assets:")
    for _, row in fixed_assets.iterrows():
        print(f"{row['name']}")

    # LIABILITIES
    print("\nLIABILITIES")
    print("-" * 50)

    # Current Liabilities
    current_liab = df[df['type'] == 'CURRLIAB']
    print("\nCurrent Liabilities:")
    for _, row in current_liab.iterrows():
        print(f"{row['name']}")

    # Term Liabilities
    term_liab = df[df['type'] == 'TERMLIAB']
    print("\nLong Term Liabilities:")
    for _, row in term_liab.iterrows():
        print(f"{row['name']}")

    # EQUITY
    print("\nEQUITY")
    print("-" * 50)
    equity = df[df['class'] == 'EQUITY']
    for _, row in equity.iterrows():
        print(f"{row['name']}")

    # Show Revenue accounts for reference
    print("\nREVENUE ACCOUNTS")
    print("-" * 50)
    revenue = df[df['type'] == 'REVENUE']
    for _, row in revenue.iterrows():
        print(f"{row['name']}")

    # Show Expense accounts for reference
    print("\nEXPENSE ACCOUNTS")
    print("-" * 50)
    expense = df[df['type'] == 'EXPENSE']
    for _, row in expense.iterrows():
        print(f"{row['name']}")

    print("\n=== ACCOUNT TYPE SUMMARY ===")
    print(df.groupby('type')['name'].count())

# Create the balance sheet
create_balance_sheet(df)

# Show accounts by type summary
print("\n=== DETAILED ACCOUNT ANALYSIS ===")
account_summary = df.groupby(['class', 'type']).size().unstack(fill_value=0)
print("\nAccount Distribution:")
print(account_summary)

# Show top accounts
print("\n=== SIGNIFICANT ACCOUNTS ===")
print("\nTop 10 accounts by code:")
print(df.sort_values('code')[['code', 'name', 'type', 'class']].head(10))


=== BALANCE SHEET ===

ASSETS
--------------------------------------------------

Bank Accounts:
Business Bank Account: 0908007006543
Business Savings Account: 121314121314

Current Assets:
Business Bank Account
Business Savings Account
Accounts Receivable
Prepayments

Fixed Assets:
Office Equipment
Less Accumulated Depreciation on Office Equipment
Computer Equipment
Less Accumulated Depreciation on Computer Equipment

LIABILITIES
--------------------------------------------------

Current Liabilities:
Accounts Payable
Unpaid Expense Claims
Sales Tax
Employee Tax Payable
Superannuation Payable
Income Tax Payable
Revenue Received in Advance
Historical Adjustment
Suspense
Clearing Account
Rounding
Tracking Transfers
Owner A Drawings
Owner A Funds Introduced

Long Term Liabilities:
Loan

EQUITY
--------------------------------------------------
Retained Earnings
Owner A Share Capital

REVENUE ACCOUNTS
--------------------------------------------------
Sales
Other Revenue
Interest Income


In [18]:
# Get the dataframes
import pandas as pd

invoices_df = read_result.cache.get_pandas_dataframe("invoices")
bank_df = read_result.cache.get_pandas_dataframe("bank_transactions")

print("=== FINANCIAL METRICS ===")

# Calculate Invoice Metrics
try:
    # Convert numeric columns to float
    invoices_df['total'] = pd.to_numeric(invoices_df['total'], errors='coerce')
    invoices_df['amountpaid'] = pd.to_numeric(invoices_df['amountpaid'], errors='coerce')
    invoices_df['amountcredited'] = pd.to_numeric(invoices_df['amountcredited'], errors='coerce')

    # Revenue (ACCREC = Accounts Receivable = Sales Invoices)
    total_revenue = invoices_df[invoices_df['type'] == 'ACCREC']['total'].sum()

    # Expenses (ACCPAY = Accounts Payable = Bills)
    total_expenses = invoices_df[invoices_df['type'] == 'ACCPAY']['total'].sum()

    print("\nFrom Invoices:")
    print(f"Total Revenue (from Sales Invoices): ${total_revenue:,.2f}")
    print(f"Total Expenses (from Bills): ${total_expenses:,.2f}")
    print(f"Net Income: ${total_revenue - total_expenses:,.2f}")

    # Accounts Receivable (Outstanding Sales Invoices)
    accounts_receivable = invoices_df[
        (invoices_df['type'] == 'ACCREC') &
        (invoices_df['status'] == 'AUTHORISED')
    ]['amountdue'].sum()

    print(f"\nAccounts Receivable (Outstanding): ${accounts_receivable:,.2f}")

    # Show breakdown by status
    print("\nInvoice Status Breakdown:")
    status_breakdown = invoices_df.groupby(['type', 'status'])['total'].agg(['sum', 'count'])
    print(status_breakdown)

except Exception as e:
    print(f"Error in invoice calculations: {str(e)}")

# Calculate Bank Transaction Metrics
try:
    # Convert to numeric
    bank_df['total'] = pd.to_numeric(bank_df['total'], errors='coerce')

    # Income and Expenses from bank transactions
    bank_income = bank_df[bank_df['type'] == 'RECEIVE']['total'].sum()
    bank_expenses = bank_df[bank_df['type'] == 'SPEND']['total'].sum()

    print("\nFrom Bank Transactions:")
    print(f"Total Bank Income: ${bank_income:,.2f}")
    print(f"Total Bank Expenses: ${bank_expenses:,.2f}")
    print(f"Net Bank Movement: ${bank_income - bank_expenses:,.2f}")

    # Show breakdown by type
    print("\nBank Transaction Type Breakdown:")
    bank_breakdown = bank_df.groupby('type')['total'].agg(['sum', 'count'])
    print(bank_breakdown)

except Exception as e:
    print(f"Error in bank calculations: {str(e)}")

# Show Payment Status
print("\nPayment Status Summary:")
print(invoices_df.groupby('status')['total'].sum())

=== FINANCIAL METRICS ===

From Invoices:
Total Revenue (from Sales Invoices): $0.00
Total Expenses (from Bills): $18.90
Net Income: $-18.90

Accounts Receivable (Outstanding): $0.00

Invoice Status Breakdown:
                    sum  count
type   status                 
ACCPAY AUTHORISED  18.9      1

From Bank Transactions:
Total Bank Income: $0.00
Total Bank Expenses: $0.00
Net Bank Movement: $0.00

Bank Transaction Type Breakdown:
Empty DataFrame
Columns: [sum, count]
Index: []

Payment Status Summary:
status
AUTHORISED    18.9
Name: total, dtype: float64


In [20]:
import pandas as pd

excel_content = pd.read_excel("Demo_Company__Global___Balance_Sheet.xlsx")

try:
    invoices_df = read_result.cache.get_pandas_dataframe("invoices")
    bank_df = read_result.cache.get_pandas_dataframe("bank_transactions")

    # Convert dates and filter for December 2024
    invoices_df['date'] = pd.to_datetime(invoices_df['date'])
    dec_2024_mask = (invoices_df['date'] <= '2024-12-31')
    invoices_df = invoices_df[dec_2024_mask]

    # Print Excel data for comparison
    print("=== Balance Sheet from Excel ===")
    print(excel_content.head())

    # Print Xero data
    print("\n=== Xero Data as of Dec 31, 2024 ===")
    total_assets = invoices_df[invoices_df['type'] == 'ACCREC']['total'].sum()
    print(f"Total Assets: ${total_assets:,.2f}")

except Exception as e:
    print(f"Error processing data: {str(e)}")


Monthly Revenue Breakdown:
Series([], Freq: M, Name: total, dtype: float64)

Monthly Expenses Breakdown:
Series([], Freq: M, Name: total, dtype: float64)

Total Revenue (2023-2024): $0.00
Total Expenses (2023-2024): $0.00
Net Income (2023-2024): $0.00


## Creating graphs

PyAirbyte integrates with Pandas, which integrates with `matplotlib` as well as many other popular libraries. We can use this as a means of quickly creating graphs.


In [10]:
%pip install matplotlib

import matplotlib.pyplot as plt

users_df = read_result["users"].to_pandas()

plt.hist(users_df["age"], bins=10, edgecolor="black")
plt.title("Histogram of Ages")
plt.xlabel("Ages")
plt.ylabel("Frequency")
plt.show()



KeyError: 'users'

## Working in SQL

Since data is cached in a local DuckDB database, we can query the data with SQL.

We can do this in multiple ways. One way is to use the [JupySQL Extension](https://jupysql.ploomber.io/en/latest/user-guide/template.html), which we'll use below.


In [None]:
# Install JupySQL to enable SQL cell magics
%pip install --quiet jupysql
# Load JupySQL extension
%load_ext sql
# Configure max row limit (optional)
%config SqlMagic.displaylimit = 200

In [None]:
# Get the SQLAlchemy 'engine' object for the cache
engine = read_result.cache.get_sql_engine()
# Pass the engine to JupySQL
%sql engine

In [None]:
# Get table objects for the 'users' and 'purchases' streams
users_table = read_result.cache["users"].to_sql_table()
purchases_table = read_result.cache["purchases"].to_sql_table()
display([users_table.fullname, purchases_table.fullname])

In [None]:
%%sql
# Show most recent purchases by purchase date:
SELECT users.id, users.name, purchases.product_id, purchases.purchased_at
FROM {{ users_table.fullname }} AS users
JOIN {{ purchases_table.fullname }} AS purchases
ON users.id = purchases.user_id
ORDER BY purchases.purchased_at DESC
LIMIT 10

In [None]:
# Show tables for the other streams
%sqlcmd tables