## DNSC 6290 Large Datasets Group Project

### Group 6——PUBG Match Deaths and Statistics

Create SparkContext and SparkSession:

In [33]:
import findspark
findspark.init()

In [34]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
     .appName("Test SparkSession") \
     .getOrCreate()

#Remember to close sc at the end

In [35]:
spark

#### 1. Load and Prepare Data

We need to stack all five aggregate files into one file, as well as stacking all five deaths files into one. 

In [36]:
#This command read every file in the "aggregate" folder
aggr_all = spark.read.option("header", "true") \
    .option("delimiter", ",") \
    .option("inferSchema", "true") \
    .csv("s3://bigdata2020group6/aggregate/agg_match_stats_*.csv")

In [5]:
aggr_all.count() #67369236 rows in total

67369231

In [37]:
#This command read every file in the "deaths" folder
death_all = spark.read.option("header", "true") \
    .option("delimiter", ",") \
    .option("inferSchema", "true") \
    .csv("s3://bigdata2020group6/deaths/kill_match_stats_final_*.csv")

In [7]:
death_all.count() #65370480 rows in total

65370475

Data clearning: Null value in killer_placement?

#### 3. Explore Data Structure

In [60]:
aggr_all.show(10)

+--------------------+---------+--------------------+----------+----------+--------------+-----------+------------------+------------------+----------+------------+-----------+-------------------+-------+--------------+
|                date|game_size|            match_id|match_mode|party_size|player_assists|player_dbno|  player_dist_ride|  player_dist_walk|player_dmg|player_kills|player_name|player_survive_time|team_id|team_placement|
+--------------------+---------+--------------------+----------+----------+--------------+-----------+------------------+------------------+----------+------------+-----------+-------------------+-------+--------------+
|2017-11-26T20:59:...|       37|2U4GBNA0YmnNZYkzj...|       tpp|         2|             0|          1|          2870.724|        1784.84778|       117|           1|   SnuffIes|            1106.32|      4|            18|
|2017-11-26T20:59:...|       37|2U4GBNA0YmnNZYkzj...|       tpp|         2|             0|          1|2938.4072300000003

In [76]:
death_all.show(10)

+------------+----------------+----------------+-----------------+-----------------+-------+--------------------+----+---------------+----------------+-----------------+-----------------+
|   killed_by|     killer_name|killer_placement|killer_position_x|killer_position_y|    map|            match_id|time|    victim_name|victim_placement|victim_position_x|victim_position_y|
+------------+----------------+----------------+-----------------+-----------------+-------+--------------------+----+---------------+----------------+-----------------+-----------------+
|     Grenade| KrazyPortuguese|             5.0|         657725.1|         146275.2|MIRAMAR|2U4GBNA0YmnLSqvEy...| 823|KrazyPortuguese|             5.0|         657725.1|         146275.2|
|      SCAR-L|nide2Bxiaojiejie|            31.0|         93091.37|         722236.4|MIRAMAR|2U4GBNA0YmnLSqvEy...| 194|    X3evolution|            33.0|         92238.68|         723375.1|
|        S686|        Ascholes|            43.0|         366

In [67]:
aggr_all.printSchema

<bound method DataFrame.printSchema of DataFrame[date: string, game_size: int, match_id: string, match_mode: string, party_size: int, player_assists: int, player_dbno: int, player_dist_ride: double, player_dist_walk: double, player_dmg: int, player_kills: int, player_name: string, player_survive_time: double, team_id: int, team_placement: int]>

In [69]:
death_all.printSchema

<bound method DataFrame.printSchema of DataFrame[killed_by: string, killer_name: string, killer_placement: double, killer_position_x: double, killer_position_y: double, map: string, match_id: string, time: int, victim_name: string, victim_placement: double, victim_position_x: double, victim_position_y: double]>

In [38]:
aggr_all.createOrReplaceTempView("aggr")
death_all.createOrReplaceTempView("death")  

In [11]:
#Split aggregate dateframe into three dfs based on party_size
from pyspark.sql.functions import col
single = aggr_all.filter(col("party_size") == 1)
double = aggr_all.filter(col("party_size") == 2)
quadruple = aggr_all.filter(col("party_size") == 4)

#### 4. Analyze Data

##### (1) SQL:

a. Which locations is "dangerous" for parachuting? 

In [18]:
location = spark.sql("""
             select victim_position_x as X, victim_position_y as Y,time as Time from death
             where time<120
             order by time asc
          """).cache()

In [24]:
location.show(10)

+--------+--------+----+
|       X|       Y|time|
+--------+--------+----+
|793642.1|20216.34|  23|
|326611.7|588506.9|  28|
|463199.8| 6090.61|  32|
|180589.4| 7373.03|  33|
|528533.3| 8006.34|  33|
|132976.3| 8953.52|  33|
|170921.9| 7790.76|  34|
|239040.5| 6733.71|  34|
|148092.8| 8481.53|  34|
|521694.0| 7921.38|  34|
+--------+--------+----+
only showing top 10 rows



In [20]:
location.count()

3560172

Plot:

In [22]:
import pandas as pd
locationdf=pd.DataFrame(location.collect())

In [23]:
locationdf.head()

Unnamed: 0,0,1,2
0,793642.1,20216.34,23
1,326611.7,588506.9,28
2,463199.8,6090.61,32
3,180589.4,7373.03,33
4,528533.3,8006.34,33


b. Player's placement vs Number of enemies killed

In [75]:
single.createOrReplaceTempView("single")
double.createOrReplaceTempView("double")  
quadruple.createOrReplaceTempView("quadruple")

In [95]:
#Party Size = 1
party1_kill = spark.sql("""
             select team_placement as rank , avg(player_kills) as avg_kills from single
             group by team_placement
             order by team_placement asc
          """).cache()

In [96]:
party1_kill.show(10)

+----+------------------+
|rank|         avg_kills|
+----+------------------+
|   1| 6.970846857480082|
|   2| 3.599979000144374|
|   3|3.0568580976829995|
|   4| 2.708515111695138|
|   5|  2.47160518182416|
|   6| 2.285916429187261|
|   7| 2.144181446561614|
|   8|  2.01534766485873|
|   9|1.9034656409849717|
|  10|1.8151479399756085|
+----+------------------+
only showing top 10 rows



In [97]:
#Party Size = 2
party2_kill = spark.sql("""
             select team_placement as rank , avg(player_kills) as avg_kills from double
             group by team_placement
             order by team_placement asc
          """).cache()

In [98]:
party2_kill.show(10)  ##Rank 0 ???

+----+------------------+
|rank|         avg_kills|
+----+------------------+
|   0|               1.5|
|   1| 4.416512205489827|
|   2|2.5763982214079224|
|   3|2.2311533877941216|
|   4|1.9527973618868921|
|   5|1.7622350599079941|
|   6|1.6203774044897594|
|   7|1.5054382262074872|
|   8|1.4006394631320518|
|   9|1.3206494950410355|
+----+------------------+
only showing top 10 rows



In [99]:
#Party Size = 4
party4_kill = spark.sql("""
             select team_placement as rank , avg(player_kills) as avg_kills from quadruple
             group by team_placement
             order by team_placement asc
          """).cache()

In [100]:
party4_kill.show(10)  ##Again, rank 0 ??

+----+------------------+
|rank|         avg_kills|
+----+------------------+
|   0|1.1176470588235294|
|   1| 2.937261017288994|
|   2|1.7871176501136514|
|   3|1.5751676654505462|
|   4|1.3730355960439127|
|   5|1.2366525816118643|
|   6|1.1366548365049696|
|   7| 1.051095615368824|
|   8|0.9775038751257716|
|   9|0.9118165067424356|
+----+------------------+
only showing top 10 rows



c. Kill Distance vs Kill By

In [63]:
distance_kill_by = spark.sql("""
             select killed_by as wheapons, avg(power((power((killer_position_x-victim_position_x),2)+power((killer_position_y-victim_position_y),2)),(1/2))) as distance from death
             group by wheapons
             order by distance desc
          """).cache()

In [64]:
distance_kill_by.show(10)

+-------------------+------------------+
|           wheapons|          distance|
+-------------------+------------------+
|              Punch|157538.95516226656|
|           Bluezone|151482.50792483127|
|            RedZone|105620.78502231884|
|                Pan|101837.47949163972|
|            Machete| 85485.09132695777|
|             Sickle| 77937.55447560512|
|            Crowbar| 77936.05738769755|
|              Drown| 62207.56573677812|
|         Hit by Car| 51321.28609688651|
|Motorbike (SideCar)| 44445.05348103158|
+-------------------+------------------+
only showing top 10 rows



d.Kill by wheapons

Wheapons count if player are in first place

In [51]:
wheapons_fir = spark.sql("""
             select killed_by as wheapons, count(killed_by) as count from death
             where victim_placement==2 and killer_placement==1
             group by killed_by
             order by count desc
          """).cache()

In [52]:
wheapons_fir.show(10)

+------------+------+
|    wheapons| count|
+------------+------+
|Down and Out|261364|
|        M416|230938|
|      SCAR-L|165638|
|       M16A4| 84685|
|         AKM| 70058|
|      Kar98k| 51019|
|     Grenade| 44625|
|     Mini 14| 42468|
|         SKS| 29374|
|    Bluezone| 25389|
+------------+------+
only showing top 10 rows



Wheapons count if player rank are between 2 to 10

In [53]:
wheapons_sec_ten = spark.sql("""
             select killed_by as wheapons, count(killed_by) as count from death
             where victim_placement>2 and victim_placement<11
             group by killed_by
             order by count desc
          """).cache()

In [54]:
wheapons_sec_ten.show(10)

+------------+-------+
|    wheapons|  count|
+------------+-------+
|Down and Out|3255373|
|        M416|2031640|
|      SCAR-L|1531037|
|    Bluezone|1148048|
|       M16A4|1071318|
|         AKM| 930885|
|      Kar98k| 615815|
|     Mini 14| 547421|
|         SKS| 415367|
|        UMP9| 410707|
+------------+-------+
only showing top 10 rows



##### (2) Machine Learning:

#### 5. 

In [46]:
spark.stop()