In [1]:
# import pyspark
from pyspark.sql import SparkSession

spark = (SparkSession
        .builder
        .master('local[*]')
        .appName('pubg')
        .getOrCreate())
spark

In [2]:
spark.conf.get("spark.sql.shuffle.partitions")
spark.conf.set("spark.sql.shuffle.partitions", 5)
spark.conf.get("spark.sql.shuffle.partitions")

'5'

In [2]:
path_agg = 'C:\\Users\\FTS_DEMO\\Desktop\Power BI\\archive\\aggregate\\'
path_deaths = 'C:\\Users\\FTS_DEMO\\Desktop\\Power BI\\archive\\deaths\\'

In [3]:
import os
for file in os.listdir(path_agg),os.listdir(path_deaths):
    print(file)

['agg_match_stats_0.csv', 'agg_match_stats_1.csv', 'agg_match_stats_2.csv', 'agg_match_stats_3.csv', 'agg_match_stats_4.csv']
['kill_match_stats_final_0.csv', 'kill_match_stats_final_1.csv', 'kill_match_stats_final_2.csv', 'kill_match_stats_final_3.csv', 'kill_match_stats_final_4.csv']


In [4]:
# build spark schema based on first files
# theoreticaly could have inferschema on each, it would not be efficient

agg_match_stats_0 = spark.read.csv(path_agg+'agg_match_stats_0.csv',inferSchema = True, header=True)
kill_match_stats_final_0 = spark.read.csv(path_deaths+'kill_match_stats_final_0.csv',inferSchema = True, header=True)

In [25]:
# build spark dfs

df_list = []
for file in os.listdir(path_agg):

    if file.endswith('.csv') and file.find('0.csv') < 0:
        df_name = file[:file.find('.csv')]
        df = spark.read.csv(path_agg+file,schema=agg_match_stats_0.schema,header=True) 
        df.name = df_name
        print(df.name)
        df_list.append(df_name)
        exec(df_name + '= df')
        

for file in os.listdir(path_deaths):

    if file.endswith('.csv') and file.find('0.csv') < 0:
        df_name = file[:file.find('.csv')]
        df = spark.read.csv(path_deaths+file,schema=kill_match_stats_final_0.schema,header=True) 
        df.name = df_name
        print(df.name)
        df_list.append(df_name)
        exec(df_name + '= df')

print(df_list)

agg_match_stats_1
agg_match_stats_2
agg_match_stats_3
agg_match_stats_4
kill_match_stats_final_1
kill_match_stats_final_2
kill_match_stats_final_3
kill_match_stats_final_4
['agg_match_stats_1', 'agg_match_stats_2', 'agg_match_stats_3', 'agg_match_stats_4', 'kill_match_stats_final_1', 'kill_match_stats_final_2', 'kill_match_stats_final_3', 'kill_match_stats_final_4']


In [6]:
df_list.append('agg_match_stats_0')
df_list.append('kill_match_stats_final_0')

In [7]:
# to be looped
agg_match_stats = agg_match_stats_0.union(agg_match_stats_1).union(agg_match_stats_2).union(agg_match_stats_3).union(agg_match_stats_4)
kill_match_stats_final = kill_match_stats_final_0.union(kill_match_stats_final_1).union(kill_match_stats_final_2).union(kill_match_stats_final_3).union(kill_match_stats_final_4)

In [8]:
kill_match_stats_final.show(5)
agg_match_stats.show(5)

+------------+----------------+----------------+-----------------+-----------------+-------+--------------------+----+---------------+----------------+-----------------+-----------------+
|   killed_by|     killer_name|killer_placement|killer_position_x|killer_position_y|    map|            match_id|time|    victim_name|victim_placement|victim_position_x|victim_position_y|
+------------+----------------+----------------+-----------------+-----------------+-------+--------------------+----+---------------+----------------+-----------------+-----------------+
|     Grenade| KrazyPortuguese|             5.0|         657725.1|         146275.2|MIRAMAR|2U4GBNA0YmnLSqvEy...| 823|KrazyPortuguese|             5.0|         657725.1|         146275.2|
|      SCAR-L|nide2Bxiaojiejie|            31.0|         93091.37|         722236.4|MIRAMAR|2U4GBNA0YmnLSqvEy...| 194|    X3evolution|            33.0|         92238.68|         723375.1|
|        S686|        Ascholes|            43.0|         366

In [9]:
kill_match_stats_final.printSchema()
agg_match_stats.printSchema()

root
 |-- killed_by: string (nullable = true)
 |-- killer_name: string (nullable = true)
 |-- killer_placement: double (nullable = true)
 |-- killer_position_x: double (nullable = true)
 |-- killer_position_y: double (nullable = true)
 |-- map: string (nullable = true)
 |-- match_id: string (nullable = true)
 |-- time: integer (nullable = true)
 |-- victim_name: string (nullable = true)
 |-- victim_placement: double (nullable = true)
 |-- victim_position_x: double (nullable = true)
 |-- victim_position_y: double (nullable = true)

root
 |-- date: timestamp (nullable = true)
 |-- game_size: integer (nullable = true)
 |-- match_id: string (nullable = true)
 |-- match_mode: string (nullable = true)
 |-- party_size: integer (nullable = true)
 |-- player_assists: integer (nullable = true)
 |-- player_dbno: integer (nullable = true)
 |-- player_dist_ride: double (nullable = true)
 |-- player_dist_walk: double (nullable = true)
 |-- player_dmg: integer (nullable = true)
 |-- player_kills: int

In [10]:
kill_match_stats_final.createOrReplaceTempView('kill_match_stats_final_t')

spark.sql(
    'select count(*) as killer_is_victim from kill_match_stats_final_t where killer_name = victim_name'
).show()

+----------------+
|killer_is_victim|
+----------------+
|          966906|
+----------------+



In [11]:
kill_match_stats_final.groupBy('map').count().show()

+-------+--------+
|    map|   count|
+-------+--------+
|   null|  783392|
|MIRAMAR|11622838|
|ERANGEL|52964245|
+-------+--------+



In [12]:
spark.sql(
    "select \
    min(killer_position_x)  \
    ,max(killer_position_x) \
    ,min(killer_position_y) \
    ,max(killer_position_y) \
    ,min(victim_position_x) \
    ,max(victim_position_x) \
    ,min(victim_position_y) \
    ,max(victim_position_y) \
    from kill_match_stats_final_t where map = 'ERANGEL'"
).show()

+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+
|min(killer_position_x)|max(killer_position_x)|min(killer_position_y)|max(killer_position_y)|min(victim_position_x)|max(victim_position_x)|min(victim_position_y)|max(victim_position_y)|
+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+
|             -688028.0|              810724.7|             -70411.01|              817629.8|             -688028.0|              930233.3|             -101266.9|              875298.1|
+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+



In [13]:
spark.sql(
    "select \
    min(killer_position_x)  \
    ,max(killer_position_x) \
    ,min(killer_position_y) \
    ,max(killer_position_y) \
    ,min(victim_position_x) \
    ,max(victim_position_x) \
    ,min(victim_position_y) \
    ,max(victim_position_y) \
    from kill_match_stats_final_t where map = 'MIRAMAR'"
).show()

+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+
|min(killer_position_x)|max(killer_position_x)|min(killer_position_y)|max(killer_position_y)|min(victim_position_x)|max(victim_position_x)|min(victim_position_y)|max(victim_position_y)|
+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+
|                   0.0|              814274.4|                   0.0|              815183.8|             -63506.48|              894545.8|             -219036.2|              924352.4|
+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+



In [14]:
spark.sql(
    "select \
    count(*)  \
    from kill_match_stats_final_t where time <= 0 "
).show()

+--------+
|count(1)|
+--------+
|       0|
+--------+



In [26]:
spark.sql(
    "select \
    count(*)  \
    from kill_match_stats_final_t where 'min(victim_position_x)' <= 0 "
).show()

+--------+
|count(1)|
+--------+
|       0|
+--------+



In [15]:
# f for nulls count
from pyspark.sql.functions import *

def nullratio(df):
    col_list = []
    allvals = df.count()
    
    for k in df.columns:
        nullvals = df.where(col(k).isNull()).count()
        perc = (nullvals/allvals)*100
        coltype = str(df.schema[k].dataType)
        summary = k,coltype,nullvals,perc,allvals
        col_list.append(summary)
    endtable = spark.createDataFrame(col_list,['col_name','col_type','nullvals','perc','allvals'])
    return(endtable)

In [16]:
kms_n = nullratio(kill_match_stats_final)
kms_n.show()

+-----------------+-------------+--------+-----------------+--------+
|         col_name|     col_type|nullvals|             perc| allvals|
+-----------------+-------------+--------+-----------------+--------+
|        killed_by| StringType()|       0|              0.0|65370475|
|      killer_name| StringType()| 4517469|6.910564746546511|65370475|
| killer_placement| DoubleType()| 4517469|6.910564746546511|65370475|
|killer_position_x| DoubleType()| 4517469|6.910564746546511|65370475|
|killer_position_y| DoubleType()| 4517469|6.910564746546511|65370475|
|              map| StringType()|  783392|1.198388110228662|65370475|
|         match_id| StringType()|       0|              0.0|65370475|
|             time|IntegerType()|       0|              0.0|65370475|
|      victim_name| StringType()|       0|              0.0|65370475|
| victim_placement| DoubleType()| 1227694|1.878055804244959|65370475|
|victim_position_x| DoubleType()|       0|              0.0|65370475|
|victim_position_y| 

In [18]:
kill_match_stats_final.createOrReplaceTempView('kill_match_stats_final_t')

spark.sql(
        "select * from kill_match_stats_final_t where ((killer_name is null) and (map is null) and (victim_placement is null))"
    ).show()

+------------+-----------+----------------+-----------------+-----------------+----+--------------------+----+--------------+----------------+-----------------+-----------------+
|   killed_by|killer_name|killer_placement|killer_position_x|killer_position_y| map|            match_id|time|   victim_name|victim_placement|victim_position_x|victim_position_y|
+------------+-----------+----------------+-----------------+-----------------+----+--------------------+----+--------------+----------------+-----------------+-----------------+
|Down and Out|       null|            null|             null|             null|null|2U4GBNA0YmmJReBAX...|1182|       woyaopi|            null|         211354.2|         400596.2|
|Down and Out|       null|            null|             null|             null|null|2U4GBNA0YmnZgse-j...|1672|   Buddha_Bear|            null|         572501.1|         345663.2|
|Down and Out|       null|            null|             null|             null|null|2U4GBNA0YmnzIiFsc...|

In [17]:
ams_n = nullratio(agg_match_stats)
ams_n.show()

+-------------------+---------------+--------+-------------------+--------+
|           col_name|       col_type|nullvals|               perc| allvals|
+-------------------+---------------+--------+-------------------+--------+
|               date|TimestampType()|       0|                0.0|67369231|
|          game_size|  IntegerType()|       0|                0.0|67369231|
|           match_id|   StringType()|       0|                0.0|67369231|
|         match_mode|   StringType()|       0|                0.0|67369231|
|         party_size|  IntegerType()|       0|                0.0|67369231|
|     player_assists|  IntegerType()|       0|                0.0|67369231|
|        player_dbno|  IntegerType()|       0|                0.0|67369231|
|   player_dist_ride|   DoubleType()|       0|                0.0|67369231|
|   player_dist_walk|   DoubleType()|       0|                0.0|67369231|
|         player_dmg|  IntegerType()|       0|                0.0|67369231|
|       play

In [19]:
agg_match_stats.createOrReplaceTempView('agg_match_stats_t')

spark.sql(
        "select * from agg_match_stats_t where player_name is null"
    ).show()

+-------------------+---------+--------------------+----------+----------+--------------+-----------+------------------+------------------+----------+------------+-----------+-------------------+-------+--------------+
|               date|game_size|            match_id|match_mode|party_size|player_assists|player_dbno|  player_dist_ride|  player_dist_walk|player_dmg|player_kills|player_name|player_survive_time|team_id|team_placement|
+-------------------+---------+--------------------+----------+----------+--------------+-----------+------------------+------------------+----------+------------+-----------+-------------------+-------+--------------+
|2017-12-28 10:30:48|       48|2U4GBNA0YmnZDcOwR...|       tpp|         2|             0|          0|        2853.27246|        3584.87085|        13|           0|       null|           1494.819|     32|             3|
|2017-12-31 14:11:48|       28|2U4GBNA0Ymkoxr2BY...|       tpp|         4|             0|          0|               0.0|18.8

Remove nulls and nagative coordinates

In [20]:
kill_match_stats_final = kill_match_stats_final.na.drop(how='any', thresh=None, subset=None)

In [21]:
agg_match_stats = agg_match_stats.na.drop(how='any', thresh=None, subset=None)

In [22]:
kill_match_stats_final = kill_match_stats_final.filter("NOT(killer_position_x < 0 or killer_position_y < 0 or victim_position_x < 0 or victim_position_y < 0)")

# Most dangerous positions MIRMAR

In [36]:
kill_match_stats_final.createOrReplaceTempView('kill_match_stats_final_t_no_null')

In [37]:
# scale

spark.sql(
    "select \
    min(killer_position_x)  \
    ,max(killer_position_x) \
    ,min(killer_position_y) \
    ,max(killer_position_y) \
    ,min(victim_position_x) \
    ,max(victim_position_x) \
    ,min(victim_position_y) \
    ,max(victim_position_y) \
    from kill_match_stats_final_t_no_null where map = 'MIRAMAR'"
).show()

+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+
|min(killer_position_x)|max(killer_position_x)|min(killer_position_y)|max(killer_position_y)|min(victim_position_x)|max(victim_position_x)|min(victim_position_y)|max(victim_position_y)|
+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+
|                   0.0|              814274.4|                   0.0|              815183.8|                   0.0|              814274.4|                   0.0|              815209.6|
+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+



In [55]:
# according to the info about df coordinates should be in the range 0,800000
#scaling
miramar_xy = kill_match_stats_final\
            .select('victim_position_x','victim_position_y') \
            .filter(col('map') == 'MIRAMAR')\
            .filter(col('victim_position_x') <= 800000)\
            .filter(col('victim_position_y') <= 800000)\


miramar_xy = miramar_xy\
        .withColumn('victim_position_x',col('victim_position_x')*643/800000)\
        .withColumn('victim_position_y',col('victim_position_y')*643/800000)



miramar_xy.show(5)

+-----------------+------------------+
|victim_position_x| victim_position_y|
+-----------------+------------------+
|    528.646549125|117.56869200000001|
|74.13683904999999|     581.412736625|
|    295.220991875|     338.552440375|
|    383.104142125|254.59456400000002|
|    380.646756875|       255.9291105|
+-----------------+------------------+
only showing top 5 rows



In [None]:
# remove 0,0

In [57]:
miramar_xy.write.option('header',True)\
   .csv('miramar_xy.csv')

# Time of survival ml

In [87]:
time_pred_reg = kill_match_stats_final.filter("map = 'ERANGEL'")

time_pred_reg = time_pred_reg.select(['killer_position_x','killer_position_y','victim_position_x','victim_position_y','time'])

In [88]:
dep_var = 'time'
ind_var = []
for c in time_pred_reg.columns:
    if c != 'time':
        ind_var.append(c)
    
ind_var

['killer_position_x',
 'killer_position_y',
 'victim_position_x',
 'victim_position_y']

In [89]:
# For data prep
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import * 
from pyspark.sql.functions import *
from pyspark.ml.feature import StringIndexer

# To check for multicolinearity
from pyspark.ml.stat import Correlation

# For training and evaluation
from pyspark.ml.regression import *
from pyspark.ml.evaluation import *
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [90]:
renamed = time_pred_reg.withColumnRenamed(dep_var,'label')

if str(renamed.schema['label'].dataType) != 'IntegerType':
    renamed = renamed.withColumn("label", renamed["label"].cast(FloatType()))

In [117]:
# Create empty lists set up to divide you input list into numeric and string data types
numeric_inputs = []
string_inputs = []
for column in ind_var:
    if str(renamed.schema[column].dataType) == 'StringType':
        new_col_name = column+"_num"
        string_inputs.append(new_col_name)
    else:
        numeric_inputs.append(column)
        indexed = renamed


        
# # If the dataframe contains string types
# if len(string_inputs) != 0: 
#     # Then use the string indexer to convert them to numeric
#     # Be careful not to convert a continuous variable that was read in incorrectly here
#     # This is meant for categorical columns only
#     for column in input_columns:
#         if str(renamed.schema[column].dataType) == 'StringType':
#             indexer = StringIndexer(inputCol=column, outputCol=column+"_num") 
#             indexed = indexer.fit(renamed).transform(renamed)
# else:
#     indexed = renamed

In [118]:
features_list = numeric_inputs
assembler = VectorAssembler(inputCols=features_list,outputCol='features')
final_data = assembler.transform(indexed).select('features','label')
final_data.show(5)

+--------------------+------+
|            features| label|
+--------------------+------+
|[496989.8,312569....|1035.0|
|[496989.8,312569....|1035.0|
|[460416.7,414748....|1422.0|
|[488034.1,347220....|1210.0|
|[501062.9,425078....|1818.0|
+--------------------+------+
only showing top 5 rows



In [120]:
pearsonCorr = Correlation.corr(final_data, 'features', 'pearson').collect()[0][0]
array = pearsonCorr.toArray()

In [108]:
array

array([[1.        , 0.12796134, 0.87909412, 0.08553448],
       [0.12796134, 1.        , 0.08743973, 0.88424543],
       [0.87909412, 0.08743973, 1.        , 0.24319704],
       [0.08553448, 0.88424543, 0.24319704, 1.        ]])

Predicting time you will be able to survive in the game depending where you are and with whom do you pleay with. 