## Montar Entorno

In [25]:
#Bibliotecas para poder trabajar con Spark
!sudo apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.5.7/spark-3.5.7-bin-hadoop3.tgz
!tar xf spark-3.5.7-bin-hadoop3.tgz
#Configuración de Spark con Python
!pip install -q findspark
!pip install pyspark

#Estableciendo variable de entorno
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.7-bin-hadoop3"

#Buscando e inicializando la instalación de Spark
import findspark
findspark.init()
findspark.find()

Hit:1 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Hit:2 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease
Hit:3 https://cli.github.com/packages stable InRelease
Hit:4 http://archive.ubuntu.com/ubuntu jammy InRelease
Hit:5 http://security.ubuntu.com/ubuntu jammy-security InRelease
Hit:6 http://archive.ubuntu.com/ubuntu jammy-updates InRelease
Hit:7 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Hit:8 https://r2u.stat.illinois.edu/ubuntu jammy InRelease
Hit:9 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:10 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:11 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
46 packages can be upgraded. Run 'apt list --upgradable' to see them.
[1;33mW: [0mSkipping acquire of configured file 'main/source/Sou

'/content/spark-3.5.7-bin-hadoop3'

## Conectar Drive

In [26]:
# Montar Google Drive (te pedirá autorización)
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


## Generar Sesión de Spark

In [27]:
from pyspark.sql import SparkSession

# Funciones de PySpark SQL
from pyspark.sql.functions import (
    col, when, sum, avg, max, min, count, countDistinct,
    stddev, lit, desc
)

# Tipos de datos de PySpark (probablemente no los necesites en este proyecto)
from pyspark.sql.types import (
    StructType, StructField, StringType, IntegerType,
    DoubleType, TimestampType
)

In [28]:
# Crear sesión Spark
spark = SparkSession.builder \
    .appName("PumpFun_Token_Classification") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.sql.shuffle.partitions", "10") \
    .getOrCreate()

df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("/content/drive/MyDrive/ConcentracionIA2/Big Data/Reto/archive/*.csv")

In [29]:
print("="*80)
print("DATASET INFORMATION")
print("="*80)
print(f"Registry Total: {df.count():,}")
print(f"Column Total: {len(df.columns)}")
print(f"Unique Tokens: {df.select('mint_token_id').distinct().count():,}")

DATASET INFORMATION
Registry Total: 9,478,706
Column Total: 34
Unique Tokens: 485,525


In [30]:
# Mostrar esquema
print("DATASET SCHEMA")
df.printSchema()

DATASET SCHEMA
root
 |-- index: integer (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- mint_token_id: string (nullable = true)
 |-- holder: string (nullable = true)
 |-- trade_mode: string (nullable = true)
 |-- token_quantity: double (nullable = true)
 |-- creator: string (nullable = true)
 |-- creator_fee: double (nullable = true)
 |-- creator_fee_pump: double (nullable = true)
 |-- market_cap_usd: double (nullable = true)
 |-- token_delta: double (nullable = true)
 |-- sol_delta: double (nullable = true)
 |-- buy_count: integer (nullable = true)
 |-- sell_count: integer (nullable = true)
 |-- total_count: integer (nullable = true)
 |-- token_volume: double (nullable = true)
 |-- sol_volume: double (nullable = true)
 |-- liquidity_ratio: double (nullable = true)
 |-- virtual_sol_reserves: double (nullable = true)
 |-- virtual_token_reserves: double (nullable = true)
 |-- consumed_gas: integer (nullable = true)
 |-- fee: integer (nullable = true)
 |-- relative_strengt

In [31]:
# Estadísticas básicas
print("DATASET STATISTICS")
df.describe().show()

DATASET STATISTICS
+-------+------------------+---------+--------------------+--------------------+----------+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+--------------------+-----------------+------------------+-----------------+--------------------+--------------------+--------------------+--------------------+----------------------+------------------+------------------+-----------------------+---------------------------+------------------+-------------------+-----------------+------------------+------------------+--------------------+-------------------+------------------+------------------+
|summary|             index|timestamp|       mint_token_id|              holder|trade_mode|      token_quantity|             creator|         creator_fee|    creator_fee_pump|    market_cap_usd|         token_delta|           sol_delta|        buy_count|        sell_count|      total_count|        token_volume|      

In [32]:
# Verificar valores nulos
print("NULL VALUES PER COLUMN:")
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()


NULL VALUES PER COLUMN:
+-----+---------+-------------+------+----------+--------------+-------+-----------+----------------+--------------+-----------+---------+---------+----------+-----------+------------+----------+---------------+--------------------+----------------------+------------+---+-----------------------+---------------------------+-----------------+--------------+----------------+-------------+---------------+-------------------+---------------+------------+------------+--------------+
|index|timestamp|mint_token_id|holder|trade_mode|token_quantity|creator|creator_fee|creator_fee_pump|market_cap_usd|token_delta|sol_delta|buy_count|sell_count|total_count|token_volume|sol_volume|liquidity_ratio|virtual_sol_reserves|virtual_token_reserves|consumed_gas|fee|relative_strength_index|bollinger_relative_position|volume_oscillator|rate_of_change|money_flow_index|total_holders|current_holders|top10_percent_total|creator_balance|creator_sold|holder_ratio|buy_sell_ratio|
+-----+-----

In [46]:
print("GENERATING FEATURES PER TOKEN")

# Crear features por token (necesarias para clasificación)
token_features = df.groupBy('mint_token_id').agg(
    # Métricas de trading
    sum(when(col('trade_mode') == 'buy', 1).otherwise(0)).alias('total_buys'),
    sum(when(col('trade_mode') == 'sell', 1).otherwise(0)).alias('total_sells'),
    (sum(when(col('trade_mode') == 'buy', 1).otherwise(0)) /
    (sum(when(col('trade_mode') == 'sell', 1).otherwise(0)) + 1)).alias('buy_sell_ratio_calc'),

    # Métricas de volumen
    sum('sol_volume').alias('total_sol_volume'),
    sum('token_volume').alias('total_token_volume'),
    avg('sol_volume').alias('avg_sol_volume'),

    # Métricas de mercado
    avg('market_cap_usd').alias('avg_market_cap'),
    max('market_cap_usd').alias('max_market_cap'),
    min('market_cap_usd').alias('min_market_cap'),
    stddev('market_cap_usd').alias('std_market_cap'),

    # Métricas de holders
    max('total_holders').alias('max_total_holders'),
    max('current_holders').alias('final_holders'),
    avg('top10_percent_total').alias('avg_concentration'),
    max('top10_percent_total').alias('max_concentration'),

    # Métricas de creador
    max('creator_sold').alias('creator_tokens_sold'),
    max('creator_balance').alias('creator_final_balance'),
    sum('creator_fee').alias('total_creator_fees'),
    avg('holder_ratio').alias('avg_holder_ratio'),

    # Indicadores técnicos
    avg('relative_strength_index').alias('avg_rsi'),
    max('relative_strength_index').alias('max_rsi'),
    min('relative_strength_index').alias('min_rsi'),
    avg('bollinger_relative_position').alias('avg_bollinger'),
    avg('volume_oscillator').alias('avg_volume_osc'),
    avg('money_flow_index').alias('avg_mfi'),
    avg('liquidity_ratio').alias('avg_liquidity_ratio'),

    # Métricas de gas y fees
    sum('consumed_gas').alias('total_gas'),
    sum('fee').alias('total_fees'),
    avg('fee').alias('avg_fee'),

    # Métricas de reservas virtuales
    avg('virtual_sol_reserves').alias('avg_sol_reserves'),
    avg('virtual_token_reserves').alias('avg_token_reserves'),

    # Contadores
    count('*').alias('total_transactions'),
    countDistinct('holder').alias('unique_holders_interacted')
)

GENERATING FEATURES PER TOKEN


In [34]:
# Manejar valores nulos e infinitos
token_features = token_features.na.fill(0)

print(f"Features generadas: {token_features.count():,} tokens")
token_features.show(5)

Features generadas: 485,525 tokens
+--------------------+----------+-----------+-------------------+--------------------+--------------------+--------------------+------------------+--------------+--------------+------------------+-----------------+-------------+--------------------+-----------------+-------------------+---------------------+------------------+------------------+-----------------+-----------+-------+-----------------+--------------+------------------+--------------------+---------+----------+-----------------+--------------------+--------------------+------------------+-------------------------+
|       mint_token_id|total_buys|total_sells|buy_sell_ratio_calc|    total_sol_volume|  total_token_volume|      avg_sol_volume|    avg_market_cap|max_market_cap|min_market_cap|    std_market_cap|max_total_holders|final_holders|   avg_concentration|max_concentration|creator_tokens_sold|creator_final_balance|total_creator_fees|  avg_holder_ratio|          avg_rsi|    max_rsi|min

In [35]:
ml_data = token_features.withColumn(
    'bought_by_alpha',
    when(
        (col('total_buys') > 10) &
        (col('avg_market_cap') > 50000) &
        (col('avg_rsi') > 50),
        1
    ).otherwise(0)
)

# Verificar distribución del target
print("DISTRIBUCIÓN DEL TARGET:")
ml_data.groupBy('bought_by_alpha').count().show()

DISTRIBUCIÓN DEL TARGET:
+---------------+------+
|bought_by_alpha| count|
+---------------+------+
|              0|485501|
|              1|    24|
+---------------+------+



In [36]:
# Seleccionar columnas numéricas para el modelo
feature_cols = [
    'total_buys', 'total_sells', 'buy_sell_ratio_calc',
    'total_sol_volume', 'total_token_volume', 'avg_sol_volume',
    'avg_market_cap', 'max_market_cap', 'std_market_cap',
    'max_total_holders', 'final_holders', 'avg_concentration',
    'creator_tokens_sold', 'total_creator_fees', 'avg_holder_ratio',
    'avg_rsi', 'max_rsi', 'avg_bollinger', 'avg_volume_osc',
    'avg_mfi', 'avg_liquidity_ratio', 'total_gas', 'total_transactions',
    'unique_holders_interacted'
]

In [37]:
# Eliminar filas con valores nulos en features
ml_data_clean = ml_data.select(['mint_token_id', 'bought_by_alpha'] + feature_cols).na.drop()

In [38]:
train_data, test_data = ml_data_clean.randomSplit([0.8, 0.2], seed=42)

print(f"DATA SPLITTING")
print(f"Training: {train_data.count():,} registros")
print(f"Testing: {test_data.count():,} registros")

DATA SPLITTING
Training: 388,705 registros
Testing: 96,820 registros


In [40]:
from pyspark.ml.feature import VectorAssembler, StandardScaler, StringIndexer
from pyspark.ml.classification import RandomForestClassifier, LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd

In [41]:
# VectorAssembler: combina todas las features en un vector
assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="features_raw"
)

# StandardScaler: normaliza las features
scaler = StandardScaler(
    inputCol="features_raw",
    outputCol="features",
    withStd=True,
    withMean=True
)

# Modelo: Random Forest Classifier
rf = RandomForestClassifier(
    labelCol="bought_by_alpha",
    featuresCol="features",
    numTrees=100,
    maxDepth=10,
    seed=42
)

# Crear pipeline
pipeline = Pipeline(stages=[assembler, scaler, rf])

In [42]:
print("TRAINING RANDOM FOREST")
model = pipeline.fit(train_data)

TRAINING RANDOM FOREST


In [43]:
# Hacer predicciones en test set
predictions = model.transform(test_data)

# Mostrar algunas predicciones
print("PREDICTIONS:")
predictions.select('mint_token_id', 'bought_by_alpha', 'prediction', 'probability').show(10, truncate=False)

print("MODEL EVALUATION")

# Evaluador para clasificación binaria (AUC-ROC)
binary_evaluator = BinaryClassificationEvaluator(
    labelCol="bought_by_alpha",
    rawPredictionCol="rawPrediction",
    metricName="areaUnderROC"
)

auc = binary_evaluator.evaluate(predictions)
print(f"AUC-ROC: {auc:.4f}")

# Evaluador multiclase (para accuracy, precision, recall)
multi_evaluator = MulticlassClassificationEvaluator(
    labelCol="bought_by_alpha",
    predictionCol="prediction"
)

accuracy = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "accuracy"})
precision = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedPrecision"})
recall = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedRecall"})
f1 = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "f1"})

print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1-Score: {f1:.4f}")

# Matriz de confusión
print("CONFUSION MATRIX:")
predictions.groupBy('bought_by_alpha', 'prediction').count().show()

# Feature importance
rf_model = model.stages[-1]
feature_importance = pd.DataFrame({
    'feature': feature_cols,
    'importance': rf_model.featureImportances.toArray()
}).sort_values('importance', ascending=False)

print("TOP 10 MORE IMPORTANT FEATURES:")
print(feature_importance.head(10).to_string(index=False))

PREDICTIONS:
+--------------------------------------------+---------------+----------+------------------------------------------+
|mint_token_id                               |bought_by_alpha|prediction|probability                               |
+--------------------------------------------+---------------+----------+------------------------------------------+
|128zZUE3JoA1hiE9isFztGhZu2uyMdGQQ8hP25kcpump|0              |0.0       |[1.0,0.0]                                 |
|12KZrfp6Q9vnvgkZYnRi9YSgd99aq1avXrFoHtCwpump|0              |0.0       |[1.0,0.0]                                 |
|12P2zQQMu1eHJkvA5apNZiNf5hqntHnEW3KiNSetpump|0              |0.0       |[1.0,0.0]                                 |
|12i2vPMzsHhfrskyqZPTmBeePkWwc7jzvYR7dixSpump|0              |0.0       |[1.0,0.0]                                 |
|12qBCkqfYsYBBQ6dcbUJW81UxDTiud5YRRezzi1wpump|0              |0.0       |[1.0,0.0]                                 |
|12tkiNx2Xy478Ejw2LhVcsbTX9n6dKiBKL8RkooCpump|0    