In [0]:
import pandas as pd

In [0]:
blob_storage_key = dbutils.secrets.get(scope="key-vault-secrets",key="blob-storage-key")

In [0]:
# dbutils.fs.unmount("/mnt/alphaplanexports")
# dbutils.fs.mount(
#   source = "wasbs://alphaplanexports@alphaplanexports.blob.core.windows.net",
#   mount_point = "/mnt/alphaplanexports",
#   extra_configs = {"fs.azure.account.key.alphaplanexports.blob.core.windows.net":dbutils.secrets.get(scope="key-vault-secrets",key="blob-storage-key")})

In [0]:
ansprechpartner_export = spark.read.csv('dbfs:/mnt/alphaplanexports/Ansprechpartner230710.CSV', sep=';', header=True, encoding="latin1")

In [0]:
ansprechpartner_df = ansprechpartner_export.toPandas()

In [0]:
len(ansprechpartner_df)

In [0]:
def is_scientific(phone):
    if 'E+' in str(phone):
        out = True
    else:
        out = False
    return out

In [0]:
is_sc = ansprechpartner_df['Telefon'].apply(is_scientific) | ansprechpartner_df['Telefax'].apply(is_scientific)
len(ansprechpartner_df[is_sc])

In [0]:
def convert_scientific_notation(phone_value):
    phone_string = str(phone_value)
    if ('E+' in phone_string) or ('e+' in phone_string):
        phone_string = phone_string.replace(',','.')
        phone_float = float(phone_string)
        return str(phone_float).split('.')[0]

In [0]:
ansprechpartner_df['Telefon'] = ansprechpartner_df['Telefon'].apply(convert_scientific_notation)
ansprechpartner_df['Telefax'] = ansprechpartner_df['Telefax'].apply(convert_scientific_notation)

In [0]:
ansprechpartner_df = ansprechpartner_df[ansprechpartner_df['Email'].notnull()]

In [0]:
len(ansprechpartner_df)

In [0]:
import re

def extract_tld(email):
    # Define the regex pattern to match the TLD
    pattern = r'@[\w.]+\.(\w+)'

    # Find the TLD using regex
    match = re.search(pattern, email)

    if match:
        tld = match.group(1)
        return tld.lower()
    else:
        return None


### NOK records (not accepted in the upload)

In [0]:
tlds = ansprechpartner_df['Email'].apply(extract_tld) # top level domain

In [0]:
mask1 = tlds.isin([ 'yu', 'bzz', 'czz', 'bizz']) # to be excluded
mask2 =  ansprechpartner_df['Email'].str.contains(r'^[A-Za-z0-9._%+-]+@[A-Za-z0-9-]+\.[A-Za-z]{2,4}$') # to be included
mask3 = ansprechpartner_df['Email'].str.contains(r'@-') == False # to be included

In [0]:
# ansprechpartner_df[mask1]

In [0]:
# ansprechpartner_df[~mask2]

In [0]:
ansprechpartner_df = ansprechpartner_df[~mask1]
ansprechpartner_df = ansprechpartner_df[mask2]
ansprechpartner_df = ansprechpartner_df[mask3]

In [0]:
contacts_records = ansprechpartner_df.to_dict(orient='records')# [:330]

In [0]:
def split_list_into_chunks(lst, chunk_size):
    """
    Split a list into chunks of a specified size.
    
    Args:
        lst (list): The list to split.
        chunk_size (int): The size of each chunk.
    
    Returns:
        list: A list of lists, where each inner list represents a chunk.
    """
    return [lst[i:i + chunk_size] for i in range(0, len(lst), chunk_size)]

In [0]:
batches_to_upload = split_list_into_chunks(contacts_records, 100)

In [0]:
def get_batch_of_contacts_for_hubspot_upload(batch_records):
    list_to_upload = [
                        {
                        "email": record["Email"],
                        "properties": [
                                        
                                        {
                                            "property": "k_alphaplan_ansprechpartnerid_hubspot",
                                            "value": record["AnsprechpartnerID"]
                                        },
                                        {
                                            "property": "k_alphaplan_adressnummer_hubspot",
                                            "value": record["AdressNummer"]
                                        },
                                        {
                                            "property": "suchname",
                                            "value": record["SuchName"]
                                        },
                                        {
                                            "property": "firstname",
                                            "value": record["Vorname"]
                                        },
                                        {
                                            "property": "lastname",
                                            "value": record["Name1"]
                                        },
                                        {
                                            "property": "email",
                                            "value": record["Email"]
                                        },
                                        {
                                            "property": "phone",
                                            "value": record["Telefon"]
                                        },
                                        {
                                            "property": "salutation",
                                            "value": record["Anrede"]
                                        },
                                        {
                                            "property": "fax",
                                            "value": record["TelefonMobil"]
                                        },
                                        {
                                            "property": "telefon_privat",
                                            "value": record["TelefonPrivat"]
                                        },
                                        {
                                            "property": "website",
                                            "value": record["Internet"]
                                        },
                                        {
                                            "property": "abteilung",
                                            "value": record["Abteilung"]
                                        },
                                        {
                                            "property": "information",
                                            "value": record["Information"]
                                        },
                                        {
                                            "property": "titel",
                                            "value": record["Titel"]
                                        },
                                        {
                                            "property": "address",
                                            "value": record["Strasse"]
                                        },
                                        {
                                            "property": "zip",
                                            "value": record["Postleitzahl"]
                                        },
                                        {
                                            "property": "city",
                                            "value": record["Ort"]
                                        },
                                        {
                                            "property": "jobtitle",
                                            "value": record["Position"]
                                        }                                  
                                        ]
                        }
                        for record in batch_records
                    ]
    return list_to_upload

In [0]:
import requests
import json

In [0]:
api_upload_key = dbutils.secrets.get(scope="key-vault-secrets",key="upload-api-accesstoken")

In [0]:
def upload_batch_contacts_to_hubspot(list_to_upload, api_upload_key):
    # Set up the API endpoint URL
    url = 'https://api.hubapi.com/contacts/v1/contact/batch/'

    # Convert the contact data to JSON format
    data = json.dumps(list_to_upload)

    # Set up the request headers
    headers = {
        'Content-Type': 'application/json',
        'Authorization': 'Bearer ' + api_upload_key
    }

    # Make the API request
    response = requests.post(url, headers=headers, data=data)

    # Print the response status code and content
    if response.status_code == 202:
        print("Contacts were successfully imported to Hubspot.")
    elif response.status_code == 409:
        print("Conflict error while imported to Hubspot.")
    elif response.status_code == 400:
        print("there is a problem with the data in the request body.", response.content)    
    else:
        print("An error occurred while importing contacts to Hubspot. Status code: ", response.status_code)

In [0]:
# batch_test = batches_to_upload[0][:30]
# list_to_upload = get_batch_of_contacts_for_hubspot_upload(batch_test)
# upload_batch_contacts_to_hubspot(list_to_upload, api_upload_key)

In [0]:
batches_to_upload[0]

In [0]:
i = 1
for batch in batches_to_upload:
    list_to_upload = get_batch_of_contacts_for_hubspot_upload(batch)
    upload_batch_contacts_to_hubspot(list_to_upload, api_upload_key)
    print('---------processed batch ', i)
    i +=1