In [7]:
%pip install haversine
import pandas as pd
from haversine import haversine
import glob
import s3fs
import unittest
import numpy as np
import os
from dma_functions import get_dma, get_dma_zipcodes

Note: you may need to restart the kernel to use updated packages.


DATASETS (NECESSARY FOR FUNCTIONS)

In [8]:
# Read in centroids data
centroids_df = pd.read_csv("./data/evaluation_data/zipcode_centroids.csv",
                        dtype={"ZIP": str})

FUNCTIONS

In [9]:
# Split a sentence into multiple sentences segmented on DMA
def split_by_dma(sentence):
    # If this sentence is one zipcode long, do nothing
    if len(sentence) == 1:
        return [sentence]
    else:
        sentence_split = []
        start_index = 0
        start_dma = get_dma(sentence[start_index])
        for i in range(1, len(sentence)):
            if get_dma(sentence[i]) != start_dma:
                sentence_split.append(sentence[start_index:i])
                start_index = i
                start_dma = get_dma(sentence[start_index])
                # If this is the last zipcode in the sentence, make sure to add it on
                if i == len(sentence) - 1:
                    sentence_split.append([sentence[i]])
            # If this is the last zipcode and it isn't a change in dma, add sentence from start_index to this index
            elif i == len(sentence) - 1:
                sentence_split.append(sentence[start_index:i + 1])
        return sentence_split

# Merge list of sentences based on common DMA
def merge_by_dma(sentences, merged_sentences):
    # Base case: if sentences has no zipcodes left, return merged_sentences
    if (len(sentences) == 0) or (len(sentences) == 1 and len(sentences[0]) == 0):
        return merged_sentences
    # If sentences only has one zipcode left, concatenate to merged_sentences and return
    elif len(sentences) == 1 and len(sentences[0]) != 0:
        return merged_sentences + sentences
    else:
        # Get all indices that have same DMA as first sentence
        start_dma = get_dma(sentences[0])
        same_indices = [0]
        combined_sentence = sentences[0]
        # Combine all same DMA sentences into one
        for i in range(1, len(sentences)):
            if get_dma(sentences[i]) == start_dma:
                same_indices.append(i)
                combined_sentence = combined_sentence + sentences[i]
        merged_sentences = merged_sentences + [combined_sentence]
        # Only use non-same (not examined) indices
        sentences_dropped = [s for s in sentences if sentences.index(s) not in same_indices]
        return merge_by_dma(sentences_dropped, merged_sentences)
    
def convert_four_digit_zip(zipcode):
    # If zip is longer than 4 digits, return same zipcode
    if len(zipcode) > 4:
        return zipcode
    elif len(zipcode) == 4:
        return "0" + zipcode
    else:
        raise Exception('Invalid zipcode/zipcode length')
        
def calc_distance(zip1, zip2):
    zip1_data = centroids_df[centroids_df["ZIP"] == zip1]
    zip2_data = centroids_df[centroids_df["ZIP"] == zip2]
    if len(zip1_data) == 0 or len(zip2_data) == 0:
        return None
    else: 
        zip1_coords = (zip1_data.LAT.values[0], zip1_data.LNG.values[0])
        zip2_coords = (zip2_data.LAT.values[0], zip2_data.LNG.values[0])
        return haversine(zip1_coords, zip2_coords)

# Get negative sample (closest geographic zipcode)
def get_negative(anchor):
    # Get all zipcodes in this zipcode's DMA
    dma_zipcodes = get_dma_zipcodes(get_dma(anchor))
    # Remove anchor from dma_zipcodes
    anchor_index = np.where(dma_zipcodes == anchor)
    dma_zipcodes = np.delete(dma_zipcodes, anchor_index)
    
    # Find smallest distance and zipcode
    min_index = -1
    min_value = float("inf")
    for index, zipcode in enumerate(dma_zipcodes):
        current_distance = calc_distance(anchor, zipcode)
        # Check if current_distance is None ie either the anchor or comparison zipcode doesn't have data
        if current_distance == None:
            continue
        elif current_distance < min_value:
            min_index = index
            min_value = current_distance
    
    return dma_zipcodes[min_index], min_value

MODIFY AND AUGMENT CLICKSTREAM DATA

In [20]:
# Clickstream v2 Data
s3 = s3fs.S3FileSystem(anon=False)
PATH = "s3://ojo-data-science/glen/clickstream_v2_clean/"
# Use glob to get all the csv files in the folder
csv_files = s3.glob(os.path.join(PATH, "*.csv"))

# Loop over list of csv files
dataframes = []
for file_path in csv_files:
    file = pd.read_csv("s3://" + file_path, 
                       header=None, 
                       names=["unique_visit_id", "first_char", "sentences"],
                       converters={'sentences': lambda x: x[1:-1].split(',')})
    dataframes.append(file)
    
clickstream_v2 = pd.concat(dataframes, ignore_index=True)

# clickstream_v2.to_csv("clickstream_v2.csv", index=False)

In [None]:
# clean = all zipcodes start with same digit
# length = length of sentence
clean = [True for i in range(len(clickstream_v2.index))]
length = [-1 for i in range(len(clickstream_v2.index))]

for index, row in clickstream_v2.iterrows():
    clean_counter = True
    for zipcode in row["sentences"]:
        if str(row["first_char"]) != zipcode[0]:
            clean_counter = False
            break
    clean[index] = clean_counter
    length[index] = len(row["sentences"])
    
clickstream_v2["clean"] = clean
clickstream_v2["length"] = length

# clickstream_v2.to_csv("clickstream_v2_augmented.csv", index=False)

In [13]:
clickstream_v2 = pd.read_csv("./data/training_data/clickstream_v2_augmented.csv",
                            converters={"sentences": lambda x: x[1:-1].replace("'", "").replace(" ", "").split(',')})

# Split and merge clickstream_v2 sentences by DMA (through multiple dataframes)
clickstream_v2_dma = clickstream_v2.copy()
        
# Get clickstream_dma data
clickstream_v2_dma["sentences_dma"] = clickstream_v2_dma["sentences"].apply(lambda x: merge_by_dma(split_by_dma(x), []))
clickstream_v2_dma = clickstream_v2_dma.explode("sentences_dma", ignore_index=True)
clickstream_v2_dma.to_csv("clickstream_v2_dma.csv", index=False)

GET NEGATIVE ZIPCODES FOR TRANSACTION DATA

In [28]:
# Use Transaction Zipcodes data to run tests
transactions_df = pd.read_csv("./data/evaluation_data/transaction-zipcodes-2022-06-29.csv",
                            dtype={"zipcode": str, "transaction_zipcode": str})

# Clean + preprocess data
transactions_df.dropna(inplace=True)
transactions_df["zipcode"] = transactions_df["zipcode"].apply(convert_four_digit_zip)
transactions_df["transaction_zipcode"] = transactions_df["transaction_zipcode"].apply(convert_four_digit_zip)

# Split into multiple dataframes for easy compute
rows = range(1000, 11000, 1000)
dfs = []

for index, row in enumerate(rows):
    if index == 0:
        dfs.append(transactions_df.iloc[:row])
    else:
        dfs.append(transactions_df.iloc[rows[index - 1]:row])

In [54]:
# Get negative zipcodes by splitting df into multiple
# 0 1 2 3 4 5 6 7 8 9
for i in range(len(dfs)):
    print("Getting dataframe " + str(i))
    CURRENT = dfs[i]
    neg_zipcodes = []
    for index, row in CURRENT.iterrows():
        # Keep track of progress
        if index % 100 == 0:
            print(index)
        negative_data = get_negative(row["zipcode"])
        if negative_data == None:
            neg_zipcodes.append(None)
        else:
            neg_zipcodes.append(negative_data[0])
    CURRENT.insert(4, "negative_zipcode", neg_zipcodes)

    NAME = "transactions_df_" + str(i) + ".csv"

    CURRENT.to_csv(NAME, index_label="main_index")

# Merge back into one dataframe
transactions_df = pd.concat(dfs)
transactions_df.to_csv("transactions_df_augmented.csv", index=False)

Getting dataframe 2
2100
2200
2300
2400
2500
2600
2700
2800
2900
3000
Getting dataframe 3
3100
3200
3300
3400
3500
3600
3700
3800
3900
4000
Getting dataframe 4
4100
4200
4300
4400
4500
4600
4700
4800
4900
5000
Getting dataframe 5
5100
5200
5300
5400
5500
5600
5700
5800
5900
6000
Getting dataframe 6
6100
6200
6300
6400
6500
6600
6700
6800
6900
7000
Getting dataframe 7
7100
7200
7300
7400
7500
7600
7700
7800
7900
8000
Getting dataframe 8
8100
8200
8300
8400
8500
8600
8700
8800
8900
9000
Getting dataframe 9


In [90]:
# Check to make sure augmented is unchanged from original
transactions_df = pd.read_csv("./data/evaluation_data/transaction-zipcodes-2022-06-29.csv",
                             dtype={"zipcode": str, "transaction_zipcode": str, "negative_zipcode": str})
transactions_df.dropna(inplace=True)
transactions_df["zipcode"] = transactions_df["zipcode"].apply(convert_four_digit_zip)
transactions_df["transaction_zipcode"] = transactions_df["transaction_zipcode"].apply(convert_four_digit_zip)
transactions_df = transactions_df.reset_index(drop=True)
transactions_df_augmented = pd.read_csv("./data/evaluation_data/transactions_df_augmented.csv",
                             dtype={"zipcode": str, "transaction_zipcode": str, "negative_zipcode": str})
for index, row in transactions_df.iterrows():
    if row["zipcode"] != transactions_df_augmented.iloc[index].zipcode or row["transaction_zipcode"] != transactions_df_augmented.iloc[index].transaction_zipcode:
            print("bad at " + str(index))

UNIT TESTS FOR FUNCTIONS

In [6]:
class RunTests(unittest.TestCase):
    def test_get_dma(self):
        self.assertEqual(get_dma(""), None)
        self.assertEqual(get_dma([]), None)
        self.assertEqual(get_dma([""]), None)
        self.assertEqual(get_dma(["2138"]), None)
        self.assertEqual(get_dma("2138"), None)
        self.assertEqual(get_dma(["78750"]), "Austin TX")
        self.assertEqual(get_dma("78750"), "Austin TX")
        self.assertEqual(get_dma("02138"), "Boston MA-Manchester NH")
        self.assertEqual(get_dma(["02138"]), "Boston MA-Manchester NH")
        self.assertEqual(get_dma(["78750", "78754", "78741"]), "Austin TX")
        
        
    def test_split_by_dma(self):
        self.assertEqual(split_by_dma(["78750"]), [["78750"]])
        self.assertEqual(split_by_dma(["78750", "02138"]), [["78750"], ["02138"]])
        self.assertEqual(split_by_dma(["78750", "02138", "78741"]), [["78750"], ["02138"], ["78741"]])
        self.assertEqual(split_by_dma(["78750", "78741", "02138", "02210", "78754", "02108", "02112"]), 
                                     [["78750", "78741"], ["02138", "02210"], ["78754"], ["02108", "02112"]])
        self.assertEqual(split_by_dma(["78750", "78741", "02138"]), [["78750", "78741"], ["02138"]])
        self.assertEqual(split_by_dma(["02138", "78750", "78741"]), [["02138"], ["78750", "78741"]])
        self.assertEqual(split_by_dma(["02138", "78750", "78741", "02114", "02115", "02116"]), 
                                     [["02138"], ["78750", "78741"], ["02114", "02115", "02116"]])
        
    def test_merge_by_dma(self):
        self.assertEqual(merge_by_dma([["78750"]], []), [["78750"]])
        self.assertEqual(merge_by_dma([[]], []), [])
        self.assertEqual(merge_by_dma([["78750"], ["02138"]], []), [["78750"], ["02138"]])
        self.assertEqual(merge_by_dma([["78750"], ["02138"], ["78754"]], []), [["78750", "78754"], ["02138"]])
        self.assertEqual(merge_by_dma([["78750"], ["02138"], ["78754"], ["78750"]], []), [["78750", "78754", "78750"], ["02138"]])
        self.assertEqual(merge_by_dma([["78750", "78754"], ["02138", "02210"], ["10007", "10008"]], []), 
                                         [["78750", "78754"], ["02138", "02210"], ["10007", "10008"]])
        self.assertEqual(merge_by_dma([["78750", "78754"], ["02138", "02210"], ["10007", "10008"], ["02108", "02112", "02116"]], []), 
                                         [["78750", "78754"], ["02138", "02210", "02108", "02112", "02116"], ["10007", "10008"]])
        self.assertEqual(merge_by_dma([["02110"], ["78750", "78754"], ["02138", "02210"], ["10007", "10008"], ["02108", "02112", "02116"]], []), 
                                         [["02110", "02138", "02210", "02108", "02112", "02116"], ["78750", "78754"], ["10007", "10008"]])
        
    def test_convert_four_digit_zip(self):
        self.assertEqual(convert_four_digit_zip("1111"), "01111")
        self.assertEqual(convert_four_digit_zip("0101"), "00101")
        self.assertEqual(convert_four_digit_zip("78750"), "78750")
        
    def test_calc_distance(self):
        self.assertFalse(calc_distance("02138", "02138"))
        self.assertFalse(calc_distance("78750", "78750"))
        self.assertTrue(calc_distance("02138", "78750"))
        self.assertTrue(calc_distance("78750", "78704") < 30)
        self.assertFalse(calc_distance("02138", "78704") < 30)
        
    def test_get_negative(self):
        self.assertFalse(get_negative("78750")[0] == "02138")
        self.assertFalse(get_negative("78750")[0] == "78750")
        self.assertTrue(get_negative("78750")[0] == "78726")
        self.assertTrue(get_negative("02138")[0] == "02140")
        
unittest.main(argv=['first-arg-is-ignored'], exit=False)

......
----------------------------------------------------------------------
Ran 6 tests in 5.028s

OK


<unittest.main.TestProgram at 0x7fceec578898>