In [None]:
# Lectura del archivo .csv de training
import pyspark_csv as pycsv
sc.addPyFile('pyspark_csv.py')
plaintext_rdd = sc.textFile('train_train.csv')
dataframe = pycsv.csvToDataFrame(sqlCtx, plaintext_rdd, parseDate=False)


data = dataframe.rdd
data.count()

In [None]:
from bs4 import BeautifulSoup # Para eliminar tags html
import re # Expresiones regulares para eliminar puntuacion
from nltk.corpus import stopwords # Stopwords para eliminar palabras comunes
from nltk.stem.lancaster import LancasterStemmer

def aplicarStemming(x):
    words = x.split()
    st = LancasterStemmer()
    new_words = []
    for w in words:
        new_words.append(st.stem(w))
    return " ".join(new_words)

def borrarPalabrasComunes(x):
    words = x.split()
    stop_words = set(stopwords.words("english"))
    stop_words.remove("not")
    stop_words.remove("hadn")
    stop_words.remove("hasn")
    stop_words.remove("didn")
    new_words = []
    for w in words:
        if(not w in stop_words):
            new_words.append(w)
    return " ".join(new_words)

def eliminarMenosUsadas(x):
    global menosUsadas
    print menosUsadas
    words = x.split()    
    new_words = []
    for w in words:
        if(not w in menosUsadas):
            new_words.append(w)
    return " ".join(new_words)
    
def borrarSimbolos(x):
    aBorrar = ",@#$-.():[]!?'"
    for c in aBorrar:
        x = x.replace(c, "")
    return x

def explicitarNegacion(x):
    negadores = ["not", "no", "dont", "doesnt", "havent", "hasnt", "isnt", "arent"
                "wont", "aint", "didnt", "hadnt"]
    words = x.split()
    new_words = []
    i = 0
    while(i < len(words)):
        if(words[i] in negadores):
            new_words.append("NOT_"+words[i+1].upper())
            i += 1
        else:
            new_words.append(words[i])
        i += 1
    return " ".join(new_words)

# El formato de n-gramas es el siguiente: los n-gramas se devuelven en un
# string, separados cada uno por un espacio. Para considerar los espacios
# "verdaderos" del texto original, se los reemplaza primero por @.
# Por ejemplo, el string "Un buen dia" se traduce con esta función, y tam.
# de n-grama=3 al string "Un@ n@b @bu bue uen en@ n@d @di dia".
def conseguirNgramas(x):
    ngramSize = 3
    old_string = x.replace(" ", "@")
    if(len(old_string) < ngramSize):
        return old_string
    new_string = old_string[0:ngramSize]
    for i in range(1, len(old_string)-ngramSize+1):
        new_string += " "
        new_string += old_string[i:(i+ngramSize)]
    return new_string

def considerarEmoticonesPuntuacion(x):
    # Lista de caritas felices
    caras_felices = [":)", "(:", "[:", ":]", "c:", "=)", "=]", "(=", "[=", "c=",
                    "=D", ":D", ";)", "(;", ";D"]
    for emoji in caras_felices:
        x = x.replace(emoji , "SMILING_FACE")
    # Lista de caritas tristes
    caras_tristes = [":(", ":[", "):", "]:", ":c", "=(", "=[", "]=", "=c", "D=", 
                    "D:", ";(", ");", "D;", ]
    for emoji in caras_tristes:
        x = x.replace(emoji, "SAD_FACE")
    # Lista de caritas sorprendidas
    caras_sorpr = [":0", ":o", "0:", "o:", "=o", "0="]
    for emoji in caras_sorpr:
        x = x.replace(emoji, "SURPRISED_FACE") 
    # Puntuación (signos ! y ?)
    x = x.replace("!!!", " ADMIRx3")
    x = x.replace("!!", " ADMIRx2")
    x = x.replace("???", " QUESx3")
    x = x.replace("??", " QUESx2")
    x = x.replace("?!", " ADM_QUES")
    x = x.replace("!?", " ADM_QUES")
    x = x.replace("!", " ADMIRx1")
    x = x.replace("?", " QUESx1")
    return x

# Función encargada de realizar un pre-procesamiento de los textos de las reviews
# según lo considerado por nuestro diseño del TP. Para ello, se recibe el set de
# entrenamiento como un RDD de reviews, que son tuplas (texto, puntaje).
# Las distintas acciones que la función realiza sobre el texto de las reviews
# dependen de los flags de procesamiento recibidos en flagsP (como lista).
# A continuación la lista de acciones controlada por cada flag de flagsP:
# flagsP[0] controla la eliminación de palabras comunes ("a", "the", "of", etc.)
# fragsP[1] elimina las palabras de frecuencia menor a *elMinimo*
# flagsP[2] activa el uso de stemming sobre las palabras de la review
# flagsP[3] activa el reconocimiento de emoticones y puntuaciones ?,!
# flagsP[4] activa la explicitación de la negación
# flagsP[5] convierte el texto a n-gramas
# Notar que las acciones se hacen en el orden explicitado por los flags (primero se eliminan
# las palabras comunes, después las de frecuencia menor, después stemming, etc.)
# Acciones que el pre-procesador de reviews hace siempre:
# - Eliminar tags html
# - Convertir todo a minúsculas
# - Eliminar los siguientes símbolos: "," "@" "#" "$" "-" "." "(" ")" ":" "]" "["
# (En el caso de considerar emoticones o puntuación no lo hace hasta después de
# detectar todos los emoticones o símbolos deseados correspondientes)
def preprocesar_reviews(elSet, flagsP):
    nuevoSet = elSet.map(lambda x: (BeautifulSoup(x[0], "lxml").getText(), x[1]) )
    nuevoSet = nuevoSet = nuevoSet.map(lambda x: (x[0].lower(), x[1]))
    
    if(flagsP[0]):
        nuevoSet = nuevoSet.map(lambda x: (borrarPalabrasComunes(x[0]), x[1]))
        
    if(flagsP[1]):
        global menosUsadas
        elMinimo = 10
        setFrec = nuevoSet.flatMap(lambda x: x[0].split()).map(lambda x: (x, 1))
        setFrec = setFrec.reduceByKey(lambda x,y: x+y)
        menosUsadas = setFrec.filter(lambda x: x[1] < elMinimo).map(lambda x: x[0]).collect()
        nuevoSet = nuevoSet.map(lambda x: (eliminarMenosUsadas(x[0]), x[1]))
    
    if(flagsP[2]):
        nuevoSet = nuevoSet.map(lambda x: (aplicarStemming(x[0]), x[1]))
    
    if(flagsP[3]):
        nuevoSet = nuevoSet.map(lambda x: (considerarEmoticonesPuntuacion(x[0]), x[1]))
    nuevoSet = nuevoSet = nuevoSet.map(lambda x: (borrarSimbolos(x[0]), x[1]))
    
    if(flagsP[4]):
        nuevoSet = nuevoSet.map(lambda x: (explicitarNegacion(x[0]), x[1]))
    
    if(flagsP[5]):
        nuevoSet = nuevoSet.map(lambda x: (conseguirNgramas(x[0]), x[1]))
    
    return nuevoSet

menosUsadas = []

In [None]:
import numpy as np

def parsearReview(x):
    texto = x[0]
    tabla = x[1]
    palabras = texto.split()
    for w in palabras:
        tabla.aumentarFrecuencia(w)
    return tabla

# Método auxiliar usado en el map-reduce: Fusiona dos diccionarios
def unirDiccionarios(dicA, dicB):
    # Copio A en C
    dicC = {}
    for clave in dicA.keys():
        valor = dicA[clave]
        dicC[clave] = valor
    # Veo ahora los simbolos en B
    for clave in dicB.keys():
        valor = dicB[clave]
        if dicC.has_key(clave):
            dicC[clave] += valor
        else:
            dicC[clave] = valor
    return dicC

# Método auxiliar de vectorización: Genera un numpy.vector que representa
# la cant. de palabras de un determinado review.
def vectorizar(x):
    global tabla_vec
    words = tabla_vec.keys()
    p = []
    for w in words:
        if(x.has_key(w)):
            p.append(float(x[w]))
        else:
            p.append(0.0)
    return np.array(p)

# vectorizar_reviews es la función encargada de convertir en numpy.vector
# todos los textos de todas las reviews. Para ello, recibe como parámetro
# el set de entrenamiento, que debe tener el sge formato: debe ser un RDD
# formado por tuplas (texto, puntaje) de cada review. El método devuelve
# otro RDD con el formato (vector, puntaje) donde vector es un numpy.vector
# que representa el texto de cada review recibida. Cada vector se consigue
# de la sgte forma: Primero se listan todas las palabras de todas las
# reviews del set, y se les asigna a cada una de ellas un índice del vector; 
# luego, para cada review del set, se cuentan cuántas palabras hay y cuáles,
# y se ponen esos valores de contadores en las posiciones correspondientes
# del vector. Por ejemplo, si todas las palabras son ["Casa", "Pez", "Arbol"]
# una review de la forma "Casa Pez Casa" se traducirá como el vector (2, 1, 0)
# NOTA: Si se quiere que en vez de contar las palabras se cargue sólamente un
# 1 o un 0 según si la palabra está presente o no, se debe cambiar en la
# función vectorizar de arriba el p.append(float(x[w])) por un p.append(1.0)
# ADVERTENCIA: El vectorizador no hace ningún procesamiento del texto, y
# separa sólo las palabras por espacios. Cualquier pre-procesamiento (por
# ejemplo, eliminar las ",") debe hacerse antes de llamarlo.
def vectorizar_reviews(elSet):
    # Primero consigo en tabla_vec un diccionario con todas las palabras
    # de todas las reviews del set
    global tabla_vec
    tabla_vec = elSet.map(lambda x: (x[0], SymbolTable())).map(parsearReview)
    tabla_vec = tabla_vec.map(lambda x: dict(x.verItems()))
    tabla_vec = tabla_vec.reduce(unirDiccionarios)
    # Vectorizo ahora todas las reviews del set usando esa tabla
    reviews_vec = reviews.map(lambda x: (x[0], SymbolTable(), x[1]))
    reviews_vec = reviews_vec.map(lambda x: (parsearReview(x), x[2]))
    reviews_vec = reviews_vec.map(lambda x: (dict(x[0].verItems()), x[1]))
    reviews_vec = reviews_vec.map(lambda x: (vectorizar(x[0]), x[1]))
    return reviews_vec


tabla_vec = {}

In [None]:
# Todo este bloque define la realización del k-fold crossed validation.
# El método funciona así: recibe un set de entrenamiento y hace sobre el
# mismo la técnica de k-fold crossed validation. El formato del set debe
# ser un RDD de TUPLAS de la forma: (features, categoria) donde la clave
# features puede ser cualquier basura, y categoria es el valor numérico
# que se desea predecir (aka el puntaje de cada review). En cada pasada
# del k-fold crossed validation, se invocan a las funciones de entrenar
# func_entrenar y a las de predicción func_predecir. Éstas dos funciones
# deben trabajar de manera global con el/los compresor/es o el SVM. Sus
# firmas deben ser las siguientes:
# func_entrenar recibe un set de entrenamiento (en el mismo formato que
# el set original, como tuplas feature,cat.) y prepara al compresor o SVM
# para las predicciones usando ese set.
# func_predecir recibe un set a predecir (en el mismo formato de tuplas
# feature, cat) y debe devolver OTRO set (también en el mismo formato!)
# que correspondan a las predicciones hechas por el SVM o compresores.
# Observación importante: como el k-fold crossed validation en sí no
# tiene ni idea qué usamos para predecir, todo lo demás ajeno a eso,
# incluyendo la selección de hiperparametros, debe hacerse "por fuera",
# ya sea con un pre-procesamiento de las reviews o en la función de entrenar.
def fooCount(x):
    global contadora
    contadora += 1
    return (x, contadora)

def calculo_ECM(predSet, valSet):
    cant = predSet.count()
    setAux = predSet.union(valSet)
    setAux = setAux.map(lambda x: (np.array_str(x[0]), x[1]) )
    setAux = setAux.reduceByKey(lambda x,y: float(x)-float(y)).map(lambda x: x[1]*x[1])
    ecm = setAux.reduce(lambda x,y: x+y)
    return (ecm/float(cant))

def k_fold_crossed_validation(elSet, func_entrenar, func_predecir):
    cantParticiones = 8
    ecm_acum = 0.0
    largoSet = elSet.count()
    largoParticion = largoSet / cantParticiones
    setauxi = elSet.map(fooCount)
    for j in range (1, cantParticiones+1):
        # Obtengo el testSet como la particion j-ésima y el trainSet como
        # todo el resto del set recibido menos el testSet
        #print "El set:", elSet.take(2)
        testSet = setauxi.filter(lambda x: (x[1] % cantParticiones) == (j-1)).map(lambda x: x[0])
        trainSet = setauxi.filter(lambda x: (x[1] % cantParticiones) == (j-1)).map(lambda x: x[0])
        #print "Antes de entrenar:", trainSet.take(2)
        # Entreno contra trainSet
        func_entrenar(trainSet)
        # Testeo contra testSet
        setResultados = func_predecir(testSet)
        ecm_acum += calculo_ECM(setResultados, testSet)
        print "ECM acumulado iteracion", j, "es:", ecm_acum
    # Obtengo el ECM promedio de la validación
    print "ECM promedio:", (ecm_acum/float(cantParticiones))
    
    
contadora = 0 # Se usa, no tocar

In [None]:
from symbol_table import *
from MulticatSVM import *
import numpy as np
np.set_printoptions(threshold='nan')

def funcion_hash(numero):
    dim_k = 25
    return (numero % dim_k)

def hashing_trick(x):
    dim_k = 25
    vec_k = []
    for i in range(0, dim_k):
        vec_k.append(0.0)
    vec_k = np.array(vec_k)
    for i in range(0, x.size):
        vec_k[funcion_hash(i)] += x[i]
    return vec_k

def entrenar_SVM(trainSet):
    global our_svm
    our_svm = MulticatSVM(dim = dim_datos, cte_soft_margin = C_inicial, cantCategorias = 5)
    # Primero necesito separar las reviews de acuerdo a su puntaje
    rev_por_puntaje = []
    for i in range(1, 6): # 5 puntajes posibles
        rev_act = trainSet.filter(lambda x: x[1] == i).map(lambda x: x[0]).collect()
        rev_por_puntaje.append(rev_act)
    # Tengo en rev_por_puntaje los datos en el formato que el MulticatSVM
    # los necesita => Lo entreno!
    our_svm.entrenar(rev_por_puntaje)
    return

def aplicar_prediccion(x):
    global our_svm
    return our_svm.predecir(x)

def predecir_SVM(testSet):
    global our_svm
    setResultados = testSet.map(lambda x: (x[0], aplicar_prediccion(x[0])) )
    return setResultados

# Primero guardo las reviews en formato (texto, puntaje) y pre-proceso
reviews = data.map(lambda x: (x.Text, x.Prediction))
reviews = preprocesar_reviews(reviews, [1, 1, 1, 0, 0, 0])
# Vectorizo el set:
tabla_vec = {} # Importante: Esta tabla vec se va a usar
reviews = vectorizar_reviews(reviews)
# Aplico hashing trick!
reviews = reviews.map(lambda x: (hashing_trick(x[0]), x[1]) )
# Hagamos un SVM multicategoría
dim_datos = reviews.take(5)[0][0].size
C_inicial = 9000.0 #cte inicial soft-margin
our_svm = MulticatSVM(dim = dim_datos, cte_soft_margin = C_inicial, cantCategorias = 5)
# Hago k-fold crossed validation contra las reviews
k_fold_crossed_validation(reviews, entrenar_SVM, predecir_SVM)
#entrenar_SVM(reviews)

#testing = sc.parallelize(reviews.take(10), 4)
#print "Antes:", testing.collect()
#testing = predecir_SVM(testing)
#print "Después", testing.collect()

In [None]:
def fooCount(x):
    global contadora
    contadora += 1
    return (x, contadora)

print reviews.take(5)
print reviews.map(lambda x: (np.array_str(x[0]), x[1]) ).take(5)

np.array_str(reviews.take(1)[0][0])