In [1]:
import os
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import \
    StructType, StructField, \
    TimestampType, StringType, IntegerType, IntegerType

table_schema = StructType(
    [
        StructField("timestamp", TimestampType(), False),
        StructField("map_name", StringType(), False),
        StructField("attacker_name", StringType(), False),
        StructField("attacker_x", IntegerType(), False),
        StructField("attacker_y", IntegerType(), False),
        StructField("victim_x", IntegerType(), False),
        StructField("victim_y", IntegerType(), False),
        StructField("game_stage", IntegerType(), False),
        StructField("weapon_name", StringType(), False),
        StructField("zone_x", IntegerType(), False),
        StructField("zone_y", IntegerType(), False),
        StructField("zone_radius", IntegerType(), False),
        StructField("scope", StringType(), True),
        StructField("muzzle", StringType(), True)
    ]
)

session = SparkSession.builder \
    .master("spark://master:7077") \
    .appName("Weapon Ranking") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

print("Connected")

Connected


In [82]:
from pyspark.sql import SQLContext

df = session.read.csv("hdfs://master:9000/data/Baltic_Main_data.txt", header=True, schema=table_schema, nullValue="None")

# drop non-existing scope and muzzles
df = df.filter(df.scope.isNotNull() & df.muzzle.isNotNull())

In [83]:
filtered_table = df.createOrReplaceTempView("Filtered_Table")

In [84]:
sqlContext = SQLContext(session)
import pyspark.sql.functions as F


In [85]:
best_weapons1 = []
best_scopes1 = []
best_muzzles1 = []
kill_count1 = []

best_weapons2 = []
best_scopes2 = []
best_muzzles2 = []
kill_count2 = []

best_weapons3 = []
best_scopes3 = []
best_muzzles3 = []
kill_count3 = []

In [86]:
for i in range(1, 10):
    print("On {}...".format(i))
    results = sqlContext.sql(
        "SELECT weapon_name, count(*) as count " + \
       " FROM Filtered_Table WHERE game_stage={} group by weapon_name".format(i)
    )
    row = results.orderBy(F.desc("count")).take(3)
    print(row)
    best_weapons1.append(row[0].__getitem__("weapon_name"))
    kill_count1.append(row[0].__getitem__("count"))
    best_weapons2.append(row[1].__getitem__("weapon_name"))
    kill_count2.append(row[1].__getitem__("count"))
    best_weapons3.append(row[2].__getitem__("weapon_name"))
    kill_count3.append(row[2].__getitem__("count"))

On 1...
[Row(weapon_name='WeapHK416_C', count=109089), Row(weapon_name='WeapAK47_C', count=85555), Row(weapon_name='WeapSCAR-L_C', count=70050)]
On 2...
[Row(weapon_name='WeapHK416_C', count=206166), Row(weapon_name='WeapFNFal_C', count=73838), Row(weapon_name='WeapAK47_C', count=66491)]
On 3...
[Row(weapon_name='WeapHK416_C', count=157559), Row(weapon_name='WeapFNFal_C', count=65940), Row(weapon_name='WeapAK47_C', count=40073)]
On 4...
[Row(weapon_name='WeapHK416_C', count=138519), Row(weapon_name='WeapFNFal_C', count=71067), Row(weapon_name='WeapSKS_C', count=35605)]
On 5...
[Row(weapon_name='WeapHK416_C', count=117681), Row(weapon_name='WeapFNFal_C', count=71522), Row(weapon_name='WeapSKS_C', count=33780)]
On 6...
[Row(weapon_name='WeapHK416_C', count=86735), Row(weapon_name='WeapFNFal_C', count=53029), Row(weapon_name='WeapSKS_C', count=23978)]
On 7...
[Row(weapon_name='WeapHK416_C', count=58970), Row(weapon_name='WeapFNFal_C', count=23252), Row(weapon_name='WeapAK47_C', count=1134

In [88]:
print(best_weapons1)
print(kill_count1)
print(best_weapons2)
print(kill_count2)
print(best_weapons3)
print(kill_count3)

['WeapHK416_C', 'WeapHK416_C', 'WeapHK416_C', 'WeapHK416_C', 'WeapHK416_C', 'WeapHK416_C', 'WeapHK416_C', 'WeapHK416_C', 'WeapHK416_C']
[109089, 206166, 157559, 138519, 117681, 86735, 58970, 25746, 840]
['WeapAK47_C', 'WeapFNFal_C', 'WeapFNFal_C', 'WeapFNFal_C', 'WeapFNFal_C', 'WeapFNFal_C', 'WeapFNFal_C', 'WeapAK47_C', 'WeapAK47_C']
[85555, 73838, 65940, 71067, 71522, 53029, 23252, 5018, 209]
['WeapSCAR-L_C', 'WeapAK47_C', 'WeapAK47_C', 'WeapSKS_C', 'WeapSKS_C', 'WeapSKS_C', 'WeapAK47_C', 'WeapSCAR-L_C', 'WeapSCAR-L_C']
[70050, 66491, 40073, 35605, 33780, 23978, 11345, 3079, 124]


In [89]:
# max_row = results.groupBy("weapon_name").max('count').collect()[0].asDict()
# best_weap = max_row["weapon_name"]
# count = max_row["max(count)"]
row = results.orderBy(F.desc("count")).take(1)[0]
print(row.__getitem__("weapon_name"))
print(row.__getitem__("count"))


# print(best_weap, count)


WeapHK416_C
840


In [69]:
results = sqlContext.sql(
"""
SELECT * FROM (
(SELECT game_stage, weapon_name, count(*) as ct from
Filtered_Table group by game_stage, weapon_name) A
RIGHT JOIN (
SELECT game_stage, max(ct) ct FROM (
SELECT game_stage, weapon_name, count(*) as ct from
Filtered_Table group by game_stage, weapon_name) group by game_stage) B on A.game_stage = B.game_stage AND A.ct = B.ct)
"""
)
results.show()

+----------+-----------+------+----------+------+
|game_stage|weapon_name|    ct|game_stage|    ct|
+----------+-----------+------+----------+------+
|         6|WeapHK416_C| 93113|         6| 93113|
|         7|WeapHK416_C| 62620|         7| 62620|
|         8|WeapHK416_C| 27165|         8| 27165|
|         5|WeapHK416_C|128891|         5|128891|
|         2|WeapHK416_C|265458|         2|265458|
|         9|WeapHK416_C|   912|         9|   912|
|         4|WeapHK416_C|155621|         4|155621|
|         3|WeapHK416_C|183911|         3|183911|
|         1|WeapHK416_C|293085|         1|293085|
+----------+-----------+------+----------+------+



In [90]:
BASE_SCOPE = "SELECT scope, count(*) as count FROM Filtered_Table WHERE game_stage={} and weapon_name='{}' group by scope"
for i, weapon_name in enumerate(best_weapons1):
    print("On {} with {}...".format(i + 1, weapon_name))
    completed = BASE_SCOPE.format(i + 1, weapon_name)
    
    results = sqlContext.sql(
       completed
    )
    row = results.orderBy(F.desc("count")).take(1)
    print(row)
    best_scopes1.append(row[0].__getitem__("scope"))
    
print(best_scopes1)

for i, weapon_name in enumerate(best_weapons2):
    print("On {} with {}...".format(i + 1, weapon_name))
    completed = BASE_SCOPE.format(i + 1, weapon_name)
    
    results = sqlContext.sql(
       completed
    )
    row = results.orderBy(F.desc("count")).take(1)
    print(row)
    best_scopes2.append(row[0].__getitem__("scope"))
    
print(best_scopes2)

for i, weapon_name in enumerate(best_weapons3):
    print("On {} with {}...".format(i + 1, weapon_name))
    completed = BASE_SCOPE.format(i + 1, weapon_name)
    
    results = sqlContext.sql(
       completed
    )
    row = results.orderBy(F.desc("count")).take(1)
    print(row)
    best_scopes3.append(row[0].__getitem__("scope"))
    
print(best_scopes3)

On 1 with WeapHK416_C...
[Row(scope='Item_Attach_Weapon_Upper_DotSight_01_C', count=66904)]
On 2 with WeapHK416_C...
[Row(scope='Item_Attach_Weapon_Upper_DotSight_01_C', count=159075)]
On 3 with WeapHK416_C...
[Row(scope='Item_Attach_Weapon_Upper_DotSight_01_C', count=126210)]
On 4 with WeapHK416_C...
[Row(scope='Item_Attach_Weapon_Upper_DotSight_01_C', count=110783)]
On 5 with WeapHK416_C...
[Row(scope='Item_Attach_Weapon_Upper_DotSight_01_C', count=94240)]
On 6 with WeapHK416_C...
[Row(scope='Item_Attach_Weapon_Upper_DotSight_01_C', count=69578)]
On 7 with WeapHK416_C...
[Row(scope='Item_Attach_Weapon_Upper_DotSight_01_C', count=47671)]
On 8 with WeapHK416_C...
[Row(scope='Item_Attach_Weapon_Upper_DotSight_01_C', count=21226)]
On 9 with WeapHK416_C...
[Row(scope='Item_Attach_Weapon_Upper_DotSight_01_C', count=726)]
['Item_Attach_Weapon_Upper_DotSight_01_C', 'Item_Attach_Weapon_Upper_DotSight_01_C', 'Item_Attach_Weapon_Upper_DotSight_01_C', 'Item_Attach_Weapon_Upper_DotSight_01_C', 'I

In [None]:
# BASE_MUZZLE = "SELECT muzzle, count(*) as count FROM Filtered_Table WHERE game_stage={} and weapon_name='{}' group by muzzle"
# for i, weapon_name in enumerate(best_weapons1):
#     print("On {} with {}...".format(i + 1, weapon_name))
#     completed = BASE_MUZZLE.format(i + 1, weapon_name)
    
#     results = sqlContext.sql(
#        completed
#     )
#     row = results.orderBy(F.desc("count")).take(1)
#     print(row)
#     best_muzzles1.append(row[0].__getitem__("muzzle"))
    
# print(best_muzzles1)

best_muzzles2 = []
for i, weapon_name in enumerate(best_weapons2):
    print("On {} with {}...".format(i + 1, weapon_name))
    completed = BASE_MUZZLE.format(i + 1, weapon_name)
    
    results = sqlContext.sql(
       completed
    )
    row = results.orderBy(F.desc("count")).take(1)
    print(row)
    best_muzzles2.append(row[0].__getitem__("muzzle"))
    
print(best_muzzles2)

for i, weapon_name in enumerate(best_weapons3):
    print("On {} with {}...".format(i + 1, weapon_name))
    completed = BASE_MUZZLE.format(i + 1, weapon_name)
    
    results = sqlContext.sql(
       completed
    )
    row = results.orderBy(F.desc("count")).take(1)
    print(row)
    best_muzzles3.append(row[0].__getitem__("muzzle"))
    
print(best_muzzles3)

On 1 with WeapAK47_C...
[Row(muzzle='Item_Attach_Weapon_Muzzle_Compensator_Large_C', count=41857)]
On 2 with WeapFNFal_C...
[Row(muzzle='Item_Attach_Weapon_Muzzle_Suppressor_Large_C', count=14329)]
On 3 with WeapFNFal_C...
[Row(muzzle='Item_Attach_Weapon_Muzzle_Suppressor_Large_C', count=13544)]
On 4 with WeapFNFal_C...
[Row(muzzle='Item_Attach_Weapon_Muzzle_Suppressor_Large_C', count=15325)]
On 5 with WeapFNFal_C...
[Row(muzzle='Item_Attach_Weapon_Muzzle_Suppressor_Large_C', count=15707)]
On 6 with WeapFNFal_C...
[Row(muzzle='Item_Attach_Weapon_Muzzle_Suppressor_SniperRifle_C', count=11603)]
On 7 with WeapFNFal_C...
[Row(muzzle='Item_Attach_Weapon_Muzzle_Compensator_Large_C', count=5137)]
On 8 with WeapAK47_C...
[Row(muzzle='Item_Attach_Weapon_Muzzle_Compensator_Large_C', count=3936)]
On 9 with WeapAK47_C...
[Row(muzzle='Item_Attach_Weapon_Muzzle_Compensator_Large_C', count=164)]
['Item_Attach_Weapon_Muzzle_Compensator_Large_C', 'Item_Attach_Weapon_Muzzle_Suppressor_Large_C', 'Item_At

In [22]:
from pyspark.sql import Window
from pyspark.sql.functions import rank
window = Window.partitionBy("weapon_name").orderBy("numberOfTimesOccurred")
results.withColumn("game_stage", rank().over(window)).show()

+----------+-------------+---------------------+
|game_stage|  weapon_name|numberOfTimesOccurred|
+----------+-------------+---------------------+
|         1|  WeapGroza_C|                   90|
|         2|  WeapGroza_C|                  301|
|         3|  WeapGroza_C|                 2157|
|         4|  WeapGroza_C|                 4486|
|         4|  WeapGroza_C|                 4486|
|         6|  WeapGroza_C|                 5493|
|         7|  WeapGroza_C|                 5798|
|         8|  WeapGroza_C|                 6261|
|         9|  WeapGroza_C|                 6486|
|         1|WeapSaiga12_C|                    3|
|         1|WeapSaiga12_C|                    3|
|         3|WeapSaiga12_C|                   16|
|         4|WeapSaiga12_C|                   23|
|         5|WeapSaiga12_C|                   45|
|         6|WeapSaiga12_C|                   57|
|         7|WeapSaiga12_C|                  178|
|         8|WeapSaiga12_C|                 3599|
|         1|    Weap

In [18]:
from pyspark.sql.functions import desc

results.groupby("game_stage").sort(desc("count")).limit(3).show()

AttributeError: 'GroupedData' object has no attribute 'sort'

In [16]:
results2 = sqlContext.sql(
    "SELECT game_stage, weapon_name, scope, max(numberOfTimesOccurred) as countt FROM" + \
    "(SELECT game_stage, weapon_name, scope, max(ct) as numberOfTimesOccurred FROM " + \
    "(SELECT game_stage, weapon_name, scope, count(*) as ct FROM Filtered_Table " + \
    "group by game_stage, weapon_name, scope) group by game_stage, weapon_name, scope) group by game_stage, weapon_name, scope"
)

In [18]:
results2.count()

1354