# ROS Multiclass
### (Random Over-Sampling) - Duplica aleatoriamente muestras de la clase minoritaria, hasta compensar el desequilibrio de las clases.

### Importar las librerías a utilizar

In [1]:
import findspark
findspark.init()
import numpy as np
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf

conf = (SparkConf()
        .setAppName("ROS") \
        .set('spark.driver.cores', '6') \
        .set('spark.executor.cores', '6') \
        .set('spark.driver.memory', '6G') \
        .set('spark.sql.autoBroadcastJoinThreshold', '-1') \
        .set('spark.executor.memory', '6G'))
sc = SparkContext(conf=conf)

In [2]:
spark = SparkSession.builder.getOrCreate()

## Cargar los datos

El formato de datos debe ser el siguiente:


&nbsp; 
class column1 column2 column3 column4 ...... column_n 


&nbsp;
class column1 column2 column3 column4 ...... column_n 


&nbsp;
..       ..         ..        ..         ..          ..          .. 
 

&nbsp;
..        ..          ..          ..         ..         ..        .. 


&nbsp;

In [34]:
#Se lee el archivo y se almacenan los datos en un RDD
rdd_data = sc.textFile('/home/jsarabia/Documents/IA/Data-exploration-url_svmlight/data/url_svmlight/Day75.svm')
#rdd_data = sc.textFile('/home/jsarabia/Documents/IA/DayGeneral.svm')
#rdd_data = sc.textFile('ros1/part-00000')

In [35]:
rdd_data.take(1)

['+1 4:0.0373444 5:0.062069 6:0.0588235 11:0.142857 16:0.1 17:0.916974 19:0.209008 21:0.000416667 22:0.00041832 23:0.00041832 62:1 64:1 66:1 68:1 72:1 82:1 84:1 86:1 88:1 92:1 102:1 104:1 106:1 108:1 112:1 139:1 141:1 143:1 145:1 149:1 263:1 266:1 267:1 270:0.00041832 726:1 731:1 736:1 905:1 906:1 908:1 909:1 910:1 912:1 913:1 914:1 915:1 917:1 1629:1 2401:1 3521:1 3522:1 8197:1 8198:1 8199:1 8200:1 10728:1 155153:1 155154:1 155155:1 155156:1 155157:1 155158:1 155160:1 155161:1 155163:1 155164:1 155165:1 155174:1 155175:1 155176:1 155177:1 155178:1 155179:1 155180:1 155181:1 155182:1 155183:1 155184:1 155185:1 155186:1 155187:1 155188:1 155189:1 155190:1 155191:1 155192:1 155193:1 155194:1 155195:1 155196:1 155197:1 155198:1 155199:1 155200:1 155201:1 155202:1 155203:1 155204:1 155205:1 155206:1 155207:1 155208:1 155209:1 155210:1 155211:1 155212:1 155213:1 155214:1 503652:1 1597418:1 2573731:1']

In [36]:
# Número total de instancias
print('Instances:', rdd_data.count())

Instances: 20000


## Se separan las instancias y se toma la primera posición que corresponde a la clase


In [37]:
# se asigna a un nuevo RDD
rdd_classes = rdd_data.map(lambda instance: instance.split(' ')[0]) # [0] Posición de la clase

In [38]:
# Contar cuántas instancias hay de cada clase
class_repetition = (rdd_classes
    .flatMap(lambda row: row.split(" ")) # Se separa cada fila del RDD
    .map(lambda classes: (classes, 1))   # Se agrega un 1 para contar cada elemento
    .reduceByKey(lambda a, b: a + b)     # Se suma el total de clases 
    .sortBy(lambda s:s[1], False)        # Se ordena de acuerdo al no. de clases totales
    .collect())

In [39]:
# Número total de clases
print('Classes:', len(class_repetition))

Classes: 2


In [40]:
# Se crean dos listas: la lista "xclass" almacenará la clase, mientras que "xtotals" el número de 
# veces que esta clase se repita
xclass = []
xtotals = []

## Muestras por clase

In [41]:
# Ciclo for que recorre los elementos de la lista "class_repetition", se llenan las listas 
# "xclass" y "xtotals" y, posteriormente, se almacenan en dos arreglos de numpy
print('Clase \t Total')
for x in range(len(class_repetition)):
    a = class_repetition[x]
    xclass.append(str(a[0]))
    xtotals.append(a[1])
    print(a[0], '\t', a[1])
np_class = np.array(xclass)
np_total = np.array(xtotals)
    

Clase 	 Total
-1 	 14651
+1 	 5349


In [42]:
# Se crean dos RDD auxiliares
rdd = sc.parallelize([])
aux = sc.parallelize([])

In [43]:
# Se crean dos RDD auxiliares
rdd = sc.parallelize([])
aux = sc.parallelize([])

for i in range(len(class_repetition)):
    # Buscar la clase en el RDD inicial respecto al índice de la iteración
    clas = rdd_data.filter(lambda s:s.split(' ')[0]==np_class[i])
    # Calcular el número de muestras faltantes de la clase en cada iteración; "np_total[0]"
    # tiene el valor más alto de instancias
    missing_samples = np_total[0] - np_total[i]
    # Bloque para generar las muestras aleatorias
    blocks =  535
    # Número de iteraciones por clase
    iteration=missing_samples/blocks
    print("Clase:",np_class[i],"\tTotal:",np_total[i],"\tFaltantes:",missing_samples,"\tIteraciones:", int(iteration))
    if(iteration>0) and (i >0):
        for j in range(int(iteration)):
            #print(j+1)
            #Extract the sample
            extract=clas.takeSample(True,blocks,1)
            #Convert to rdd
            aux=sc.parallelize(extract)
            #join the samples of each class
            rdd=rdd.union(aux)
    #Join the classes
    rdd=rdd.union(clas)

Clase: -1 	Total: 14651 	Faltantes: 0 	Iteraciones: 0
Clase: +1 	Total: 5349 	Faltantes: 9302 	Iteraciones: 17


In [44]:
rdd.count()

29095

In [143]:
rdd_data.count()

16000

## Guardar el archivo con las clases balanceadas

In [144]:
rdd.coalesce(1).saveAsTextFile('ros1')

In [45]:
sc.stop()