In [2]:
import snowflake.connector
import sys
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import datetime as dt

In [3]:
pd.options.display.max_columns = 1000
pd.options.display.max_rows = 100
plt.rcParams["figure.figsize"] = (10,8)
plt.style.use('seaborn-darkgrid')

In [4]:
con = snowflake.connector.connect(
    user="tliang@endpointclosing.com",
    account="endpoint",
    authenticator="externalbrowser",
    role="SNOWFLAKE_DATA_ENGINEERING",
    warehouse="DATAENGINEERING_WH"
)
cur = con.cursor()

Initiating login request with your identity provider. A browser window should have opened for you to complete the login. If you can't see it, check existing browser windows, or your OS settings. Press CTRL+C to abort and try again...


### load the datasets

In [5]:
# lead
query = "SELECT * FROM LAKE.SALESFORCE.LEAD"
lead = cur.execute(query).fetch_pandas_all().reset_index()

In [6]:
states = {
    'AK': 'Alaska',
    'AL': 'Alabama',
    'AR': 'Arkansas',
    'AS': 'American Samoa',#
    'AZ': 'Arizona',
    'CA': 'California',
    'CO': 'Colorado',
    'CT': 'Connecticut',
    'DC': 'District of Columbia',
    'DE': 'Delaware',
    'FL': 'Florida',
    'GA': 'Georgia',
    'GU': 'Guam',#
    'HI': 'Hawaii',
    'IA': 'Iowa',
    'ID': 'Idaho',
    'IL': 'Illinois',
    'IN': 'Indiana',
    'KS': 'Kansas',
    'KY': 'Kentucky',
    'LA': 'Louisiana',
    'MA': 'Massachusetts',
    'MD': 'Maryland',
    'ME': 'Maine',
    'MI': 'Michigan',
    'MN': 'Minnesota',
    'MO': 'Missouri',
    'MP': 'Northern Mariana Islands',#
    'MS': 'Mississippi',
    'MT': 'Montana',
    'NC': 'North Carolina',
    'ND': 'North Dakota',
    'NE': 'Nebraska',
    'NH': 'New Hampshire',
    'NJ': 'New Jersey',
    'NM': 'New Mexico',
    'NV': 'Nevada',
    'NY': 'New York',
    'OH': 'Ohio',
    'OK': 'Oklahoma',
    'OR': 'Oregon',
    'PA': 'Pennsylvania',
    'PR': 'Puerto Rico',#
    'RI': 'Rhode Island',
    'SC': 'South Carolina',
    'SD': 'South Dakota',
    'TN': 'Tennessee',
    'TX': 'Texas',
    'UT': 'Utah',
    'VA': 'Virginia',
    'VI': 'Virgin Islands',#
    'VT': 'Vermont',
    'WA': 'Washington',
    'WI': 'Wisconsin',
    'WV': 'West Virginia',
    'WY': 'Wyoming'
}

def state_std(x:str):

    if x is None:
        return ""

    elif (len(x) == 2) & (x.isalpha()):
        return x.upper()
    else:
        for key, value in states.items():
            if x.lower() == value.lower():
                return key
        return ""


In [7]:
lead["STATE_STD"] = lead["STATE"].apply(state_std)

In [8]:
wa_lead = lead[(lead["IS_DELETED"] == False)& (lead["STATE_STD"] == "WA")& (lead["STATUS"]!= "New")]

In [9]:
# task
query = "SELECT * FROM lake.salesforce.task WHERE WHO_ID IN (SELECT ID from lake.salesforce.lead WHERE (lower(STATE) LIKE 'wa%'))"
wa_task = cur.execute(query).fetch_pandas_all().reset_index()

In [10]:
# lead_hist
query = "SELECT * FROM LAKE.SALESFORCE.LEAD_HISTORY"
lead_hist = cur.execute(query).fetch_pandas_all().reset_index()

In [11]:
wa_lead_hist = lead_hist[(lead_hist["LEAD_ID"].isin(wa_lead["ID"])) & (lead_hist["IS_DELETED"] == False)]

In [12]:
# listing
query = "SELECT * FROM LAKE.SALESFORCE.LISTING_C"
listing = cur.execute(query).fetch_pandas_all().reset_index()

In [13]:
wa_lead_listing = listing[listing["LEAD_C"].isin(wa_lead["ID"])]

### rollback data using cutoff date

#### rollback_wa_lead

In [14]:
cutoff_date_start = dt.date(2021,7,1)
cutoff_date_end = dt.date(2022,1,1)

In [15]:
# leadConvertted, created, leadMerged:  field has ineffective value changes (None to None) so that it's excluded
# Owner: Owner can't be found in lead columns
# ownerAssignment: need a extra filter to specify DATATYPE ==EntityId

# function to standardize the field name
def field_std(x:str) -> str:

    field_replace_dict = {"MobilePhone":"MOBILE_PHONE",
                    "LeadSource":"LEAD_SOURCE", 
                    "Brokerage1__c": "BROKERAGE_1_C",
                    "ownerAssignment": "OWNER_ID",
                    "HasOptedOutOfEmail":"HAS_OPTED_OUT_OF_EMAIL"}

    for key in field_replace_dict.keys():
        if x == key:
            x = field_replace_dict[key]
            
    x = x.upper()
    x = x.replace("__","_")
    
    return x

In [16]:
# rollback function
# to rollback the status of the lead status, using the lead_hist, changing the value at the 'lead' dataset

# steps:
# set the cutoff date: cutoff date start, cutoff date end
# forward search the lead_hist, using lead_id and the lead_hist in range cutoff date, find the earlist change of each type of changes, and extract the old_value, replace the value on related columns
# return the dataset

def rollback_lead(input_lead_df:pd.DataFrame, 
                    lead_hist_df:pd.DataFrame, 
                    cutoff_date_start:dt.date, 
                    cutoff_date_end:dt.date) -> pd.DataFrame:
    
    #subset the data using lead id and cutoff dates
    subset_hist_df = lead_hist_df[(lead_hist_df["LEAD_ID"].isin(input_lead_df["ID"])) & (lead_hist_df["CREATED_DATE"].dt.date >= cutoff_date_start) & (lead_hist_df["CREATED_DATE"].dt.date <= cutoff_date_end)]

    # drop rows that contains fields which does not refer back to lead dataset
    filter_values = ["leadConvertted","created","leadMerged","Owner"]
    subset_hist_df = subset_hist_df[~subset_hist_df["FIELD"].isin(filter_values)]
    
    # ownerAssignment: need a extra filter to drop DATATYPE ==EntityId
    subset_hist_df = subset_hist_df[~(((subset_hist_df["FIELD"] == "Owner") | (subset_hist_df["FIELD"] == "ownerAssignment") )& (subset_hist_df["DATA_TYPE"] == "EntityId"))]

    # apply the field standardization function
    subset_hist_df["FIELD_STD"] = subset_hist_df["FIELD"].apply(field_std)

    # create a dataframe for output
    output_df = pd.DataFrame()

    # need to check cases with same fields
    for id in input_lead_df["ID"]:
        # extract the lead hist of the certain id
        id_subset = subset_hist_df[subset_hist_df["LEAD_ID"] == id]
        # filter only the first changes, by kind of fields
        id_subset = id_subset.loc[id_subset.groupby("FIELD_STD")["CREATED_DATE"].idxmin()]
        # create the dictionary to replace
        replace_dict = dict(zip(id_subset["FIELD_STD"],id_subset["OLD_VALUE"]))
        # pullout the id realted row for replacement
        lead_id_row = input_lead_df[input_lead_df["ID"] == id]
        # extract id index
        id_idx = lead_id_row.index
        # convert id row to dictonary
        lead_id_dict = lead_id_row.to_dict("index")[id_idx.values[0]]
        # use replace dict to update
        lead_id_dict.update(replace_dict)
        # transform back to dataframe and append to output dataframe
        output_df = pd.concat([output_df, pd.DataFrame(lead_id_dict, index = id_idx)])

    return output_df

In [17]:
cutoff_wa_lead = rollback_lead(wa_lead,wa_lead_hist,cutoff_date_start,cutoff_date_end)

#### cutoff_wa_task

In [21]:
wa_task["ACTIVITY_DATE"].dtypes

dtype('O')

In [22]:
cutoff_wa_task= wa_task[pd.to_datetime(wa_task["ACTIVITY_DATE"]).dt.date < cutoff_date_end]

#### cutoff_wa_lead_hist

In [23]:
cutoff_wa_lead_hist = wa_lead_hist[wa_lead_hist["CREATED_DATE"].dt.date < cutoff_date_end]

#### cutoff_wa_lead_listing

In [28]:
cutoff_wa_lead_listing = wa_lead_listing[pd.to_datetime(wa_lead_listing["ACTIVE_DATE_C"]).dt.date < cutoff_date_end]

### output the data

In [43]:
# cutoff_wa_lead.to_csv("../data/cutoff_wa_lead.csv")

In [44]:
# cutoff_wa_task.to_csv("../data/cutoff_wa_task.csv")

In [47]:
# cutoff_wa_lead_hist.to_csv("../data/cutoff_wa_lead_hist.csv")

In [46]:
# cutoff_wa_lead_listing.to_csv("../data/cutoff_wa_lead_listing.csv")