In [9]:
#!pip install pyspark

In [1]:
#spark sql imports
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import Row
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import *

import matplotlib.pyplot as plt
%matplotlib inline

In [2]:
#spark ML imports
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer, StringIndexer, CountVectorizer, IDF
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

In [None]:
import os
os.getcwd()

In [5]:
spark = SparkSession.builder.appName('PUBG').getOrCreate()

#change configuration settings on Spark 
conf = spark.sparkContext._conf.setAll([('spark.executor.memory', '4g'), ('spark.app.name', 'Spark Updated Conf'), ('spark.executor.cores', '4'), ('spark.cores.max', '4'), ('spark.driver.memory','4g')])
# the 4g is the total mun of the memory 4g * 4 executor = 16g


#print spark configuration settings
spark.sparkContext.getConf().getAll()

[('spark.driver.memory', '4g'),
 ('spark.executor.memory', '4g'),
 ('spark.executor.id', 'driver'),
 ('spark.app.id', 'local-1582732507476'),
 ('spark.executor.cores', '4'),
 ('spark.cores.max', '4'),
 ('spark.driver.port', '50496'),
 ('spark.app.name', 'Spark Updated Conf'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.driver.host', 'LAPTOP-67Q35R8F.mshome.net'),
 ('spark.master', 'local[*]'),
 ('spark.submit.deployMode', 'client'),
 ('spark.ui.showConsoleProgress', 'true')]

In [6]:
spark.sparkContext

## Load Data

In aggregate, each match's meta information and player statistics are summarized (as provided by pubg). It includes various aggregate statistics such as player kills, damage, distance walked, etc as well as metadata on the match itself such as queue size, fpp/tpp, date, etc.

In [144]:
#match = spark.read.csv("C:/Users/kailf/Python_code/UChicago/Big_Data/Project/match_stats_0.csv", inferSchema=True,header=True)
agg0 = spark.read.csv('agg/agg_match_stats_0.csv', inferSchema=True, header=True)
agg1 = spark.read.csv('agg/agg_match_stats_1.csv', inferSchema=True, header=True)
agg2 = spark.read.csv('agg/agg_match_stats_2.csv', inferSchema=True, header=True)
agg3 = spark.read.csv('agg/agg_match_stats_3.csv', inferSchema=True, header=True)
agg4 = spark.read.csv('agg/agg_match_stats_4.csv', inferSchema=True, header=True)
match = agg0.union(agg1)
match = match.union(agg2)
match = match.union(agg3)
match = match.union(agg4)




In deaths, the files record every death that occurred within the 720k matches. That is, each row documents an event where a player has died in the match.

In [10]:
#kill = spark.read.csv("C:/Users/kailf/Python_code/UChicago/Big_Data/Project/kill_stats_final_0.csv",inferSchema=True, header=True)
kil0 = spark.read.csv('deaths/kill_match_stats_final_0.csv', inferSchema=True, header=True)
kil1 = spark.read.csv('deaths/kill_match_stats_final_1.csv', inferSchema=True, header=True)
kil2 = spark.read.csv('deaths/kill_match_stats_final_2.csv', inferSchema=True, header=True)
kil3 = spark.read.csv('deaths/kill_match_stats_final_3.csv', inferSchema=True, header=True)
kil4 = spark.read.csv('deaths/kill_match_stats_final_4.csv', inferSchema=True, header=True)
kill = kil0.union(kil1)
kill = kill.union(kil2)
kill = kill.union(kil3)
kill = kill.union(kil4)





In [135]:
kill.show(3)

+---------+----------------+----------------+-----------------+-----------------+-------+--------------------+----+---------------+----------------+-----------------+-----------------+
|killed_by|     killer_name|killer_placement|killer_position_x|killer_position_y|    map|            match_id|time|    victim_name|victim_placement|victim_position_x|victim_position_y|
+---------+----------------+----------------+-----------------+-----------------+-------+--------------------+----+---------------+----------------+-----------------+-----------------+
|  Grenade| KrazyPortuguese|             5.0|         657725.1|         146275.2|MIRAMAR|2U4GBNA0YmnLSqvEy...| 823|KrazyPortuguese|             5.0|         657725.1|         146275.2|
|   SCAR-L|nide2Bxiaojiejie|            31.0|         93091.37|         722236.4|MIRAMAR|2U4GBNA0YmnLSqvEy...| 194|    X3evolution|            33.0|         92238.68|         723375.1|
|     S686|        Ascholes|            43.0|         366921.4|         421

In [111]:
match.show(1)

+--------------------+---------+--------------------+----------+----------+--------------+-----------+----------------+----------------+----------+------------+-----------+-------------------+-------+--------------+
|                date|game_size|            match_id|match_mode|party_size|player_assists|player_dbno|player_dist_ride|player_dist_walk|player_dmg|player_kills|player_name|player_survive_time|team_id|team_placement|
+--------------------+---------+--------------------+----------+----------+--------------+-----------+----------------+----------------+----------+------------+-----------+-------------------+-------+--------------+
|2017-11-26T20:59:...|       37|2U4GBNA0YmnNZYkzj...|       tpp|         2|             0|          1|        2870.724|      1784.84778|       117|           1|   SnuffIes|            1106.32|      4|            18|
+--------------------+---------+--------------------+----------+----------+--------------+-----------+----------------+----------------+

In [57]:
#match = match.withColumn('date', F.regexp_replace('date', "+0000", ""))

In [136]:
match.select('date').take(4)

[Row(date='2017-11-26T20:59:40+0000'),
 Row(date='2017-11-26T20:59:40+0000'),
 Row(date='2017-11-26T20:59:40+0000'),
 Row(date='2017-11-26T20:59:40+0000')]

In [147]:
match.printSchema()

root
 |-- date: timestamp (nullable = true)
 |-- game_size: string (nullable = true)
 |-- match_id: string (nullable = true)
 |-- match_mode: string (nullable = true)
 |-- party_size: string (nullable = true)
 |-- player_assists: string (nullable = true)
 |-- player_dbno: string (nullable = true)
 |-- player_dist_ride: string (nullable = true)
 |-- player_dist_walk: string (nullable = true)
 |-- player_dmg: string (nullable = true)
 |-- player_kills: string (nullable = true)
 |-- player_name: string (nullable = true)
 |-- player_survive_time: string (nullable = true)
 |-- team_id: string (nullable = true)
 |-- team_placement: string (nullable = true)



In [145]:
match = match.withColumn("date", F.unix_timestamp(match["date"], "yyyy-MM-dd'T'HH:mm:ss'+0000'").cast('timestamp'))

In [150]:
from pyspark.sql.functions import month, year, hour, minute, second

match = match.withColumn('month', month(match['date']))
match = match.withColumn('year', year(match['date']))
match = match.withColumn('time', time(match['date']))

NameError: name 'time' is not defined

In [149]:
match.show(3)

+-------------------+---------+--------------------+----------+----------+--------------+-----------+------------------+------------------+----------+------------+-----------+-------------------+-------+--------------+-----+----+----+
|               date|game_size|            match_id|match_mode|party_size|player_assists|player_dbno|  player_dist_ride|  player_dist_walk|player_dmg|player_kills|player_name|player_survive_time|team_id|team_placement|month|year|hour|
+-------------------+---------+--------------------+----------+----------+--------------+-----------+------------------+------------------+----------+------------+-----------+-------------------+-------+--------------+-----+----+----+
|2017-11-26 20:59:40|       37|2U4GBNA0YmnNZYkzj...|       tpp|         2|             0|          1|          2870.724|        1784.84778|       117|           1|   SnuffIes|            1106.32|      4|            18|   11|2017|  20|
|2017-11-26 20:59:40|       37|2U4GBNA0YmnNZYkzj...|       t

In [None]:
#replace match_id with numeric id
match_id = match.select(match.match_id).distinct()
match_id_kill = kill.select('match_id').distinct()
match_id = match_id.union(match_id_kill)
match_id = match_id.withColumn('id',F.monotonically_increasing_id())
match = match.join(match_id, on=['match_id'],how = 'left').drop('match_id')
kill = kill.join(match_id, on=['match_id'],how = 'left').drop('match_id')

## EDA
#### Add killing distance to kill

In [None]:
kill = kill.withColumn('kill_distance',F.sqrt((kill.killer_position_x-kill.victim_position_x)**2+
                                      (kill.killer_position_y-kill.victim_position_y)**2))

Game time: more players and less waiting time / by match type


Average match time / by match type / party size


How many matches in the dataset  / by match type / party size

In [114]:
match_map=kill.select(['map','id'])

In [115]:
match_map.show(3)

+-------+--------------------+
|    map|            match_id|
+-------+--------------------+
|MIRAMAR|2U4GBNA0YmnLSqvEy...|
|MIRAMAR|2U4GBNA0YmnLSqvEy...|
|MIRAMAR|2U4GBNA0YmnLSqvEy...|
+-------+--------------------+
only showing top 3 rows



In [116]:
match_joined = match.join(match_map, match['id'] == match_map['id'], how = 'left')

In [117]:
match_joined.show(3)

+--------------------+---------+----------+----------+--------------+-----------+----------------+----------------+----------+------------+-------------+-------------------+-------+--------------+-------+
|                date|game_size|match_mode|party_size|player_assists|player_dbno|player_dist_ride|player_dist_walk|player_dmg|player_kills|  player_name|player_survive_time|team_id|team_placement|    map|
+--------------------+---------+----------+----------+--------------+-----------+----------------+----------------+----------+------------+-------------+-------------------+-------+--------------+-------+
|2017-12-30T19:06:...|       94|       tpp|         1|             0|          0|               0|       23.053623|      22.0|           0|JackFFFFFFFFF|            101.117| 100010|            87|ERANGEL|
|2017-12-30T19:06:...|       94|       tpp|         1|             0|          0|               0|       23.053623|      22.0|           0|JackFFFFFFFFF|            101.117| 100010

In [104]:
match_joined.groupby('map','party_size').count().show()

+-------+----------+---------+
|    map|party_size|    count|
+-------+----------+---------+
|MIRAMAR|         4|111358780|
|ERANGEL|         4|463783134|
|ERANGEL|         1|220202880|
|MIRAMAR|         2| 63433144|
|ERANGEL|         2|335063305|
|   null|         1|  1217315|
|MIRAMAR|         1| 39025182|
|   null|         4|  8230127|
|   null|         2|  5616203|
+-------+----------+---------+



Average distance walk/ride 


In [118]:
match_joined.groupby('map','party_size').avg('player_dist_walk').show()

+-------+----------+---------------------+
|    map|party_size|avg(player_dist_walk)|
+-------+----------+---------------------+
|MIRAMAR|         4|    1410.742018224071|
|ERANGEL|         4|   1355.8162957032268|
|ERANGEL|         1|    1150.693641124211|
|MIRAMAR|         2|   1276.2859497094053|
|ERANGEL|         2|   1221.9183791044488|
|   null|         1|   1022.9180469875213|
|MIRAMAR|         1|   1182.2890436595835|
|   null|         4|    1299.164686919216|
|   null|         2|     1187.68092500758|
+-------+----------+---------------------+



In [119]:
match_joined.groupby('map','party_size').avg('player_dist_ride').show()

+-------+----------+---------------------+
|    map|party_size|avg(player_dist_ride)|
+-------+----------+---------------------+
|MIRAMAR|         4|   1052.4923974292822|
|ERANGEL|         4|   1346.2549098260222|
|ERANGEL|         1|     909.324541313901|
|MIRAMAR|         2|    972.9549795766075|
|ERANGEL|         2|    1176.657920711431|
|   null|         1|    842.3804298805157|
|MIRAMAR|         1|    744.0219518258749|
|   null|         4|   1260.7400371829012|
|   null|         2|   1100.2779096482088|
+-------+----------+---------------------+



#### Team Stats
team average survive time / team ranking by party size

In [None]:
match.filter(match.party_size == 4).groupby(['match_id','team_id']).\
    agg(F.avg('party_size'),F.avg('player_survive_time'),F.sum('player_kills'),F.avg('team_placement'),\
        F.avg('player_dist_ride'), F.avg('player_dist_walk')).\
    orderBy(['match_id','avg(player_survive_time)','avg(team_placement)'],ascending=[1,0,0]).show()

#### Survival time distribution by group-size

In [None]:
df1 = match.sample(withReplacement = False, fraction = 0.005, seed=0).\
        join(kill.select('match_id','map'), match.match_id == kill.match_id, 'left').\
        select('player_survive_time','party_size','map').toPandas()

In [None]:
df1.shape

In [None]:
plt.figure(figsize=(10,8))
plt.xlim(-0.1, 42)
sns.distplot(df1[df1['party_size'] == 4]['player_survive_time']/60,hist=False, color = 'r')
sns.distplot(df1[df1['party_size'] == 2]['player_survive_time']/60,hist=False, color = 'b')
sns.distplot(df1[df1['party_size'] == 1]['player_survive_time']/60,hist=False)

#### death position with in 4 minutes

In [None]:
import matplotlib.image as mpimg

miramar_map = mpimg.imread('./miramar.jpg')
erangel_map = mpimg.imread('./erangel.jpg')

In [None]:
erangel_map.shape

In [None]:
miramar_map.shape

In [None]:
#seperate two map
miramar = kill.filter(kill.map == 'MIRAMAR')
erangel = kill.filter(kill.map == 'ERANGEL')

In [None]:
db4_m = miramar.filter(miramar.time <= 240).select('victim_position_x','victim_position_y').toPandas()

In [None]:
from scipy.ndimage.filters import gaussian_filter
import matplotlib.cm as cm
from matplotlib.colors import Normalize
import numpy as np

def heatmap(x, y, s, bins=100):
    heatmap, xedges, yedges = np.histogram2d(x, y, bins=bins)
    heatmap = gaussian_filter(heatmap, sigma=s)

    extent = [xedges[0], xedges[-1], yedges[0], yedges[-1]]
    return heatmap.T, extent

#bg = imread('../input/erangel.jpg')
hmap, extent = heatmap(db4_m['victim_position_x']/800, db4_m['victim_position_y']/800, 3.5) #parameter 4.5??
alphas = np.clip(Normalize(0, hmap.max(), clip=True)(hmap)*3.5, 0.0, 1.)
colors = Normalize(0, hmap.max(), clip=True)(hmap)
colors = cm.Reds(colors)
colors[..., -1] = alphas

fig, ax = plt.subplots(figsize=(12,12))
ax.set_xlim(0, 1000); ax.set_ylim(0, 1000)
ax.imshow(miramar_map)
ax.imshow(colors, extent=extent, origin='lower', cmap=cm.Reds, alpha=0.9)
#plt.scatter(plot_data_er[:,0], plot_data_er[:,1])
plt.gca().invert_yaxis()

#### death position in a single match

In [None]:
miramar.select('match_id').distinct().show(5,truncate=False)

In [None]:
match1 = miramar.filter(miramar.match_id == '2U4GBNA0Yml0XDizIVK4IxDWB75tIG-FBMzmyEP_fBUj1UAIBJN9VCG7X49NaJ0W').\
        orderBy('time',ascending=[1]).toPandas()

In [None]:
import matplotlib.animation as animation
%matplotlib notebook

#fig, ax = plt.subplots(figsize=(6,6))
#ax.set_xlim(0, 1000) 
#ax.set_ylim(1000,0)


fig = plt.figure(figsize=(6,6))
plt.xlim(0, 1000)
plt.ylim(1000, 0)

def animate(i):
    data = match1.iloc[:int(i+1),10:12]/800
    #sns.scatterplot(x='victim_position_x', y='victim_position_y', data=data, palette = "RdBu")
    plt.scatter(data['victim_position_x'],data['victim_position_y'],c='r',marker='x')
    
            
ani = animation.FuncAnimation(fig,animate,frames = match1.shape[0],repeat=False)
plt.imshow(miramar_map)
plt.show()