In [None]:
from pyspark.sql import DataFrame, Row
from functools import reduce
from pyspark.sql.types import *
from pyspark.sql.functions import *
from graphframes import *
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SQLContext
import uuid

In [None]:
conf=SparkConf()
conf.set("spark.executor.memory", "2g")
conf.set("spark.driver.memory", "12g")
conf.set("spark.cores.max", "6")

sc = SparkContext.getOrCreate(conf)

spark = SQLContext(sc)

In [None]:
class MatchPruner:
    def __init__(self, mdw: DataFrame, match_suggestion: DataFrame, config: dict) -> None:
        self.mdw = mdw
        self.match_suggestion = match_suggestion
        self.config = config
        
    def preprocessing_matches(self, matches: DataFrame):
        matches = matches.withColumn("seller_type", lower(col("seller_type")))\
            .withColumn("match_type", when( col("match_type").isNotNull(), lower(col("match_type")) ).otherwise(col("match_type")) )
        return matches
    
    def prune_1p_3p_match_suggestions(self, match_suggestion: DataFrame):
        filtered_matches = None
        pruned_matches = None
        if self.config['seller_type'] == '1p':
            filtered_matches = match_suggestion.filter(col('seller_type') == '1p')
            pruned_matches = match_suggestion.filter(col('seller_type') == '3p')
            pruned_matches = pruned_matches.withColumn("STATUS", lit("pruned_by_seller_type_1p"))
        elif self.config['seller_type'] == '1p':
            filtered_matches = match_suggestion.filter(col('seller_type') == '3p')
            pruned_matches = match_suggestion.filter(col('seller_type') == '1p')
            pruned_matches = pruned_matches.withColumn("STATUS", lit("pruned_by_seller_type_3p"))
        else:
            filtered_matches = match_suggestion
        return filtered_matches, pruned_matches
    
    def separate_audited_unaudited_base_product_matches(self, match_suggestion:DataFrame, mdw:DataFrame):
        unaudited_base_product_matches = match_suggestion.join(mdw, 'base_sku_uuid', 'left_anti')
        audited_base_product_matches = match_suggestion.join(mdw, 'base_sku_uuid', 'inner')
        return unaudited_base_product_matches, audited_base_product_matches
    
    def separate_audited_and_unaudited_matches(self, audited_base_product_matches:DataFrame, mdw:DataFrame):
        audited_matches = audited_base_product_matches.join(mdw, 'pair_id', 'inner')
        similar_matches = audited_matches.filter(col('match_type')in(['equivalent', 'similar']))\
                        .select('base_sku_uuid', 'base_seller_type', 'audited_match_comp_seller_type', 'match_type')\
                        .groupBy('base_sku_uuid', 'base_seller_type', 'match_type')\
                            .agg(concat_ws('_', collect_set(col("audited_match_comp_seller_type"))).alias("audited_match_comp_seller_type"))\
                        .withColumn('status', lit(f"pruned_by_{col("base_sku_uuid")}"))
        
        exact_matches = audited_matches.filter(col('match_type')in(['exact']))\
                        .select('base_sku_uuid', 'base_seller_type', 'audited_match_comp_seller_type', 'match_type')\
                        .groupBy('base_sku_uuid', 'base_seller_type', 'match_type')\
                            .agg(concat_ws('_', collect_set(col("audited_match_comp_seller_type"))).alias("audited_match_comp_seller_type"))\
                        .withColumn('status', lit(f"pruned_by_{col("base_sku_uuid")}"))
        audited_base_product_unaudited_matches = audited_base_product_matches.join(mdw, 'pair_id', 'left_anti')
        return similar_matches, exact_matches, audited_base_product_unaudited_matches
    
    def prune_match_based_on_match_type(self, similar_matches:DataFrame , exact_matches:DataFrame , audited_base_product_unaudited_matches:DataFrame ):
        # audited_base_product_unaudited_matches contains only unaudited matches 
        pruned_suggestion = None
        match_suggestion = None
        if self.config['match_type'] == 'exact':
            
            if self.config['seller_type'] == '1p':
                pruned_suggestion = audited_base_product_unaudited_matches
                match_suggestion = None

            elif self.config['seller_type'] == '3p':
                audited_base_product_unaudited_matches = audited_base_product_unaudited_matches.join(exact_matches, "base_sku_uuid", "left")
                if self.config['cardinality'] == '1':
                    pruned_suggestion = audited_base_product_unaudited_matches.filter(col("audited_match_comp_seller_type").contains("3p"))
                    match_suggestion = audited_base_product_unaudited_matches.filter(~(col("audited_match_comp_seller_type").contains("3p")))
                if self.config['cardinality'] == 'n':
                    pruned_suggestion = None
                    match_suggestion = audited_base_product_unaudited_matches

            elif self.config['seller_type'] == '1p_or_3p':
                audited_base_product_unaudited_matches = audited_base_product_unaudited_matches.join(exact_matches, "base_sku_uuid", "left")
                if self.config['cardinality'] == '1':
                    pruned_suggestion = audited_base_product_unaudited_matches
                    match_suggestion = None
                if self.config['cardinality'] == 'n':
                    pruned_suggestion = audited_base_product_unaudited_matches.filter( (col("audited_match_comp_seller_type").contains("1p") ) & ( col('comp_seller_type') == '1p' ) )
                    match_suggestion = audited_base_product_unaudited_matches.filter( ~((col("audited_match_comp_seller_type").contains("1p") ) & ( col('comp_seller_type') == '1p' )) ) 

            elif self.config['seller_type'] == '1p_over_3p':
                audited_base_product_unaudited_matches = audited_base_product_unaudited_matches.join(exact_matches, "base_sku_uuid", "left")
                if self.config['cardinality'] == '1':
                    pruned_suggestion = audited_base_product_unaudited_matches.filter( ~((col("audited_match_comp_seller_type") == "3p") & ( col('comp_seller_type') == '1p' )) )
                    match_suggestion = audited_base_product_unaudited_matches.filter( (col("audited_match_comp_seller_type") == "3p") & ( col('comp_seller_type') == '1p' ) )
                if self.config['cardinality'] == 'n':
                    pruned_suggestion = audited_base_product_unaudited_matches.filter( (col("audited_match_comp_seller_type").contains("1p") ) & ( col('comp_seller_type') == '1p' ) )
                    match_suggestion = audited_base_product_unaudited_matches.filter( ~((col("audited_match_comp_seller_type").contains("1p") ) & ( col('comp_seller_type') == '1p' )) )

        elif self.config['similar'] == 'similar' or self.config['match_type'] == 'exact_or_similar':
            #  audited_base_product_unaudited_matches only contain 1p match suggestion
            exact_and_similar_match = exact_matches.union(similar_matches)
            if self.config['seller_type'] == '1p':
                audited_base_product_unaudited_matches = audited_base_product_unaudited_matches.join(exact_and_similar_match.filter(col("audited_match_comp_seller_type").contains("1p")), "base_sku_uuid", "left")
                if self.config['cardinality'] == '1':
                    prune_suggestion = audited_base_product_unaudited_matches.filter( (col("match_type") == 'exact')  )
                    match_suggestion = audited_base_product_unaudited_matches.filter( ~(col("match_type") == 'exact') )
                if self.config['cardinality'] == 'n':
                    pruned_suggestion = None
                    match_suggestion = audited_base_product_unaudited_matches
            elif self.config['seller_type'] == '3p':
                audited_base_product_unaudited_matches = audited_base_product_unaudited_matches.join(exact_and_similar_match.filter(col("audited_match_comp_seller_type").contains("3p")), "base_sku_uuid", "left")
                if self.config['cardinality'] == '1':
                    prune_suggestion = audited_base_product_unaudited_matches.filter( (col("match_type") == 'exact')  )
                    match_suggestion = audited_base_product_unaudited_matches.filter( ~( (col("match_type") == 'exact')  ) )
                if self.config['cardinality'] == 'n':
                    pruned_suggestion = None
                    match_suggestion = audited_base_product_unaudited_matches
            elif self.config['seller_type'] == '1p_or_3p':
                audited_base_product_unaudited_matches = audited_base_product_unaudited_matches.join(exact_and_similar_match, "base_sku_uuid", "left")
                if self.config['cardinality'] == '1':
                    prune_suggestion = audited_base_product_unaudited_matches.filter( col("match_type") == 'exact' )
                    match_suggestion = audited_base_product_unaudited_matches.filter( ~( col("match_type") == 'exact' ) )
                if self.config['cardinality'] == 'n':
                    pruned_suggestion = None
                    match_suggestion = audited_base_product_unaudited_matches
            elif self.config['seller_type'] == '1p_over_3p':
                audited_base_product_unaudited_matches = audited_base_product_unaudited_matches.join(exact_and_similar_match, "base_sku_uuid", "left")
                if self.config['cardinality'] == '1':
                    condition = ( col("match_type") == 'exact') & (
                        (col('audited_match_comp_seller_type').contains('1p') )  |
                        ((col('audited_match_comp_seller_type') == '3p') & (col('comp_seller_type') == '3p'))
                    )
                    prune_suggestion = audited_base_product_unaudited_matches.filter( condition )
                    match_suggestion = audited_base_product_unaudited_matches.filter( ~(condition) )
                if self.config['cardinality'] == 'n':
                    pruned_suggestion = None
                    match_suggestion = audited_base_product_unaudited_matches
        return pruned_suggestion, match_suggestion
        
    