In [89]:
import numpy  as np
import pandas as pd
import matplotlib.pyplot as plt

from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession

from pyspark.sql.types import *
from pyspark.sql.functions import pandas_udf, PandasUDFType

from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler, StandardScaler

from pyspark.ml.classification import LogisticRegression

In [30]:
spark = (SparkSession.builder.appName('bigquery').getOrCreate())

In [31]:
# Read the data from BigQuery as a Spark Dataframe.
df = spark.read.csv('C:/Users/faisa/OneDrive - Letterkenny Institute of Technology/2nd Semester/Big Data Analytics - Shagufta/Technical Project/PUBG/pubg_prediction/train_mini.csv', header=True)

In [32]:
df.printSchema()

root
 |-- Id: string (nullable = true)
 |-- groupId: string (nullable = true)
 |-- matchId: string (nullable = true)
 |-- assists: string (nullable = true)
 |-- boosts: string (nullable = true)
 |-- damageDealt: string (nullable = true)
 |-- DBNOs: string (nullable = true)
 |-- headshotKills: string (nullable = true)
 |-- heals: string (nullable = true)
 |-- killPlace: string (nullable = true)
 |-- killPoints: string (nullable = true)
 |-- kills: string (nullable = true)
 |-- killStreaks: string (nullable = true)
 |-- longestKill: string (nullable = true)
 |-- matchDuration: string (nullable = true)
 |-- matchType: string (nullable = true)
 |-- maxPlace: string (nullable = true)
 |-- numGroups: string (nullable = true)
 |-- rankPoints: string (nullable = true)
 |-- revives: string (nullable = true)
 |-- rideDistance: string (nullable = true)
 |-- roadKills: string (nullable = true)
 |-- swimDistance: string (nullable = true)
 |-- teamKills: string (nullable = true)
 |-- vehicleDestroys

In [33]:
df.collect()

, weaponsAcquired='0', winPoints='1462', winPlacePerc='0.02'),
 Row(Id='9c873418229eb3', groupId='c24ef200159ecd', matchId='5067f79e33e108', assists='0', boosts='0', damageDealt='0.0', DBNOs='0', headshotKills='0', heals='0', killPlace='98', killPoints='1072', kills='0', killStreaks='0', longestKill='0.0', matchDuration='1405', matchType='duo-fpp', maxPlace='51', numGroups='49', rankPoints='-1', revives='0', rideDistance='0.0', roadKills='0', swimDistance='0.0', teamKills='0', vehicleDestroys='0', walkDistance='0.0', weaponsAcquired='0', winPoints='1505', winPlacePerc='0.0'),
 Row(Id='6783a9195f4c4b', groupId='00b4079fd64d53', matchId='1b888a1dbe06a5', assists='0', boosts='0', damageDealt='0.0', DBNOs='0', headshotKills='0', heals='0', killPlace='100', killPoints='1449', kills='0', killStreaks='0', longestKill='0.0', matchDuration='1324', matchType='solo-fpp', maxPlace='100', numGroups='95', rankPoints='-1', revives='0', rideDistance='0.0', roadKills='0', swimDistance='0.0', teamKills=

In [34]:
columns_to_drop = ['Id', 'groupId', 'matchId']

In [35]:
df = df.drop(*columns_to_drop)

In [34]:
df.printSchema()

root
 |-- assists: string (nullable = true)
 |-- boosts: string (nullable = true)
 |-- damageDealt: string (nullable = true)
 |-- DBNOs: string (nullable = true)
 |-- headshotKills: string (nullable = true)
 |-- heals: string (nullable = true)
 |-- killPlace: string (nullable = true)
 |-- killPoints: string (nullable = true)
 |-- kills: string (nullable = true)
 |-- killStreaks: string (nullable = true)
 |-- longestKill: string (nullable = true)
 |-- matchDuration: string (nullable = true)
 |-- matchType: string (nullable = true)
 |-- maxPlace: string (nullable = true)
 |-- numGroups: string (nullable = true)
 |-- rankPoints: string (nullable = true)
 |-- revives: string (nullable = true)
 |-- rideDistance: string (nullable = true)
 |-- roadKills: string (nullable = true)
 |-- swimDistance: string (nullable = true)
 |-- teamKills: string (nullable = true)
 |-- vehicleDestroys: string (nullable = true)
 |-- walkDistance: string (nullable = true)
 |-- weaponsAcquired: string (nullable = 

In [36]:
df = df.drop('matchType')

In [37]:
df = df.dropna()

In [38]:
df.collect()

 numGroups='33', rankPoints='-1', revives='0', rideDistance='0.0', roadKills='0', swimDistance='0.0', teamKills='0', vehicleDestroys='0', walkDistance='23.01', weaponsAcquired='2', winPoints='1500', winPlacePerc='0.0'),
 Row(assists='0', boosts='0', damageDealt='0.0', DBNOs='0', headshotKills='0', heals='0', killPlace='96', killPoints='1264', kills='0', killStreaks='0', longestKill='0.0', matchDuration='1877', maxPlace='33', numGroups='33', rankPoints='-1', revives='0', rideDistance='0.0', roadKills='0', swimDistance='0.0', teamKills='0', vehicleDestroys='0', walkDistance='0.0', weaponsAcquired='0', winPoints='1466', winPlacePerc='0.0'),
 Row(assists='0', boosts='0', damageDealt='0.0', DBNOs='0', headshotKills='0', heals='0', killPlace='97', killPoints='1291', kills='0', killStreaks='0', longestKill='0.0', matchDuration='1293', maxPlace='34', numGroups='34', rankPoints='-1', revives='0', rideDistance='0.0', roadKills='0', swimDistance='0.0', teamKills='0', vehicleDestroys='0', walkDist

In [39]:
df.describe()

DataFrame[summary: string, assists: string, boosts: string, damageDealt: string, DBNOs: string, headshotKills: string, heals: string, killPlace: string, killPoints: string, kills: string, killStreaks: string, longestKill: string, matchDuration: string, maxPlace: string, numGroups: string, rankPoints: string, revives: string, rideDistance: string, roadKills: string, swimDistance: string, teamKills: string, vehicleDestroys: string, walkDistance: string, weaponsAcquired: string, winPoints: string, winPlacePerc: string]

In [41]:
df = convertColumn(df, df.columns, FloatType())

In [42]:
df.describe().show()

+-------+-------+------+------------------+-----+-------------+-----+------------------+-----------------+-----+-----------+-----------+------------------+------------------+------------------+-----------------+-------+------------+---------+------------+---------+---------------+------------------+------------------+-----------------+--------------------+
|summary|assists|boosts|       damageDealt|DBNOs|headshotKills|heals|         killPlace|       killPoints|kills|killStreaks|longestKill|     matchDuration|          maxPlace|         numGroups|       rankPoints|revives|rideDistance|roadKills|swimDistance|teamKills|vehicleDestroys|      walkDistance|   weaponsAcquired|        winPoints|        winPlacePerc|
+-------+-------+------+------------------+-----+-------------+-----+------------------+-----------------+-----+-----------+-----------+------------------+------------------+------------------+-----------------+-------+------------+---------+------------+---------+---------------+-

In [52]:
# detect categorical columns:
categorical_cols = [item[0] for item in df.dtypes if item[1].startswith('string')][:1]
# detect numerical columns:
numerical_cols = [item[0] for item in df.dtypes if item[1].startswith('int') | item[1].startswith('double') | item[1].startswith('float')]

In [50]:
categorical_cols

[]

In [53]:
numerical_cols

['assists',
 'boosts',
 'damageDealt',
 'DBNOs',
 'headshotKills',
 'heals',
 'killPlace',
 'killPoints',
 'kills',
 'killStreaks',
 'longestKill',
 'matchDuration',
 'maxPlace',
 'numGroups',
 'rankPoints',
 'revives',
 'rideDistance',
 'roadKills',
 'swimDistance',
 'teamKills',
 'vehicleDestroys',
 'walkDistance',
 'weaponsAcquired',
 'winPoints',
 'winPlacePerc']

In [54]:
def count_null_values(c):
    """Input pyspark dataframe and return list of columns with missing value and it's total value"""
    null_counts = []          #start with an empty list to add missing value counter
    for col in c.dtypes:     
        cname = col[0]            
        ctype = col[1]        
        null_values = c.where( c[cname].isNull()).count() #check count of null in column name
        result = tuple([cname, null_values])  #new tuple, (column name, null count)
        null_counts.append(result)      #put the new tuple in our result list
    null_counts=[(x,y) for (x,y) in null_counts if y!=0]  #view just columns that have missing values
    return null_counts

miss_counts = count_null_values(df)

In [55]:
miss_counts

[]

In [56]:
miss_val_list=[x[0] for x in miss_counts]  ## list of clomuns having missing values
df_missing= train_data.select(*miss_val_list)

# categorical columns
categorical_colums_miss=[item[0] for item in df_missing.dtypes if item[1].startswith('string')] 

# numerical columns
numerical_columns_miss = [item[0] for item in df_missing.dtypes if item[1].startswith('int') | item[1].startswith('double') | item[1].startswith('float')] 


In [57]:
categorical_colums_miss

[]

In [58]:
numerical_columns_miss

[]

In [79]:
stages = [] # stages in our Pipeline
for categoricalCol in categorical_cols:
    # Category Indexing with StringIndexer
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    # Use OneHotEncoder to convert categorical variables into binary SparseVectors
    # encoder = OneHotEncoderEstimator(inputCol=categoricalCol + "Index", outputCol=categoricalCol + "classVec")
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    # Add stages.  These are not run here, but will run all at once later on.
    stages += [stringIndexer, encoder]


In [80]:
# Convert label into label indices using the StringIndexer
label_stringIdx = StringIndexer(inputCol="winPlacePerc", outputCol="label")
stages += [label_stringIdx]

In [81]:
# Transform all features into a vector using VectorAssembler
assemblerInputs = [c + "classVec" for c in categorical_cols] + numerical_cols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [82]:
stages

[StringIndexer_00ec5834edee, VectorAssembler_c76d116d8355]

In [122]:
cols = df.columns
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(df)
df = pipelineModel.transform(df)
selectedCols = ['label','features']+cols
df = df.select(selectedCols)
pd.DataFrame(df.take(5), columns=df.columns).transpose()

IllegalArgumentException: 'requirement failed: Output column label already exists.'

In [117]:
(train, test) = train_data.randomSplit([0.7, 0.3])

In [118]:
pd.DataFrame(train.take(5), columns=train.columns).transpose()

Unnamed: 0,0,1,2,3,4
label,0,0,0,0,0
features,"(1.0, 0.0, 85.80000305175781, 0.0, 0.0, 0.0, 9...","(0.0, 0.0, 20.399999618530273, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 55.650001525878906, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 97.54000091552734, 0.0, 0.0, 0.0, 9...","(0.0, 0.0, 10.800000190734863, 0.0, 0.0, 0.0, ..."
assists,1,0,0,0,0
boosts,0,0,0,0,0
damageDealt,85.8,20.4,55.65,97.54,10.8
DBNOs,0,0,0,0,0
headshotKills,0,0,0,0,0
heals,0,0,0,0,0
killPlace,97,97,96,96,97
killPoints,1230,1723,1443,1289,0


In [87]:
lr = LogisticRegression(featuresCol = 'features', labelCol = 'label', maxIter=100)
lrModel = lr.fit(train)

In [95]:
lrModel.coefficientMatrix

SparseMatrix(10, 25, [0, 12, 24, 36, 48, 60, 72, 84, 96, 108, 120], [0, 2, 6, 7, 11, 12, 13, 14, ..., 11, 12, 13, 14, 21, 22, 23, 24], [98.1818, -0.6842, 7.802, -0.0059, 0.1307, -1.0589, -1.1284, 0.0139, ..., -0.0103, -0.2787, -0.2813, -0.0031, -0.102, 6.0943, 0.0035, 401.6929], 1)

In [97]:
predictions = lrModel.transform(test)

In [98]:
predictions.show()

+-----+--------------------+-------+------+-----------+-----+-------------+-----+---------+----------+-----+-----------+-----------+-------------+--------+---------+----------+-------+------------+---------+------------+---------+---------------+------------+---------------+---------+------------+--------------------+--------------------+----------+
|label|            features|assists|boosts|damageDealt|DBNOs|headshotKills|heals|killPlace|killPoints|kills|killStreaks|longestKill|matchDuration|maxPlace|numGroups|rankPoints|revives|rideDistance|roadKills|swimDistance|teamKills|vehicleDestroys|walkDistance|weaponsAcquired|winPoints|winPlacePerc|       rawPrediction|         probability|prediction|
+-----+--------------------+-------+------+-----------+-----+-------------+-----+---------+----------+-----+-----------+-----------+-------------+--------+---------+----------+-------+------------+---------+------------+---------+---------------+------------+---------------+---------+-----------

In [105]:
pd.DataFrame(predictions.take(5), columns=predictions.columns).transpose()

Unnamed: 0,0,1,2,3,4
label,0,0,0,0,0
features,"(0.0, 0.0, 19.440000534057617, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 96.0, 1058.0, 0...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 97.0, 1514.0, 0...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 99.0, 1190.0, 0...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 100.0, 1335.0, ..."
assists,0,0,0,0,0
boosts,0,0,0,0,0
damageDealt,19.44,0,0,0,0
DBNOs,0,0,0,0,0
headshotKills,0,0,0,0,0
heals,0,0,0,0,0
killPlace,100,96,97,99,100
killPoints,1265,1058,1514,1190,1335
