In [1]:
#  Make analysis of LinkedIn profile data

import os
import json
import time
import pprint
from google.cloud import firestore
from google.api_core import retry
from openai import OpenAI

# initialize the OpenAI client
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))


def show_json(obj):
    display(json.loads(obj.model_dump_json()))


# Assume the google-cloud-firestore import is available in the development environment


# Initialize Firestore client with specific project ID, only if file is found
if os.path.isfile("vk-linkedin-master-service-account.json"):
    os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = (
        "vk-linkedin-master-service-account.json"
    )

db = firestore.Client(
    project="vk-linkedin",
    database="linkedin"
)

# Get the 'extracted' collection
extracted_ref = db.collection("extracted")

# analysis collection
analysis_ref = db.collection("analysis")

In [2]:
def wait_on_run(run, thread):
    """
    Wait for a run to complete.

    This function continuously checks the status of a run in a thread. If the run is queued or in progress,
    it retrieves the run and waits for half a second before checking again. It stops checking and returns the run
    once the run is no longer queued or in progress.

    Args:
        run (openai.api_resources.abstract.APIResource): The run to wait for.
        thread (openai.api_resources.abstract.APIResource): The thread that the run is in.

    Returns:
        openai.api_resources.abstract.APIResource: The completed run.

    Note:
        This function uses the OpenAI API, which requires an API key and may incur costs.
    """

    while run.status == "queued" or run.status == "in_progress":
        run = client.beta.threads.runs.retrieve(
            thread_id=thread.id,
            run_id=run.id,
        )
        time.sleep(1)
    return run

In [3]:
# analyze the summary with the OpenAI Assitant
def analyze_summary(summary) -> dict:
    """
    Analyze the summary with the OpenAI Assistant.

    This function creates a new thread and adds the summary to it. It then creates a run with a specific assistant ID.
    The function waits for the run to complete and retrieves the messages from the thread. The last message is cleaned
    and converted to a dictionary.

    Args:
        summary (str): The summary to be analyzed.

    Returns:
        dict: The response from the OpenAI Assistant as a dictionary. If the response cannot be decoded as JSON,
        it returns None and prints an error message.

    Raises:
        json.JSONDecodeError: If the response cannot be decoded as JSON.
    """

    try:
        # create a thread
        thread = client.beta.threads.create()

        #  add summary to the thread
        message = client.beta.threads.messages.create(
            thread_id=thread.id, role="user", content=summary
        )

        # create  run
        run = client.beta.threads.runs.create(
            thread_id=thread.id,
            assistant_id="asst_mxA64EdU5qHi9wNVpEmQRQZr",
        )
    except Exception as e:
        print(f"Error createing Open AI threat: {e}")
        return None

    # get the run result
    try:
        run = wait_on_run(run, thread)
    except Exception as e:
        print(f"Error Getting results from Open AI threat: {e}")
        return None

    # get messages
    messages = client.beta.threads.messages.list(thread_id=thread.id)
    message = messages.data[0]  # the last message

    #  clean response
    response = message.content[0].text.value
    response = response.replace("```json\n", "").replace("```", "")

    # convert rsponse to dictionary
    try:
        res = json.loads(response)
        return res
    except json.JSONDecodeError:
        print("Failed to decode JSON: ", response)
        return None

In [4]:
# helper function to analyze the summary
def get_analysis(summary):
    try:
        analysis = analyze_summary(summary)
        return analysis
    except Exception as e:
        print(f"Error analyzing summary: {e}")
        return None

In [5]:
# helper function get member distance
def get_member_distance(doc_dict) -> int:
    try:
        str_distance = doc_dict["memberDistance"]["memberDistance"]
        # convert last character to int
        distance = int(str_distance[-1:])
        return distance
    except Exception as e:
        print(
            f"Error getting member distance: {e} "
        )
        return 0

In [6]:
# process all documents in collection `extracted` via generator

# retry policy
# reference https://cloud.google.com/spanner/docs/custom-timeout-and-retry?hl=en
my_retry = retry.Retry(timeout=30, predicate=retry.if_exception_type())

# retry policy with custom parameters
my_retry = retry.Retry(
    initial_delay=0.5,  # Initial delay of 0.5 seconds
    timeout=60,  # Maximum delay of 30 seconds
    multiplier=2,  # Increase delay by 2x on each retry
    retry_count=10,  # Retry up to 5 times
)


# docs_generator = (doc for doc in extracted_ref.stream(retry=retry))

# get docs
docs = extracted_ref.stream(retry=my_retry)
print_one = True
# number of documents
# print("number of extracted documents: ", len(list(docs)))

# save docs to list
doc_list = list(docs)

print("number of extracted documents: ", len(doc_list))

# process docs from the list
for doc in doc_list:
    doc_dict = doc.to_dict()

    # print one document

    if print_one:
        pprint.pprint(doc_dict)
        print_one = False

    doc_id = doc.id
    doc_ref = analysis_ref.document(doc_id)

    summary = doc_dict["summary"].replace("\n", " ")

    # reprocess flag
    reprocess = False

    # check if the document already exists
    if not doc_ref.get(retry=my_retry).exists:
        repprocess = True
    else:
        # check if summary is changed
        if doc_ref.get(retry=my_retry).to_dict()["summary"] != summary:
            reprocess = True

    # check if document should be reprocessed
    if reprocess:
        # for new document
        if len(summary) > 50:
            # get analysis of summary via OpenAI Assistant
            try:
                analysis = get_analysis(summary)

                # if analysis is not None
                if analysis is not None:
                    # print('-----------------\n', doc_id, doc_dict['fullName'], summary, analysis)

                    if (
                        "industry" in analysis
                        and "function" in analysis
                        and "seniority" in analysis
                    ):
                        new_doc = {
                            "profileUrl": doc_dict["profileUrl"],
                            "lh_id": doc_dict["lhId"],
                            "industry": analysis["industry"],
                            "function": analysis["function"],
                            "seniority": analysis["seniority"],
                            "summary": summary,
                            "memberDistance": get_member_distance(doc_dict),
                        }

                        # create document in new collection
                        analysis_ref.document(doc_id).set(new_doc, retry=my_retry)

                else:
                    # wait for 10 seconds before trying again
                    time.sleep(10)
            except Exception as e:
                print(e)
                time.sleep(10)
                continue
    else:
        # update exitsting document fiiels profileUrl, lh_id, summary
        update_doc = {
            "profileUrl": doc_dict["profileUrl"],
            "lh_id": doc_dict["lhId"],
            "summary": summary,
            "memberDistance": get_member_distance(doc_dict),
        }

        analysis_ref.document(doc_id).set(update_doc, retry=my_retry)

number of extracted documents:  27222
{'action_id': 29,
 'action_name': '',
 'action_type': 'SendPersonToWebhook',
 'addToTargetDate': 'March 21, 2024 10:51:02 PM',
 'addToTargetDateISO': '2024-03-22T03:51:02.709Z',
 'campaignMessagingHistory': None,
 'campaign_id': 9,
 'campaign_name': 'Send to Firebase',
 'campaign_type': 1,
 'connect': None,
 'customFields': {},
 'externalIds': [{'createdAt': '2021-01-27T01:35:38.026Z',
                  'externalId': '483788075',
                  'id': 10642,
                  'memberId': 483788075,
                  'personId': 3294,
                  'sentAtToPAS': '2024-03-21T10:21:10.617Z',
                  'type': 'member-id'},
                 {'createdAt': '2021-01-27T01:35:38.026Z',
                  'externalId': '-jananova-',
                  'id': 10643,
                  'personId': 3294,
                  'sentAtToPAS': '2024-03-21T10:21:10.617Z',
                  'type': 'public-id'},
                 {'createdAt': '2021-01-27T01: