# Orizon: Raio-X

Vamos entender como os serviços se relacionam entre si computando a frequencia relativa à cada um deles por ID conta e a frequência de agrupamentos de serviços por ID conta.

Será utilizado Regras de Associação e o agrupamento feito pelo algoritmo baseado no Eclat(Equivalent Class Transformation).

In [39]:
import findspark
findspark.init("/home/raven/.spark")

In [40]:
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType, IntegerType
import numpy as np

Inicializando sessão do Spark:

In [41]:
spark = SparkSession.builder \
   .master("local[4]") \
   .appName("Orizon - Raio X") \
   .config("spark.executor.memory", "1gb") \
   .getOrCreate()
sc = spark.sparkContext
sqlContext= SQLContext(sc, sparkSession=spark)

In [42]:
df = sqlContext.read.format('csv').options(header='true', inferSchema='true', delimiter='\t').load('data_30735068.csv')

In [43]:
master = 30735068
df.printSchema()

root
 |-- categoria: string (nullable = true)
 |-- id_cliente: integer (nullable = true)
 |-- cliente: string (nullable = true)
 |-- id_lote: integer (nullable = true)
 |-- id_conta: integer (nullable = true)
 |-- guia_prestador: string (nullable = true)
 |-- id_item: integer (nullable = true)
 |-- id_prestador: integer (nullable = true)
 |-- prestador: string (nullable = true)
 |-- cnpj: long (nullable = true)
 |-- uf_prestador: string (nullable = true)
 |-- cidade_prestador: string (nullable = true)
 |-- id_beneficiario: string (nullable = true)
 |-- sexo: string (nullable = true)
 |-- data_nascimento: string (nullable = true)
 |-- status: string (nullable = true)
 |-- cid: string (nullable = true)
 |-- cid2: string (nullable = true)
 |-- cid3: string (nullable = true)
 |-- cid4: string (nullable = true)
 |-- senha: string (nullable = true)
 |-- crm_solicitante: string (nullable = true)
 |-- uf_crm_solicitante: string (nullable = true)
 |-- cbos_solicitante: string (nullable = true)


Selecionando apenas os campos que vamos utilizar:

In [44]:
selected_df = df.select("id_conta", "servico", "descricao_despesa", "tipo_item", "qtde", "valor")
del df
selected_df.printSchema()

root
 |-- id_conta: integer (nullable = true)
 |-- servico: string (nullable = true)
 |-- descricao_despesa: string (nullable = true)
 |-- tipo_item: string (nullable = true)
 |-- qtde: double (nullable = true)
 |-- valor: double (nullable = true)



Preparando o dataset:

In [45]:
def numeric_convert(item):
    try:
        return int(item)
    except ValueError: # replaces non convertible strings with NaN
        return np.nan

In [46]:
numeric_udf = udf(numeric_convert, IntegerType())
selected_df = selected_df.withColumn("servico", numeric_udf("servico")).dropDuplicates().na.drop()

In [47]:
selected_df.printSchema()

root
 |-- id_conta: integer (nullable = true)
 |-- servico: integer (nullable = true)
 |-- descricao_despesa: string (nullable = true)
 |-- tipo_item: string (nullable = true)
 |-- qtde: double (nullable = true)
 |-- valor: double (nullable = true)



In [48]:
selected_df.first()

Row(id_conta=1133618327, servico=70799172, descricao_despesa='ESPARADRAPO 10 X 4,5 M - B', tipo_item='MATERIAIS', qtde=120.0, valor=1.2)

Pronto, os dados parecem estar em ordem. Nesse moment, vamos trabalhar apenas com os 'id_conta' atrelados a mais de um serviço: 

In [49]:
count = selected_df.groupBy("id_conta").count()

In [50]:
count.head(10)

[Row(id_conta=1184282323, count=100),
 Row(id_conta=1184299571, count=74),
 Row(id_conta=1190390629, count=117),
 Row(id_conta=1196940285, count=100),
 Row(id_conta=1199011816, count=120),
 Row(id_conta=1208434177, count=81),
 Row(id_conta=1217489232, count=6),
 Row(id_conta=1221638720, count=62),
 Row(id_conta=1230273877, count=90),
 Row(id_conta=1245792650, count=65)]

In [51]:
selected_df = selected_df.join(selected_df.groupBy('id_conta').count(),on='id_conta')
selected_df = selected_df.filter(selected_df["count"] > 1).drop("count")

### Início da implementação do algoritmo Eclat de Regra de Associação:

In [52]:
def support(service, dataframe):
    """ Function that gets as inputs services and a dataframe, 
        gets the total number of accounts in it, and gets the accounts 
        that contain the specific services
    Parameters:
    -------------------
    Inputs:
        service (np.array): a array with all the services
        dataframe (pd.DataFrame): the DataFrame in which to look for the services
    Returns:
        The support, which is the total number of accounts containing each one of the services
    """
    
    acc_ids = dataframe.filter(dataframe["servico"] == service[0]).select("id_conta").rdd.flatMap(lambda x: x).collect()
    for i in range(1, len(service)): # acc_ids that contain all previous service and current service
        # maybe create udf for filtering?
        service_df = dataframe.filter(dataframe["servico"] == service[i])
        acc_df = service_df[service_df.id_conta.isin(acc_ids)]
        acc_ids = [row["id_conta"] for row in acc_df.select("id_conta").collect()]
    return len(set(acc_ids))

In [53]:
##################### Not tested yet ############################

def drop_comb(elements,group):
    """ Function that check if a combination contain all the unwanted elements in a list
    Parameters:
    -------------------
    Inputs:
        elements (np.array):a array with the unwanted elements 
        group (tuple): combination of elements
    Returns:
        A boolean indicating if the combinations has all the unwanted elements (False) or do not (True)
    """
    return len(list(filter(lambda x: x in elements, group))) != len(elements)

In [54]:
################### Not tested yet ################################

def combinations_test(iterable, r,drops):
    """ Function that combines the elements r-wise-way. 
    It uses some core elements from a built-in lib from python 
    called (itertools.combinations), and we just added the drops elements
    Parameters:
    --------------------
    Inputs:
        iterable (list): elements to combine
        r (integer): number of combinations
        drops (list): list of unwanted elements in a combination
    Returns:
        comb (list): combinations of elements r-wise way in a tuple, if not in the drops list
    """
    pool = sc.parallelize(tuple(iterable))
    n = len(tuple(iterable))
    
    if r > n:
        return
    
    indices = sc.parallelize(range(r))
    comb = sc.parallelize(tuple(pool[i] for i in indices))
    i=0
    
    for drop in drops:
        if drop_comb(drop,sc.paralelize(comb))==False:
            break;
        i=i+1
        if i==len(drops):
            yield comb
            
    while True:
        for i in reversed(range(r)):
            if indices[i] != i + n - r:
                break
        else:
            return
        indices[i] += 1
        for j in range(i+1, r):
            indices[j] = indices[j-1] + 1
        comb = sc.parallelize(tuple(pool[i] for i in indices))
        i=0
        for drop in drops:
            if drop_comb(drop,sc.parallelize(comb))==False:
                break;
            i=i+1
            if i==len(drops):
                yield comb

In [55]:
def eclat(servicos,dataframe,r=1,ids_drop=[],min_sup=10):
    ''' Function that combines services in a r-wise-way, calculates its support and return if higher than min_sup.
        Parameters:
        -----------
        Inputs:
            servicos (list): list of services to combine
            r (int): number of combinations
            ids_drop (list): list of combinations of services not wanted together
            min_sup (integer): mininum of support
        Return:
            ids (array): array of tuples of elements combinations
            serv_list (array): support of the ids combinations
            new_ids_drop (array): array of new combinatios of services unwanted
    '''
    serv_list=[]
    ids = []
    new_ids_drop = []
    # first case, calculating all the services supports and returning if higher than min_sup
    if(r==1):
        for service in servicos:
            supp = support([service],dataframe)
            if(supp>min_sup):
                serv_list.append(supp)
                ids.append(service)
        return ids,serv_list,ids_drop
    # others cases, combinating the services not contaning in the drops list and returning if higher than min_sup
    else:
        if len(ids_drop)>0:
            ids_2 = list(combinations_test(servicos,r,ids_drop))
        else:
            ids_2 = list(combinations(servicos,r))
            
        for service in ids_2:
            supp = support(service,dataframe)
            if(supp>min_sup):
                serv_list.append(supp)
                ids.append(service)
            else:
                new_ids_drop.append(service)
        return ids,serv_list,new_ids_drop
    

In [56]:
def eclat_iter(servicos,dataframe,min_sup):
    ''' Function that calls the eclat function for all r combinations and puts all the returns together.
        Parameters:
        -----------
        Inputs:
            servicos (list): list of all services to use eclat
            dataframe (pandas.DataFrame): dataframe containing the services and accounts
            min_sup (integer): mininum support
        Return:
            final_id (array): array of all elements and combinations used in the association
            final_serv (array): array of all the supports of final_id.    
    '''
    # calling eclat for the first case (all services and no combinations)
    ids, serv_list, ids_drop = eclat(servicos,dataframe,min_sup=min_sup)
    
    # iteration logic for the others cases
    i=2
    final_id = ids
    final_serv = serv_list
    new_serv = [1,1]
    while True:
        # breaking the loop in case the last eclat return only one or zero combinations
        if(len(new_serv)<2):
            break
       # doing each iteration and putting together in the final_id and final_serv
        else:
            new_ids, new_serv, ids_drop = eclat(servicos=ids,dataframe=dataframe,ids_drop=ids_drop,r=i,min_sup=min_sup)
            final_id = final_id + new_ids
            final_serv = final_serv + new_serv
            i = i + 1
            ids = []
            # logic to break the tuple of combinations into singles services to use in the new iteration
            for x in new_ids:
                for y in x:
                    if y not in ids:
                        ids.append(y)
    return final_id,final_serv


    

Retornando o número de contas equivalentes a 10% do total de contas, para utilizar como mínimo suporte

In [57]:
min_sup = int(0.1*support([master],selected_df))
min_sup

1304

## Eclat por grupo de Itens

In [58]:
# todos os tipos de item diferentes
tipos_item = [row["tipo_item"] for row in selected_df.select("tipo_item").distinct().collect()]
tipos_size = len(tipos_item)

In [None]:
group_ids = list(np.zeros([tipos_size,1]))
group_basket = list(np.zeros([tipos_size,1]))
for i in range(tipos_size):
    print(tipos_item[i])
    tipos_df = selected_df.filter(selected_df["tipo_item"] == tipos_item[i])
    servicos_rows = tipos_df.select("servico").distinct().collect()
    servicos = [row["servico"] for row in servicos_rows]
    %time group_ids[i],group_basket[i] = eclat_iter(servicos, tipos_df, min_sup)
    if len(group_ids[i])==0:
        print('Sem agrupamento')
    else:
        print(group_ids[i][len(group_ids[i])-1])
        print(group_basket[i][len(group_ids[i])-1]*(100/support([master],selected_df)))