<a href="https://colab.research.google.com/github/EricDiGi/Globly-Ingest/blob/main/Invoice_Upload_V2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
# @title Import Control for Difficult Libraries
import os, sys

# module name & command line entry if it should not be found
imports = [
    ["snowflake.connector", "pip install snowflake-connector-python[pandas]==2.7.9"],
    ["snowflake.sqlalchemy", "pip install --upgrade snowflake-sqlalchemy"]
]

def safe_import(module, command):
  try:
    __import__(module)
  except ModuleNotFoundError:
    os.system(command)
    __import__(module)

for mod, cmd in imports:
  safe_import(mod, cmd)

In [3]:
# @title All the Imports
import pandas as pd
import numpy as np
import random as r
import re
import glob

import yaml
from yaml import SafeLoader

from getpass import getpass

import multiprocessing as mp
from multiprocessing import Process, Lock, Pool


from snowflake.connector.pandas_tools import pd_writer
import snowflake.connector
from snowflake.sqlalchemy import URL
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session, sessionmaker

import traceback
import warnings
warnings.filterwarnings("error")
warnings.filterwarnings("ignore")

import logging
logging.getLogger('snowflake.connector.cursor').setLevel(logging.WARNING) 

In [25]:
# @title Verbose Print & Logging System
# @markdown Log file contains info on which files were uploaded and which weren't
"""
  Valid-bool  Note  Path  Sheet
"""
def vprint(input_string, end="\n", flush=True, log=False):
  def __log():
    try:
      with open(log_file_, 'a') as log_stream:
        log_stream.write(input_string+end)
        log_stream.flush()
    except:
      raise Exception("No log stream available")

  def __sub():
    if 'verbose_' not in globals():
      print(input_string, end=end)
      if flush:
        sys.stdout.flush()
    else:
      if verbose_:
        print(input_string, end=end)
        if flush:
          sys.stdout.flush()

  if 'print_lock' in globals():
    print_lock.acquire()
    try:
      __sub()
    finally:
      print_lock.release()
  else:
    __sub()
  
  if log and ('fsys_lock' in globals()) and (log_file_ in globals()):
    fsys_lock.acquire()
    try:
      __log()
    finally:
      fsys_lock.release()
  elif log and (log_file_ in globals()):
    __log()

In [5]:
# @title Database Login

#-----------------------------------------------------
#               OVERHEAD Variables
#-----------------------------------------------------

os.environ['SNOW_USER'] = input("Username: ")
os.environ['SNOW_PASSWORD'] = getpass("Password: ")

Username: eric_digioacchino
Password: ··········


In [6]:
class Armory:
  def __init__(self):
    self.candidate_files = None
  #-----------------------------------------------------
  #               CONFIG Helpful Variables
  #-----------------------------------------------------

    # Config YAML
    stream = open("drive/MyDrive/Invoice Ingestion Suite/config.yml","r")
    self.config_ = yaml.load(stream, SafeLoader)
    stream.close()
    
    # Customers
    self.customer_regex = list()
    with open(self.config_["logging"]["parent_dir"]+self.config_['tool_files']['customer_regex'],"r") as f:
      lines = f.readlines()
      for line in lines:
        self.customer_regex.append( [s.rstrip() for s in line.split("\t")] ) 

    # Column Names 
    self.column_name_regex = list()
    with open(self.config_["logging"]["parent_dir"]+self.config_['tool_files']['column_solve'],"r") as f:
      lines = f.readlines()
      for line in lines:
        self.column_name_regex.append( [s.rstrip() for s in line.split("\t")] )


    # GLOB Filename Sequences
    self.filename_structures = list()
    with open(self.config_["logging"]["parent_dir"]+self.config_['tool_files']['filename_structs'],"r") as f:
      self.filename_structures = f.readlines()
      self.filename_structures = [line.rstrip() for line in self.filename_structures]

    # Database Connection (Snowflake)
    self.conn = snowflake.connector.connect(
                  user = os.environ['SNOW_USER']
                  , password = os.environ['SNOW_PASSWORD']
                  , account = self.config_['snowflake']['account']
                  , warehouse = self.config_['snowflake']['warehouse']
                  , role = self.config_['snowflake']['role']
                  , database = self.config_['snowflake']['database']
                  , schema = self.config_['snowflake']['schema']
                  )
    self.engine = create_engine((self.config_["snowflake"]["_PATH"][0] + self.config_['snowflake']['account'] + self.config_["snowflake"]["_PATH"][1]), creator=lambda: self.conn)
    #self.sql_session = sessionmaker(bind=self.engine)
  #-----------------------------------------------------
  #               Helpful Loaders and Processors
  #-----------------------------------------------------

  # Find all Candidate Files and load into list
  def load(self):
    self.candidate_files = []
    for struct in self.filename_structures:
      self.candidate_files += list(glob.glob(struct, recursive=True))

  # Using file path, find the name of the customer
  def get_customer(self,file_path):
    found = None
    for regex, customer in self.customer_regex:
      rex = re.compile(regex)
      res = rex.search(file_path)
      if res != None:
        return customer
    return None
  
  # Validate that every file path has a valid customer regex
  def validate(self):
    found_bad = False
    if self.candidate_files is None:
      raise Exception("No files loaded for validation")
    for f in self.candidate_files:
      cust = self.get_customer(f.lower())
      if cust == None:
        found_bad = True
        vprint("Unknown Customer:%s\n"%(f) , end="",flush=True)

    if not found_bad:
      vprint("Rules Compiled for all Customers\n" , end="",flush=True)
      return True
    else:
      vprint("Missing Customer, need to debug\n" , end="",flush=True)
      return False

  #-----------------------------------------------------
  #               SQL Stuff
  #-----------------------------------------------------
  def upload_to_snowflake(self, data_frame, engine, table_name, chunk=2000, truncate=True, create=False):
    with self.engine.connect() as con:
        if create:
            data_frame.head(0).to_sql(name=table_name, con=self.conn, if_exists="replace", index=False, chunksize=chunk)
        if truncate:
            con.execute(f"truncate table {table_name}")
            con.close()
        data_frame.to_sql(name=table_name, con=self.engine, if_exists='append', index=False, chunksize=chunk, method=pd_writer)
    #self.engine.dispose()
  
  def sqlize(self, df, create=False):
    df.columns = map(lambda x: str(x).upper(), df.columns)
    self.upload_to_snowflake(df, self.engine, self.config_['terraform']['table_name'], truncate=False, create=create)
  

In [22]:
class Invoice:
  def __init__(self, df, path, sheet, iter):
    self.df = df
    self.path = path
    self.sheet = sheet
    self.file_num = iter

  def preprocess(self):
    vprint("preprocessing - %d\n"%(self.file_num) , end="",flush=True)
    
    cols = dict([(c,(''.join(ch for ch in c if ch.isalnum()).upper())) for c in self.df.columns])
    self.df.rename(columns=cols, inplace=True)

    # Do some cleaning here
    customer_name = tools.get_customer(self.path.lower())
    if customer_name == "Customer Regex Failed":
      vprint("Customer name resolution failed:%s\n"%(self.path) , end="", flush=True)
    self.df['CUSTOMER'] = customer_name
    self.df["file_path"] = self.path

    specialty_columns = ['RXPRICE', 'DISPENSINGFEE', 'UNITPRICE']
    for col in specialty_columns:
      try:
        self.df[col+'TYPE'] = self.df[col].apply(type)
        if len(self.df.index[self.df[col+'TYPE'] == str].tolist()) > 0:
          self.df = self.df.truncate(
              after=list([a-1 for a in self.df.index[(self.df[col+'TYPE'] == str)].tolist()])[0]
              ,axis=0
              )
          # self.df.drop(labels=[1+a for a in self.df.index[(self.df[col+'TYPE'] == str)].tolist()], axis=0, inplace=True)
          # self.df.drop(labels=self.df.index[(self.df[col+'TYPE'] == str)].tolist(), axis=0, inplace=True)
        # drop calculations after meaningful data if known to exist
      except Exception as e:
        #print(e)
        continue

  def simplify(self):    
    if type(self.df) == type(None):
      vprint("0\t\"Invoice Failed to Resolve\"\t\"%s\"\t\"%s\""%(self.path, self.sheet), log=True, flush=True)
      return None

    vprint("simplifying - %d\n"%(self.file_num) , end="",flush=True)

    _columns = self.df.columns.tolist()
    simplified = pd.DataFrame()

    self.df = self.df.convert_dtypes()
    column_types = {
        "api_request_timestamp": (lambda df, c: df[c].astype(str)),
        "zipcode": (lambda df, c: df[c].astype(str)),
        "cash_insurance": (lambda df, c: df[c].astype(str)),
        "dispensed_ndc": (lambda df, c: df[c].astype(str)),
        "rx_ndc_concat" : (lambda df, c: df[c].astype(str)),
        "rx_number": (lambda df, c: df[c].astype(str)),
        "dispensing_fee": (lambda df, c: pd.to_numeric(df[c], errors="coerce")),
        "rx_price": (lambda df, c: pd.to_numeric(df[c], errors="coerce")),
        "unit_price": (lambda df,c: pd.to_numeric(df[c], errors="coerce")),
        "invoice_reason": (lambda df, c: df[c].astype(str)),
        "shipping_cost": (lambda df, c: pd.to_numeric(df[c],errors="coerce"))
    }

    for col, reg in tools.column_name_regex:
      rexcomp = re.compile(reg)
      try:
        simplified[col] = self.df[list(filter(rexcomp.match,_columns))].bfill(axis=1).iloc[:,0]
        try:
          simplified[col] = column_types[col](simplified,col)
        except Exception as e1:
          continue
      except Exception as e2:
        continue
    
    simplified['api_request_timestamp'] = pd.to_datetime(simplified['api_request_timestamp'], 
                                          #format='%Y-%m-%d %H:%M:%S', 
                                          utc=True,
                                          errors='coerce')
    
    self.df = simplified
    # vprint(self.df.dtypes)
    # 

In [18]:
class CandidateFile:
  def __init__(self,filepath, iter):
    self.path = filepath
    self.workbook = None
    self.sheet = None
    self.page = None
    self.file_num = iter
    
    self.has_valid = False
    self.is_bad = False
    self.__regex__ = re.compile("\s*api.*")
  
  def load(self):
    fsys_lock.acquire()
    #vprint("Loading - %d\n"%(self.file_num) , end="",flush=True)
    try:
      with pd.ExcelFile(self.path) as workbook:
        self.has_valid = self.verify_sheets(workbook)
      #self.workbook.close()
    except Exception as e:
      self.is_bad = True
      #vprint("0\t\"Workbook Failed to Load\"\t\"%s\"\t\"%s\""%(self.path, self.sheet), log=True, flush=True)
    finally:
      #vprint("File System Lock to release - %d"%(self.file_num), flush=True)
      fsys_lock.release()
  
  def verify_sheets(self, workbook):
    for sheet in workbook.sheet_names:
      try:
          self.page = pd.read_excel(self.path, sheet_name=sheet)
          trimmed = list(filter(self.__regex__.match, list(self.page.columns.str.lower())))
          if len(trimmed) >= 2:
            self.sheet = sheet
            return True
      except Exception as e:
        continue
    return False
  
  def transfer_knowledge(self):
    vprint("Transfering knowledge - %d\n"%(self.file_num) , end="",flush=True)
    if not self.is_bad and self.has_valid:
      if type(self.page) != type(None) and self.page.empty == False:
        return Invoice(self.page, self.path, self.sheet, self.file_num)
    return None



In [9]:
class DocumentProcess:
  def __init__(self, file, n):
    self.candy = CandidateFile(file, n)
    self.file_num = n
  
  def _run(self):
    #vprint(f'Process %s started working on task %d'%(mp.current_process().name,self.file_num), flush=True)
    invoice = None
    vprint("Loading - %s"%(self.file_num), flush=True)
    self.candy.load()
    vprint("Loading Complete - %s"%(self.file_num), flush=True)
    if self.candy.is_bad:
      vprint("Rotten Egg - %d"%(self.file_num), flush=True)
    if self.candy.has_valid:
      try:
        invoice = self.candy.transfer_knowledge()
        if invoice != None:
          invoice.preprocess()
          invoice.simplify()
          vprint("done prepping - %d\n"%(self.file_num) , end="",flush=True)
          
          if type(invoice.df) != type(None) and not invoice.df.empty:
            sql_lock.acquire()
            try:
              vprint("trying to sqlize - %d\n"%(self.file_num) , end="",flush=True)
              

              tools.sqlize(invoice.df)

              vprint("Shared %d to DB\n"%(self.file_num), end="",flush=True)
              
            except Exception as e:
              vprint("0\t\"DB Share Failed\"\t\"%s\"\t\"%s\""%(self.candy.path, self.candy.sheet), log=True, flush=True)
              vprint("%s\nCould not send %d to Database because %s\n %s\n"%(invoice.df.dtypes,self.file_num, e, traceback.format_exc()), end="",flush=True)
              
            finally:
              sql_lock.release()
            vprint("File %d Complete\n"%(self.file_num) , end="",flush=True)
            vprint("1\t\"Upload Complete\"\t\"%s\"\t\"%s\""%(self.candy.path, self.candy.sheet), log=True, flush=True)
          else:
            vprint("0\t\"Invoice Empty\"\t\"%s\"\t\"%s\""%(self.candy.path, self.candy.sheet), log=True, flush=True)
            vprint("Invoice is empty for some reason\n", end="",flush=True)

      except Exception as e:
        vprint("Sheet %d could not be processed because %s \n %s\n"%(self.file_num,e, traceback.format_exc()) , end="",flush=True)
        
    else:
      vprint("0\t\"No Sheets Found\"\t\"%s\""%(self.candy.path), log=True, flush=True)
      vprint("No Sheets Here - %d\n"%(self.file_num) , end="",flush=True)
    #vprint(f'Process %s finished working on task %d'%(mp.current_process().name,self.file_num), flush=True)
      

    

In [10]:
def remove_active_globals(input_series):
  intersection = [i for i in input_series if i in globals()]
  for var in intersection:
    del globals()[var]

In [24]:
class ColdRefresh:
  def __init__(self):
    pass

  def refresh(self, log_file):
    print("ColdRefresh")
    global tools
    tools = Armory()
    print("Traversing File Structure")
    tools.load()
    print("All Files Explored")
    if tools.validate():
      vprint("Tool Box is ready\n" , end="",flush=True)
    else:
      raise Exception("Run Backtrace - Tool Set failed to validate")
    
    global log_file_
    log_file_ = tools.config_["logging"]["parent_dir"] + log_file
    if not os.path.exists(log_file_):
      with open(log_file_, 'x') as f:
        pass

class HotRefresh(ColdRefresh):
  def refresh(self, log_file):
    print("HotRefresh")

    global tools
    tools = Armory()
    print("Traversing File Structure")
    tools.load()

    # Remove seen files
    global log_file_
    log_file_ = tools.config_["logging"]["parent_dir"] + log_file
    if os.path.exists(log_file_):
      try:
        log_data = pd.read_table(log_file_, header=None)
        seen_files = log_data[2].tolist()
        tools.candidate_files = list(filter(lambda i: i not in seen_files, tools.candidate_files))
      except:
        pass
    else:
      with open(log_file_, 'x') as f:
        pass

    print("All Files Explored")
    if tools.validate():
      vprint("Tool Box is ready\n" , end="",flush=True)
    else:
      raise Exception("Run Backtrace - Tool Set failed to validate")





In [12]:
global tools
tools = None

def f(obj):
    obj._run()
    return obj

def RunIngestionPipe(verbose=True, log=False, log_file="log.txt", random_batch_size=None, max_pooling=3, refresh_method='ColdRefesh', debug=False, debug_list=None):
  global verbose_, log_file_
  verbose_ = verbose

  # Do Refresh Stuff Here
  refresh_sys = globals()[refresh_method]().refresh(log_file=log_file)
  
  global fsys_lock, sql_lock, print_lock
  fsys_lock = Lock()
  sql_lock = Lock()
  print_lock = Lock()
  if debug:
    candidate_series =  ([DocumentProcess(f, n) for n,f in enumerate(debug_list)])
  else:
    if random_batch_size is not None:
      candidate_series = ([DocumentProcess(f, n) for n,f in enumerate(r.sample(tools.candidate_files,random_batch_size))])
    else:
      candidate_series = ([DocumentProcess(f, n) for n,f in enumerate(tools.candidate_files)])
    
  vprint("Candidates Identified\n" , end="",flush=True)
  vprint("Beginning Ingestion\n" , end="",flush=True)
  with Pool(max_pooling) as p:
    p.map(f, candidate_series)

In [None]:
problem_files = [
    ]
    
RunIngestionPipe(
    verbose=True
    , log=True
    , max_pooling=7
    #, random_batch_size=250
    , debug=True
    , debug_list=problem_files
    , refresh_method='ColdRefresh'
  )