In [None]:
# default_exp contacts

# contacts

> API details.

In [None]:
#hide
from nbdev.showdoc import *

In [None]:
#hide
#export
import time
import os
import json
import requests
from dotenv import load_dotenv
from am4894hubspot.search import hubspot_search
load_dotenv()

True

In [None]:
hapikey = os.environ.get('HUBSPOT_API_KEY','demo')
#hapikey = 'demo'

In [None]:
#export


def create_contact(contact=None, hapikey=None):
    """
    Create a contact.
    """
    if contact is None:
        contact = dict(email='sometestuser@example.com')
    if hapikey is None:
        hapikey = os.environ.get('HUBSPOT_API_KEY','demo')
    if not contact_exists(contact['email']):
        endpoint = 'https://api.hubapi.com/contacts/v1/contact/'
        url = f'{endpoint}?hapikey={hapikey}'
        headers = {
            "Content-Type": "application/json"
        }
        data = json.dumps({"properties": [{"property": k,"value": contact[k]} for k in contact]})
        r = requests.post(url=url, data=data, headers=headers)
        response = r.json()
        if response['properties']['email']['value'] == contact['email']:
            response['status'] = 'created'
        else:
            response['status'] = 'error'
        return response
    else:
        response = get_contact(contact['email'])
        if response['properties']['email'] == contact['email']:
            response['status'] = 'exists'
        else:
            response['status'] = 'error'
        return response

In [None]:
#export


def contact_exists(value, property='email', operator='EQ', hapikey=None) -> bool:
    """
    Check if a contact exists.
    """
    r = hubspot_search(value, property=property, operator=operator, hapikey=hapikey)
    if r.get('total',0) > 0:
        return True
    else:
        return False

In [None]:
#export


def get_contact(value, property='email', operator='EQ', hapikey=None):
    """
    Get contact.
    """
    if hapikey is None:
        hapikey = os.environ.get('HUBSPOT_API_KEY','demo')
    r = hubspot_search(value, property=property, operator=operator, hapikey=hapikey)
    if r['total'] > 0:
        contact = r['results'][0]
        return contact
    else:
        return None

In [None]:
#export 


def delete_contact(vid=None, hapikey=None):
    """
    Delete a contact.
    """
    if hapikey is None:
        hapikey = os.environ.get('HUBSPOT_API_KEY','demo')
    endpoint = f'https://api.hubapi.com/contacts/v1/contact/vid/{vid}'
    url = f'{endpoint}?hapikey={hapikey}'
    headers = {
        "Content-Type": "application/json"
    }
    r = requests.delete(url=url, headers=headers)
    return r.json()

In [None]:
#export


def create_contacts(contacts=None, hapikey=None, check_exists=True):
    """
    Create contacts from a list of contacts.
    """
    if hapikey is None:
        hapikey = os.environ.get('HUBSPOT_API_KEY','demo')
    url = "https://api.hubapi.com/contacts/v1/contact/batch"
    querystring = {"hapikey": hapikey}
    headers = {"Content-Type": "application/json"}
    payload = []
    # loop over contacts and build payload to send
    for contact in contacts:
        email = contact["email"]
        payload.append({"email": email, "properties": [{"property": k,"value": contact[k]} for k in contact]})
    payload = json.dumps(payload)
    r = requests.request("POST", url, data=payload, headers=headers, params=querystring)
    if r.status_code == 202:
        return True
    else:
        raise ValueError(f"batch contact creation failed status_code={r.status_code} reason={r.reason}")
        return False

In [None]:
# tests

test_email = 'fake_user@example.com'

# create an example minimal contact
contact_data = {
    "email": test_email
}

# create contact
result_create = create_contact(contact_data)
assert result_create['status'] in ['created', 'exists']

time.sleep(5)

# check it exists
assert contact_exists(test_email) == True

# get contact
contact = get_contact(test_email)
assert contact['properties']['email'] == test_email

# delete contact
result_delete = delete_contact(vid=contact['id'])
assert result_delete['deleted'] == True

## create batch of contacts
#test_contacts = [
#    dict(email='user1@example.com'),
#    dict(email='user2@example.com'),
#]
#assert create_contacts(test_contacts) == True
#time.sleep(5)
## now clean up and delete them
#for test_contact in test_contacts:
#    time.sleep(5)
#    test_email = test_contact['email']
#    test_contact_vid = get_contact(test_email)['id']
#    delete_contact(vid=test_contact_vid)