MULTINOMIAL NAIVE BAYES

In [2]:
import pyspark
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local").setAppName("mi programa")

#sc.stop()
sc = SparkContext(conf = conf)
from pyspark.sql import SparkSession
from pyspark.ml.feature import StandardScaler
spark = SparkSession.builder.getOrCreate()

In [3]:
#con spark, leer archivo
df = spark.read.option('header', 'true').csv('../flights.csv').limit(1000)
#quitar datos que en arrival delay estén vacíos
df = df.dropna(subset = ["ARRIVAL_DELAY"])

In [None]:
#crear df1 con sólo unas columnas de df. Borrar datos vacíos
df1 = df.select(["ARRIVAL_DELAY", "MONTH", "DAY", "DAY_OF_WEEK", "ORIGIN_AIRPORT", "DESTINATION_AIRPORT"]).dropna()
#withColumn crea nueva columna
df1 = df1.withColumn('label_1', df1["ARRIVAL_DELAY"] > 0)

df1 = (df1.withColumn('label_1', df1['label_1'].cast('string')))

from pyspark.ml.feature import StringIndexer

Y_indexer = StringIndexer(inputCol = 'label_1', outputCol = 'label')
df1 = Y_indexer.fit(df1).transform(df1)

df1.show(5)

In [3]:
df1_indexer = StringIndexer(inputCol = "ORIGIN_AIRPORT", outputCol = "ORIGIN_AIRPORT_INDEX")
df1 = df1_indexer.fit(df1).transform(df1)
df1_indexer = StringIndexer(inputCol = "DESTINATION_AIRPORT", outputCol = "DESTINATION_AIRPORT_INDEX")
df1 = df1_indexer.fit(df1).transform(df1)
df1.select(["ORIGIN_AIRPORT", "DESTINATION_AIRPORT", "ORIGIN_AIRPORT_INDEX", "DESTINATION_AIRPORT_INDEX"]).show(5)

+--------------+-------------------+--------------------+-------------------------+
|ORIGIN_AIRPORT|DESTINATION_AIRPORT|ORIGIN_AIRPORT_INDEX|DESTINATION_AIRPORT_INDEX|
+--------------+-------------------+--------------------+-------------------------+
|           ANC|                SEA|                21.0|                     13.0|
|           LAX|                PBI|                 1.0|                     27.0|
|           SFO|                CLT|                 3.0|                     12.0|
|           LAX|                MIA|                 1.0|                      5.0|
|           SEA|                ANC|                 2.0|                     39.0|
+--------------+-------------------+--------------------+-------------------------+
only showing top 5 rows



In [4]:
from pyspark.ml.feature import VectorAssembler
df1 = (df1
      .withColumn("MONTH", df1["MONTH"].cast('int'))
      .withColumn("DAY", df1["DAY"].cast('int'))
      .withColumn("DAY_OF_WEEK", df1["DAY_OF_WEEK"].cast('int')))

vecAssembler = VectorAssembler(inputCols = ["MONTH", "DAY", "DAY_OF_WEEK", "ORIGIN_AIRPORT_INDEX", "DESTINATION_AIRPORT_INDEX"], outputCol = 'features')
df1 = vecAssembler.transform(df1)
df2 = df1.select(['label', 'features'])
df2.show(3)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|[1.0,1.0,4.0,21.0...|
|  0.0|[1.0,1.0,4.0,1.0,...|
|  1.0|[1.0,1.0,4.0,3.0,...|
+-----+--------------------+
only showing top 3 rows



In [5]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

(train, test) = df2.randomSplit([0.6, 0.4], seed = 100)
nb = NaiveBayes(smoothing = 1.0, modelType = 'multinomial')

In [6]:
model = nb.fit(train)

In [7]:
predictions = model.transform(test)
predictions.show()

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|[1.0,1.0,4.0,0.0,...|[-18.976895057640...|[0.66591111432701...|       0.0|
|  0.0|[1.0,1.0,4.0,0.0,...|[-26.682341193584...|[0.8000280443972,...|       0.0|
|  0.0|[1.0,1.0,4.0,0.0,...|[-31.305608875151...|[0.85869607334943...|       0.0|
|  0.0|[1.0,1.0,4.0,0.0,...|[-32.846698102340...|[0.87477406679746...|       0.0|
|  0.0|[1.0,1.0,4.0,0.0,...|[-32.846698102340...|[0.87477406679746...|       0.0|
|  0.0|[1.0,1.0,4.0,0.0,...|[-34.387787329528...|[0.88925858522893...|       0.0|
|  0.0|[1.0,1.0,4.0,0.0,...|[-43.634322692661...|[0.94879010455218...|       0.0|
|  0.0|[1.0,1.0,4.0,0.0,...|[-51.339768828605...|[0.97381348850792...|       0.0|
|  0.0|[1.0,1.0,4.0,0.0,...|[-60.586304191738...|[0.98847950153981...|       0.0|
|  0.0|[1.0,1.0,

In [8]:
evaluator = MulticlassClassificationEvaluator(labelCol = "label", predictionCol = "prediction", metricName = "accuracy")
accuracy = evaluator.evaluate(predictions)

print("Precisión del modelo: " + str(accuracy))

Precisión del modelo: 0.5807291666666666


BERNOULLI NAIVE BAYES

In [18]:
import pyspark.sql.functions as F
df3 = df.select(["ARRIVAL_DELAY", "MONTH", "DAY", "DAY_OF_WEEK", "ORIGIN_AIRPORT", "DESTINATION_AIRPORT"]).dropna()
df3 = df3.withColumn('label_1', df3["ARRIVAL_DELAY"] > 0)

df3 = (df3.withColumn('label_1', df3['label_1'].cast('string')))

X_indexer = StringIndexer(inputCol = 'label_1', outputCol = 'label')
df3 = X_indexer.fit(df3).transform(df3)

month = df3.select("MONTH").distinct().rdd.flatMap(lambda x: x).collect()
day = df3.select("DAY").distinct().rdd.flatMap(lambda x: x).collect()
day_of = df3.select("DAY_OF_WEEK").distinct().rdd.flatMap(lambda x: x).collect()
origin = df3.select("ORIGIN_AIRPORT").distinct().rdd.flatMap(lambda x: x).collect()
destination = df3.select("DESTINATION_AIRPORT").distinct().rdd.flatMap(lambda x: x).collect()

month_expr = [F.when(F.col("MONTH") == mo, 1).otherwise(0).alias("MONTH_" + mo) for mo in month]
day_expr = [F.when(F.col("DAY") == da, 1).otherwise(0).alias("DAY_" + da) for da in day]
day_of_expr = [F.when(F.col("DAY_OF_WEEK") == daf, 1).otherwise(0).alias("DAY_OF_WEEK_" + daf) for daf in day_of]
origin_expr = [F.when(F.col("ORIGIN_AIRPORT") == oa, 1).otherwise(0).alias("ORIGIN_AIRPORT_" + oa) for oa in origin]
destination_expr = [F.when(F.col("DESTINATION_AIRPORT") == da, 1).otherwise(0).alias("DESTINATION_AIRPORT_" + da) for da in destination]

df3.show()

+-------------+-----+---+-----------+--------------+-------------------+-------+-----+
|ARRIVAL_DELAY|MONTH|DAY|DAY_OF_WEEK|ORIGIN_AIRPORT|DESTINATION_AIRPORT|label_1|label|
+-------------+-----+---+-----------+--------------+-------------------+-------+-----+
|          -22|    1|  1|          4|           ANC|                SEA|  false|  0.0|
|           -9|    1|  1|          4|           LAX|                PBI|  false|  0.0|
|            5|    1|  1|          4|           SFO|                CLT|   true|  1.0|
|           -9|    1|  1|          4|           LAX|                MIA|  false|  0.0|
|          -21|    1|  1|          4|           SEA|                ANC|  false|  0.0|
|            8|    1|  1|          4|           SFO|                MSP|   true|  1.0|
|          -17|    1|  1|          4|           LAS|                MSP|  false|  0.0|
|          -10|    1|  1|          4|           LAX|                CLT|  false|  0.0|
|          -13|    1|  1|          4|      

In [19]:
df3 = df3.select("label", "ARRIVAL_DELAY", "MONTH", "DAY", "DAY_OF_WEEK", "ORIGIN_AIRPORT", "DESTINATION_AIRPORT", *month_expr+day_expr+day_of_expr+origin_expr+destination_expr)
df3.show()

+-----+-------------+-----+---+-----------+--------------+-------------------+-------+-----+-------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-

In [20]:
assembler = VectorAssembler(inputCols = [x for x in df3.columns if x not in {"label", "ARRIVAL_DELAY", "MONTH", "DAY", "DAY_OF_WEEK", "ORIGIN_AIRPORT", "DESTINATION_AIRPORT"}], outputCol = "features")
df3 = assembler.transform(df3)

In [21]:
sc.setCheckpointDir('checkpoint/')
df3.select("features")

DataFrame[features: vector]

In [22]:
df4 = df3.select("label", "features")

In [23]:
df4.show(2)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(262,[0,1,2,3,191...|
|  0.0|(262,[0,1,2,4,192...|
+-----+--------------------+
only showing top 2 rows



In [26]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

sc.setCheckpointDir("checkpoint/")
(train2, test2) = df4.randomSplit([0.6, 0.4], seed = 100)

nb2 = NaiveBayes(smoothing = 1.0, modelType = "bernoulli")
nb2.checkpointInterval = 2
model_b = nb2.fit(train2)

In [27]:
predictions_b = model_b.transform(test2)
predictions_b.show(5)

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(262,[0,1,2,3,191...|[-10.441770942784...|[0.95817070244878...|       0.0|
|  0.0|(262,[0,1,2,3,191...|[-10.441770942784...|[0.95817070244878...|       0.0|
|  0.0|(262,[0,1,2,3,191...|[-10.441770942784...|[0.95817070244878...|       0.0|
|  0.0|(262,[0,1,2,3,196...|[-10.629774333046...|[0.90257604601309...|       0.0|
|  0.0|(262,[0,1,2,3,200...|[-11.562919396475...|[0.93792124363139...|       0.0|
+-----+--------------------+--------------------+--------------------+----------+
only showing top 5 rows



In [28]:
evaluator_b = MulticlassClassificationEvaluator(labelCol = "label", predictionCol = "prediction", metricName = "accuracy")
accuracy_b = evaluator_b.evaluate(predictions_b)

print("el modelo tiene una precisión de " + str(accuracy_b))

el modelo tiene una precisión de 0.6588541666666666
