In [1]:
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score
from pyspark.mllib.evaluation import MulticlassMetrics


In [2]:
spark = (
 SparkSession.builder.appName("project 1 RF")
 .config("spark.sql.repl.eagerEval.enabled", True)
 .config("spark.executor.memory","6G")
 .config("spark.driver.memory","4G")
 .config("spark.sql.parquet.cacheMetadata", "true")
 .config("spark.sql.session.timeZone", "Etc/UTC")
 .config('spark.driver.maxResultSize', '2048m')
 .getOrCreate()
)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/08/17 22:32:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/08/17 22:32:28 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/08/17 22:32:28 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
23/08/17 22:32:28 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
23/08/17 22:32:28 WARN Utils: Service 'SparkUI' could not bind on port 4043. Attempting port 4044.
23/08/17 22:32:28 WARN Utils: Service 'SparkUI' could not bind on port 4044. Attempting port 4045.


In [3]:
data = spark.read.parquet('data/merged_sdf.parquet')

                                                                                

In [4]:
data.where(F.col('tip_amount') >= 3).count() / data.count()

                                                                                

0.5631643629905457

## Adjusting the columns to train the model ##

In [5]:
column = ['PULocationID','DOLocationID', 'Airport', 'Congestion', 'Weekend', 'tip']

In [6]:
def discretization(sdf):
    sdf = sdf.withColumn('tip', (F.col('tip_amount') >= 3).cast('BOOLEAN'))
    sdf = sdf.select(column)
    return sdf.withColumn('tip_numeric', F.col('tip').cast('int')).select(['PULocationID','DOLocationID', 'Airport', 'Congestion', 'Weekend', 'tip_numeric'])

In [7]:
data = discretization(data)

In [8]:
data.where(F.col('tip_numeric') != 0).count() / data.count()

0.5631643629905457

## Model training ##

In [9]:
# Assuming 'label' is the target column and 'features' are the feature columns
assembler = VectorAssembler(inputCols=['PULocationID','DOLocationID', 'Airport', 'Congestion', 'Weekend'], outputCol="features")

In [10]:
rf = RandomForestClassifier(featuresCol="features", labelCol="tip_numeric", numTrees=100, maxDepth=5)

pipeline = Pipeline(stages=[assembler, rf])

train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

model = pipeline.fit(train_data)


23/08/17 22:33:24 WARN MemoryStore: Not enough space to cache rdd_58_5 in memory! (computed 108.7 MiB so far)
23/08/17 22:33:24 WARN BlockManager: Persisting block rdd_58_5 to disk instead.
23/08/17 22:33:24 WARN MemoryStore: Not enough space to cache rdd_58_4 in memory! (computed 108.7 MiB so far)
23/08/17 22:33:24 WARN BlockManager: Persisting block rdd_58_4 to disk instead.
23/08/17 22:33:24 WARN MemoryStore: Not enough space to cache rdd_58_0 in memory! (computed 108.7 MiB so far)
23/08/17 22:33:24 WARN BlockManager: Persisting block rdd_58_0 to disk instead.
23/08/17 22:33:24 WARN MemoryStore: Not enough space to cache rdd_58_1 in memory! (computed 108.7 MiB so far)
23/08/17 22:33:24 WARN BlockManager: Persisting block rdd_58_1 to disk instead.
23/08/17 22:33:25 WARN MemoryStore: Not enough space to cache rdd_58_3 in memory! (computed 108.7 MiB so far)
23/08/17 22:33:25 WARN BlockManager: Persisting block rdd_58_3 to disk instead.
23/08/17 22:33:43 WARN MemoryStore: Not enough spa

## Evaluating the result ##

In [11]:
predictions = model.transform(test_data)

evaluator = MulticlassClassificationEvaluator(labelCol="tip_numeric", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy:", accuracy)




Accuracy: 0.5923478027519965


                                                                                

In [12]:
prediction_and_label = predictions.select("prediction", "tip_numeric").rdd.map(lambda row: (float(row.prediction), float(row.tip_numeric)))

# Instantiate the MulticlassMetrics class
metrics = MulticlassMetrics(prediction_and_label)

# Compute the confusion matrix
confusion_matrix = metrics.confusionMatrix()


                                                                                

In [13]:
print(confusion_matrix)

DenseMatrix([[ 200643., 1121587.],
             [ 111918., 1591728.]])


23/08/18 04:04:27 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 4469178 ms exceeds timeout 120000 ms
23/08/18 04:04:28 WARN SparkContext: Killing executors is not supported by current scheduler.
23/08/18 04:04:28 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:322)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:117)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$driverEndpoint(BlockManagerMasterEndpoint.scala:116)
	at org.apache.spark.storage.