# Initialization an syncronization with Nimble API
*By Dmytro Mendzhul, Aug 2023.*

This is a Jupiter notebook for testing approaches when solving test task from Nimble.<br> 
It contains only database initaalization and syncronization approach.

### Start PostgreSQL [OPTIONAL]
Skip this step if PostgreSQL is already running on port 5432

In [None]:
%%bash
. ~/.bashrc
docker-compose -f ../deploy/docker-compose-db.yml --project-directory . up --build

## Create `contact` table
### Define helper functions:

In [3]:
import psycopg # Alternatively, SQLAlchemy can be used, but the task has condition not to use ORM
from yarl import URL

def run_sql_command(sql: str, db_url: URL) -> None:
    """Runs a PSQL command.

    Args:
        sql (str): The SQL command to run.
        db_url (URL): database URL.

    Returns:
        None.
    """

    with psycopg.connect(conninfo=str(db_url)) as conn, \
        conn.cursor() as cursor:
        try:
            cursor.execute(sql)
            conn.commit()
        finally:
            conn.close()

### [SKIP] (for convenience) drop `contact` table:

In [4]:
from nimblesync.settings import settings

# Remove 'contact' table
try:
    run_sql_command('DROP TABLE IF EXISTS contact', settings.db_url)
except Exception as ex:
    print(ex)

### Create `contact` table:

Some columns explanation:
- `id` - autoincremented
- `external_id` - for Nimble API resource identifiers, indexed
- `removed` - value '`t`' means external contact is no longer exist on API, default `f`

In [5]:
from nimblesync.settings import settings

create_contact_table_sql = """
CREATE TABLE IF NOT EXISTS contact (
    id SERIAL PRIMARY KEY,
    external_id VARCHAR ( 50 ) UNIQUE,
    first_name VARCHAR ( 255 ),
    last_name VARCHAR ( 255 ),
    email VARCHAR ( 255 ),
    removed BOOLEAN DEFAULT FALSE NOT NULL,
    textsearchable_index_col tsvector
        GENERATED ALWAYS AS (to_tsvector('english',
            coalesce(first_name, '') || ' ' || 
            coalesce(last_name, '') || ' ' || 
            coalesce(email, '')
        )) STORED
);
"""

# Run
run_sql_command(create_contact_table_sql, settings.db_url)

## Import initial data from CSV
### Constants:

In [6]:
SEED_DATA_FILE_PATH = '../seed/Nimble Contacts - Sheet1.csv'

### [OPTIONAL] Observe initial data

In [7]:
import pandas as pd
df_initial = pd.read_csv(SEED_DATA_FILE_PATH)
print(df_initial.head())
print(f'...\nTOTAL: {len(df_initial)}')
del(df_initial)

  first name      last name                               Email
0       Oleg         Mishyn               mystylename@gmail.com
1        Ken  Underwood III  kenneth.underwood@yahoofinance.com
2      kitty          akbar                      asd1@gmail.com
3      Craig       Jamieson           craig@salesresultsllc.com
4    Francis          Hoang                    fqjunk@gmail.com
...
TOTAL: 10


### Seed initial data to database:

Note that seed CSV file can contain large amount of data.
Therefore, to perform import efficiently, PostgreSQL 'COPY' command is used.

In [8]:
import psycopg
from nimblesync.settings import settings

def seed_contacts(csv_path: str, check_if_empty: bool = True, verbose: bool = False) -> None:

    with open(csv_path, "r") as file, \
        psycopg.connect(conninfo=str(settings.db_url)) as conn, \
        conn.cursor() as cursor:
        try:            
            run_import = True

            if check_if_empty:
                cursor.execute('SELECT COUNT(*) FROM contact')
                count = cursor.fetchone()[0]
                if count > 0:
                    run_import = False
                    if verbose:
                        print(f'Found {count} records in \'contact\' table. Skipping seed.')
            
            if run_import:
                with cursor.copy("""
                            COPY contact(
                                first_name,
                                last_name,
                                email)
                            FROM stdin (format csv, delimiter ',', quote '\"')""") as copy:
                    copy.write(file.read())

                conn.commit()
                if verbose:
                    print(f'Successfully imported seed data into \'contact\' table.')
        finally:
            conn.close()


seed_contacts(SEED_DATA_FILE_PATH, verbose=True)

Successfully imported seed data into 'contact' table.


## Load data from Nimble API
### Constants:

In [9]:
from nimblesync.settings import settings

NIMBLE_CONTACTS_IDS_ENDPOINT = settings.nimble_contacts_ids_endpoint
NIMBLE_CONTACTS_ENDPOINT = settings.nimble_contacts_endpoint
NIMBLE_TOKEN = settings.nimble_token
NIMBLE_HEADERS = { 'Authorization': 'Bearer ' + NIMBLE_TOKEN }
SUCCESS_STATUS_CODE = 200
BATCH_SIZE = 10 # can be increased, but for test purpose let it be small
DEFAULT_BATCH_SIZE = 30

### Define functions:

In [10]:
from typing import Iterator
import requests

def get_params(page: int, fields: [str] = None) -> dict[str, any]:
    """Generate parameters for a query.

    Args:
        page (int): The page number.
        fields ([str]): Fields to return.

    Returns:
        dict[str, any]:A dictionary of parameters.
    """
    
    params = {'page': page}
    
    if BATCH_SIZE != DEFAULT_BATCH_SIZE:
        params['per_page'] = BATCH_SIZE

    if (fields is not None):
        params['fields'] = ','.join(fields)

    return params


def get_field(resource: dict[str, any], field: str) -> str:
    """Gets the field value of contact resource

    Args:
        resource (dict[str, any]): The contact from Nimble API.
        field (str): The field name.

    Returns:
        str: The value of the field.
    """
    return next(iter(resource['fields'].get(field, [])), {}).get('value')

# [OPTIONAL]
def load_all_contact_ids(verbose: bool = False) -> set[str]:
    """Loads all contact IDs from the Nimble API.

    Args:
        verbose (bool, optional): Whether to print status messages. Defaults to False.

    Returns:
        set[str]: A set of contact IDs
    """
    
    page = 0
    contact_ids = set()

    while True:
        page += 1
    
        params = get_params(page)
        response_API = requests.get(NIMBLE_CONTACTS_IDS_ENDPOINT, headers=NIMBLE_HEADERS, params=params)
        
        if verbose:
            print(f'GET ids (page={page}): status={response_API.status_code}')

        if (response_API.status_code != SUCCESS_STATUS_CODE):
            break

        page_json = response_API.json()
    
        page_contact_ids = page_json['resources']
        contact_ids.update(page_contact_ids)

        pages = page_json['meta']['pages']
    
        if page >= pages:
            break
    
    if verbose:
        print(f'loaded {len(contact_ids)} unique IDs.')

    return contact_ids

# [OPTIONAL]
def load_all_contacts(verbose: bool = False) -> dict[str, any]:
    """Loads all contacts from the Nimble API.

    Args:
        verbose (bool, optional): Whether to print status messages. Defaults to False.

    Returns:
        dict[str, any]: A dictioraty of contacts.
    """
    
    page = 0
    contacts = {}

    while True:
        page += 1
    
        params = get_params(page, ['first name','last name','email'])
        response_API = requests.get(NIMBLE_CONTACTS_ENDPOINT, headers=NIMBLE_HEADERS, params=params)

        if verbose:
            print(f'GET contacts (page={page}): status={response_API.status_code}')

        if (response_API.status_code != SUCCESS_STATUS_CODE):
            break

        page_json = response_API.json()
        
        contacts.update({x['id']: {
            'external_id':x['id'],
            'first_name':get_field(x, 'first name'),
            'last_name':get_field(x, 'last name'),
            'email':get_field(x, 'email')
        } for x in page_json['resources']})

        pages = page_json['meta']['pages']
    
        if page >= pages:
            break
    
    if verbose:
        print(f'loaded {len(contacts)} contacts.')
    
    return contacts


def load_paginated_contacts(verbose: bool = False) -> Iterator[list[tuple[str, ...]]]:
    """Loads all contacts from the Nimble API by pages.

    Args:
        verbose (bool, optional): Whether to print status messages. Defaults to False.

    Returns:
        dict[str, any]: A dictioraty of contacts.
    """
    
    page = 0
    total = 0

    while True:
        page += 1
    
        params = get_params(page, ['first name','last name','email'])
        response_API = requests.get(NIMBLE_CONTACTS_ENDPOINT, headers=NIMBLE_HEADERS, params=params)

        if verbose:
            print(f'GET contacts (page={page}): status={response_API.status_code}')

        if (response_API.status_code != SUCCESS_STATUS_CODE):
            break

        page_json = response_API.json()
        
        chunk = [(
            str(x['id']),
            get_field(x, 'first name'),
            get_field(x, 'last name'),
            get_field(x, 'email')
        ) for x in page_json['resources']]

        yield chunk

        total += len(chunk)
        pages = page_json['meta']['pages']
    
        if page >= pages:
            break
    
    if verbose:
        print(f'loaded {len(total)} contacts')



### [OPTIONAL] Load all contact IDs from Nimble API:

In [10]:
nimble_contact_ids = load_all_contact_ids(verbose=True)
print('Example:', nimble_contact_ids.pop())

GET ids (page=1): status=200
GET ids (page=2): status=200
GET ids (page=3): status=200
GET ids (page=4): status=200
loaded 34 unique IDs.
Example: 64ca0fa3d1d39db980b9d418


### [OPTIONAL] Load all contacts from Nimble API:

In [328]:
nimble_contacts = load_all_contacts(verbose=True)
print("Example:", list(nimble_contacts.values())[-1])

GET contacts (page=1): status=200
GET contacts (page=2): status=200
GET contacts (page=3): status=200
GET contacts (page=4): status=200
loaded 34 contacts.
Example: {'external_id': '64ca0fa7d1d39db980b9d458', 'first_name': 'Randy', 'last_name': 'Smith', 'email': 'smith.r@samsung.com'}


### [OPTIONAL] Read external contacts stored in db:

In [13]:
import psycopg
from yarl import URL
from nimblesync.settings import settings

def get_external_contacts_from_db(db_url: URL) -> dict[str, dict[str, any]]:
    """ Read all external contacts (with external_id) from database

    Args:
        db_url (URL): database URL.

    Returns:
        dict[str, dict[str, any]]: Dictionary of contacts keyed by external_id
    """

    query_sql = '''
    SELECT id, external_id, first_name, last_name, email, removed
    FROM contact
    WHERE external_id IS NOT NULL;
    '''
    contacts = {}

    with psycopg.connect(conninfo=str(db_url)) as conn:
        with conn.cursor() as cursor:
            cursor.execute(query_sql)
            entities = cursor.fetchall()
            contacts = {str(x[1]): {
                'id':x[0],
                'external_id':x[1],
                'first name':x[2],
                'last name':x[3],
                'email':x[4],
                'removed':x[5]
            } for x in entities}
            
    return contacts

# Run
current_contacts = get_external_contacts_from_db(settings.db_url)

print("Total:", len(current_contacts))
print("Example:", list(current_contacts.values())[-1] if current_contacts else None)

Total: 34
Example: {'id': 45, 'external_id': '64ca0fa7d1d39db980b9d458', 'first name': 'Randy', 'last name': 'Smith', 'email': 'smith.r@samsung.com', 'removed': False}


## Update database from Nimble API
Now let's copy data to temp table and perform UPSERT and 'REMOVE' (with status) operatons

Note that using COPY (bulk insert) is fastest way to populate postgres table.<br> 
This is psycopg3 implementation, more details here: https://www.psycopg.org/psycopg3/docs/basic/copy.html<br> 
In psycopg2 it would be cursor.copy_from with buffer: https://hakibenita.com/fast-load-data-python-postgresql<br> 
Another performance benchmarking here:<br> 
https://github.com/NaysanSaran/pandas2postgresql/blob/master/notebooks/Psycopg2_Bulk_Insert_Speed_Benchmark.ipynb

In [12]:
import psycopg
from nimblesync.settings import settings

# Load contacts from Nimble API as lists of tuples, page by page:
# (this will run slower, due to API response)
chunks = load_paginated_contacts()
# To use pre-loaded data from bloks above, use instead:
# chunks = [[tuple(x.values()) for x in nimble_contacts.values()]]

def update_contacts_with_external_data(chunks: Iterator[list[tuple[str, ...]]]) -> None:
    """Loads contacts page-by-page to temp table, then upserts into 'contact' table and marks missing records as removed

    Args:
        chunks (Iterator[list[tuple[str, ...]]]): Paginated contact records from Nimble API
    """

    with psycopg.connect(conninfo=str(settings.db_url)) as conn, \
        conn.cursor() as cursor:
        try:
            # Create temp table
            cursor.execute("""
                CREATE TEMP TABLE tmp_contact(
                    external_id VARCHAR ( 50 ),
                    first_name VARCHAR ( 255 ),
                    last_name VARCHAR ( 255 ),
                    email VARCHAR ( 255 ))
                    ON COMMIT DROP;""")
            
            # Bulk copy to temp table
            for chunk in chunks:
                with cursor.copy("""
                    COPY tmp_contact(
                        external_id,
                        first_name,
                        last_name,
                        email)
                    FROM stdin;""") as copy:
                    for row in chunk:
                        copy.write_row(row)
        
        # Add new records and update existing ones if there are changes
        # Also, sets removed=FALSE if a previously deleted record reappears
            cursor.execute("""
            INSERT INTO contact (
                external_id,
                first_name,
                last_name,
                email)
            SELECT
                external_id,
                first_name,
                last_name,
                email
            FROM tmp_contact
            ON CONFLICT (external_id)  DO UPDATE  SET
                first_name=EXCLUDED.first_name,
                last_name=EXCLUDED.last_name,
                email=EXCLUDED.email,
                removed=FALSE
            WHERE
                contact.first_name != EXCLUDED.first_name OR
                contact.last_name != EXCLUDED.last_name OR
                contact.email != EXCLUDED.email OR
                contact.removed = TRUE;""")
        
        # Mark as removed contacts that are no longer returned by the API
        # (additional index can improve this update, as shown in Appendix 1)
            cursor.execute("""
            CREATE UNIQUE INDEX tmp_contact_external_id_key ON tmp_contact (external_id);
            UPDATE contact c
            SET removed = TRUE
            WHERE
                c.external_id IS NOT NULL AND
                NOT EXISTS (
                    SELECT FROM tmp_contact t
                    WHERE t.external_id = c.external_id
                );""")

            conn.commit()
        except (Exception, psycopg.DatabaseError) as error:
            print("Error: %s" % error)
            conn.rollback()
            cursor.close()
        finally:
            conn.close()

update_contacts_with_external_data(chunks)
print('Updated successfully!')

Updated successfully!


## Search for contact

In [14]:
import psycopg
from yarl import URL
from nimblesync.settings import settings

def search_for_contacts(db_url: URL, search: str, limit: int = 10, offset: int = 0) -> list[dict[str, any]]:
    """ Read all external contacts (with external_id) from database

    Args:
        db_url (URL): database URL.
        search (str): Search text.

    Returns:
        dict[str, dict[str, any]]: Dictionary of contacts keyed by external_id
    """

    query_sql = '''
    SELECT id, external_id, first_name, last_name, email, removed
    FROM contact
    WHERE textsearchable_index_col @@ to_tsquery(%s)
    LIMIT %s OFFSET %s;'''
    
    res = []

    with psycopg.connect(conninfo=str(db_url)) as conn:
        with conn.cursor() as cursor:
            cursor.execute(query_sql, (search, limit, offset))
            entities = cursor.fetchall()
            res = [{
                'id':x[0],
                'external_id':x[1],
                'first name':x[2],
                'last name':x[3],
                'email':x[4],
                'removed':x[5]
            } for x in entities]
            
    return res

# Run
found_contacts = search_for_contacts(settings.db_url, 'Ken | Smith')

for record in found_contacts:
    print(record)

{'id': 3, 'external_id': None, 'first name': 'Ken', 'last name': 'Underwood III', 'email': 'kenneth.underwood@yahoofinance.com', 'removed': False}
{'id': 9, 'external_id': None, 'first name': 'Ken', 'last name': 'Gray', 'email': 'kgray@cc-techgroup.com', 'removed': False}
{'id': 45, 'external_id': '64ca0fa7d1d39db980b9d458', 'first name': 'Randy', 'last name': 'Smith', 'email': 'smith.r@samsung.com', 'removed': False}


### Appendix 1
Query plan comparison with and without additional index on temp table:


      nimblesync=# CREATE TABLE tmp2 AS TABLE contact;
      SELECT 45
      nimblesync=# EXPLAIN UPDATE contact c
                  SET removed = TRUE
                  WHERE
                  c.external_id IS NOT NULL AND
                  NOT EXISTS (
                        SELECT FROM tmp2 t
                        WHERE t.external_id = c.external_id
                  );
                                    QUERY PLAN                                 
      ----------------------------------------------------------------------------
      Update on contact c  (cost=10.90..21.45 rows=1 width=1683)
      ->  Hash Anti Join  (cost=10.90..21.45 rows=1 width=1683)
            Hash Cond: ((c.external_id)::text = (t.external_id)::text)
            ->  Seq Scan on contact c  (cost=0.00..10.40 rows=40 width=1676)
                  Filter: (external_id IS NOT NULL)
            ->  Hash  (cost=10.40..10.40 rows=40 width=124)
                  ->  Seq Scan on tmp2 t  (cost=0.00..10.40 rows=40 width=124)
      (7 rows)

      nimblesync=# CREATE UNIQUE INDEX tmp2_external_id_key ON tmp2 (external_id);
      CREATE INDEX
      nimblesync=# EXPLAIN UPDATE contact c
                  SET removed = TRUE
                  WHERE
                  c.external_id IS NOT NULL AND
                  NOT EXISTS (
                        SELECT FROM tmp2 t
                        WHERE t.external_id = c.external_id
                  );
                                    QUERY PLAN                                 
      ---------------------------------------------------------------------------
      Update on contact c  (cost=2.01..12.56 rows=1 width=1683)
      ->  Hash Anti Join  (cost=2.01..12.56 rows=1 width=1683)
            Hash Cond: ((c.external_id)::text = (t.external_id)::text)
            ->  Seq Scan on contact c  (cost=0.00..10.40 rows=40 width=1676)
                  Filter: (external_id IS NOT NULL)
            ->  Hash  (cost=1.45..1.45 rows=45 width=124)
                  ->  Seq Scan on tmp2 t  (cost=0.00..1.45 rows=45 width=124)
      (7 rows)

### Convert this notebook to HTML:

In [15]:
%%bash
. ~/.bashrc
jupyter nbconvert --to html sync.ipynb

[NbConvertApp] Converting notebook sync.ipynb to html
[NbConvertApp] Writing 342759 bytes to sync.html
