# Módulo 1 Utilización, procesamiento y visualización de grandes volúmenes de datos

### BITCOIN Y RELACION CON RANSOMWARE

Este conjunto de datos contiene información sobre transacciones de Bitcoin y está específicamente diseñado para identificar patrones de transacciones que podrían estar relacionados con ransomware (software malicioso que cifra los archivos de un usuario y pide un rescate para recuperarlos).

Las características (o datos) proporcionados en el conjunto de datos ofrecen detalles sobre cómo las transacciones de Bitcoin están estructuradas y cómo las monedas se mueven en la red. Aquí hay algunos conceptos clave:

Direcciones de Bitcoin (address): Cada transacción en Bitcoin involucra direcciones de origen y destino. Estas direcciones son como las cuentas bancarias en el mundo del dinero digital.

Patrones de Transacción (length, weight, count, looped, neighbors): Estas características proporcionan información sobre cómo las transacciones se dividen, se mezclan y se fusionan. Por ejemplo, length indica cuántas veces las monedas se mezclan en una transacción para ocultar su origen. weight y count proporcionan detalles sobre cómo se combinan las monedas en múltiples direcciones. looped indica si las monedas fueron divididas y fusionadas nuevamente en una dirección final.

Cantidad de Monedas (income) y Etiquetas (label): income representa la cantidad de monedas (en satoshis) involucradas en la transacción. label indica si la transacción está asociada con un tipo específico de ransomware o si se clasifica como "white" si no se sabe si está relacionada con ransomware.

#### Primero iniciamos la sesion de spark

La explicacion de lo que se hace en el script viene en comentarios

In [1]:
from pyspark.sql import SparkSession


In [2]:
spark = SparkSession.builder.appName("Nombre de tu Aplicación").getOrCreate()




23/10/30 16:09:05 WARN Utils: Your hostname, carlosoc resolves to a loopback address: 127.0.1.1; using 192.168.100.100 instead (on interface wlp3s0)
23/10/30 16:09:05 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/10/30 16:09:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
# Cargar datos en un DataFrame de Spark
data = spark.read.format("csv").option("header", "true").option("delimiter", ",").load("bitcoin.csv")

In [4]:
# Mostrar las primeras 5 filas del DataFrame
data.show(5)


+--------------------+----+---+------+-------------------+-----+------+---------+---------+---------------+
|             address|year|day|length|             weight|count|looped|neighbors|   income|          label|
+--------------------+----+---+------+-------------------+-----+------+---------+---------+---------------+
|111K8kZAEnJg245r2...|2017| 11|    18|0.00833333333333333|    1|     0|        2|100050000|princetonCerber|
|1123pJv8jzeFQaCV4...|2016|132|    44|     0.000244140625|    1|     0|        1|    1e+08| princetonLocky|
|112536im7hy6wtKbp...|2016|246|     0|                  1|    1|     0|        2|    2e+08|princetonCerber|
|1126eDRw2wqSkWosj...|2016|322|    72|         0.00390625|    1|     0|        2| 71200000|princetonCerber|
|1129TSjKtx65E35Gi...|2016|238|   144| 0.0728484071989931|  456|     0|        1|    2e+08| princetonLocky|
+--------------------+----+---+------+-------------------+-----+------+---------+---------+---------------+
only showing top 5 rows



In [5]:
# Eliminar filas con valores nulos en cualquier columna
data_cleaned = data.dropna()


In [6]:
# Mostrar los tipos de datos de cada columna para analizar que tipo de variables tenemos
data_cleaned.dtypes


[('address', 'string'),
 ('year', 'string'),
 ('day', 'string'),
 ('length', 'string'),
 ('weight', 'string'),
 ('count', 'string'),
 ('looped', 'string'),
 ('neighbors', 'string'),
 ('income', 'string'),
 ('label', 'string')]

In [7]:
# Verificar valores únicos en cada columna
for col in data_cleaned.columns:
    unique_values = data_cleaned.select(col).distinct().count()
    print(f"Columna '{col}' tiene {unique_values} valores únicos.")


                                                                                

Columna 'address' tiene 2631095 valores únicos.


                                                                                

Columna 'year' tiene 8 valores únicos.


                                                                                

Columna 'day' tiene 365 valores únicos.


                                                                                

Columna 'length' tiene 73 valores únicos.


                                                                                

Columna 'weight' tiene 785669 valores únicos.


                                                                                

Columna 'count' tiene 11572 valores únicos.


                                                                                

Columna 'looped' tiene 10168 valores únicos.


                                                                                

Columna 'neighbors' tiene 814 valores únicos.


                                                                                

Columna 'income' tiene 1866365 valores únicos.


[Stage 56:===>                                                    (1 + 15) / 16]

Columna 'label' tiene 29 valores únicos.


                                                                                

In [8]:
# Contar el número total de filas en el DataFrame
total_datos = data_cleaned.count()

# Mostrar el resultado
print("Número total de datos en el DataFrame:", total_datos)


Número total de datos en el DataFrame: 2916697


                                                                                

 A PARTIR DE AQUI, PUEDO DESCARTAR CIERTAS COLUMNAS POR LA CANTIDAD DE VALORES UNICOS QUE SE PRESENTAN, PARA LA PARTE DE ADDRESS CASI TODOS TIENEN UN VALOR UNICO POR LO QUE NO NOS SIRVE EN EL MODELO. PARA LA PARTE DE INCOME TAMBIEN TIENE MUCHOS VALORES UNICOS, SIN EMBARGO NO LOS SUFICIENTES PARA DESCARTAR TODA LA COLUMNA. PROBARE EL MODELO UTILIZANDO ESA COLUMNA.

In [9]:
#Aqui se elimina la columna address del dataframe
data_drop = data_cleaned.drop('address')

### Ahora, voy a convertir todas las variables string a numericas para el modelo, en el caso de los labels igual asignar un valor numerico unico para cada categoria ya que siendo strings no puedo entrenar un modelo

In [10]:
from pyspark.ml.feature import StringIndexer
from pyspark.sql.functions import col

# Lista de nombres de columnas numéricas (excluyendo la columna 'label')
columnas_numericas = [col_name for col_name in data_drop.columns if col_name != 'label' and col_name!='weight']


data_drop = data_drop.withColumn("weight", (col("weight") * 1.0).cast("float"))

# Convertir columnas numéricas de tipo string a tipo numerico
for col_name in columnas_numericas:
   data_drop = data_drop.withColumn(col_name, col(col_name).cast('double').cast('int'))



In [11]:
data_drop.show()

+----+---+------+------------+-----+------+---------+---------+--------------------+
|year|day|length|      weight|count|looped|neighbors|   income|               label|
+----+---+------+------------+-----+------+---------+---------+--------------------+
|2017| 11|    18| 0.008333334|    1|     0|        2|100050000|     princetonCerber|
|2016|132|    44|2.4414062E-4|    1|     0|        1|100000000|      princetonLocky|
|2016|246|     0|         1.0|    1|     0|        2|200000000|     princetonCerber|
|2016|322|    72|  0.00390625|    1|     0|        2| 71200000|     princetonCerber|
|2016|238|   144|  0.07284841|  456|     0|        1|200000000|      princetonLocky|
|2016| 96|   144|    0.084614| 2821|     0|        1| 50000000|      princetonLocky|
|2016|225|   142|0.0020885186|  881|     0|        2|100000000|     princetonCerber|
|2016|324|    78|  0.00390625|    1|     0|        2|100990000|     princetonCerber|
|2016|298|   144|   2.3028283| 4220|     0|        2| 80000000|  

In [12]:
#aqui lo que hago es llenar los na values con ceros
data_drop = data_drop.na.fill(0.0, subset=["weight"])

###  Ahora que ya tengo el dataframe con los datos transformados, falta convertir los labels para tener el dataframe final

In [13]:
label_indexer = StringIndexer(inputCol='label', outputCol='label_index')

# Ajustar el modelo y transformar los datos
indexer_model = label_indexer.fit(data_drop)
data_indexed = indexer_model.transform(data_drop)

# Obtener la lista de etiquetas originales
etiquetas_originales = indexer_model.labels

# Mostrar la lista de etiquetas originales y sus índices asignados
print("Etiquetas Originales:", etiquetas_originales)


[Stage 66:>                                                       (0 + 16) / 16]

Etiquetas Originales: ['white', 'paduaCryptoWall', 'montrealCryptoLocker', 'princetonCerber', 'princetonLocky', 'montrealCryptXXX', 'montrealNoobCrypt', 'montrealDMALockerv3', 'montrealDMALocker', 'montrealSamSam', 'montrealCryptoTorLocker2015', 'montrealGlobeImposter', 'montrealGlobev3', 'montrealGlobe', 'montrealWannaCry', 'montrealRazy', 'montrealAPT', 'paduaKeRanger', 'montrealFlyper', 'montrealXTPLocker', 'montrealCryptConsole', 'montrealVenusLocker', 'montrealXLockerv5.0', 'montrealEDA2', 'montrealJigSaw', 'paduaJigsaw', 'montrealComradeCircle', 'montrealSam', 'montrealXLocker']


                                                                                

In [14]:

# Crear un diccionario que mapea etiquetas originales a sus índices asignados
categorias_con_indices = dict(zip(etiquetas_originales, range(len(etiquetas_originales))))

# Mostrar la lista de etiquetas originales y sus índices asignados
#print("Etiquetas Originales:", etiquetas_originales)
print("Categorías con sus respectivos índices asignados:")
print(categorias_con_indices)

Categorías con sus respectivos índices asignados:
{'white': 0, 'paduaCryptoWall': 1, 'montrealCryptoLocker': 2, 'princetonCerber': 3, 'princetonLocky': 4, 'montrealCryptXXX': 5, 'montrealNoobCrypt': 6, 'montrealDMALockerv3': 7, 'montrealDMALocker': 8, 'montrealSamSam': 9, 'montrealCryptoTorLocker2015': 10, 'montrealGlobeImposter': 11, 'montrealGlobev3': 12, 'montrealGlobe': 13, 'montrealWannaCry': 14, 'montrealRazy': 15, 'montrealAPT': 16, 'paduaKeRanger': 17, 'montrealFlyper': 18, 'montrealXTPLocker': 19, 'montrealCryptConsole': 20, 'montrealVenusLocker': 21, 'montrealXLockerv5.0': 22, 'montrealEDA2': 23, 'montrealJigSaw': 24, 'paduaJigsaw': 25, 'montrealComradeCircle': 26, 'montrealSam': 27, 'montrealXLocker': 28}


#### Primero hacemos un show para ver los diferentes valores

In [16]:
data_indexed = data_indexed.drop('label')
data_indexed.show(100)

# Reducir el número de particiones a un valor adecuado
data_indexed = data_indexed.coalesce(4)  # Ajusta el número de particiones según sea necesario


+----+---+------+------------+-----+------+---------+---------+-----------+
|year|day|length|      weight|count|looped|neighbors|   income|label_index|
+----+---+------+------------+-----+------+---------+---------+-----------+
|2017| 11|    18| 0.008333334|    1|     0|        2|100050000|        3.0|
|2016|132|    44|2.4414062E-4|    1|     0|        1|100000000|        4.0|
|2016|246|     0|         1.0|    1|     0|        2|200000000|        3.0|
|2016|322|    72|  0.00390625|    1|     0|        2| 71200000|        3.0|
|2016|238|   144|  0.07284841|  456|     0|        1|200000000|        4.0|
|2016| 96|   144|    0.084614| 2821|     0|        1| 50000000|        4.0|
|2016|225|   142|0.0020885186|  881|     0|        2|100000000|        3.0|
|2016|324|    78|  0.00390625|    1|     0|        2|100990000|        3.0|
|2016|298|   144|   2.3028283| 4220|     0|        2| 80000000|        3.0|
|2016| 62|   112|3.7252903E-9|    1|     0|        1| 50000000|        4.0|
|2013|317|  

### Ahora que tenemos el dataframe completo, vamos a transformarlo para que el modelo pueda recibir los features en un vector y los labels en otro

In [23]:
from pyspark.ml.feature import VectorAssembler

# Definir las características de entrada
features = ['year', 'day', 'length', 'weight', 'count', 'looped', 'neighbors', 'income']

assembler = VectorAssembler(inputCols=features, outputCol="features")
assembled_df = assembler.transform(data_indexed)

# Selecciona las columnas "features" y "label" para el modelo
selected_df = assembled_df.select("features", "label_index")

selected_df.show()

+--------------------+-----------+
|            features|label_index|
+--------------------+-----------+
|[2017.0,11.0,18.0...|        3.0|
|[2016.0,132.0,44....|        4.0|
|[2016.0,246.0,0.0...|        3.0|
|[2016.0,322.0,72....|        3.0|
|[2016.0,238.0,144...|        4.0|
|[2016.0,96.0,144....|        4.0|
|[2016.0,225.0,142...|        3.0|
|[2016.0,324.0,78....|        3.0|
|[2016.0,298.0,144...|        3.0|
|[2016.0,62.0,112....|        4.0|
|[2013.0,317.0,4.0...|        2.0|
|[2016.0,247.0,0.0...|        3.0|
|[2016.0,146.0,144...|        5.0|
|[2017.0,3.0,4.0,0...|        3.0|
|[2016.0,158.0,56....|        5.0|
|[2016.0,156.0,8.0...|        5.0|
|[2016.0,273.0,144...|        4.0|
|[2016.0,56.0,4.0,...|        4.0|
|[2016.0,165.0,10....|        3.0|
|[2016.0,109.0,0.0...|        4.0|
+--------------------+-----------+
only showing top 20 rows



In [24]:
#Asignamos los datasets basados en el dataframe selected_df

train_data, test_data = selected_df.randomSplit([0.8, 0.2], seed=123)

### AHORA PROBE DOS MODELOS DISTINTOS Y VALIDE LAS PRUEBAS CON LAS PREDICCIONES DEL TEST_DATA

In [28]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


# Crea el modelo Random Forest
rf = RandomForestClassifier(featuresCol="features", labelCol="label_index", numTrees=200)

# Entrena el modelo
rf_model = rf.fit(train_data)

# Suponiendo que tu modelo se llama "modelo" y que tienes un DataFrame llamado "data_test" para evaluación
predictions = rf_model.transform(test_data)

# Crear un evaluador para problemas de clasificación multiclase
evaluator = MulticlassClassificationEvaluator(labelCol="label_index", metricName="accuracy")
# Calcular la métrica de evaluación (precisión en este caso)
accuracy = evaluator.evaluate(predictions)
print("Accuracy en el conjunto de prueba: ", accuracy)



23/10/30 17:23:29 WARN MemoryStore: Not enough space to cache rdd_452_1 in memory! (computed 73.4 MiB so far)
23/10/30 17:23:29 WARN BlockManager: Persisting block rdd_452_1 to disk instead.
23/10/30 17:23:29 WARN MemoryStore: Not enough space to cache rdd_452_0 in memory! (computed 73.4 MiB so far)
23/10/30 17:23:29 WARN BlockManager: Persisting block rdd_452_0 to disk instead.
23/10/30 17:23:30 WARN MemoryStore: Not enough space to cache rdd_452_3 in memory! (computed 165.7 MiB so far)
23/10/30 17:23:30 WARN BlockManager: Persisting block rdd_452_3 to disk instead.
23/10/30 17:23:30 WARN MemoryStore: Not enough space to cache rdd_452_2 in memory! (computed 165.7 MiB so far)
23/10/30 17:23:30 WARN BlockManager: Persisting block rdd_452_2 to disk instead.
23/10/30 17:23:36 WARN MemoryStore: Not enough space to cache rdd_452_3 in memory! (computed 257.2 MiB so far)
23/10/30 17:23:36 WARN MemoryStore: Not enough space to cache rdd_452_2 in memory! (computed 110.2 MiB so far)
23/10/30 17:

Accuracy en el conjunto de prueba:  0.9857002968476627


                                                                                

## Segundo modelo el cual es una red neuronal

In [18]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Entrenar el modelo
model = pipeline.fit(training_data)

# Realizar predicciones en el conjunto de entrenamiento
predictions_train = model.transform(training_data)

# Calcular la precisión en el conjunto de entrenamiento
evaluator_train = MulticlassClassificationEvaluator(labelCol='label_index', predictionCol='prediction', metricName='accuracy')
accuracy_train = evaluator_train.evaluate(predictions_train)
print('Accuracy en el conjunto de entrenamiento:', accuracy_train)

# Realizar predicciones en el conjunto de prueba
predictions_test = model.transform(testing_data)

# Calcular la precisión en el conjunto de prueba
accuracy_test = evaluator_train.evaluate(predictions_test)
print('Accuracy en el conjunto de prueba:', accuracy_test)


23/10/30 16:09:30 WARN MemoryStore: Not enough space to cache rdd_163_2 in memory! (computed 15.2 MiB so far)
23/10/30 16:09:30 WARN BlockManager: Persisting block rdd_163_2 to disk instead.
23/10/30 16:09:30 WARN MemoryStore: Not enough space to cache rdd_163_3 in memory! (computed 15.2 MiB so far)
23/10/30 16:09:30 WARN BlockManager: Persisting block rdd_163_3 to disk instead.
23/10/30 16:09:30 WARN MemoryStore: Not enough space to cache rdd_163_0 in memory! (computed 15.2 MiB so far)
23/10/30 16:09:30 WARN BlockManager: Persisting block rdd_163_0 to disk instead.
23/10/30 16:09:30 WARN MemoryStore: Not enough space to cache rdd_163_1 in memory! (computed 52.0 MiB so far)
23/10/30 16:09:30 WARN BlockManager: Persisting block rdd_163_1 to disk instead.
23/10/30 16:09:31 WARN MemoryStore: Not enough space to cache rdd_163_0 in memory! (computed 79.7 MiB so far)
23/10/30 16:09:32 WARN MemoryStore: Not enough space to cache rdd_163_2 in memory! (computed 122.5 MiB so far)
23/10/30 16:09:

Accuracy en el conjunto de entrenamiento: 0.985826725068905




Accuracy en el conjunto de prueba: 0.9857002968476627


                                                                                