## Setup

Requires a HubSpot Private app with an API access token with the following scopes:

- `crm.schemas.custom.read`
- `crm.objects.custom.read`
- `crm.objects.custom.write`
- `crm.objects.companies.read`
- `crm.schemas.contacts.read`
- `crm.objects.contacts.read`
- `crm.schemas.companies.read`
- `sales-email-read`

The token should be stored in an environment variable called `HUBSPOT_ACCESS_TOKEN`.

You can copy the sample environment file to get started; run the following command from the root of this repository:

```bash
cp .env.sample .env
```

Then open `.env` and fill in with your access token.

In [None]:
import os
from pathlib import Path

from hubspot import HubSpot
from hubspot.crm.associations.v4.models import AssociationSpec, BatchInputPublicAssociationMultiPost, PublicAssociationMultiPost

import pandas as pd


ACCESS_TOKEN = os.environ["HUBSPOT_ACCESS_TOKEN"]
ASSOCIATION_TYPES = ["calls", "emails", "meetings", "notes", "tasks"]

hubspot = HubSpot(access_token=ACCESS_TOKEN)

In [None]:
def write_json_records(df: pd.DataFrame, file_path: str):
    """Helper writes the DataFrame into a JSON file."""
    df.to_json(f"data/{file_path}", orient="records", indent=2)

In [None]:
class RunMode:
    FULL = 0
    DRY = 1
    LIVE = 2


RUN_MODE = int(os.environ.get("HUBSPOT_VENDORS_RUN_MODE", RunMode.DRY))
print(f"Run mode: {RUN_MODE}")

if RUN_MODE == RunMode.FULL:
    [f.unlink() for f in Path("data").iterdir()]

## Get vendor data

In this section we request data for the objects we'll be interacting with:

- `companies` (built-in)
- `vendors` (custom)

We start by requesting the set of properties for these objects for documentation and hints for later analysis.

Then we get companies, and filter for those with property `company_type == "Vendor"`.

Finally, we get the vendor custom objects.

In [None]:
# request company properties and read into DataFrame
if RUN_MODE == RunMode.FULL:
    company_props = hubspot.crm.properties.core_api.get_all(object_type="companies", archived=False)
    company_props_df = pd.json_normalize(company_props.to_dict(), "results")
    write_json_records(company_props_df, "company_props.json")

In [None]:
# request companies data
companies = hubspot.crm.companies.get_all(properties=["company_type", "domain", "name"], associations=ASSOCIATION_TYPES)
companies = [c.to_dict() for c in companies]

In [None]:
# read companies data into DataFrame
companies_df = pd.json_normalize(companies)
companies_df["properties.domain"] = companies_df["properties.domain"].astype("category")
if RUN_MODE == RunMode.FULL:
    write_json_records(companies_df, "company_all.json")

In [None]:
# look at the unique company types defined
if RUN_MODE == RunMode.FULL:
    company_types = companies_df["properties.company_type"].unique()
    company_types.tofile("data/company_types.txt", sep=os.linesep)

In [None]:
# filter vendor companies into new DataFrame
vendor_companies_df = companies_df[companies_df["properties.company_type"] == "Vendor"]
if RUN_MODE == RunMode.FULL:
    write_json_records(vendor_companies_df, "company_vendors.json")

In [None]:
# request vendor properties and read into DataFrame
if RUN_MODE == RunMode.FULL:
    vendor_props = hubspot.crm.properties.core_api.get_all(object_type="vendors", archived=False)
    vendor_props_df = pd.json_normalize(vendor_props.to_dict(), "results")
    write_json_records(vendor_props_df, "vendor_props.json")

In [None]:
# request vendor data
vendors = hubspot.crm.objects.get_all("vendors", properties=["domain", "vendor_name"])
vendors = [v.to_dict() for v in vendors]

In [None]:
# read vendor data into DataFrame
vendors_df = pd.json_normalize(vendors)
vendors_df["properties.domain"] = vendors_df["properties.domain"].astype("category")
if RUN_MODE == RunMode.FULL:
    write_json_records(vendors_df, "vendor_all.json")

# filter any custom object vendors missing their domain property
# these were used for testing the vendor object import
vendors_df = vendors_df[~vendors_df["properties.domain"].isna()]
if RUN_MODE == RunMode.FULL:
    write_json_records(vendors_df, "vendor_with_domains.json")

## Get association definitions

In this section we get all definitions for association types on the vendor custom object.


In [None]:
vendor_association_defs = pd.DataFrame(columns=["name", "id", "type"])
for association_type in ASSOCIATION_TYPES:
    # association definitions for the vendor custom object
    # i.e. vendors --> emails
    #      vendors --> meetings
    forward = hubspot.crm.associations.schema.types_api.get_all("vendors", association_type)
    df1 = pd.json_normalize(forward.to_dict(), "results")
    df1["type"] = association_type
    df1["dir"] = "forward"
    # reverse association definitions for the vendor custom object
    # i.e. emails --> vendors
    #      meetings --> vendors
    reverse = hubspot.crm.associations.schema.types_api.get_all(association_type, "vendors")
    df2 = pd.json_normalize(reverse.to_dict(), "results")
    df2["type"] = association_type
    df2["dir"] = "reverse"
    # combine forward and reverse for this association_type
    df = pd.concat((df1, df2))
    # merge with the overall result
    vendor_association_defs = vendor_association_defs.merge(df, how="outer")

if RUN_MODE == RunMode.FULL:
    write_json_records(vendor_association_defs, "vendor_associations.json")

## Vendor matching

This section joins vendor company objects (that have activities) with the corresponding vendor custom object
using a couple strategies:

- LEFT on domain
- LEFT on name

These results are joined together to allow us to see which company objects could not be matched with a custom object
using either strategy.


In [None]:
if RUN_MODE == RunMode.FULL:
    # combine the company vendors and custom object vendors into a single DataFrame with all columns
    # using an LEFT JOIN on domain
    # keeps records that have at least company vendor details
    joined_vendors_df_domain = vendor_companies_df.merge(
        vendors_df, on="properties.domain", how="left", suffixes=("_company", "_custom")
    )

    # filter mismatched companies for those with any activities
    missing_custom_with_activity_criteria_domain = joined_vendors_df_domain["id_custom"].isna() & any(
        [joined_vendors_df_domain[f"associations.{a}.results"].count() > 0 for a in ASSOCIATION_TYPES]
    )
    missing_custom_with_activity_domain = joined_vendors_df_domain[missing_custom_with_activity_criteria_domain]
    # sort by name
    missing_custom_with_activity_domain = missing_custom_with_activity_domain.sort_values("properties.name")
    # rename joined columns
    renames_domain = {"id_company": "id", "properties.hs_object_id_company": "properties.hs_object_id"}
    renamed_missing_domain = missing_custom_with_activity_domain.rename(columns=renames_domain)
    # select just the columns for later joining
    select_domain = list(renames_domain.values()) + ["properties.company_type", "properties.domain", "properties.name"]
    missing_output_domain = renamed_missing_domain[select_domain]
    write_json_records(missing_output_domain, "company_vendor_mismatched_domain.json")

    # combine the company vendors and custom object vendors into a single DataFrame with all columns
    # using an LEFT JOIN on name
    # keeps records that have at least company vendor details
    joined_vendors_df_name = vendor_companies_df.merge(
        vendors_df, left_on="properties.name", right_on="properties.vendor_name", how="left", suffixes=("_company", "_custom")
    )

    # filter mismatched companies for those with any activities
    missing_custom_with_activity_criteria_name = joined_vendors_df_name["id_custom"].isna() & any(
        [joined_vendors_df_name[f"associations.{a}.results"].count() > 0 for a in ASSOCIATION_TYPES]
    )
    missing_custom_with_activity_name = joined_vendors_df_name[missing_custom_with_activity_criteria_name]
    # sort by name
    missing_custom_with_activity_name = missing_custom_with_activity_name.sort_values("properties.name")
    # rename joined columns
    renames_name = {
        "id_company": "id",
        "properties.hs_object_id_company": "properties.hs_object_id",
        "properties.domain_company": "properties.domain",
    }
    renamed_missing_name = missing_custom_with_activity_name.rename(columns=renames_name)
    # select just the columns for later joining
    select_name = list(renames_name.values()) + ["properties.company_type", "properties.name"]
    missing_name = renamed_missing_name[select_name]
    write_json_records(missing_name, "company_vendor_mismatched_name.json")

    # merge the two DataFrames together
    # these are all the company vendors with activities that didn't have a matching custom vendor object
    # either on domain or name
    merged_missing = missing_output_domain.merge(missing_name, how="outer", on="properties.domain")
    # sort the columns
    sorted_cols = sorted(merged_missing.columns.to_list())
    merged_missing = merged_missing.reindex(columns=sorted_cols)
    # backfill missing values from the nearest column
    # since the columns are sorted, when e.g. column_x is missing, it will be filled from column_y
    merged_missing = merged_missing.replace("", pd.NA).bfill(axis=1)
    # rename joined columns now that there is a value for each
    renames = {
        "properties.name_x": "properties.name",
        "properties.hs_object_id_x": "properties.hs_object_id",
        "id_x": "id",
        "properties.company_type_x": "properties.company_type",
    }
    merged_missing = merged_missing.rename(columns=renames)
    # select a limited list of columns for output
    select = ["properties.domain"] + list(renames.values())
    selected_output = merged_missing[select]
    write_json_records(selected_output, "vendors_merged_missing_custom_with_activity.json")
    # adds a separator row for CSV output into markdown table format
    separator_row = pd.DataFrame([map(lambda x: "-----", select)], columns=select)
    csv_output = pd.concat([separator_row, selected_output], ignore_index=True)
    # escape the pipe character, our separator for markdown table format
    csv_output.replace("|", "\|", inplace=True)
    # write to CSV into markdown table format
    csv_output.to_csv("data/vendors_merged_missing_custom_with_activity.csv", index=False, sep="|")

## Setup vendor associations

In this section we define association objects to create, between the vendor custom object and each type in `ASSOCIATION_TYPES`.

Vendor company objects serve as the source for existing assocations, and we merge these with the `id` of the related vendor
custom object.


In [None]:
# rename joined columns
renames = {"id_custom": "id", "properties.hs_object_id_custom": "properties.hs_object_id"}
# join the vendors company objects with vendor custom objects records
# using an OUTER JOIN on domain
joined_vendors_df = vendor_companies_df.merge(
    vendors_df, on="properties.domain", how="outer", suffixes=("_company", "_custom")
).rename(columns=renames)

In [None]:
def create_vendor_associations(vendor_company_record: pd.Series, association_type: str):
    """
    Create a list of association objects, from the given vendor company's associations,
    of the given type, for both directions.
    """
    association_results = vendor_company_record[f"associations.{association_type}.results"]
    associations = association_results if isinstance(association_results, list) else None
    vendor_id = vendor_company_record["id"]

    if associations and vendor_id:
        forward = [
            {"from": {"id": vendor_id}, "to": {"id": a["id"]}, "dir": "forward", "type": association_type}
            for a in associations
        ]
        reverse = [
            {"from": {"id": a["id"]}, "to": {"id": vendor_id}, "dir": "reverse", "type": association_type}
            for a in associations
        ]
        return forward + reverse
    else:
        return []

In [None]:
vendor_associations_df = pd.DataFrame(columns=["from.id", "to.id", "dir", "type"])

for association_type in ASSOCIATION_TYPES:
    applied_associations = (
        joined_vendors_df
            # apply the function to each row for this association type
            # each apply returns a list, which are all combined into a series
            .apply(create_vendor_associations, axis=1, association_type=association_type)
            # drop duplicates (e.g. rows with empty lists)
            .drop_duplicates()
            # each row is a list itself, so explode into one giant list
            .explode()
            # finally, drop NA values
            .dropna()
    )
    if len(applied_associations) > 0:
        # now each row is a dict
        # noramlize_json turns this into a DataFrame with columns from the dict keys
        applied_associations_df = pd.json_normalize(applied_associations.to_list())
        # merge with the complete set
        vendor_associations_df = vendor_associations_df.merge(applied_associations_df, how="outer")

# now merge all created assocations with the definitions
# to get the definition name and id
# drop NA values for the outer join when no activities of a given type are present
vendor_associations_df = vendor_associations_df.merge(vendor_association_defs, how="outer", on=["dir", "type"]).dropna()

if RUN_MODE < RunMode.LIVE:
    write_json_records(vendor_associations_df, "associations_create.json")

## Create vendor associations

Now all the data preparation is done, we make the API calls to create the associations.

The API calls are done in batches according to the direction (forward or reverse) and type
of association being created.


In [None]:
# grouping the associations by direction, type, and ID
# each group will be sent in a batch request
vendor_association_groups = vendor_associations_df.groupby(by=["dir", "type", "id"])

for group in vendor_association_groups.groups:
    # group here is a tuple of the grouping column values
    group_dir, group_type, group_id = group
    # required param for each association, identifies the association type being created
    types = [AssociationSpec(association_category="USER_DEFINED", association_type_id=group_id)]
    # get the actual data for this group as a DataFrame
    group_df = vendor_association_groups.get_group(group)
    # convert to the required inputs format: list[PublicAssociationMultiPost]
    inputs = group_df.apply(lambda d: PublicAssociationMultiPost(types, d["from.id"], d["to.id"]), axis=1, result_type="reduce").to_list()

    # now create the batch request for these inputs
    batch = BatchInputPublicAssociationMultiPost(inputs=inputs)
    # decide on the from and to object
    from_object = "vendors" if group_dir == "forward" else group_type
    to_object = group_type if group_dir == "forward" else "vendors"

    if RUN_MODE == RunMode.LIVE:
        # call the API to create associations for the batch
        hubspot.crm.associations.v4.batch_api.create(from_object, to_object, batch)
    elif RUN_MODE == RunMode.DRY:
        # call the API to create a single association from the batch
        batch.inputs = batch.inputs[0:1]
        print(batch.inputs)
        response = hubspot.crm.associations.v4.batch_api.create(from_object, to_object, batch)
        print(response)
    else:
        print(f"{from_object} --> {to_object}: {len(batch.inputs)}")

print("total associations:", vendor_associations_df.shape[0])