In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
from itertools import combinations
import time as t


In [None]:
def map_to_product(row):
    """
    Map each transaction into a set of KEY-VALUE elements.
    The KEY is the word (product) itself and the VALUE is its number of apparitions.
    """
    products = row.transaction.split(';') # split products from the column transaction
    for p in products:
        yield (p, 1)

def reduce_product_by_key(value1, value2):
    "Reduce the mapped objects to unique words by merging (summing ) their values"
    return value1+value2


def format_tuples(pattern):
    """
    Used for visualizition.
    Transforms tuples to a string since Dataframe does not support column of tuples with different sizes
    (a,b,c) -> '(a,b,c)'
    """
   
    return (str(tuple(pattern[0])), str(pattern[1]))

# split_function permet d'avoir plus de flexibilité au niveau des données d'entrée
def map_to_patterns(products, split_function = lambda x: x, max_products_by_pattern = 3):
    for i in range(1, max_products_by_pattern+1): # [1;4[
        for c in combinations(split_function(products), i):
            yield (c, 1) 

def reduce_product_by_key(value1, value2):
   
    return value1+value2

from copy import deepcopy

def map_to_subpatterns(pattern):
    key = list(pattern[0])
    value = pattern[1]
    n = len(key)
    yield (tuple(key), (None, value))

    if n > 1:
        for i in range(n):
            new_key = deepcopy(key)
            remove = new_key.pop(i)
            yield (tuple(new_key), (remove,value))

            
def map_to_assoc_rules(rule):
    for prod, value in rule[1]:
        if prod == None:
            tot = float(value)
            break
        
    confidence = (rule[0], [(prod, value/tot) for prod, value in rule[1] if prod != None ])
    yield confidence

    
def MBA(rdd):
    return rdd.flatMap(lambda row: map_to_patterns(row.Transaction))\
                    .reduceByKey(reduce_product_by_key) \
                    .flatMap(map_to_subpatterns) \
                    .groupByKey().mapValues(list) \
                    .flatMap(map_to_assoc_rules)

                

In [None]:
spark = SparkSession.builder.getOrCreate()


df_order_prior = spark.read.csv('gs://bucket_tp_3_picot/order_products__prior.csv', header=True, sep=',', inferSchema=True)
df_order_prior.createOrReplaceTempView("order_prod_p") # creates table 'order_prod'

start_p = int(round(t.time() * 1000))
results = spark.sql('SELECT COLLECT_LIST(opp.product_id) AS Transaction' 
               ' FROM order_prod_p opp GROUP BY order_id'
          )
end_p = int(round(t.time() * 1000))
print((end_p - start_p) / 1000 )
results.show(5, truncate=80)

start = int(round(t.time() * 1000))
assoc = MBA(results.rdd)
assoc.map(format_tuples).toDF(['patterns', 'association_rules']).show(5)
end = int(round(t.time() * 1000))

time = (end - start) / 1000 
"""Time is in seconds"""
print(time)

