In [1]:
import pandas as pd
import requests
import json
from datetime import datetime
import urllib
from sqlalchemy import create_engine, types
from sqlalchemy.pool import NullPool
from hubspot.crm.products import ApiException as ProductsApiException
from hubspot.auth.oauth import ApiException
from validate_email import validate_email
import hubspot
import os
from dotenv import load_dotenv

load_dotenv()
DATABASE_CONNECTION_URI = os.environ["NEW_DB_URL"]
ACCESS_TOKEN = os.environ["ACCESS_TOKEN"]
client_id = os.environ["CLIENT_ID"]
client_secret = os.environ["CLIENT_SECRET"]

# create a connection to the database
engine = create_engine(DATABASE_CONNECTION_URI)

In [2]:
class HubspotAPI:
    def __init__(self):
        # API KEY
        self.access_token = ACCESS_TOKEN
        self.client = hubspot.Client.create(access_token = self.access_token)
        self.max_results = 1000000

    def raw_export_products(self, properties_dict):
        # Assistant Variables
        results = []
        after = 0

        while str(after).isnumeric() and len(results) < self.max_results:
            try:
                api_response = self.client.crm.products.basic_api.get_page(
                    limit=100,
                    after=after,
                    properties=list(properties_dict.keys()),
                    associations=[],
                    archived=False,
                )
                api_response = api_response.to_dict()
                results.extend(api_response['results'])
                print("Hubspot Products Export has gathered " + str(len(results)) + " Products")
                try:
                    after = api_response['paging']['next']['after']
                except:
                    after = api_response['paging']
            except ProductsApiException as e:
                print("Exception when calling basic_api->get_page: %s\n" % e)
        property_results = []
        for result in results:
            property_results.append(result["properties"])
        df = pd.DataFrame(property_results)
        df.to_csv("raw_hubspot_products.csv", encoding="latin-1", index=False, errors='ignore')

    def handle_raw_hubspot(self, csv_file, general_values, properties_dict, values_dict, date_columns, dtype={}):
        # Csv into Dataframe
        df = pd.read_csv(csv_file + ".csv", encoding="latin-1")
        # Rename Columns
        df.rename(columns=properties_dict, inplace=True)
        # Date Standarization
        date_columns = [char.lower() for char in date_columns]
        date_columns = ["_".join(char.split(" ")) for char in date_columns]
        for date_column in date_columns:
            df[date_column] = pd.to_datetime(df[df[date_column].notna()][date_column], errors='ignore')
            df[date_column] = df[date_column].dt.strftime('%Y-%m-%d %H:%M:%S')
        # Adjust Datatypes
        for col in list(df.columns):
            if col not in date_columns:
                df[col] = df[col].astype(dtype[col], errors='ignore')
        # Rename Values
        values_dict.update(general_values)
        df = df.map(lambda x: str(x) if pd.notnull(x) else '')
        df.replace(values_dict, inplace=True)
        # Export
        df.to_csv(csv_file[4:] + ".csv", encoding="latin-1", index=False, errors='ignore')

    


In [3]:
class SqlAPI:

    def __init__(self):
        self.conn = create_engine(DATABASE_CONNECTION_URI)

    def str_type_into_sqltype(self, datatype_dict):
        for i in datatype_dict:
            if datatype_dict[i] == "string":
                datatype_dict[i] = types.TEXT()
            elif datatype_dict[i] == "int64":
                datatype_dict[i] = types.NUMERIC()
            elif datatype_dict[i] == "float64":
                datatype_dict[i] = types.FLOAT()
            elif datatype_dict[i] == "datetime64":
                datatype_dict[i] = types.TIMESTAMP()
            elif datatype_dict[i] == "bool":
                datatype_dict[i] = types.BOOLEAN()
        return datatype_dict

    def insert_df(self, dataframe, table, dtype, index=False, if_exists="replace"):
        dataframe.to_sql(
            name=table,
            con=self.conn,
            index=index,
            if_exists=if_exists,
            method='multi',
            dtype=dtype,
            chunksize=10000
        )

    def update_table(self, table, dtype):

        print("Update Started " + table)
        csv_name = table + ".csv"
        df = pd.read_csv(csv_name, encoding='latin-1')
        columns = [column.lower() for column in df.columns]
        columns = ["_".join(column.split(" ")) for column in columns]
        df.columns = columns
        dtype = self.str_type_into_sqltype(dtype)
        self.insert_df(df, table, dtype)
        print("Update Finished " + table)

In [4]:
def dict_into_simple_dict(complex_dict, simple_value):
    headers = list(complex_dict.keys())
    simple_values_list = []
    for k in complex_dict:
        simple_values_list.append(complex_dict[k][simple_value])
    return dict(zip(headers, simple_values_list))


def list_of_datetimes(complex_dict):
    lst = []
    simple_dict = dict_into_simple_dict(complex_dict, "datatype")
    for item in simple_dict:
        if simple_dict[item] == "datetime64":
            lst.append(item)
    return lst

def update_database():
    # DB Schema Handling
    api_schema = json.load(open("api_schema_copy.json"))["API"]
    hubspot_schema = api_schema["Hubspot"]
   
    # Hubspot Schema
    h_products_properties = dict_into_simple_dict(hubspot_schema["Products"]["Properties"], "header_name")
    h_products_values = hubspot_schema["Products"]["Values"]
    h_products_datecolumns = list_of_datetimes(hubspot_schema["Products"]["Properties"])
    h_products_datecolumns = [h_products_properties[i] for i in h_products_datecolumns]
    h_products_datatypes = dict(zip(h_products_properties.values(),
                                    dict_into_simple_dict(hubspot_schema["Products"]["Properties"],
                                                          "datatype").values()))
   
    h_generalvalues = hubspot_schema["General Values"]
    
    # Class Variables Declaration
    s = SqlAPI()
    h = HubspotAPI()

    # Hubspot Export
    h.raw_export_products(h_products_properties)
 
    # Data Handle
    h.handle_raw_hubspot("raw_hubspot_products", h_generalvalues, h_products_properties, h_products_values,
                         h_products_datecolumns, h_products_datatypes)
  
    # Tablas a actualizar en la DB
    tables = [
        "hubspot_products"
    ]

    tables_datatypes = [
        h_products_datatypes
    ]
    for index, table in enumerate(tables):
        s.update_table(table, tables_datatypes[index])

In [5]:
update_database()

os.remove("raw_hubspot_products" + ".csv")
os.remove("hubspot_products" + ".csv")


Hubspot Products Export has gathered 72 Products
Update Started hubspot_products
Update Finished hubspot_products
