In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://mirrors.sonic.net/apache/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
!tar xzf spark-3.1.2-bin-hadoop3.2.tgz
!pip install -q findspark


import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop3.2"


import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [2]:
# Comprobación de la sesión de Spark
spark

In [3]:
#Cargamos los archicos de entrenamiento y prueba.
df_train = spark.read.format('csv').options(inferSchema=True, header=True).load('train.csv')
df_test = spark.read.format('csv').options(inferSchema=True, header=True).load('test.csv')

In [4]:
# Tipos de las columnas de set de entrenamiento
df_train.printSchema()

root
 |-- id: integer (nullable = true)
 |-- cat_0: string (nullable = true)
 |-- cat_1: string (nullable = true)
 |-- cat_2: string (nullable = true)
 |-- cat_3: string (nullable = true)
 |-- cat_4: string (nullable = true)
 |-- cat_5: string (nullable = true)
 |-- cat_6: string (nullable = true)
 |-- cat_7: string (nullable = true)
 |-- cat_8: string (nullable = true)
 |-- cat_9: string (nullable = true)
 |-- cat_10: string (nullable = true)
 |-- cat_11: string (nullable = true)
 |-- cat_12: string (nullable = true)
 |-- cat_13: string (nullable = true)
 |-- cat_14: string (nullable = true)
 |-- cat_15: string (nullable = true)
 |-- cat_16: string (nullable = true)
 |-- cat_17: string (nullable = true)
 |-- cat_18: string (nullable = true)
 |-- cont_0: double (nullable = true)
 |-- cont_1: double (nullable = true)
 |-- cont_2: double (nullable = true)
 |-- cont_3: double (nullable = true)
 |-- cont_4: double (nullable = true)
 |-- cont_5: double (nullable = true)
 |-- cont_6: double 

In [5]:
# Características del set de entrenamiento
df_train.describe().show()

+-------+------------------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+--------------------+-------------------+-------------------+--------------------+-------------------+--------------------+--------------------+-------------------+--------------------+-------------------+------------------+-------------------+
|summary|                id| cat_0| cat_1| cat_2| cat_3| cat_4| cat_5| cat_6| cat_7| cat_8| cat_9|cat_10|cat_11|cat_12|cat_13|cat_14|cat_15|cat_16|cat_17|cat_18|              cont_0|             cont_1|             cont_2|              cont_3|             cont_4|              cont_5|              cont_6|             cont_7|              cont_8|             cont_9|           cont_10|             target|
+-------+------------------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+--------------------+------

In [6]:
#Columnas del set de entrenamiento
df_train.columns

['id',
 'cat_0',
 'cat_1',
 'cat_2',
 'cat_3',
 'cat_4',
 'cat_5',
 'cat_6',
 'cat_7',
 'cat_8',
 'cat_9',
 'cat_10',
 'cat_11',
 'cat_12',
 'cat_13',
 'cat_14',
 'cat_15',
 'cat_16',
 'cat_17',
 'cat_18',
 'cont_0',
 'cont_1',
 'cont_2',
 'cont_3',
 'cont_4',
 'cont_5',
 'cont_6',
 'cont_7',
 'cont_8',
 'cont_9',
 'cont_10',
 'target']

In [7]:
#Convertimos las columnas que tienen valores string a integers ponderados para que puedan ser utilizados en la clasificación
from pyspark.ml.feature import (VectorAssembler, OneHotEncoder, VectorIndexer, StringIndexer)

cat_0_indexer = StringIndexer(inputCol='cat_0', outputCol='cat_0Index')
cat_1_indexer = StringIndexer(inputCol='cat_1', outputCol='cat_1Index')
cat_2_indexer = StringIndexer(inputCol='cat_2', outputCol='cat_2Index')
cat_3_indexer = StringIndexer(inputCol='cat_3', outputCol='cat_3Index')
cat_4_indexer = StringIndexer(inputCol='cat_4', outputCol='cat_4Index')
cat_5_indexer = StringIndexer(inputCol='cat_5', outputCol='cat_5Index')
cat_6_indexer = StringIndexer(inputCol='cat_6', outputCol='cat_6Index')
cat_7_indexer = StringIndexer(inputCol='cat_7', outputCol='cat_7Index')
cat_8_indexer = StringIndexer(inputCol='cat_8', outputCol='cat_8Index')
cat_9_indexer = StringIndexer(inputCol='cat_9', outputCol='cat_9Index')
cat_10_indexer = StringIndexer(inputCol='cat_10', outputCol='cat_10Index')
cat_11_indexer = StringIndexer(inputCol='cat_11', outputCol='cat_11Index')
cat_12_indexer = StringIndexer(inputCol='cat_12', outputCol='cat_12Index')
cat_13_indexer = StringIndexer(inputCol='cat_13', outputCol='cat_13Index')
cat_14_indexer = StringIndexer(inputCol='cat_14', outputCol='cat_14Index')
cat_15_indexer = StringIndexer(inputCol='cat_15', outputCol='cat_15Index')
cat_16_indexer = StringIndexer(inputCol='cat_16', outputCol='cat_16Index')
cat_17_indexer = StringIndexer(inputCol='cat_17', outputCol='cat_17Index')
cat_18_indexer = StringIndexer(inputCol='cat_18', outputCol='cat_18Index')

In [8]:
# Incluimos los nuevos integers en el set de entrenamiento.
train = cat_0_indexer.fit(df_train).transform(df_train)
train = cat_1_indexer.fit(train).transform(train)
train = cat_2_indexer.fit(train).transform(train)
train = cat_3_indexer.fit(train).transform(train)
train = cat_4_indexer.fit(train).transform(train)
train = cat_5_indexer.fit(train).transform(train)
train = cat_6_indexer.fit(train).transform(train)
train = cat_7_indexer.fit(train).transform(train)
train = cat_8_indexer.fit(train).transform(train)
train = cat_9_indexer.fit(train).transform(train)
train = cat_10_indexer.fit(train).transform(train)
train = cat_11_indexer.fit(train).transform(train)
train = cat_12_indexer.fit(train).transform(train)
train = cat_13_indexer.fit(train).transform(train)
train = cat_14_indexer.fit(train).transform(train)
train = cat_15_indexer.fit(train).transform(train)
train = cat_16_indexer.fit(train).transform(train)
train = cat_17_indexer.fit(train).transform(train)
train = cat_18_indexer.fit(train).transform(train)

In [9]:
# Juntamos todas las columnas en una única llamada features
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=[
      'cat_0Index',
      'cat_1Index',
      'cat_2Index',
      'cat_3Index',
      'cat_4Index',
      'cat_5Index',
      'cat_6Index',
      'cat_7Index',
      'cat_8Index',
      'cat_9Index',
      'cat_10Index',
      'cat_11Index',
      'cat_12Index',
      'cat_13Index',
      'cat_14Index',
      'cat_15Index',
      'cat_16Index',
      'cat_17Index',
      'cat_18Index',
      'cont_0',
      'cont_1',
      'cont_2',
      'cont_3',
      'cont_4',
      'cont_5',
      'cont_6',
      'cont_7',
      'cont_8',
      'cont_9',
      'cont_10'],
outputCol='features')
output = assembler.transform(train)
output.show()

+---+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+------+------+------+------+------+------+------+------+------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+--------------------+
| id|cat_0|cat_1|cat_2|cat_3|cat_4|cat_5|cat_6|cat_7|cat_8|cat_9|cat_10|cat_11|cat_12|cat_13|cat_14|cat_15|cat_16|cat_17|cat_18|             cont_0|             cont_1|             cont_2|             cont_3|             cont_4|             cont_5|             cont_6|             cont_7|             cont_8|             cont_9|            cont_10|target|cat_0Index|cat_1Index|cat_2Index|cat_3Index|cat_4I

In [10]:
# Finalmente nos quedamos solo con la variable a predecir y el resto de columnas unidas en una unica, features
final_data = output.selectExpr("target as label", "features as features")

In [11]:
# Set de entrenamiento final
final_data.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|    1|(30,[3,4,7,8,10,1...|
|    0|[0.0,0.0,0.0,0.0,...|
|    1|(30,[1,7,9,12,19,...|
|    0|[0.0,0.0,3.0,1.0,...|
|    0|(30,[1,4,8,10,19,...|
|    0|[0.0,4.0,1.0,0.0,...|
|    0|(30,[2,6,7,8,9,10...|
|    1|(30,[1,2,4,5,8,10...|
|    0|[0.0,8.0,3.0,0.0,...|
|    0|[0.0,1.0,0.0,0.0,...|
|    1|[0.0,6.0,4.0,1.0,...|
|    0|[0.0,3.0,10.0,1.0...|
|    0|(30,[1,4,7,8,9,10...|
|    0|[0.0,5.0,0.0,2.0,...|
|    0|(30,[6,7,8,9,10,1...|
|    0|[1.0,1.0,0.0,0.0,...|
|    0|(30,[2,5,6,7,8,10...|
|    0|(30,[6,8,14,19,20...|
|    0|[0.0,0.0,4.0,1.0,...|
|    0|(30,[1,2,7,8,9,10...|
+-----+--------------------+
only showing top 20 rows



In [12]:
from pyspark.ml.classification import (RandomForestClassifier, GBTClassifier,
                                       DecisionTreeClassifier)

In [13]:
#Instanciamos RandomForestClassifier y le pasamos como parametro maxBins 300 porque tenemos demasiados datos.
rfc = RandomForestClassifier(maxBins=300)

In [14]:
#Creamos el modelo con los datos de entrenamiento.
rfc_model = rfc.fit(final_data)

In [15]:
#Set de prueba
df_test.show()

+---+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+------+------+------+------+------+------+------+------+------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------------+-------------------+-------------------+-------------------+-------------------+-------------------+
| id|cat_0|cat_1|cat_2|cat_3|cat_4|cat_5|cat_6|cat_7|cat_8|cat_9|cat_10|cat_11|cat_12|cat_13|cat_14|cat_15|cat_16|cat_17|cat_18|             cont_0|             cont_1|             cont_2|             cont_3|             cont_4|              cont_5|             cont_6|             cont_7|             cont_8|             cont_9|            cont_10|
+---+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+------+------+------+------+------+------+------+------+------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------------+-------------------+-------------------+-----------

In [16]:
#Como en el set de entrenamiento, transformamos las columnas String en integers ponderados
from pyspark.ml.feature import (VectorAssembler, OneHotEncoder, VectorIndexer, StringIndexer)

cat_0_indexer = StringIndexer(inputCol='cat_0', outputCol='cat_0Index')
cat_1_indexer = StringIndexer(inputCol='cat_1', outputCol='cat_1Index')
cat_2_indexer = StringIndexer(inputCol='cat_2', outputCol='cat_2Index')
cat_3_indexer = StringIndexer(inputCol='cat_3', outputCol='cat_3Index')
cat_4_indexer = StringIndexer(inputCol='cat_4', outputCol='cat_4Index')
cat_5_indexer = StringIndexer(inputCol='cat_5', outputCol='cat_5Index')
cat_6_indexer = StringIndexer(inputCol='cat_6', outputCol='cat_6Index')
cat_7_indexer = StringIndexer(inputCol='cat_7', outputCol='cat_7Index')
cat_8_indexer = StringIndexer(inputCol='cat_8', outputCol='cat_8Index')
cat_9_indexer = StringIndexer(inputCol='cat_9', outputCol='cat_9Index')
cat_10_indexer = StringIndexer(inputCol='cat_10', outputCol='cat_10Index')
cat_11_indexer = StringIndexer(inputCol='cat_11', outputCol='cat_11Index')
cat_12_indexer = StringIndexer(inputCol='cat_12', outputCol='cat_12Index')
cat_13_indexer = StringIndexer(inputCol='cat_13', outputCol='cat_13Index')
cat_14_indexer = StringIndexer(inputCol='cat_14', outputCol='cat_14Index')
cat_15_indexer = StringIndexer(inputCol='cat_15', outputCol='cat_15Index')
cat_16_indexer = StringIndexer(inputCol='cat_16', outputCol='cat_16Index')
cat_17_indexer = StringIndexer(inputCol='cat_17', outputCol='cat_17Index')
cat_18_indexer = StringIndexer(inputCol='cat_18', outputCol='cat_18Index')

In [17]:
# Incluimos los datos formateados en el set de prueba.
test = cat_0_indexer.fit(df_test).transform(df_test)
test = cat_1_indexer.fit(test).transform(test)
test = cat_2_indexer.fit(test).transform(test)
test = cat_3_indexer.fit(test).transform(test)
test = cat_4_indexer.fit(test).transform(test)
test = cat_5_indexer.fit(test).transform(test)
test = cat_6_indexer.fit(test).transform(test)
test = cat_7_indexer.fit(test).transform(test)
test = cat_8_indexer.fit(test).transform(test)
test = cat_9_indexer.fit(test).transform(test)
test = cat_10_indexer.fit(test).transform(test)
test = cat_11_indexer.fit(test).transform(test)
test = cat_12_indexer.fit(test).transform(test)
test = cat_13_indexer.fit(test).transform(test)
test = cat_14_indexer.fit(test).transform(test)
test = cat_15_indexer.fit(test).transform(test)
test = cat_16_indexer.fit(test).transform(test)
test = cat_17_indexer.fit(test).transform(test)
test = cat_18_indexer.fit(test).transform(test)

In [18]:
# Juntamos todas las columnas en una única llamada features
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=[
      'cat_0Index',
      'cat_1Index',
      'cat_2Index',
      'cat_3Index',
      'cat_4Index',
      'cat_5Index',
      'cat_6Index',
      'cat_7Index',
      'cat_8Index',
      'cat_9Index',
      'cat_10Index',
      'cat_11Index',
      'cat_12Index',
      'cat_13Index',
      'cat_14Index',
      'cat_15Index',
      'cat_16Index',
      'cat_17Index',
      'cat_18Index',
      'cont_0',
      'cont_1',
      'cont_2',
      'cont_3',
      'cont_4',
      'cont_5',
      'cont_6',
      'cont_7',
      'cont_8',
      'cont_9',
      'cont_10'],
outputCol='features')
output = assembler.transform(test)
output.show()

+---+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+------+------+------+------+------+------+------+------+------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------------+-------------------+-------------------+-------------------+-------------------+-------------------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+--------------------+
| id|cat_0|cat_1|cat_2|cat_3|cat_4|cat_5|cat_6|cat_7|cat_8|cat_9|cat_10|cat_11|cat_12|cat_13|cat_14|cat_15|cat_16|cat_17|cat_18|             cont_0|             cont_1|             cont_2|             cont_3|             cont_4|              cont_5|             cont_6|             cont_7|             cont_8|             cont_9|            cont_10|cat_0Index|cat_1Index|cat_2Index|cat_3Index|cat_4Index|cat_5In

In [19]:
#Para el set de prueba seleccionamos solo la columana de features porque el target o label es lo que queremos predecir.
final_data_test = output.selectExpr("features as features")
final_data_test.show()

+--------------------+
|            features|
+--------------------+
|(30,[1,3,7,8,9,10...|
|[1.0,3.0,1.0,0.0,...|
|(30,[1,3,4,8,10,1...|
|(30,[0,6,7,8,9,10...|
|[0.0,3.0,0.0,1.0,...|
|[0.0,4.0,0.0,4.0,...|
|(30,[2,5,7,8,10,1...|
|(30,[2,4,5,8,10,1...|
|(30,[7,8,19,20,21...|
|[0.0,5.0,2.0,2.0,...|
|(30,[3,6,7,8,9,10...|
|[0.0,3.0,3.0,0.0,...|
|(30,[0,2,7,8,10,1...|
|(30,[0,1,3,7,9,10...|
|[0.0,0.0,0.0,0.0,...|
|(30,[1,4,6,7,8,10...|
|[0.0,9.0,4.0,0.0,...|
|(30,[1,3,4,7,8,10...|
|(30,[1,3,7,8,9,19...|
|[1.0,6.0,0.0,0.0,...|
+--------------------+
only showing top 20 rows



In [20]:
# Creamos y mostramos las predicciones
rfc_preds = rfc_model.transform(final_data_test)

In [32]:
rfc_preds.show()
#La columna predicction es la que nos mostrará el valor del target.

+--------------------+--------------------+--------------------+----------+
|            features|       rawPrediction|         probability|prediction|
+--------------------+--------------------+--------------------+----------+
|(30,[1,3,7,8,9,10...|[16.8614929773676...|[0.84307464886838...|       0.0|
|[1.0,3.0,1.0,0.0,...|[16.6783714597456...|[0.83391857298728...|       0.0|
|(30,[1,3,4,8,10,1...|[16.8121821035792...|[0.84060910517896...|       0.0|
|(30,[0,6,7,8,9,10...|[16.6888061599997...|[0.83444030799998...|       0.0|
|[0.0,3.0,0.0,1.0,...|[18.1250723177696...|[0.90625361588848...|       0.0|
|[0.0,4.0,0.0,4.0,...|[17.8424080759303...|[0.89212040379651...|       0.0|
|(30,[2,5,7,8,10,1...|[15.1384332274561...|[0.75692166137280...|       0.0|
|(30,[2,4,5,8,10,1...|[13.9378510654722...|[0.69689255327361...|       0.0|
|(30,[7,8,19,20,21...|[13.3280779790120...|[0.66640389895060...|       0.0|
|[0.0,5.0,2.0,2.0,...|[15.5163935727626...|[0.77581967863813...|       0.0|
|(30,[3,6,7,

In [39]:
#La solución es la columna prediction de rfc_preds y la columna id de train. No he sabido como meterlos en un csv.