## Instalación de ambiente

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
exec(open('/content/drive/MyDrive/Proyectos/Big_Data_ML.py').read())

Active services:
2881 NodeManager
3154 Jps
2713 NameNode
2794 DataNode
3051 JobHistoryServer
2639 ResourceManager




## Visualización de datos en MySQL: *Hate Speech and Offensive Language Dataset*


In [4]:
!mysql -u root --password=password testdb

Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

Welcome to the MySQL monitor.  Commands end with ; or \g.
Your MySQL connection id is 16
Server version: 8.0.39-0ubuntu0.22.04.1 (Ubuntu)

Copyright (c) 2000, 2024, Oracle and/or its affiliates.

Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

mysql> SHOW DATABASES;
+--------------------+
| Database           |
+--------------------+
| information_schema |
| mysql              |
| performance_schema |
| sys                |
| testdb             |
+--------------------+
5 rows in set (0.01 sec)

mysql> use testdb;
Database changed
mysql> SHOW TABLES;
+------------------+
| Tables_in_testdb |
+------------------+
| hate_speech      |
+------------------+
1 row in set (0.01 sec)

mysql> SELECT 


Inserción de datos con Sqoop. El password de MySQL es "password" (sin comillas).

In [5]:
!sqoop import --connect jdbc:mysql://localhost/testdb --username root --password password --table hate_speech --hive-import --create-hive-table --hive-database default

24/11/11 05:03:51 INFO sqoop.Sqoop: Running Sqoop version: 1.4.7
24/11/11 05:03:51 WARN tool.BaseSqoopTool: Setting your password on the command-line is insecure. Consider using -P instead.
24/11/11 05:03:51 INFO tool.BaseSqoopTool: Using Hive-specific delimiters for output. You can override
24/11/11 05:03:51 INFO tool.BaseSqoopTool: delimiters with --fields-terminated-by, etc.
24/11/11 05:03:52 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset.
24/11/11 05:03:52 INFO tool.CodeGenTool: Beginning code generation
Loading class `com.mysql.jdbc.Driver'. This is deprecated. The new driver class is `com.mysql.cj.jdbc.Driver'. The driver is automatically registered via the SPI and manual loading of the driver class is generally unnecessary.
24/11/11 05:03:53 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `hate_speech` AS t LIMIT 1
24/11/11 05:03:53 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `hate_speech` AS t LIMIT 1
24/11/11 05:

##  Visualización de datos con Hive

In [6]:
!hive -e "SELECT * FROM hate_speech LIMIT 10;"

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/content/apache-hive-2.3.9-bin/lib/log4j-slf4j-impl-2.6.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/content/hadoop-2.10.1/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]

Logging initialized using configuration in jar:file:/content/apache-hive-2.3.9-bin/lib/hive-common-2.3.9.jar!/hive-log4j2.properties Async: true
OK
hate_speech.class	hate_speech.count	hate_speech.hate_speech	hate_speech.neither	hate_speech.offensive_language	hate_speech.tweet	hate_speech.tweet_id
2	3	0	3	0	!!! RT @mayasolovely: As a woman you shouldn't complain about cleaning up your house. &amp; as a man you should always take the trash out...	0
1	3	0	0	3	!!!!! RT @mleew17: boy dats cold...tyga 

## Lectura de datos con Spark SQL y separación train/test

In [7]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import length

spark = SparkSession.builder.enableHiveSupport().appName("MP-3").getOrCreate()

In [8]:
df_hate = spark.sql("SELECT * FROM hate_speech")
df_hate = df_hate.filter(df_hate["tweet"].isNotNull() & (length(df_hate["tweet"]) > 1))

df_hate.show(10)

training, testing = df_hate.randomSplit([0.95, 0.05], seed=100)

+-----+-----+-----------+-------+------------------+--------------------+--------+
|class|count|hate_speech|neither|offensive_language|               tweet|tweet_id|
+-----+-----+-----------+-------+------------------+--------------------+--------+
|    2|    3|          0|      3|                 0|!!! RT @mayasolov...|       0|
|    1|    3|          0|      0|                 3|!!!!! RT @mleew17...|       1|
|    1|    3|          0|      0|                 3|!!!!!!! RT @UrKin...|       2|
|    1|    3|          0|      1|                 2|!!!!!!!!! RT @C_G...|       3|
|    1|    6|          0|      0|                 6|!!!!!!!!!!!!! RT ...|       4|
|    1|    3|          1|      0|                 2|"!!!!!!!!!!!!!!!!...|       5|
|    1|    3|          0|      0|                 3|"!!!!!!""@__Brigh...|       6|
|    1|    3|          0|      0|                 3|!!!!&#8220;@selfi...|       7|
|    1|    3|          0|      0|                 3|""" &amp; you mig...|       8|
|   

## Preprocesamiento de dataset de entrenamiento, con funciones de _tokenización_ y remoción de _stopwords_

In [9]:
from pyspark.ml.feature import RegexTokenizer
from pyspark.ml.feature import StopWordsRemover

tokenizer =RegexTokenizer(inputCol="tweet", outputCol="words", pattern="\\W")

stopWords=StopWordsRemover.loadDefaultStopWords('english')
stopWordsRemover = StopWordsRemover(stopWords=stopWords, inputCol="words", outputCol="clean_words")

In [10]:
# tokenizer
training_words = tokenizer.transform(training)

In [11]:
# mostrar las columnas "words" y "tweet" de training_words
training_words.select("words", "tweet").show(10)

+--------------------+--------------------+
|               words|               tweet|
+--------------------+--------------------+
|[blackman38tide, ...|"""@Blackman38Tid...|
|[nochillpaz, at, ...|"""@NoChillPaz: "...|
|[notoriousbm95, _...|"""@NotoriousBM95...|
|[theomaxximus, ge...|"""@TheoMaxximus:...|
|[ashlingwilde, it...|"""@ashlingwilde:...|
|[bigbootybishopp,...|"""@bigbootybisho...|
|[jayswaggkillah, ...|"""@jayswaggkilla...|
|[jgabsss, stacey,...|"""@jgabsss: Stac...|
|[don, t, worry, a...|"""Don't worry ab...|
|[let, s, kill, cr...|"""Let's kill cra...|
+--------------------+--------------------+
only showing top 10 rows



In [12]:
# stopWordsRemover
training_clean =stopWordsRemover.transform(training_words)

In [13]:
from pyspark.sql.functions import size

# Se usan finalmente aquellas frases con más de 3 palabras que no sean stopwords
train_words = training_clean.filter(size(training_clean['clean_words']) > 3)
train_words.show(n=5, truncate=False)

+-----+-----+-----------+-------+------------------+-----------------------------------------------------------------------------------------------------------------------------+--------+-------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------+
|class|count|hate_speech|neither|offensive_language|tweet                                                                                                                        |tweet_id|words                                                                                                                                      |clean_words                                                                                        |
+-----+-----+-----------+-------+------------------+--------------------------------------------------------------------------------------------

## Entrenamiento de modelo Word2Vec y transformación a features numéricos

In [14]:
from pyspark.ml.feature import Word2Vec

model_w2v = Word2Vec(vectorSize=32, inputCol="clean_words", outputCol="features").fit(train_words)
train_features = model_w2v.transform(train_words)

In [15]:
# mostrar Dataframe final que se usará para entrenamiento
train_features.select("features").show(10)

+--------------------+
|            features|
+--------------------+
|[-0.0040413890033...|
|[-0.0152795820363...|
|[-0.0236832043156...|
|[-0.0318282336617...|
|[-0.0504770226776...|
|[-0.0093586333096...|
|[-0.0416573341935...|
|[-0.0781679417599...|
|[-0.1047538125089...|
|[-0.0651746609115...|
+--------------------+
only showing top 10 rows



## Entrenamiento de modelo clasificador RandomForestClassifier

In [16]:
from pyspark.ml.classification import RandomForestClassifier

algorithm = RandomForestClassifier(numTrees=200, labelCol="class", featuresCol="features", predictionCol="prediction")
model = algorithm.fit(train_features)


In [17]:
# obtener Dataframes de testing

testing_words = tokenizer.transform(testing)
testing_clean = stopWordsRemover.transform(testing_words)
test_words = testing_clean.filter(size(testing_clean['clean_words']) > 3)
test_features = model_w2v.transform(test_words)

In [18]:
# obtener predicciones

predictions =  model.transform(test_features)

In [19]:
# mostrar label, predicciones, y el tweet original

predictions.select("class", "prediction", "tweet").show(10)

+-----+----------+--------------------+
|class|prediction|               tweet|
+-----+----------+--------------------+
|    0|       1.0|"@BGALLY17 so ur ...|
|    0|       1.0|"@_SoulSurvivor_ ...|
|    0|       1.0|"@hekterr don't s...|
|    0|       1.0|"Everyone tells m...|
|    0|       1.0|"I feel sorry for...|
|    0|       1.0|"I hate when peop...|
|    0|       2.0|"Pantera - 5 Minu...|
|    0|       1.0|#SomethingIGetAlo...|
|    0|       1.0|&#8220;@kaylenden...|
|    0|       1.0|&#8220;@kitty_hel...|
+-----+----------+--------------------+
only showing top 10 rows



In [20]:
# evaluación con métrica de accuracy

from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='class', metricName='accuracy')
accuracy = evaluator.evaluate(predictions)
print("Test accuracy: ", "%2.1f%%" % (accuracy * 100,))

Test accuracy:  84.5%
