### Import libraries

In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer,OneHotEncoderEstimator,VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder,CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

### Data Information

In [2]:
spark=SparkSession.builder.appName('pubg').getOrCreate()
df=spark.read.csv('pubg.csv',header=True,inferSchema=True,samplingRatio=0.01)

In [3]:
df.show(2)

+--------------+--------------+--------------+-------+------+-----------+-----+-------------+-----+---------+----------+-----+-----------+-----------+-------------+---------+--------+---------+----------+-------+------------+---------+------------+---------+---------------+------------+---------------+---------+------------+
|            Id|       groupId|       matchId|assists|boosts|damageDealt|DBNOs|headshotKills|heals|killPlace|killPoints|kills|killStreaks|longestKill|matchDuration|matchType|maxPlace|numGroups|rankPoints|revives|rideDistance|roadKills|swimDistance|teamKills|vehicleDestroys|walkDistance|weaponsAcquired|winPoints|winPlacePerc|
+--------------+--------------+--------------+-------+------+-----------+-----+-------------+-----+---------+----------+-----+-----------+-----------+-------------+---------+--------+---------+----------+-------+------------+---------+------------+---------+---------------+------------+---------------+---------+------------+
|7f96b2f878858a|4d4

In [4]:
df.printSchema

<bound method DataFrame.printSchema of DataFrame[Id: string, groupId: string, matchId: string, assists: int, boosts: int, damageDealt: double, DBNOs: int, headshotKills: int, heals: int, killPlace: int, killPoints: int, kills: int, killStreaks: int, longestKill: double, matchDuration: int, matchType: string, maxPlace: int, numGroups: int, rankPoints: int, revives: int, rideDistance: double, roadKills: int, swimDistance: double, teamKills: int, vehicleDestroys: int, walkDistance: double, weaponsAcquired: int, winPoints: int, winPlacePerc: double]>

In [5]:
df.describe().show()

+-------+--------------+--------------+--------------+-------------------+------------------+------------------+------------------+-------------------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+---------+------------------+------------------+-----------------+-------------------+------------------+--------------------+-----------------+-------------------+-------------------+------------------+------------------+-----------------+-------------------+
|summary|            Id|       groupId|       matchId|            assists|            boosts|       damageDealt|             DBNOs|      headshotKills|             heals|         killPlace|        killPoints|             kills|       killStreaks|      longestKill|     matchDuration|matchType|          maxPlace|         numGroups|       rankPoints|            revives|      rideDistance|           roadKills|     swimDistance|          teamKills|    vehic

### Handle missing values

In [6]:
df=df.dropna()

### Handle categorial values

In [7]:
strindexer=StringIndexer(inputCol='matchType',outputCol='matchType_index')
ohencoder=OneHotEncoderEstimator(inputCols=['matchType_index'],outputCols=['matchType_onehot'])

### Transform features to vector

In [8]:
assembler=VectorAssembler(inputCols=['assists', 'boosts', 'damageDealt', 'DBNOs', 'headshotKills', 'heals', 'killPlace', 'killPoints', 'kills', 'killStreaks', 'longestKill', 'matchDuration', 'matchType_onehot', 'maxPlace', 'numGroups', 'rankPoints', 'revives', 'rideDistance', 'roadKills', 'swimDistance', 'teamKills', 'vehicleDestroys', 'walkDistance', 'weaponsAcquired', 'winPoints'],outputCol='features')

### Split data into train and test sets

In [9]:
train,test=df.randomSplit([0.8,0.2]) 

### Build the model

In [10]:
rfr=RandomForestRegressor(featuresCol='features',labelCol='winPlacePerc')

### Hyperparameter tuning and cross validation

In [11]:
pipeline=Pipeline(stages=[strindexer,ohencoder,assembler,rfr])
evaluator=RegressionEvaluator(predictionCol='prediction',labelCol='winPlacePerc',metricName='mae')
#limited hyperparameter tunning due to memory limitation
paramgrid=ParamGridBuilder().addGrid(rfr.numTrees,[10,40,70,100]).build()
crossval=CrossValidator(estimator=pipeline,estimatorParamMaps=paramgrid,evaluator=evaluator,numFolds=3)
crossval=crossval.fit(train)

In [12]:
#optimized hyperparameter values
crossval.bestModel.stages[3]

RandomForestRegressionModel (uid=RandomForestRegressor_ab354fd1021d) with 40 trees

In [22]:
#Avg MAE for 2nd step (40 trees)
crossval.avgMetrics[1]

0.08879741086208787

### Make predictions and evaluate

In [13]:
preds=crossval.transform(test)
preds.select('winPlacePerc','prediction').show(10)

+------------+-------------------+
|winPlacePerc|         prediction|
+------------+-------------------+
|      0.4375|0.33078417334832455|
|      0.2963| 0.3695380226805693|
|      0.9615| 0.8573348164995522|
|       0.383|0.34722443232188616|
|      0.3214|  0.282898933458408|
|      0.1379| 0.2531806721402804|
|      0.3704| 0.4412877308317519|
|      0.0204|0.06731109022445624|
|      0.9231| 0.8132749884356283|
|      0.3077| 0.2395780748402175|
+------------+-------------------+
only showing top 10 rows



In [24]:
mae=evaluator.evaluate(preds)

In [26]:
#MAE on the test set
mae

0.08873352769007113