# Azure Postgres Apache AGE feature for Graph workloads

Author: Srikanth Sridhar, Microsoft, Azure Databases Global Black Belt (GBB)

#### This notebook uses a Python virtual environment.

Please refer to this link to create python environments in VS Code, [Python environments in VS Code](https://code.visualstudio.com/docs/python/environments)

•	Refer to the requirements.txt file for dependencies.

•	The AgeFreighter library is used to create Graph nodes and edges in Postgres.

•	Ensure proper permissions to write input files, and create a .env file for the Postgres connection string.

#### This section uses Python's Faker library to create sample customers and transactions.

•	This code section provides option to save the customer and transaction information either as CSV or Parquet file.

•	If you choose the Parquet file format, it is necessary to define the schema since the AgeFreighter library requires it.

•	If you need more customers and unique transactions, adjust the number of customers and transaction loop range accordingly.

In [None]:
from faker import Faker
import random
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import os
from datetime import datetime

fake = Faker()

# Uncomment / comment the below variable definition, based on the required input file format CSV / Parquet.
# save_as_file = 'customer_transactions.csv'
save_as_file = 'customer_transactions.parquet'

def generate_sort_code():
    return f"80-10-{random.randint(10, 99)}"

def generate_account_number():
    return fake.numerify('00%%###%')

def generate_transaction(customer, existing_accounts):
    to_account = random.choice(existing_accounts) if existing_accounts and random.random() < 0.5 else generate_account_number()
    return {
        'name_x': customer['name'],
        'customerID_x': customer['customerID'],
        'address_x': customer['address'],
        'email_x': customer['email'],
        'phoneNumber_x': customer['phoneNumber'],
        'account_x': customer['account'],
        'sortCode_x': customer['sortCode'],
        'fromAccount': customer['account'],
        'amount': round(random.uniform(10.0, 10000.0), 2),
        'currency': 'USD',
        'transactionID': fake.uuid4(),
        'toAccount': to_account,
        'dateTime': fake.date_time_this_year().strftime("%Y-%m-%d %H:%M:%S"),
        'name_y': fake.name(),
        'customerID_y': fake.uuid4(),
        'address_y': fake.address().replace("\n", ", "),
        'email_y': fake.email(),
        'phoneNumber_y': fake.phone_number(),
        'account_y': to_account,
        'sortCode_y': generate_sort_code()
    }

def format_elapsed_time(elapsed_seconds):
    hours, remainder = divmod(elapsed_seconds, 3600)
    minutes, seconds = divmod(remainder, 60)
    return f"{int(hours):02}:{int(minutes):02}:{seconds:05.2f}"

start_time = datetime.now()

# Generate a list of customers, adjust num_customers accordingly to your needs
num_customers = 20
customers = [{
    'name': fake.name(),
    'customerID': fake.uuid4(),
    'address': fake.address().replace("\n", ", "),
    'email': fake.email(),
    'phoneNumber': fake.phone_number(),
    'account': generate_account_number(),
    'sortCode': generate_sort_code()
} for _ in range(num_customers)]

# Generate transactions for each customer, adjust range(n) according to your needs
# This code is intended to create 5 unique transactions per customer 
transactions = []
existing_accounts = [customer['account'] for customer in customers]
for customer in customers:
    for _ in range(5):
        transaction = generate_transaction(customer, existing_accounts)
        transactions.append(transaction)
        if transaction['toAccount'] not in existing_accounts:
            existing_accounts.append(transaction['toAccount'])

# Convert to DataFrame
df = pd.DataFrame(transactions)

# Ensure the directory exists if not create it
file_path = "./dataset/" + save_as_file
os.makedirs(os.path.dirname(file_path), exist_ok=True)

# Save to CSV or Parquet
if save_as_file.endswith('.csv'):
    df.to_csv(file_path, index=False)
else:
    # Define schema for Parquet file which is mandatory
    schema = pa.schema([
        ('name_x', pa.string()),
        ('customerID_x', pa.string()),
        ('address_x', pa.string()),
        ('email_x', pa.string()),
        ('phoneNumber_x', pa.string()),
        ('account_x', pa.string()),
        ('sortCode_x', pa.string()),
        ('fromAccount', pa.string()),
        ('amount', pa.float64()),
        ('currency', pa.string()),
        ('transactionID', pa.string()),
        ('toAccount', pa.string()),
        ('dateTime', pa.string()),
        ('name_y', pa.string()),
        ('customerID_y', pa.string()),
        ('address_y', pa.string()),
        ('email_y', pa.string()),
        ('phoneNumber_y', pa.string()),
        ('account_y', pa.string()),
        ('sortCode_y', pa.string())
    ])

    # Convert DataFrame to Arrow Table
    table = pa.Table.from_pandas(df, schema=schema)

    # Save to Parquet file
    pq.write_table(table, file_path)

end_time = datetime.now()
print(f"Generated customer transactions and saved to {file_path}")

elapsed_time = end_time - start_time
elapsed_time_str = format_elapsed_time(elapsed_time.total_seconds())
print(f"Total time taken to generate the input file HH:MM:SS: {elapsed_time_str}")

#### This section uses AgeFreighter, an open-source Python library, to create Apache Age Catalog schema for nodes and edges.

•	Please refer to this link for more information on AgeFreighter library, [AGEFreighter](https://pypi.org/project/agefreighter/)

•	Create a config folder and an environment file with Postgres connection string. Assign the env file name to the "env_name" variable.

•	Sample Postgres connection string, PG_CONNECTION_STRING="host=your-postgres-hostname port=5432 dbname=postgres user=pgadmin password=your-postgres-db-password"

In [None]:
from agefreighter import Factory
from dotenv import dotenv_values
import time

env_name = "./config/postgres_config.env" # using postgres_config.env template, change to your own .env file name
config = dotenv_values(env_name)

connection_string = config['PG_CONNECTION_STRING']
# print(connection_string)
graph_name = "customer_transactions_fraud_csv" if save_as_file.endswith('.csv') else "customer_transactions_fraud_parq"

# Create the AgeFreighter instance accordingly for CSV or Parquet file input
class_name = "CSVFreighter" if save_as_file.endswith('.csv') else "ParquetFreighter"
instance = Factory.create_instance(class_name)

await instance.connect(
        dsn=connection_string,
        max_connections=64,
        min_connections=4
    )

start_time = datetime.now()

# Use the respective AgeFreighter instance for CSV or Parquet file input
if save_as_file.endswith('.csv'):
    await instance.load(
        csv_path=file_path,
        graph_name=graph_name,
        start_v_label="name_x",
        start_id="customerID_x",
        start_props=["name_x","address_x","email_x","phoneNumber_x","account_x","sortCode_x","fromAccount","amount","currency","transactionID","toAccount","dateTime"],
        edge_type="TRANSACTIONS",
        end_v_label="name_y",
        end_id="customerID_y",
        end_props=["name_y","address_y", "email_y", "phoneNumber_y", "account_y", "sortCode_y"],
        drop_graph=True,    
        use_copy=True,        
        create_graph=True,
    )
else:
    await instance.load(
            parquet_path=file_path,
            graph_name=graph_name,
            start_v_label="name_x",
            start_id="customerID_x",
            start_props=["name_x","address_x","email_x","phoneNumber_x","account_x","sortCode_x","fromAccount","amount","currency","transactionID","toAccount","dateTime"],
            edge_type="TRANSACTIONS",
            end_v_label="name_y",
            end_id="customerID_y",
            end_props=["name_y","address_y", "email_y", "phoneNumber_y", "account_y", "sortCode_y"],
            drop_graph=True,
            use_copy=True,
            create_graph=True
        )

end_time = datetime.now()
elapsed_time = end_time - start_time
elapsed_time_str = format_elapsed_time(elapsed_time.total_seconds())
print(f"Total time taken to create AGE Graph from input file {save_as_file} with nodes and edges, HH:MM:SS: {elapsed_time_str}")

#### This section provides sample queries using Apache Age Catalog for finding inbound and outbound edges.

•	After running the cells to create the input file and ingest data into Postgres, log in to your Postgres server, pgAdmin, or any other tool of your choice.

•	Ensure that the respective schema with the provided graph_name has been created.

•	Open the query tool and execute the following queries:

```python
SET search_path = ag_catalog, "$user", public;

-- Inbound edges query
SELECT * FROM cypher('your_graph_name', $$
MATCH in_edge = (a)<-[ie]-(c)
RETURN a.name_y, label(a), count(in_edge)
$$) AS (id agtype, label agtype, in_edges agtype);

-- Outbound edges query
SELECT * FROM cypher('your_graph_name', $$
MATCH out_edge = (a)-[oe]->(b)                         
RETURN a.name_x, label(a), count(out_edge)                
$$) AS (id agtype, label agtype, out_edges agtype);

```