# Implementación completa en Apache Spark del algortimo propuesto usando PySpark

In [1]:
import numpy as np
import pandas as pd
import time
import itertools
#import os
#import shutil
#import sys
#import argparse

from scipy.sparse import csr_matrix as csr
#from scipy.sparse import random

# Dependiendo de como se configure Spark las siguientes líneas pueden ser necesarias.
#import findspark
#findspark.init('/home/antoniojavier/spark-2.4.4-bin-hadoop2.7')
import pyspark as spark
from pyspark.sql import *

## Funciones necesarias para el cálculo del coeficiente de correlación de rango difuso.

In [2]:
# T-norma de Lukasiewicz.
def T_norm(x,y):
    return max(0,x+y-1)

# TL-E-Ordenacion fuertemente completa en R.  
def Fuzzy_ordering(x,y,r):
    return min(1,max(0,1-((x-y)/r)))

# Relación difusa estricta (Rx o Ry)
def Fuzzy_relation(x1,x2,r):
    return 1-Fuzzy_ordering(x2,x1,r)

# El grado de concordancia entre dos pares dado un r se calculará entonces de la siguiente forma.
def Concordance_degree(pair1,pair2,r):
    return T_norm(Fuzzy_relation(pair1[0], pair1[1],r), Fuzzy_relation(pair2[0], pair2[1],r))


## Generación de la matriz de grados de concordancia
Esta función devuelve la matriz de grados de concordancia del itemset de tamaño k=2 que se le pase como argumento, junto con un r y el conjunto de datos.

In [3]:
# Utiliza float de 16 bits en lugar del float estándar en python que utiliza 64 bits.
def generate_matrix(candidate, df, r):
    
    
    at1 = pd.to_numeric(df.value[candidate[0][0]])
    op1 = candidate[0][1]
    
    at2 = pd.to_numeric(df.value[candidate[1][0]])
    op2 = candidate[1][1]
    
    N = len(at1)     # N es el número de transacciones
    matrix = np.zeros((N, N), dtype = np.float16)
    
    if op1 == '>' and op2 == '>': 
        for i in range(N):
            for j in range(N):
                if i != j:
                    matrix[i,j] = Concordance_degree([at1[i], at1[j]],[at2[i], at2[j]],r)
                    
    if op1 == '>' and op2 == '<': 
        for i in range(N):
            for j in range(N):
                if i != j:
                    matrix[i,j] = Concordance_degree([at1[i], at1[j]],[at2[j], at2[i]],r)
                    
    return (tuple(candidate), csr(matrix))
                
"""
    Estas otras combinaciones nos las evaluaremos ya que solo habrá itemsets de tamaño 2 del 
    tipo (('atributo1', '>'), ('atributo2', '>')) y (('atributo1', '>'), ('atributo2', '<'))

    if op1 == '<' and op2 == '>': 
        for i in range(N):
            for j in range(N):
                if i != j:
                    matrix[i,j] = Concordance_degree([at1[j], at1[i]],[at2[i], at2[j]],r)          
    
    

    if op1 == '<' and op2 == '<': 
        for i in range(N):
            for j in range(N):
                if i != j:
                    matrix[i,j] = Concordance_degree([at1[j], at1[i]],[at2[j], at2[i]],r)
"""                 
    


                    


"\n    Estas otras combinaciones nos las evaluaremos ya que solo habrá itemsets de tamaño 2 del \n    tipo (('atributo1', '>'), ('atributo2', '>')) y (('atributo1', '>'), ('atributo2', '<'))\n\n    if op1 == '<' and op2 == '>': \n        for i in range(N):\n            for j in range(N):\n                if i != j:\n                    matrix[i,j] = Concordance_degree([at1[j], at1[i]],[at2[i], at2[j]],r)          \n    \n    \n\n    if op1 == '<' and op2 == '<': \n        for i in range(N):\n            for j in range(N):\n                if i != j:\n                    matrix[i,j] = Concordance_degree([at1[j], at1[i]],[at2[j], at2[i]],r)\n"

## Generación de la matriz de grados de concordancia. 8 bits
Variante de la función anterior para el caso en el que se elija una precisión de 8bits, se utilizan enteros de 8 bits.

In [4]:
# Utiliza enteros sin signo de 8 bits en lugar del float estándar en python que utiliza 64 bits.
def generate_matrix_int(candidate, df, r):
    
    
    at1 = pd.to_numeric(df.value[candidate[0][0]])
    op1 = candidate[0][1]
    
    at2 = pd.to_numeric(df.value[candidate[1][0]])
    op2 = candidate[1][1]
    
    N = len(at1)     # N es el número de transacciones
    matrix = np.zeros((N, N), dtype = np.uint)
    
    if op1 == '>' and op2 == '>': 
        for i in range(N):
            for j in range(N):
                if i != j:
                    matrix[i,j] = (255*Concordance_degree([at1[i], at1[j]],[at2[i], at2[j]],r)) 
                    
    if op1 == '>' and op2 == '<': 
        for i in range(N):
            for j in range(N):
                if i != j:
                    matrix[i,j] = (255*Concordance_degree([at1[i], at1[j]],[at2[j], at2[i]],r))
                    
    return (tuple(candidate), csr(matrix))

## Cálculo del soporte
Devuelve el soporte de la matriz que se le pasa como argumento como la suma de todos sus elementos dividido entre N(N-1)/2

In [5]:
# Devuelve el soporte de la matriz
def compute_support(m):
    N = m.shape[1]
    return csr.sum(m)/(N*((N-1)/2))   # También es posible usar np.sum directamente.

## Cálculo del soporte para 8 bits.
Variante de la función anterior para el caso en el que se elija una precisión de 8bits.

In [6]:
# Devuelve el soporte de la matriz
def compute_support_int(m):
    N = m.shape[1]
    return csr.sum(m/255)/(N*((N-1)/2))

## Validación del candidato combinado

Función que devuelve la combinación de los itemsets pasados como argumento si esta combinación es viable teniendo en cuenta los itemsets frecuentes de la iteración anterior, evaluando si:

* Tienen al menos k-2 elementos en común
* Todos sus subconjuntos de itemset graduales de tamaño k-1 fueron frecuentes en la iteración o fase anterior.

La condición de si ya han sido generados se evalúa después de esta función con un distinct() de la lista obtenida.

In [7]:
def combinedCandidates(itemset_pair, previous_frequent_itemsets):
    #previous_frequent_itemsets = [set(i) for i in previous_frequent_itemsetsk]
    result = ()
    difference = set(itemset_pair[1])-set(itemset_pair[0])
    if(len(difference) == 1):
        result =  itemset_pair[0] + tuple(difference)
        combinations = [x for x in itertools.combinations(result, len(result)-1)]
        if not all(i in previous_frequent_itemsets for i in combinations):
            result = ()

    return result

## Combinación de matrices
Función que recibe como argumento a un itemset combinado y la lista de itemsets frecuentes de la iteración anterior, con sus respectivas matrices. Devuelve el itemset y la matriz resultante de la combinación de las matrices de dos de los itemsets de tamaño k-1 que lo conforman, quedándonos con el mínimo de cada uno de sus elementos (T-norma de Gödel de las matrices).

In [8]:
def ComputeCombinedMatrix(itemset, previous_frequent):
    
    itemset1 = itemset[:-1]
    itemset2 = itemset[1:]
    #itemset2 = itemset[:-2]+itemset[-1]
    
    
    matriz_resultante = csr.minimum(previous_frequent.value[itemset1], previous_frequent.value[itemset2])
    itemset_resultante = itemset1 + tuple(set(itemset2)-set(itemset1))
    
    return (itemset_resultante, matriz_resultante)

## Función principal del algoritmo.
Devuelve la lista de todos los itemsets frecuentes en el conjunto de datos dado, que superen un cierto soporte mínimo. Recibe como entrada (argumentos) el SparkContext, el conjunto de datos(distribuido en el cluster), el soporte mínimo, un r para realizar las llamadas al cálculo de grado de correlación entre atributos. (Usa una precisión de 16 bits para almacenar los grados de concordancia)

In [9]:
def extractFrequentItemsets(sc, distDataset, min_supp, r):
    
    # Primera fase del algoritmo
    candidates = []

    n_att = len(distDataset.value.columns)

    # Añadimos a candidates todas los posibles itemsets de tamaño 2 (exceptuando equivalentes)
    for i in range(n_att):
        for j in range(i+1,n_att):
            candidates += [[(distDataset.value.columns[i],'>'),(distDataset.value.columns[j],'>')]]
            candidates += [[(distDataset.value.columns[i],'>'),(distDataset.value.columns[j],'<')]]

    
    # Evaluamos cada uno de los posibles candidatos de forma distribuida en el cluster, quedándonos únicamente
    # con los itemsets frecuentes de tamaño 2 junto con su matriz asociada.
    frequent_itemsets_k = sc.parallelize(candidates).map(lambda x: generate_matrix(x, distDataset, r))\
                                                    .filter(lambda x: compute_support(x[1]) >= min_supp)\
                                                    .collect()
    

    # Segunda fase del algoritmo

    frequent_itemsets_list = []
    # Repetimos este proceso hasta que la lista de itemsets frecuentes generados en la anterior iteración esté vacía
    while(frequent_itemsets_k):
        # Creamos una copia de los itemsets frecuentes de la fase o iteración anterior y sus respectivas matrices
        # en cada uno de los nodos del cluster.
        previous_frequent_dict = sc.broadcast(dict(frequent_itemsets_k))
        previous_frequent_itemsets = [item for item in previous_frequent_dict.value] # Lista con los itemsets frecuentes anterior
        frequent_itemsets_list += previous_frequent_itemsets
        candidates_combinations = [x for x in itertools.combinations(previous_frequent_itemsets, 2)]

        # Evaluamos las posibles combinaciones de candidatos de forma distribuida en el cluster, quedándonos únicamente
        # con los viables cuya matriz supere el soporte mínimo.
        frequent_itemsets_k = sc.parallelize(candidates_combinations)\
                             .map(lambda x: combinedCandidates(x,previous_frequent_itemsets))\
                             .filter(lambda x: x)\
                             .distinct()\
                             .map(lambda x: ComputeCombinedMatrix(x, previous_frequent_dict))\
                             .filter(lambda x: compute_support(x[1]) >= min_supp)\
                             .collect()

    
    # Eliminamos el contexto actual.
    sc.stop()
    return frequent_itemsets_list

## Función principal igual que la anterior pero para una precisión de 8 bits.

In [10]:
def extractFrequentItemsets_int(sc, distDataset, min_supp, r):
    
    # Primera fase del algoritmo
    candidates = []

    n_att = len(distDataset.value.columns)

    # Añadimos a candidates todas los posibles itemsets de tamaño 2 (exceptuando equivalentes)
    for i in range(n_att):
        for j in range(i+1,n_att):
            candidates += [[(distDataset.value.columns[i],'>'),(distDataset.value.columns[j],'>')]]
            candidates += [[(distDataset.value.columns[i],'>'),(distDataset.value.columns[j],'<')]]

    # Evaluamos cada uno de los posibles candidatos de forma distribuida en el cluster, quedándonos únicamente
    # con los itemsets frecuentes de tamaño 2 junto con su matriz asociada.
    frequent_itemsets_k = sc.parallelize(candidates).map(lambda x: generate_matrix_int(x, distDataset, r))\
                                                    .filter(lambda x: compute_support_int(x[1]) >= min_supp)\
                                                    .collect()

    

    # Segunda fase del algoritmo

    frequent_itemsets_list = []
    # Repetimos este proceso hasta que la lista de itemsets frecuentes generados en la anterior iteración esté vacía
    while(frequent_itemsets_k):
        # Creamos una copia de los itemsets frecuentes de la fase o iteración anterior y sus respectivas matrices
        # en cada uno de los nodos del cluster.
        previous_frequent_dict = sc.broadcast(dict(frequent_itemsets_k))
        previous_frequent_itemsets = [item for item in previous_frequent_dict.value] # Lista con los itemsets frecuentes anterior
        frequent_itemsets_list += previous_frequent_itemsets
        candidates_combinations = [x for x in itertools.combinations(previous_frequent_itemsets, 2)]

        # Evaluamos las posibles combinaciones de candidatos de forma distribuida en el cluster, quedándonos únicamente
        # con los viables cuya matriz supere el soporte mínimo.
        frequent_itemsets_k = sc.parallelize(candidates_combinations)\
                             .map(lambda x: combinedCandidates(x,previous_frequent_itemsets))\
                             .filter(lambda x: x)\
                             .distinct()\
                             .map(lambda x: ComputeCombinedMatrix(x, previous_frequent_dict))\
                             .filter(lambda x: compute_support_int(x[1]) >= min_supp)\
                             .collect()

    # Eliminamos el contexto actual.
    sc.stop()
    return frequent_itemsets_list

## Inicializamos el SparkContext especificando el nombre de la aplicación y el número de núcleos (en caso de realizar una ejecución en máquina local)

In [11]:
# Inicializacion del contexto
def createSparkContext():
    sc_conf = spark.SparkConf()
    sc_conf.setMaster("local[*]")    # Aqui podemos especificar el número de núcleos del procesador que queremos que se utilicen en local
    sc_conf.setAppName("pattern mining")
    #sc_conf.set('spark.executor.instances', '2')
    #sc_conf.set('spark.executor.memory', '2g')
    #sc_conf.set('spark.executor.cores', '1')
    #sc_conf.set('spark.cores.max', '1')
    #sc_conf.set('spark.logConf', True)

    sc = spark.SparkContext(conf=sc_conf)

    return sc 

sc = createSparkContext()

## Ejecución directa, asignando aqui valores que queremos a las variables
En ImplementacionSpark.py podemos realizar la ejecución y asignación de parámetros desde la línea de comandos

In [12]:
# Ejecución
r = 0.098

min_supp = 0.3

dataset_file = "winequality-red.csv"

precision = '16bits'

n_att = 12

n_trans = 100

sep = ';'

if(dataset_file == 'pd_speech_features.csv'):
    df = pd.read_csv("pd_speech_features.csv",sep = ",",skiprows = 1)   # Lo ponemos aparte porque tiene una fila extra y dos columnas que debemos quitar
    my_data = df.iloc[:n_trans,2:n_att+2]
else:
    # Cargomos los datos en un dataframe de pandas.
    #df = pd.read_csv("seizure.csv",sep = ",")
    df = pd.read_csv(dataset_file,sep = sep)





# Nos quedamos con un subconjunto total de las transacciones y de los atributos para adaptarlo a la capacidad de nuestro hardware
#my_data = df.iloc[:80,1:-149]
my_data = df.iloc[:50,:]
#my_data = df.iloc[:100,1:-743]


# Normalizo los datos (usando min-max scaling)
normalized_data = (my_data - my_data.min())/(my_data.max() - my_data.min())

datadist = sc.broadcast(normalized_data)

if precision == '8bits':
        t1 = time.time()
    
        result = extractFrequentItemsets_int(sc,datadist, min_supp,r)
    
        t2 = time.time()
else:  
        t1 = time.time()
        
        result = extractFrequentItemsets(sc,datadist, min_supp,r)
        
        t2 = time.time()

print("Tiempo de ejecución del algoritmo: ", t2-t1)
print("\n\nItemsets frecuentes obtenidos:\n\n")
for i in result:
        print(i,"\n")
# Es necesario reinicializar el SparkContext si queremos ejecutar esta celda mas de una vez.

Tiempo de ejecución del algoritmo:  14.219066858291626


Itemsets frecuentes obtenidos:


(('fixed acidity', '>'), ('volatile acidity', '<')) 

(('fixed acidity', '>'), ('citric acid', '>')) 

(('fixed acidity', '>'), ('free sulfur dioxide', '>')) 

(('fixed acidity', '>'), ('total sulfur dioxide', '>')) 

(('fixed acidity', '>'), ('density', '>')) 

(('fixed acidity', '>'), ('pH', '<')) 

(('fixed acidity', '>'), ('sulphates', '>')) 

(('fixed acidity', '>'), ('alcohol', '<')) 

(('volatile acidity', '>'), ('citric acid', '<')) 

(('volatile acidity', '>'), ('free sulfur dioxide', '>')) 

(('volatile acidity', '>'), ('free sulfur dioxide', '<')) 

(('volatile acidity', '>'), ('total sulfur dioxide', '>')) 

(('volatile acidity', '>'), ('total sulfur dioxide', '<')) 

(('volatile acidity', '>'), ('density', '>')) 

(('volatile acidity', '>'), ('density', '<')) 

(('volatile acidity', '>'), ('pH', '>')) 

(('volatile acidity', '>'), ('sulphates', '<')) 

(('volatile acidity', '>'), ('al