##### Modify Variables in the Widgets at the Top of the Screen
* **Address Column** - The column name where the addresses are stored in the uploaded file
* **Company Name** - The column name where the company names are stored in the uploaded BOM file
* **Input CSV Company Search** - The file path you uploaded your file for company search and match
* **Input CSV Facility Search** - The file path you uploaded your file for facility search and match (needs company/supplier name and address column)
* **Supplier Name** - The column name where the supplier names are stored in uploaded file 
* **Requiremt for Faciltiy Search/Match** - Address Column, Input CSV Facility Search OR table name, and Supplier Name
* **Requiremt for Company Search/Match** - Company Name, Input CSV Company Search OR table name

* **IF IMPORTING FROM Table** - Fill out the relevant Address Column, Company Name, and Supplier Name

In [0]:
company_name = dbutils.widgets.get("Company Name")
address_string = dbutils.widgets.get("Address Column")
facility_name = dbutils.widgets.get("Supplier Name")

# Install & Import Packages

In [0]:
pip install --upgrade numpy

In [0]:
try:
  __import__("postal")
except ImportError:
  # Use Conda to install LibPostal
  ! conda install -c conda-forge libpostal postal
try:
  __import__("swagger_client")
except ImportError:
  ! pip install git+https://github.com/altana-tech/atlas-api-1.0.71-python-sdk.git

import csv
import json
import random
import requests
import sys
import time
import pandas as pd
from pandas.core.common import SettingWithCopyWarning
import warnings
warnings.simplefilter(action="ignore", category=SettingWithCopyWarning)
pd.set_option("display.max_rows", 100)
import numpy as np
from tabulate import tabulate
from pyspark.sql import SparkSession
from datetime import datetime

#feature functions
from sklearn.feature_extraction.text import TfidfVectorizer
import re
import textdistance
from fuzzywuzzy import fuzz
import matplotlib.pyplot as plt
from matplotlib.pyplot import figure
from math import log2
import sparse_dot_topn.sparse_dot_topn as ct
from scipy.sparse import csr_matrix

from __future__ import print_function
import swagger_client
from swagger_client.rest import ApiException
from pprint import pprint


# Set API Token & Parameters

In [0]:
API_TOKEN = 'MTpZdW5uaToxNjM5NDIwODQxOmI0MThmZjdm.ZjI4MTliYWQyZTVmYzIzMTAwNTUyZTg1MzkxMDZkZTM0NjFmMDM0Nw'
api_headers = {'X-Api-Key':API_TOKEN,
               'Accept':'application/json'}

#Change endpoint parameter to be either 'api' or 'api-staging'
endpoint = 'api-staging' #'api'

#Set timestamp for saved CSV file 
filetimestamp = datetime.now().strftime("%Y_%m_%d-%I_%M_%S_%p")

# Helper Functions

In [0]:
def company_run_match(df, company_name_field='receiver_name', row_id='transaction_id'):
  errors = []
  df = df.reset_index(drop=True)
  start_time = pd.Timestamp.now()
  match_results = []
  for ix, row in df.iterrows():
    if ix%100 == 0:
      print(ix)
    company_name = row[company_name_field]
    company_match_url =f'https://{endpoint}.altana.ai/atlas/v1/company/match/company_name={company_name}'
    resp = requests.get(company_match_url, headers=api_headers)
    if resp.status_code!=200:
      errors.append(resp)
    else:
      result_df = pd.DataFrame([resp.json()])
      result_df[row_id] = row[row_id]
      match_results.append(result_df)
  end_time = pd.Timestamp.now()

  out_df = pd.merge(df, pd.concat(match_results), on=row_id, how='left', indicator=True)
  avg_sec = ((end_time-start_time).total_seconds())/len(df)
  print(f'avg api response time in seconds: {avg_sec}')
  return out_df, errors

#---------------------------------------------------------------------------------------------------------------------------------------------------
def company_run_search(df, company_name_field='receiver_name', row_id='transaction_id'):
  errors = []
  df = df.reset_index(drop=True)
  start_time = pd.Timestamp.now()
  match_results = []
  for ix, row in df.iterrows():
    if ix%100 == 0:
      print(ix)
    company_name = row[company_name_field]
    company_match_url =f'https://{endpoint}.altana.ai/atlas/v1/company/search/company_name={company_name}'
    resp = requests.get(company_match_url, headers=api_headers)
    if resp.status_code!=200:
      errors.append(resp)
    else:
      result_df = pd.DataFrame(resp.json()['companies']).head(30)
      result_df[row_id] = row[row_id]
      match_results.append(result_df)
  end_time = pd.Timestamp.now()

  out_df = pd.merge(df, pd.concat(match_results), on=row_id, how='left', indicator=True)
  avg_sec = ((end_time-start_time).total_seconds())/len(df)
  print(f'avg api response time in seconds: {avg_sec}')
  return out_df, errors
#---------------------------------------------------------------------------------------------------------------------------------------------------

def facility_run_match(df, facility_name_field='receiver_name', address_string_field='receiver_full_address', row_id='transaction_id'):
  errors = []
  df = df.reset_index(drop=True)
  start_time = pd.Timestamp.now()
  match_results = []
  for ix, row in df.iterrows():
    if ix%100 == 0:
      print(ix)
    company_name = row[facility_name_field]
    address_str = row[address_string_field]
    facility_match_url =f'https://{endpoint}.altana.ai/atlas/v1/facility/match?company_name={company_name}&full_address={address_str}'
    resp = requests.get(facility_match_url, headers=api_headers)
    if resp.status_code!=200:
      errors.append(resp)
    else:
      result_df = pd.DataFrame([resp.json()])
      result_df[row_id] = row[row_id]
      match_results.append(result_df)
  end_time = pd.Timestamp.now()

  out_df = pd.merge(df, pd.concat(match_results), on=row_id, how='left', indicator=True)
  avg_sec = ((end_time-start_time).total_seconds())/len(df)
  print(f'avg api response time in seconds: {avg_sec}')
  return out_df, errors

#---------------------------------------------------------------------------------------------------------------------------------------------------

def facility_run_search(df, facility_name_field='receiver_name', address_string_field='receiver_full_address', row_id='transaction_id'):
  errors = []
  df = df.reset_index(drop=True)
  start_time = pd.Timestamp.now()
  match_results = []
  for ix, row in df.iterrows():
    if ix%100 == 0:
      print(ix)
    company_name = row[facility_name_field]
    address_str = row[address_string_field]
    facility_match_url =f'https://{endpoint}.altana.ai/atlas/v1/facility/search?company_name={company_name}&full_address={address_str}'
    resp = requests.get(facility_match_url, headers=api_headers)
    if resp.status_code!=200:
      errors.append(resp)
    else:
      result_df = pd.DataFrame(resp.json()['facilities']).head(30)
      result_df[row_id] = row[row_id]
      match_results.append(result_df)
  end_time = pd.Timestamp.now()

  out_df = pd.merge(df, pd.concat(match_results), on=row_id, how='left', indicator=True)
  avg_sec = ((end_time-start_time).total_seconds())/len(df)
  print(f'avg api response time in seconds: {avg_sec}')
  return out_df, errors


#---------------------------------------------------------------------------------------------------------------------------------------------------

def address_filter(df, min_geocode_level=18, street_level_match=True, min_geo_confidence=0.4):
  df_filtered = df[(df['address_model_output_level']=='geocode_str')]
  
  out_df = pd.concat([df_filtered.reset_index(drop=True),pd.DataFrame(list(df_filtered['geo_string_address_model_metadata']))], axis=1)
  out_df = pd.concat([out_df, pd.DataFrame(list(df_filtered['geocoder_metadata']))], axis=1)
  
  tdf = pd.DataFrame(list(out_df['layer_properties_in']))
  cols = []
  for c in tdf.columns:
    cols.append(c +'_in')
  tdf.columns = cols
  out_df = pd.concat([out_df, tdf], axis=1)
  
  tdf = pd.DataFrame(list(out_df['layer_properties_out']))
  cols = []
  for c in tdf.columns:
    cols.append(c +'_out')
  tdf.columns = cols
  out_df = pd.concat([out_df, tdf], axis=1)
  
  if street_level_match:
    out_df = out_df[out_df['street_in']==out_df['street_out']]
    if (out_df['housenumber_in'] is not None  & out_df['housenumber_out'] is not None):
      out_df = out_df[out_df['housenumber_in']==out_df['housenumber_out']]

  out_df = out_df[out_df['geo_confidence_in'] > min_geo_confidence]
  out_df = out_df[out_df['geo_confidence_out'] > min_geo_confidence]
  
  out_df = out_df[out_df['geo_level_in'] > min_geocode_level]
  out_df = out_df[out_df['geo_level_out'] > min_geocode_level]
  
  return out_df

#---------------------------------------------------------------------------------------------------------------------------------------------------
def awesome_cossim_top(A:csr_matrix, B:csr_matrix, ntop:int, lower_bound:float=0)->csr_matrix:
  '''
  Runs optimizes cosine similarity on two sparse matrices
  
  Parameters
  ----------
  A: "dirty" - companies we are trying to weed out 
  B: "clean" - company name (query) search
  ntop: stores ntop similar items (if compared against several strings)
  lower_bound: stores items with a similarity above lower_bound (if compared against several strings)
  
  Output
  ----------
  matches_sim: cosine similarity based on the TFIDF of an n-grams, metric between 0 and 1
  '''
  A = A.tocsr()
  B = B.tocsr()
  M, _ = A.shape
  _, N = B.shape

  idx_dtype = np.int32

  nnz_max = M * ntop

  indptr = np.zeros(M + 1, dtype=idx_dtype)
  indices = np.zeros(nnz_max, dtype=idx_dtype)
  data = np.zeros(nnz_max, dtype=A.dtype)

  ct.sparse_dot_topn(
      M, N, np.asarray(A.indptr, dtype=idx_dtype),
      np.asarray(A.indices, dtype=idx_dtype),
      A.data,
      np.asarray(B.indptr, dtype=idx_dtype),
      np.asarray(B.indices, dtype=idx_dtype),
      B.data,
      ntop,
      lower_bound,
      indptr, indices, data)
  return csr_matrix((data, indices, indptr), shape=(M, N))

#---------------------------------------------------------------------------------------------------------------------------------------------------
def find_similarity(query_name:str, canon_name:str, n_gram:int=3)-> float:
  '''
  Evaluates cosine similarity of two strings based on their n-gram TFIDF. Leverages sparse matrix for fast calculations.
  
  Parameters
  ----------
  query_name: company to search
  canon_name: output of company names best matched from database (companies we want to narrow down)
  n_gram: number of contiguous sequence of n characters, 3 is the default. Note: higher you go -> more precise matching and lower cosine similarity will be 
  
  Output
  ----------
  matches_sim: cosine similarity based on the TFIDF of an n-grams, metric between 0 and 1
  '''
  def ngrams(string, n=n_gram):
    #string = (re.sub(r'[,-./]',r'', string)).upper()
    string = (re.sub(r'[^A-Za-z0-9]+',r'', string)).upper()
    ngrams = zip(*[string[i:] for i in range(n)])
    return [''.join(ngram) for ngram in ngrams]
  
  # constructs your vectorizer for building the TF-IDF matrix
  vectorizer = TfidfVectorizer(min_df=1, analyzer=ngrams)

  # builds a sparse document term matrix of the query company name
  tf_idf_matrix_clean = vectorizer.fit_transform([query_name]) # convert to dense matrix tf_idf_matrix_clean.todense()

  # builds a sparse document term matrix of the canon company name
  tf_idf_matrix_dirty = vectorizer.transform([canon_name])

  # if there is completely no overlap between two strings, sparse matrix will be empty (bc dense matrix will have all zeros)
  # as the result, awesome_cossim_top will throw an error, need if statement to catch that 
  if tf_idf_matrix_clean.size == 0 or tf_idf_matrix_dirty.size == 0:
      matches_sim = [0]
  else:
      # runs optimizes cosine similarity on two sparse matrices
      matches = awesome_cossim_top(tf_idf_matrix_dirty, tf_idf_matrix_clean.transpose(), 1, 0.0)
      # unpacks results from matches 
      #matches_sim = get_matches_df(matches, tf_idf_matrix_dirty, tf_idf_matrix_clean, top=0)
      
      # convert sparse similarty to dense similarity
      matches_sim = matches.data
    
  return matches_sim[0]

### Load Data from Table Stored in Databricks

In [0]:
bsci_all_supplier = sqlContext.sql("SELECT * FROM bsci_v4.search_match_input_all_suppliers")
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

bsci_all_supplier_df = bsci_all_supplier.toPandas()

company_df = bsci_all_supplier_df
company_df['custom_id'] = company_df.index

facility_df3 = bsci_all_supplier_df
facility_df3['custom_id'] = facility_df3.index

# Search by Facility 
Searches by supplier name & address

In [0]:
facility_search_results, errors = facility_run_search(df=facility_df, 
                                    facility_name_field=facility_name,
                                    address_string_field=address_string, 
                                    row_id='custom_id')

outname_search = 'facility_search_results_dataframe_'+filetimestamp+'.csv'
outdir_search = '/dbfs/FileStore/search_match_data/'
facility_search_results.to_csv(outdir_search+outname_search, index=False, encoding="utf-8")
print(filetimestamp)

#Loop through function to get similarity scores between supplier name and company name 
ngram_list_facility = []
for i, j in zip(facility_search_results['Supplier Name'], facility_search_results['company_name']):
  try: 
    sim_1 = find_similarity(i, j)
    ngram_list_facility.append(sim_1)
  except:
        ngram_list_facility.append(0)
  facility_search_results_scores = pd.DataFrame(ngram_list_facility)
  facility_search_results_scores = facility_search_results_scores.rename({0: 'company_sim_score'}, axis=1)
  facility_search_results_merged = facility_search_results.merge(facility_search_results_scores, how='outer', left_index=True, right_index=True)
facility_search_results_merged.head()

#Save dataframe from facility search as table
facility_search_results_merged[facility_search_results_merged.columns] = facility_search_results_merged[facility_search_results_merged.columns].astype(str)

#Convert resulting pandas dataframe to spark dataframe and then save as table
facility_search_results_merged_df_spark = spark.createDataFrame(facility_search_results_merged)
facility_search_results_merged_df_spark.write.mode("overwrite").saveAsTable("bsci_v4.atlas_bulk_facility_search")

# Filtered Facility Search by Geo Confidence
Change min_geo_confidence level to subset on the search results

In [0]:
#Filtered search results for facilities
#Can change min_geo_confidence and min_geocode_level
filtered_search_results = address_filter(facility_search_results, min_geo_confidence=0.4, min_geocode_level=16, street_level_match=False)
filtered_search_results.head()


#Save dataframe from facility search as table
filtered_search_results[filtered_search_results.columns] = filtered_search_results[filtered_search_results.columns].astype(str)

#Convert resulting pandas dataframe to spark dataframe and then save as table
filtered_search_results_df_spark = spark.createDataFrame(filtered_search_results)
filtered_search_results_df_spark.write.mode("overwrite").saveAsTable("bsci_v4.atlas_bulk_facility_search_geo_filtered")

#Match by Facility
Match by supplier name & address

In [0]:
#Parameterize the input fields and api/api-staing in functions 
facility_match_results, errors = facility_run_match(df=facility_df, 
                                    facility_name_field= facility_name,
                                    address_string_field=address_string, 
                                    row_id='custom_id')

#Save dataframe from facility match as table
facility_match_results[facility_match_results.columns] = facility_match_results[facility_match_results.columns].astype(str)

#Convert resulting pandas dataframe to spark dataframe and then save as table
facility_match_results_df_spark = spark.createDataFrame(facility_match_results)
facility_match_results_df_spark.write.mode("overwrite").saveAsTable("bsci_v4.atlas_bulk_facility_match")

#Search by Company
Search by company name

In [0]:
company_search_results, errors = company_run_search(df=company_df, 
                                    company_name_field=company_name, 
                                    row_id='custom_id')

ngram_list = []
for i, j in zip(company_search_results['Supplier Name'], company_search_results['company_name']):
  try: 
    sim_1 = find_similarity(i, j)
    ngram_list.append(sim_1)
  except:
        ngram_list.append(0)
  company_search_results_scores = pd.DataFrame(ngram_list)
  company_search_results_scores = company_search_results_scores.rename({0: 'company_sim_score'}, axis=1)
  company_search_results_merged = company_search_results.merge(company_search_results_scores, how='outer', left_index=True, right_index=True)


#Save dataframe from company search as table
company_search_results_merged[company_search_results_merged.columns] = company_search_results_merged[company_search_results_merged.columns].astype(str)

#Convert resulting pandas dataframe to spark dataframe and then save as table
company_search_results_merged_df_spark = spark.createDataFrame(company_search_results_merged)
company_search_results_merged_df_spark.write.mode("overwrite").saveAsTable("bsci_v4.atlas_bulk_company_search")

#Match by Company
Match by company name

In [0]:
#Check to see if the company cosine similarity is avaiable from the company api 
#which rank it falls in search rank (number of transactions with which facility)
#in top 10 search results, creating more filitered search results 
company_match_results, errors = company_run_match(df=company_df, 
                                    company_name_field=company_name, 
                                    row_id='custom_id')

#Save dataframe from company match as table
company_match_results[company_match_results.columns] = company_match_results[company_match_results.columns].astype(str)

#Convert resulting pandas dataframe to spark dataframe and then save as table
company_match_results_df_spark = spark.createDataFrame(company_match_results)
company_match_results_df_spark.write.mode("overwrite").saveAsTable("bsci_v4.atlas_bulk_company_match")

#See Cmd 52 for download instructions 

###End of Code