### Install & import necessary packages

In [4]:
!pip install --upgrade openai --quiet
!pip install --upgrade pymongo --quiet
!pip install --upgrade email-validator --quiet

In [5]:
import pandas as pd
import numpy as np
import os
import re
import json
import time
import requests
import email_validator
import warnings

from openai import OpenAI
from pymongo import MongoClient
from urllib.request import urlopen

warnings.filterwarnings("ignore")

### Read in CSVs

We'll read in two CSVs - AllLeads and human_data which are two CSVs of leads. The first contains leads from a run by the Lead Automator programmatic tool, while the other contains leads collected manually by the 10K team.

In [6]:
# Read in CSV
leads = pd.read_csv("") # Enter dataset URL

FileNotFoundError: [Errno 2] No such file or directory: ''

### Define Variables + Functions

Here, we'll define constants, initialize clients, and create all the sub-functions for processing leads that we will use in the main process_leads() function.

In [None]:
# Define constants for functions
NO_NAMES_INDICATOR = "School Contact"
INVALID_NAME_TERMS = ["school", "department"]
BUCKETS = ["Sports", "Auxiliary", "Instructor", "Stem", "Administration", "Advisory", "Clerical", "Counseling"]
PREFIXES = ["dr", "ms", "mr", "mrs", "miss"]

# Define output files for logging and data
LOG_FILE = "LeadProcessingLog.txt"
OUTPUT_FILE = "AllLeadsFiltered.csv"

prompt_bytes_string = urlopen("https://raw.githubusercontent.com/bamartin1618/p4_data/main/LeadProcessingPrompt.txt").read()
PROMPT = prompt_bytes_string.decode("utf-8")

# Initialize OpenAI API
os.environ["OPENAI_API_KEY"] = "" # TODO: Replace with your API key
client = OpenAI()

# Initialize email validation testing
resolver = email_validator.caching_resolver(timeout=10)
email_validator.CHECK_DELIVERABILITY = True

# Processing Functions
def collect_metadata(df, style):
    """
    Collects metadata as a JSON-formatted dictionary.
    ARGS:
        - df: DataFrame
        - style: string, either "NULL" or "ROLE"
    """

    if style == "NULL":

        info = round((df.isnull().sum() / df.shape[0]) * 100, 2)
        info = info[info > 0]
        as_dict = info.to_dict()

    elif style == "ROLE":

        by_role = df.groupby("Bucket").size()
        by_role = round((by_role / by_role.sum()) * 100, 2)

        as_dict = by_role.to_dict()

    return as_dict


def validate_email(email):
    """
    Validates an email address by regex and invalid chars.
    ARGS:
        - email: string
    """

    email_cleaned = email.replace('"', "")
    email_pattern = r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,3}"

    correct_email_structure = bool(re.match(email_pattern, email_cleaned)) and "webmaster" not in email_cleaned # Sysadmin email

    if correct_email_structure:

      try:

        validity = email_validator.validate_email(email, dns_resolver=resolver)
        email_normalized = validity.normalized
        return email_normalized

      except email_validator.EmailNotValidError:
        return "invalid_email"

    return "invalid_email"

def bucket_roles(roles, count):
    """
    Bucket role into one of 8 categories: Sports, Auxiliary, Stem, Instructor, Administration, Advisory, Clerical, Counseling.
    ARGS:
      - roles: String list of 10 roles
    """

    try:

        chat = client.chat.completions.create(
            model="gpt-3.5-turbo",
            messages=[
                {"role": "system", "content": PROMPT.format(count)},
                {"role": "user", "content": roles}
            ],
        )

        chat_response = chat.choices[0].message.content
        list_of_buckets = chat_response.split(";")

    except requests.exceptions.Timeout:
        cleaned_buckets = [None] * 10
        print("Request timed out.")

    except Exception as e:
        cleaned_buckets = [None] * 10
        print(f"An unexpected error occurred: {e}")

    cleaned_buckets = []
    for messy_bucket in list_of_buckets:

        clean = messy_bucket.strip()
        if clean not in BUCKETS:

            clean = "Unknown"

        cleaned_buckets.append(clean)

    if len(cleaned_buckets) != count:

      # If GPT gave less
      if len(cleaned_buckets) < count:

        cleaned_buckets += ["Unknown"] * (count - len(cleaned_buckets))

      # If GPT gave more
      else:

        cleaned_buckets = cleaned_buckets[:count]

    return cleaned_buckets

def process_chunk(df):
    """
    Role buckets a set of 10 roles.
    ARGS:
      - df: A set of 10 rows from the original dataframe
    """

    # 1. Collect all roles as a String "Role_1, Role_2, ..."
    role_string = ";".join(df["Role"].tolist())
    count = len(df["Role"].tolist())

    # 2. GPT it and get a response
    buckets = bucket_roles(role_string, count)

    # 3. Place them back in the column
    df["Bucket"] = buckets
    return df

def clean_names(row, indicator):
    """
    Cleans the first and last names of a row.
    ARGS:
        - row: Dictionary
    """

    first_name = row["FirstName"]
    last_name = row["LastName"]

    if indicator == "first":

        case = "None"

        if type(first_name) != str:
            return first_name, case

        if not pd.isna(first_name):

            first_name = first_name.lower().strip()
            first_name = re.sub(r'[^a-zA-Z ]', '', first_name)

            # Define conditions for invalid first name
            is_initial_only = len(first_name) == 1
            is_prefix = first_name in PREFIXES

            if is_initial_only or is_prefix:
                case = "Initial / Prefix"
                row["FirstName"] = np.NaN
            else:

                # If multiple first-names without hyphen
                split_on_space = first_name.split(" ")
                if len(split_on_space) > 1:

                    option_1 = split_on_space[0].lower().replace(".", "").strip()
                    option_2 = split_on_space[1].lower().replace(".", "").strip()

                    # Define conditions for invalid name pt.2
                    contains_prefixes = option_1 in PREFIXES

                    if contains_prefixes:
                        case = "Prefix / Multiple Names"
                        row["FirstName"] = option_2.capitalize()
                    else:
                        case = "Multiple Names"
                        row["FirstName"] = option_1.capitalize()

                else:

                    row["FirstName"] = first_name.capitalize()

        return row["FirstName"], case

    else:

        case = "None"
        if type(last_name) != str:
            return last_name, case

        last_name = last_name.lower().strip()

        # Define conditions for invalid last name
        contains_invalid_terms = False
        for term in INVALID_NAME_TERMS:
            if last_name.count(term) > 0:
                contains_invalid_terms = True

        contains_prefixes = last_name in PREFIXES

        if contains_invalid_terms:
            case = "Invalid Terms"
            row["LastName"] = np.NaN
        elif contains_prefixes:
            case = "Prefix"
            row["LastName"] = row["FirstName"].capitalize()
            row["FirstName"] = np.NaN
        else:
            row["LastName"] = last_name.capitalize()


        # If multiple last names, capitalize each

        if not pd.isna(row["LastName"]):
            if "-" in last_name:
                factor = "-"
            else:
                factor = " "

            names = row["LastName"].split(factor)
            last_name = [name.capitalize() for name in names]
            row["LastName"] = factor.join(last_name)

        return row["LastName"], case


def process(df, process):
    """
    Processes the leads CSV collected by the Lead Automator.
    ARGS:
        - df: DataFrame
        - process: string
    """

    # Grab current size
    current_size = df.shape[0]

    # Various processses
    if process == "Nulls":

        df = df.dropna(subset=["Email"])
        df = df.dropna(subset=["FirstName", "LastName"], how="all")
        df = df.dropna(subset=["LastName"])
        df = df.loc[(df["FirstName"] != NO_NAMES_INDICATOR) & (df["LastName"] != NO_NAMES_INDICATOR)]

    elif process == "Duplicates":

        df = df.drop_duplicates(subset=["Email"])

    elif process == "Validity":

        df["Email"] = df["Email"].apply(validate_email)
        df = df[df["Email"] != "invalid_email"]

    elif process == "Roles":

        with_role = df.dropna(subset=["Role"])
        without_role = df[df["Role"].isnull()]

        chunk_size = 7
        chunks = [with_role.iloc[i:i + chunk_size] for i in range(0, len(with_role), chunk_size)]

        # Processing each chunk and storing the results
        processed_chunks = [process_chunk(chunk) for chunk in chunks]
        df = pd.concat(processed_chunks, ignore_index=True)

        df = df[~df["Bucket"].isin(["Sports", "Auxiliary", "Clerical"])]

        role_replace_dict = {np.NaN: "Faculty Contact"}

        without_role["Bucket"] = [None for i in range(without_role.shape[0])]

        df = pd.concat([df, without_role], ignore_index=True)
        df["Role"] = df["Role"].replace(role_replace_dict)

    elif process == "Names":

        # TODO: Implement logging for case

        df[["FirstName", "FirstNameCase"]] = df.apply(clean_names, axis=1, indicator="first", result_type="expand")
        df[["LastName", "LastNameCase"]] = df.apply(clean_names, axis=1, indicator="last", result_type="expand")

        df = df[~df["LastName"].isnull()]

    # Grab new size
    new_size = df.shape[0]

    impact = {process: current_size - new_size}
    return df, impact

### Process Leads

Using all the functions we've defined above, we'll process our leads to convert our messy input data to new clean and actionable data. We'll collect and print statistics while doing so.

In [None]:
# Primary Processing Function
def process_leads(df):
    """
    Processes the leads CSV collected by the Lead Automator.
    ARGS:
        - df: DataFrame
    """

    with open(LOG_FILE, "w") as log:

        log.write("Processing leads...\n")

        # Grab current size
        log.write("Total unprocessed leads before processing: {}".format(df.shape[0]))

        # Collect metadata
        null_metadata = collect_metadata(df, "NULL")
        log.write("\nMetadata for NULL leads: " + json.dumps(null_metadata))

        # Convert "Unknown" and "" to NaN
        replace_dict = {"Unknown": np.nan, "": np.nan}
        df = df.replace(replace_dict)

        # Process leads
        df, impact = process(df, "Nulls")
        log.write("\nNumber of NULL leads: " + json.dumps(impact))

        df, impact = process(df, "Validity")
        log.write("\nNumber of Invalid leads: " + json.dumps(impact))

        df, impact = process(df, "Duplicates")
        log.write("\nNumber of Duplicate leads: " + json.dumps(impact))

        df, impact = process(df, "Roles")
        log.write("\nNumber of invalid roles: " + json.dumps(impact))

        df, impact = process(df, "Names")
        log.write("\nNumber of invalid names: " + json.dumps(impact))

        first_name_case_distributions = (df["FirstNameCase"].value_counts() / df["FirstNameCase"].value_counts().sum()).to_dict()
        last_name_case_distributions = (df["LastNameCase"].value_counts() / df["LastNameCase"].value_counts().sum()).to_dict()

        log.write("\nDistribution Of First Name Cases: " + json.dumps(first_name_case_distributions))
        log.write("\nDistribution Of Last Name Cases: " + json.dumps(last_name_case_distributions))

        role_metadata = collect_metadata(df, "ROLE")
        log.write("\nMetadata for role buckets: " + json.dumps(role_metadata))

        # Grab new size
        log.write("\nTotal leads after processing: {}".format(df.shape[0]))
        log.write("\nFinished processing leads.\n")

    # Drop unneeded columns
    unhelpful_columns = ["FirstNameCase", "LastNameCase"]
    df = df.drop(columns = unhelpful_columns)

    # Write new leads to CSV
    df.to_csv(OUTPUT_FILE, index=False)

process_leads(leads)

### Calculate Quality of Leads

After processing, it's a good idea to get a numerical value of how "perfect" each lead is. Currently, we'll define a perfect lead as so:

1. The FirstName value is NOT blank, meaning the contact has a viable First and Last Name. (Contacts with an invalid / blank last name are filtered out earlier)

2. The Bucket value is NOT "Unknown", meaning the contacts "Role" is something that GPT could determine a viable Bucket for, meaning it's a realistic Role value

In [None]:
# Write code to calculate "quality" of leads

MAX_QUALITY = 2 # 2

def calculate_quality(row):
    """
    Calculates the quality of a lead.
    ARGS:
        - row: Dictionary
    """

    quality_score = 0

    # If we have a first name
    if not pd.isna(row["FirstName"]):
        quality_score += 1

    # If we have a role that is bucketable
    no_bucket_exists = pd.isna(row["Bucket"])
    bucket_is_unknown = row["Bucket"] == "Unknown"


    if (not no_bucket_exists) or (not bucket_is_unknown):
        quality_score += 1

    return quality_score

leads_filtered = pd.read_csv(OUTPUT_FILE)
leads_filtered["Quality"] = leads_filtered.apply(calculate_quality, axis=1)

with open(LOG_FILE, "a") as log:
    perfect_lead_percentage = leads_filtered[leads_filtered["Quality"] == MAX_QUALITY].shape[0] / leads_filtered.shape[0]
    perfect_lead_percentage = round(perfect_lead_percentage * 100, 2)

    log.write("\nQuality of filtered leads: {}".format(perfect_lead_percentage))

In [None]:
def mongoimport(csv_path, db_name, coll_name, db_url):
    """ Imports a csv file at path csv_name to a mongo colection
    returns: count of the documants in the new collection
    """

    try:
        # Connect to MongoDB
        client = MongoClient(db_url)
        db = client[db_name]
        coll = db[coll_name]

        # Load the data as a JSON compatible form
        data = pd.read_csv(csv_path)
        payload = data.to_dict("records")

        # Empty collection and upload
        coll.drop()
        coll.insert_many(payload)

        return "Successful Upload"

    except Exception as e:

        return "Error: {}".format(str(e))

csv_path = "AllLeadsFiltered.csv"
db_name = "Lead_Automator_Leads"
coll_name = "All_Leads_Filtered"
db_url = "" # Add your database URL here.

response = mongoimport(csv_path, db_name, coll_name, db_url)

if response != "Successful Upload":

  print(response)