# Procesamiento y limpieza de datos
## Dataset: [*Cyberbullying Classification*](https://www.kaggle.com/datasets/andrewmvd/cyberbullying-classification?resource=download)
### Luna Duran, Dayana Gonzalez y Emanuel Naval
---
## Limpieza

El dataset cuenta con un aproximado de 48000 datos que consisten en *tweet_text* y *cyberbullying_type*. Cuenta con 6 tipos de clasificación:

* Edad
* Origen étnico
* Género
* Religión
* Otro tipo de ciberacoso
* No ciberacos

En donde se tienen aproximadamente 8000 datos para cada una de las clases, es decir, el dataset se encuentra balanceado. El dataset no cuenta con valores nulos o erroneos y por lo tanto no es necesario realizar una limpieza de datos.

## Procesamiento

El procesamiento será realizado para la columna *tweet* en donde cada uno de ellos será sometido a un procesamiento de lenguaje natural. Como son tweets, el contenido será tokenizado, en donde los vínculos o links, caracteres no alfanuméricos (como signos de puntuación, #, @...), stopwords y palabras con longitud menor a 3 serán eliminados y no tenidos en cuenta.

In [60]:
# Importación de librerías
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer, OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import RandomForestClassifier
from pyspark.sql.types import StructType, StructField, StringType, FloatType

spark = SparkSession.builder.appName("cyberbullying_classification").getOrCreate()

In [10]:
# Carga de los datos con opciones que permiten separar por comas
data = spark.read.option("quote", "\"") \
                     .option("escape", "\"") \
                     .option("multiline", True) \
                     .csv('cyberbullying_tweets.csv', sep = ",", header='true')
# Visualización de la carga de datos
data.show(5)

+--------------------+------------------+
|          tweet_text|cyberbullying_type|
+--------------------+------------------+
|In other words #k...| not_cyberbullying|
|Why is #aussietv ...| not_cyberbullying|
|@XochitlSuckkks a...| not_cyberbullying|
|@Jason_Gio meh. :...| not_cyberbullying|
|@RudhoeEnglish Th...| not_cyberbullying|
+--------------------+------------------+
only showing top 5 rows



In [11]:
# Conteo del número de datos por clase para verificar que no hayan datos nulos o erroneos
data.groupBy("cyberbullying_type") \
    .count() \
    .orderBy(col("count").desc()) \
    .show()

+-------------------+-----+
| cyberbullying_type|count|
+-------------------+-----+
|           religion| 7998|
|                age| 7992|
|             gender| 7973|
|          ethnicity| 7961|
|  not_cyberbullying| 7945|
|other_cyberbullying| 7823|
+-------------------+-----+



In [12]:
## Inicialización de las funciones para el pipeline
# Tokenizador con expresion regular
regexTokenizer = RegexTokenizer(inputCol="tweet_text", outputCol="words", pattern="\\W")
# Eliminador de stopwords
stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered")
# Contador para la bolsa de palabras
countVectors = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=10000, minDF=5)
# Codificación de la variable respuesta
label_stringIdx = StringIndexer(inputCol = "cyberbullying_type", outputCol = "label")

In [13]:
# Pipeline inicial para tokenizar y remover stopwords
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover])
pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)

# Eliminación de las palabras de longitud < 3 para las palabras filtradas
expr_sql = "filter(filtered, x -> length(x) > 3)"
dataset = dataset.withColumn("filtered", expr(expr_sql))

# Pipeline final para la creación de la bolsa de palabras y codif. de la var. respuesta
pipeline = Pipeline(stages=[countVectors, label_stringIdx])
pipelineFit = pipeline.fit(dataset)
dataset = pipelineFit.transform(dataset)

# Visualización del resultado
dataset.show(5, truncate=False)

+-------------------------------------------------------------------------------------------------------------------+------------------+-----------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------+-----+
|tweet_text                                                                                                         |cyberbullying_type|words                                                                                                                  |filtered                                                                                        |features                                                                    |label|
+-------------------------------------------------------------------------------------------------------------------+---------

In [14]:
# Visualización del número de tweets por categoría luego del procesamiento
dataset.groupBy(col("label"), col("cyberbullying_type")) \
    .count() \
    .orderBy(col("count").desc()) \
    .show()

+-----+-------------------+-----+
|label| cyberbullying_type|count|
+-----+-------------------+-----+
|  0.0|           religion| 7998|
|  1.0|                age| 7992|
|  2.0|             gender| 7973|
|  3.0|          ethnicity| 7961|
|  4.0|  not_cyberbullying| 7945|
|  5.0|other_cyberbullying| 7823|
+-----+-------------------+-----+



In [15]:
# División del dataset en conjunto de entrenamiento y prueba
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 23)
print("Conteo del dataset de entrenamiento: " + str(trainingData.count()))
print("Conteo del dataset de prueba: " + str(testData.count()))

Conteo del dataset de entrenamiento: 33386
Conteo del dataset de prueba: 14306


In [16]:
# Pequeña visualización de los tokens, bolsa de palabras y su respectiva etiqueta
data_final = dataset.select(col('filtered'), col('features'), col('label'))
data_final.show(5, truncate=False)

+------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------+-----+
|filtered                                                                                        |features                                                                    |label|
+------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------+-----+
|[words, katandandre, food, crapilicious]                                                        |(9511,[252,503,2045],[1.0,1.0,1.0])                                         |4.0  |
|[aussietv, white, theblock, imacelebrityau, today, sunrise, studio10, neighbours, wonderlandten]|(9511,[18,127,8383],[1.0,1.0,1.0])                                          |4.0  |
|[xochitlsuckkks, classy, whore, velvet, cupcakes]                                        

In [29]:
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")
model = nb.fit(trainingData)

predictions = model.transform(testData)
select = predictions.select("tweet_text","cyberbullying_type","label","prediction")
select.show(5, truncate=False)
    

gender 2.0 [0.0017865471516936968,0.07726678690933524,0.3238320663249651,0.5065683059643187,0.03382322483262598,0.05672306881706124] 3.0
other_cyberbullying 5.0 [0.032972525715829,0.07723576977119458,0.42248643194547647,0.4367145273092455,0.017446415698680044,0.013144329559574298] 3.0
gender 2.0 [0.0053303002373438985,0.056556751343460235,0.42712034502991053,0.4288050589437457,0.018125621943907447,0.06406192250163212] 3.0
ethnicity 3.0 [4.246519851534244e-05,0.0032318914418686963,0.11889039068340802,0.867810198021253,0.003638880590297658,0.006386174064657477] 3.0
gender 2.0 [0.008434511689726537,0.1886927517412078,0.6435058928033842,0.0625281959005312,0.03960574963713058,0.05723289822801969] 2.0
gender 2.0 [0.0019816172615590407,0.03595967209458885,0.5402003631884628,0.39809463158768316,0.004851139374255316,0.018912576493450812] 2.0
gender 2.0 [0.004879532890680509,0.005297550760603373,0.025675230630188294,0.8944710768980637,0.01738495157381129,0.052291657246652634] 3.0
gender 2.0 [0.0

In [32]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.770072035993158

In [38]:
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.1, 0.3, 0.5]) # regularization parameter
             .addGrid(lr.elasticNetParam, [0.0, 0.1, 0.2]) # Elastic Net Parameter (Ridge = 0)
 #            .addGrid(lr.maxIter, [10, 20, 50]) #Number of iterations
 #            .addGrid(idf.numFeatures, [10, 100, 1000]) # Number of features
             .build())
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr, \
                    estimatorParamMaps=paramGrid, \
                    evaluator=evaluator, \
                    numFolds=5)
cvModel = cv.fit(trainingData)

predictions = cvModel.transform(testData)
# Evaluate best model
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.8184553081244266

In [46]:
select = predictions.select("tweet_text","cyberbullying_type","label","prediction")\
            .orderBy("cyberbullying_type", ascending=True) 

select.show(50, truncate=30)

+------------------------------+------------------+-----+----------+
|                    tweet_text|cyberbullying_type|label|prediction|
+------------------------------+------------------+-----+----------+
|//Arya x Teacher????? Arya ...|               age|  1.0|       1.0|
|@RepTedYoho I have to tell ...|               age|  1.0|       1.0|
|//Patches when she was a te...|               age|  1.0|       1.0|
|#BullyBarr is obviously an ...|               age|  1.0|       2.0|
|       //Quite the eager slut.|               age|  1.0|       5.0|
|#MorningJoe #DisbarBarr Bil...|               age|  1.0|       1.0|
|//my school bully keeps sen...|               age|  1.0|       1.0|
|#TSFakeAnime a fallen star ...|               age|  1.0|       1.0|
|    0, would bully u at school|               age|  1.0|       1.0|
|#WhyIDontLikeTrump ... ever...|               age|  1.0|       1.0|
|1. I don't. 2. I don't thin...|               age|  1.0|       1.0|
|#YAMonday excerpt from my N...|  

In [53]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
Accuracy = evaluator.evaluate(predictions,{evaluator.metricName: "accuracy"})

In [54]:
evaluator.setMetricName("f1")
f1 = evaluator.evaluate(predictions)

In [55]:
evaluator.setMetricName("hammingLoss")
Loss = evaluator.evaluate(predictions)

In [57]:
Metrics = spark.createDataFrame([
        (0, "a b c d e spark", 1.0),
        (1, "b d", 0.0),
        (2, "spark f g h", 1.0),
        (3, "hadoop mapreduce", 0.0)
    ], ["id", "text", "label"])

Traceback (most recent call last):
  File "c:\Users\dayan\AppData\Local\Programs\Python\Python311\Lib\site-packages\pyspark\serializers.py", line 458, in dumps
    return cloudpickle.dumps(obj, pickle_protocol)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\dayan\AppData\Local\Programs\Python\Python311\Lib\site-packages\pyspark\cloudpickle\cloudpickle_fast.py", line 73, in dumps
    cp.dump(obj)
  File "c:\Users\dayan\AppData\Local\Programs\Python\Python311\Lib\site-packages\pyspark\cloudpickle\cloudpickle_fast.py", line 602, in dump
    return Pickler.dump(self, obj)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\dayan\AppData\Local\Programs\Python\Python311\Lib\site-packages\pyspark\cloudpickle\cloudpickle_fast.py", line 692, in reducer_override
    return self._function_reduce(obj)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\dayan\AppData\Local\Programs\Python\Python311\Lib\site-packages\pyspark\cloudpickle\cloudpickle_fast.py", line 565, in _fun

PicklingError: Could not serialize object: IndexError: tuple index out of range

In [59]:
data2 = [("James","","Smith","36636","M",3000),
    ("Michael","Rose","","40288","M",4000),
    ("Robert","","Williams","42114","M",4000),
    ("Maria","Anne","Jones","39192","F",4000),
    ("Jen","Mary","Brown","","F",-1)
  ]
 
df = spark.createDataFrame([("James","","Smith","36636","M",3000),
    ("Michael","Rose","","40288","M",4000),
    ("Robert","","Williams","42114","M",4000),
    ("Maria","Anne","Jones","39192","F",4000),
    ("Jen","Mary","Brown","","F",-1)
  ], ["id", "text", "label","1","2","3"])
df.printSchema()
df.show(truncate=False)

Traceback (most recent call last):
  File "c:\Users\dayan\AppData\Local\Programs\Python\Python311\Lib\site-packages\pyspark\serializers.py", line 458, in dumps
    return cloudpickle.dumps(obj, pickle_protocol)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\dayan\AppData\Local\Programs\Python\Python311\Lib\site-packages\pyspark\cloudpickle\cloudpickle_fast.py", line 73, in dumps
    cp.dump(obj)
  File "c:\Users\dayan\AppData\Local\Programs\Python\Python311\Lib\site-packages\pyspark\cloudpickle\cloudpickle_fast.py", line 602, in dump
    return Pickler.dump(self, obj)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\dayan\AppData\Local\Programs\Python\Python311\Lib\site-packages\pyspark\cloudpickle\cloudpickle_fast.py", line 692, in reducer_override
    return self._function_reduce(obj)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\dayan\AppData\Local\Programs\Python\Python311\Lib\site-packages\pyspark\cloudpickle\cloudpickle_fast.py", line 565, in _fun

PicklingError: Could not serialize object: IndexError: tuple index out of range

In [61]:
# Define el esquema del DataFrame
schema = StructType([
    StructField("Metrica", StringType(), nullable=True),
    StructField("Valor", FloatType(), nullable=True)
])

# Crea un DataFrame a partir de una lista de filas
data = [("accuracy", 0.85), ("f1-score", 0.75), ("HammingLoss", 0.1)]
df = spark.createDataFrame(data, schema)

# Muestra el DataFrame
df.show()

Traceback (most recent call last):
  File "c:\Users\dayan\AppData\Local\Programs\Python\Python311\Lib\site-packages\pyspark\serializers.py", line 458, in dumps
    return cloudpickle.dumps(obj, pickle_protocol)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\dayan\AppData\Local\Programs\Python\Python311\Lib\site-packages\pyspark\cloudpickle\cloudpickle_fast.py", line 73, in dumps
    cp.dump(obj)
  File "c:\Users\dayan\AppData\Local\Programs\Python\Python311\Lib\site-packages\pyspark\cloudpickle\cloudpickle_fast.py", line 602, in dump
    return Pickler.dump(self, obj)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\dayan\AppData\Local\Programs\Python\Python311\Lib\site-packages\pyspark\cloudpickle\cloudpickle_fast.py", line 692, in reducer_override
    return self._function_reduce(obj)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\dayan\AppData\Local\Programs\Python\Python311\Lib\site-packages\pyspark\cloudpickle\cloudpickle_fast.py", line 565, in _fun

PicklingError: Could not serialize object: IndexError: tuple index out of range