In [0]:
spark.conf.set('spark.sql.execution.arrow.enabled', True)
spark.conf.set('spark.sql.execution.arrow.fallback.enabled', False)
import pandas as pd
import string
import os
from pyspark.sql.window import Window
from pyspark.sql.functions import array, array_distinct, broadcast, coalesce, col, column,collect_set, count, avg,\
                                  concat, concat_ws, create_map, explode, first, lit, lower, upper,lag,\
                                  monotonically_increasing_id, pandas_udf, PandasUDFType,stddev_pop,\
                                  regexp_extract, sha2, size, split, trim, udf, when, round,months_between,\
                                  substring, current_date, datediff, year, current_timestamp, udf, monotonically_increasing_id,\
                                  sum, rank, isnull, regexp_replace,sort_array, collect_list,struct,\
                                  slice, to_date,to_timestamp,arrays_overlap, length, slice, array_except
from pyspark.sql.types import FloatType, StringType, StructField, StructType, IntegerType, DecimalType, NullType,BooleanType,DateType
from pyspark.sql.functions import date_format
from pyspark.sql import DataFrame
from pyspark.ml.feature import HashingTF, IDF, MinHashLSH, NGram, RegexTokenizer
from pyspark.ml.linalg import SparseVector, VectorUDT
from itertools import chain
from functools import reduce
import math as m
import json, pprint
from mlflow.tracking import MlflowClient
from mlflow.entities.model_registry.model_version_status import ModelVersionStatus
import time
import uuid
from decimal import Decimal

In [0]:
@udf("string")
def nonnull_count(*argv):
    """return the number of nonnull and len > 0 values"""
    _v = 0
    for val in argv:
        if val is not None:
            _v+=1
    return _v

@udf("double")
def weighted_score(*sims):  
    # weight order: [FirstName, LastName, Street, City, State, Zip, Gender, DOB, Race, SSN, MBI]
    weights = [
      float(w_FName), float(w_MName), float(w_LName), 
      float(w_Street), float(w_City), float(w_County), float(w_State), float(w_Zip), 
      float(w_Gender), float(w_Dob), float(w_Age), float(w_Race), 
      float(w_SSN), float(w_MBI), float(w_Lic), float(w_StateDL)
    ]
    _running_weight_normalizer = float(0)
    _running_score = float(0)
    for idx, sim in enumerate(sims):
        if sim is not None:
            _running_score += float(sim * 100*weights[idx])
            _running_weight_normalizer += float(weights[idx])
    if _running_weight_normalizer==0:
        return float(0)
    else:
        return _running_score / _running_weight_normalizer

@pandas_udf("string", PandasUDFType.GROUPED_AGG)
def lexical_last(v):
    """Sort groups lexically and return the last entry in the set"""
    return v.max()

@udf("string")
def string_first(x, y):
    """return the string value that comes first alphabetically"""
    return x if x < y else y

@udf("string")
def string_last(x, y):
    """return the string value that comes last alphabetically"""
    return x if x >= y else y

@udf("double")
def cos_sim(x, y):
    """return the weighted cosine similarity between two vector columns"""
    if x is not None and y is not None:
        if x.norm(2)*y.norm(2) == 0:
            return float(0)
        else:
            return float(x.dot(y)/(x.norm(2)*y.norm(2)))
    
@udf("double")
def jaccard_sim(x, y):
    """return the weighted jaccard similarity between two vector columns"""
    if x is not None and y is not None:
        xi, yi = set(x.indices), set(y.indices)
        xd, yd = dict(zip(xi, x.values)), dict(zip(yi, y.values))
        sum1, sum2 = 0, 0
        for idx in xi | yi:      
            sum1 += m.pow(min(xd.get(idx,0), yd.get(idx,0)), 2)
            sum2 += m.pow(max(xd.get(idx,0), yd.get(idx,0)), 2)
        if m.sqrt(sum2)==0:
            return float(0)
        else:
            return float(m.sqrt(sum1)/m.sqrt(sum2))

@udf("double")
def relative_numeric_sim(x, y):
    """return the relative numerical difference between single index spark vectors"""
    if x is not None and y is not None:
        age1 = x.indices[0]
        age2 = y.indices[0]
        denom = m.fabs(max([float(age1), float(age2)]))
        if denom != 0:
            return m.pow(1.0 - (m.fabs(float(age1) - float(age2)) / denom), 2)
    else:
        return float(0)

def drop_values_below_threshold(vec, threshold): 
    """
      Given a vector of token weights, drop all features below the provided weight-threshold.
      The idea is that super-common tokens (SCT) should be given low weights (by IDF) and 
      SCTs will cause MinHashLSH trouble, as it will result in giant buckets/bins.
      Further, SCTs don't even contribute much to similarities, by definition, since they have low weights.
      So, we drop them here to avoid the MinHashLSH performance pitfalls
    """
    #_dels is a list of indices to drop
    _dels = [x for x,y in enumerate(vec.values) if y<threshold]
    return SparseVector(
      vec.size,   # vector size is unchanged, it is based on the global set of all tokens
      [y for x,y in enumerate(vec.indices) if x not in _dels],  # these are the indexes we want to keep
      [y for x,y in enumerate(vec.values) if x not in _dels]    # these are the values we want to keep
    )

drop_values_below_threshold_udf = udf(drop_values_below_threshold, VectorUDT())

def add_hashes(df, id="sourceId", var="variable", val="value"):
    """
        valueId is shared by all tokens with the same value, e.g. 'Luke' and 'Luke' 
        df ~ dataframe to modify
        id ~ column with original source ids
        var ~ column with the name of the variable (e.g. original source column)
        val ~ column with source values
    """
    return (df
            .withColumn("valueId", sha2(lower(trim(col(val))), 256))
            .withColumn("featureId", sha2(concat(col(id), col(var)), 256))
           )

def drop_bad_values(df, val="value", vid="valueId", fid="featureId"):
    """
        Given a dataframe, return a smaller dataframe where we drop all entries associated with
        bad/ignorable values... like blanks
        df ~ dataframe to modify
        val ~ column with original value (not used currently)
        vid ~ hashed version of val
        fid ~ unique record identifier
    """
    return df.filter(trim(col(val)) != "")

def dedup_values(df, val="value", vid="valueId", fid="featureId"):
    """
        Given a dataframe, return a smaller dataframe where we keep only 1 row per value.
        We need to be deterministic in choosing which row to keep, so we call a custom function.

        df ~ dataframe to modify
        val ~ column with original value (not used currently)
        vid ~ hashed version of val
        fid ~ unique record identifier
    """
    keepers = (df
               .groupBy(vid).agg(lexical_last(fid))
               .withColumnRenamed(f"lexical_last({fid})", fid)
               .drop(vid)
              )
    return broadcast(keepers).join(df, fid, "left")

vsize = udf(lambda v: float(v.norm(1)),FloatType())

def tokenize(df, mode="default", iCol="value", oCol="tokens", id="featureId", sid=None):
    """
        Return dataframe with new column containing list of tokens for a given input string-like column

        df ~ dataframe to modify
        mode ~ tokenizing approach (default/bigram/trigram/numeric)
        iCol ~ column containing the data to tokenize
        oCol ~ column name to create and store the tokenized data
        id ~ how to uniquely identify each row
        sid ~ (option) source ID to pass through
    """
    rcols = [id, oCol] if sid is None else [sid, id, oCol] #optionally return a sourceId pulled from the "sid" column (for default)
    gcols = [id] if sid is None else [id, sid] #optionally return a sourceId pulled from the "sid" column (for ngram)
    if mode=="numeric":
        """e.g. 1 -> [1]"""
        return df.withColumn(oCol, array(iCol)).select(*rcols)
    if mode=="default":
        """e.g. "some cool words" -> ["some", "cool", "words"]"""
        tk = RegexTokenizer(inputCol=iCol, outputCol=oCol, pattern="\\W")
        return tk.transform(df).withColumn(oCol, array_distinct(col(oCol))).select(*rcols)
    else:
        tk = RegexTokenizer(inputCol=iCol, outputCol="temp1", pattern="\\W")
    if mode=="bigram":
        """e.g. "some cool words" -> ["_s", "so", "om", "me", "e_", "_c", "co", "oo", .....] """
        ng = NGram(n=2, inputCol="temp1", outputCol=oCol)
    elif mode=="trigram":
        """e.g. "some cool words" -> ["_so", "som", "ome", "me_", "_co", "coo", "ool", .....]"""
        ng = NGram(n=3, inputCol="temp1", outputCol=oCol)
    else:
        return
    
    return (ng.transform(tk.transform(df)
                         .withColumn("temp1", explode("temp1"))
                         .withColumn("temp1", array_distinct(split("temp1", "")))
                        )
            .withColumn(oCol, explode(oCol))
            .groupBy(*gcols)
            .agg(collect_set(oCol).alias(oCol))
            .filter(size(col("tokens"))>0)
           )

def spvec(x, vsize=262144):
    """This takes a number and creates a sparse vector where the index=number is 1
         This is not, admittedly, not great solution to a generic numeric similarity approach
         We are using this because we are not separating string from numeric features,
           this means we need to make sure we return the same type for both: a sparse vector
    """
    return SparseVector(vsize, [int(x[0]) % vsize], [1])

spvec = udf(spvec, VectorUDT())

def numeric_featurize(df, iCol="tokens", oCol="features", id="featureId", sid=None):
    if sid is not None:
        return df.withColumn(oCol, spvec(col(iCol))).select(id, sid, oCol)
    else:
        return df.withColumn(oCol, spvec(col(iCol))).select(id, oCol)

def featurize(df, iCol="tokens", oCol="features", id="featureId", idf_threshold=4.0, sid=None, tf=True):
    """
        Given a column containing arrays of tokens, measure the document frequency, weight the tokens and return as a sparse vector

        iCol ~ column containing the tokenized form of the data, e.g. ["some", "cool", "words"]
        oCol ~ this is the name of the column to create that will contain the output
        id ~ column that uniquely identifies each row
        idf_threshold ~ minimum token weight, anything below this will be dropped from the filtered_idfXform DF
    """
    rcols = [id, oCol] if sid is None else [sid, id, oCol] #optionally return a sourceId pulled from the "sid" column
    htf = HashingTF(inputCol=iCol, outputCol=f"raw_{oCol}")
    fdata = htf.transform(df).select(id, f"raw_{oCol}") if sid is None else htf.transform(df).select(sid, id, f"raw_{oCol}")
    if tf:
        idf = IDF(inputCol=f"raw_{oCol}", outputCol=oCol)
        idfModel = idf.fit(fdata)
        full_idfXform = idfModel.transform(fdata).select(*rcols)
    else:
        full_idfXform = fdata.withColumnRenamed(f"raw_{oCol}", oCol)
    if idf_threshold is not None:  # Its possible user desires tokenization/featurization, but no binning, so idf_threshold is None
        filtered_idfXform = (full_idfXform
                             .select(id, oCol)
                             .withColumn("features", drop_values_below_threshold_udf(col("features"), lit(idf_threshold)))
                             .filter(vsize(col("features")) != 0)
                            )
    else:
        filtered_idfXform = None
    return filtered_idfXform, full_idfXform  # Return the filtered AND unfiltered result

def binning(df, iCol="features", oCol="hashes", dCol="minHashJaccardDistance", id="featureId", numHashes=10, threshold=0.5):
    """
        This uses the MinHashLSH algorithm to generate candidate pairs based on a similarity threshold.

        df ~ dataframe with data to generate candidate pairs
        iCol ~ column name containing features for comparison approximations
        oCol ~ where to store generated minhashes
        dCol ~ column name to create containing the approximate jaccard distance between pairs
        id ~ unique record id
        numHashes ~ number of unique hash functions to use in MinHashLSH.  Using more yields higher precision, but requires more compute
        threshold ~ equivalent to (1 - jaccardSimilarity); pairs must be at least this similar to become candidates, lower == more restrictive
    """
    mh = MinHashLSH(inputCol=iCol, outputCol=oCol, numHashTables=numHashes)
    mh_model = mh.fit(df)
    return (mh_model
            .approxSimilarityJoin(df, df, threshold, distCol=dCol)
            .filter(col(f"datasetA.{id}")>col(f"datasetB.{id}"))
            .select(col(f"datasetA.{id}").alias(f"{id}1"), col(f"datasetB.{id}").alias(f"{id}2"), dCol)
           )

def normCols(DF):
    """For all columns in DF do the following:
         - remove all leading whitespace characters
         - remove all trailing whitespace characters
         - change all letters to lower-case
    """
    for c in DF.columns:
        DF = DF.withColumn(c, lower(regexp_extract(col(c), r'\s*(.*)\s*', 1)))
    return DF

def map_to_targets(df, mappings, context, source):
    """The contextConfig.sourceMappings part of the config file contains mappings that need to be applied.
        Each mapping contains a source name ('deltaTable'), a list of source columns ('sourceCols') and, optionally, a target alias ('targetAlias')
        This function is applied for a given context (e.g. Person Names) and a given source table.
        For each relevant mapping found in the config file, this function adds a column that combines the sourceCols
    """
    maps = mappings.filter((col("contextName")==context) & (col("deltaTable")==source)).select("sourceCols","targetAlias").collect()
    to_drop = []
    for _map in maps:
        _c = _map[0]
        _t = context + "__" + (_map[1] if _map[1] is not None else _c[0])
        df = df.withColumn(_t, concat_ws(" ", *[coalesce(c, lit("")) for c in _c]))
        to_drop += _c
    return df.drop(*list(set(to_drop)))

def melt(df, id_vars, value_vars, var_name="variable", value_name="value"):
    # adapated from https://stackoverflow.com/questions/41670103/how-to-melt-spark-dataframe
    """Convert DataFrame from wide to long format."""
    _vars_and_vals = create_map(
        list(chain.from_iterable([
            [lit(c), col(c)] for c in value_vars]
        ))
    )
    _tmp = df.select(*id_vars, explode(_vars_and_vals)) \
        .withColumnRenamed('key', var_name) \
        .withColumnRenamed('value', value_name)

    return _tmp

def convertToNull(dfa):
    for i in dfa.columns:
        dfa = dfa.withColumn(i , when(trim(col(i)) == '', None ).otherwise(col(i)))
    return dfa
  
# Wait until the model is ready
def wait_until_ready(model_name, model_version):
    client = MlflowClient()
    for _ in range(60):
        model_version_details = client.get_model_version(
          name=model_name,
          version=model_version,
        )
        status = ModelVersionStatus.from_string(model_version_details.status)
        print("Model status: %s" % ModelVersionStatus.to_string(status))
        print('#' + '='*100)
        if status == ModelVersionStatus.READY:
            break
        time.sleep(1)
    
    
def load_latest_model(model_name):
    client = MlflowClient()
    model_version_infos = client.search_model_versions(f"name = '{model_name}'")
    max_model_version = max([int(model_version_info.version) for model_version_info in model_version_infos])
    print(f'The latest version of pm-gbt model is:  {max_model_version}')
    print('#' + '='*100)
  
    model_uri = f"models:/{model_name}/{max_model_version}"
    print(f"Model uri is {model_uri}")
    print('#' + '='*100)
  
    wait_until_ready(model_name, max_model_version)
  
    model = mlflow.spark.load_model(model_uri)
    return model


uuid_udf = udf(lambda: str(uuid.uuid4()), StringType()).asNondeterministic()

#user = dbutils.secrets.get("snowflake", "snowflake-databricks-username")
#password = dbutils.secrets.get("snowflake", "snowflake-databricks-pw")
#database = dbutils.secrets.get("snowflake", "snowflake-databricks-database")
#url = dbutils.secrets.get("snowflake", "snowflake-host")

# snowflake connection options
#options = {"sfUrl": url,
 #          "sfUser": user,
 #          "sfPassword": password,
 #          "sfDatabase": database,
 #          "sfSchema": "ET3_LOOKER",
  #         "truncate_table" : "ON",
  #         "usestagingtable" : "OFF"
  #          }

def write_sf_data(df, table_name):
    ''' Function to write data from a dataframe in databricks to snowflake
        df = pyspark.dataframe
        table_name = snowflake table name in this format [schema].[tablename]
            ie. = ET3_LOOKER.MATCHED
    '''
    df.write\
    .format("snowflake") \
    .options(**options) \
    .option("dbtable", table_name) \
    .mode('overwrite')\
    .save()

def load_sf_data(table):
    data = spark.read.format("snowflake").options(**options).option("dbtable", table).load()
#   data.createOrReplaceTempView(table)
    return data