# Dota match prediction based on the lineup
The idea is to create a model able to predict the outcome of a dota match based only on the lineup of both teams.
It is clear that the presition can never be very high: there are too many variables at play, expecially assuming the games are mostly balanced in terms of picks.

### Imports
Let's start by importing all the needed modules

In [1]:
from json import loads
from pyspark.sql import SparkSession
from pyspark.sql.dataframe import StructType, StructField, DataFrame
from pyspark.sql.types import ArrayType, IntegerType, LongType, BooleanType
from pyspark.sql.functions import udf, col
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml.pipeline import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler, VectorSizeHint

### Create the Spark Session
Creates a Spark sessio for the app named _"dotingestion"_

In [2]:
sc = SparkSession.builder.appName("dotingestion").getOrCreate()
sc

## Pre-Processing 🔧

### Define a schema
This is the schema that contains all the data we need:
```yaml
dire_lineup: [2, 14, 61, 96, 112]    # list of the hero_id the dire team picked
radiant_lineup: [4, 12, 42, 45, 65]  # list of the hero_id the radiant team picked
radiant_win: true                    # whether the radiant team won
match_id: 10                         # sequential id of the match, to make sure each match is accounted for once
```

In [3]:
schema = StructType([StructField("dire_lineup", ArrayType(IntegerType(), False), False),
                    StructField("radiant_lineup", ArrayType(IntegerType(), False), False),
                    StructField("radiant_win", BooleanType(), False),
                    StructField("match_id", LongType(), False)])
schema

StructType(List(StructField(dire_lineup,ArrayType(IntegerType,false),false),StructField(radiant_lineup,ArrayType(IntegerType,false),false),StructField(radiant_win,BooleanType,false),StructField(match_id,LongType,false)))

### Read the data
Reads the matches data already collected from the _"./data.json"_ file.  
Drops immedialy any null or repeated row.

In [4]:
path = "data.json"
df = sc.read.json(path, schema=schema).na.drop("all").distinct()
df.printSchema()

root
 |-- dire_lineup: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- radiant_lineup: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- radiant_win: boolean (nullable = true)
 |-- match_id: long (nullable = true)



In [5]:
df.count()

73922

In [6]:
df.show(5)

+--------------------+--------------------+-----------+----------+
|         dire_lineup|      radiant_lineup|radiant_win|  match_id|
+--------------------+--------------------+-----------+----------+
| [75, 88, 7, 44, 25]|[86, 41, 34, 43, ...|      false|5026549394|
|[96, 94, 25, 103,...|[71, 67, 23, 84, 11]|      false|5026549581|
| [2, 34, 68, 32, 76]| [53, 26, 90, 8, 75]|      false|5026549637|
|[100, 28, 26, 12,...| [2, 52, 86, 94, 30]|      false|5026549793|
|[129, 93, 75, 5, 35]| [73, 16, 44, 50, 2]|      false|5026549835|
+--------------------+--------------------+-----------+----------+
only showing top 5 rows



### Process data 1
The hero_id values go from 1 to 135, but not all indeces are used.  
For this reason, first we compact them in a dictionary based on the data in the _hero.json_ file, to map each possible value to an index going from **0** to **n**, where n is the number of possible hero_id values.

In [7]:
with open("heroes.json", 'r', encoding="utf-8") as f:
    heroes_dict = {hero['id']: i for i, hero in enumerate(loads(f.read()))}
heroes_dict

{1: 0,
 2: 1,
 3: 2,
 4: 3,
 5: 4,
 6: 5,
 7: 6,
 8: 7,
 9: 8,
 11: 9,
 10: 10,
 12: 11,
 13: 12,
 14: 13,
 15: 14,
 16: 15,
 17: 16,
 18: 17,
 19: 18,
 20: 19,
 21: 20,
 22: 21,
 23: 22,
 25: 23,
 31: 24,
 26: 25,
 27: 26,
 28: 27,
 29: 28,
 30: 29,
 32: 30,
 33: 31,
 34: 32,
 35: 33,
 36: 34,
 37: 35,
 38: 36,
 39: 37,
 40: 38,
 41: 39,
 42: 40,
 43: 41,
 44: 42,
 45: 43,
 46: 44,
 47: 45,
 48: 46,
 49: 47,
 50: 48,
 51: 49,
 52: 50,
 53: 51,
 54: 52,
 55: 53,
 56: 54,
 57: 55,
 58: 56,
 59: 57,
 60: 58,
 61: 59,
 62: 60,
 63: 61,
 64: 62,
 65: 63,
 66: 64,
 67: 65,
 69: 66,
 68: 67,
 70: 68,
 71: 69,
 72: 70,
 73: 71,
 74: 72,
 75: 73,
 76: 74,
 77: 75,
 78: 76,
 79: 77,
 80: 78,
 81: 79,
 82: 80,
 83: 81,
 84: 82,
 85: 83,
 86: 84,
 87: 85,
 88: 86,
 89: 87,
 90: 88,
 91: 89,
 92: 90,
 93: 91,
 94: 92,
 95: 93,
 96: 94,
 97: 95,
 98: 96,
 99: 97,
 100: 98,
 101: 99,
 102: 100,
 103: 101,
 104: 102,
 106: 103,
 107: 104,
 109: 105,
 110: 106,
 111: 107,
 105: 108,
 112: 109,
 113: 1

### Process data 2
Tranform the _"dire_lineup"_ and _"radiant_lineup"_ columns, replacing the list of hero_id with an array filled with 0, except for the indeces corrisponding at the hero_id present in the original array mapped using the *heroes_dict* variable, which will have a value of 1.  
Additionally, the type of those two columns changes from **ArrayType** to **VectorUDT**. This is required for the **VectorAssembler** to work correctly.

#### Example:
**Assuming there are a total of 10 heroes and the indeces go from 1 to 10**

| dire_lineup | radiant_lineup | radiant_win | match_id |
| - | - | - | - |
| [2, 4, 6, 7, 8] | [1, 3, 5, 9, 10] | true | 4976549005 | 

Becomes

| dire_lineup | radiant_lineup | radiant_win | match_id | dire_lineup_vec | radiant_lineup_vec |
| - | - | - | - | - | - |
| [2, 4, 6, 7, 8] | [1, 3, 5, 9, 10] | true | 4976549005 | [0, 1, 0, 1, 0, 1, 1, 1, 0, 0] | [1, 0, 1, 0, 1, 0, 0, 0, 1, 1] |

In [8]:
def convert_heroes_to_lineup(df: DataFrame) -> DataFrame:

    def onehot(heroes: ArrayType):
        lineup = tuple(heroes_dict[hero] for hero in heroes)
        return Vectors.dense([1 if hero_slot in lineup else 0 for hero_slot in range(len(heroes_dict))])

    heros_to_lineup_udf = udf(onehot, VectorUDT())
    return df.withColumn("dire_lineup_vec", heros_to_lineup_udf(df.dire_lineup))\
             .withColumn("radiant_lineup_vec", heros_to_lineup_udf(df.radiant_lineup))

df = convert_heroes_to_lineup(df)
df.show(5)

+--------------------+--------------------+-----------+----------+--------------------+--------------------+
|         dire_lineup|      radiant_lineup|radiant_win|  match_id|     dire_lineup_vec|  radiant_lineup_vec|
+--------------------+--------------------+-----------+----------+--------------------+--------------------+
| [75, 88, 7, 44, 25]|[86, 41, 34, 43, ...|      false|5026549394|[0.0,0.0,0.0,0.0,...|[0.0,0.0,0.0,0.0,...|
|[96, 94, 25, 103,...|[71, 67, 23, 84, 11]|      false|5026549581|[0.0,0.0,0.0,0.0,...|[0.0,0.0,0.0,0.0,...|
| [2, 34, 68, 32, 76]| [53, 26, 90, 8, 75]|      false|5026549637|[0.0,1.0,0.0,0.0,...|[0.0,0.0,0.0,0.0,...|
|[100, 28, 26, 12,...| [2, 52, 86, 94, 30]|      false|5026549793|[0.0,0.0,0.0,0.0,...|[0.0,1.0,0.0,0.0,...|
|[129, 93, 75, 5, 35]| [73, 16, 44, 50, 2]|      false|5026549835|[0.0,0.0,0.0,0.0,...|[0.0,1.0,0.0,0.0,...|
+--------------------+--------------------+-----------+----------+--------------------+--------------------+
only showing top 5 

### Process data 3
Cast the _"radiant_win"_ column type from **BooleanType** to **IntegerType**.

#### Example:
**Assuming there are a total of 10 heroes and the indeces go from 1 to 10**

| dire_lineup_vec | radiant_lineup_vec | dire_lineup | radiant_lineup | radiant_win | match_id |
| - | - | - | - | - | - |
| [0, 1, 0, 1, 0, 1, 1, 1, 0, 0] | [1, 0, 1, 0, 1, 0, 0, 0, 1, 1] | [2, 4, 6, 7, 8] | [1, 3, 5, 9, 10] | true | 4976549005 |

Becomes

| dire_lineup | radiant_lineup | radiant_win | match_id | dire_lineup_vec | radiant_lineup_vec | radiant_win_int |
| - | - | - | - | - | - | - |
| [2, 4, 6, 7, 8] | [1, 3, 5, 9, 10] | true | 4976549005 | [0, 1, 0, 1, 0, 1, 1, 1, 0, 0] | [1, 0, 1, 0, 1, 0, 0, 0, 1, 1] | 1 |

In [9]:
def convert_types(df: DataFrame) -> DataFrame:
    return df.withColumn("radiant_win_int", df.radiant_win.cast(IntegerType()))

df = convert_types(df)
df.show(5)

+--------------------+--------------------+-----------+----------+--------------------+--------------------+---------------+
|         dire_lineup|      radiant_lineup|radiant_win|  match_id|     dire_lineup_vec|  radiant_lineup_vec|radiant_win_int|
+--------------------+--------------------+-----------+----------+--------------------+--------------------+---------------+
| [75, 88, 7, 44, 25]|[86, 41, 34, 43, ...|      false|5026549394|[0.0,0.0,0.0,0.0,...|[0.0,0.0,0.0,0.0,...|              0|
|[96, 94, 25, 103,...|[71, 67, 23, 84, 11]|      false|5026549581|[0.0,0.0,0.0,0.0,...|[0.0,0.0,0.0,0.0,...|              0|
| [2, 34, 68, 32, 76]| [53, 26, 90, 8, 75]|      false|5026549637|[0.0,1.0,0.0,0.0,...|[0.0,0.0,0.0,0.0,...|              0|
|[100, 28, 26, 12,...| [2, 52, 86, 94, 30]|      false|5026549793|[0.0,0.0,0.0,0.0,...|[0.0,1.0,0.0,0.0,...|              0|
|[129, 93, 75, 5, 35]| [73, 16, 44, 50, 2]|      false|5026549835|[0.0,0.0,0.0,0.0,...|[0.0,1.0,0.0,0.0,...|              0|


## ML 🤖

### ML pipeline
The machine learning pipeline has 4 stages, but the first 2 are the same:
```yaml
VectorSizeHint x2: # needed for the Vector assembler, since when applyng this pipeline during a structured stream it doesn't know in advance size of the vectors
    inputCol: dire_lineup_vec [or] radiant_lineup_vec
    size: len(heroes_dict)
    handleInvalid: skip
VectorAssembler: # assembles the "radiant_lineup_vec" and "dire_lineup_vec" vectors in a single "features" vector
    inputCols: ['radiant_lineup_vec', 'dire_lineup_vec']
    outputCol: ['features']
LogisticRegression: # uses the "features" vector to predict the "radiant_win_int" column value
    featuresCol: ['features']
    labelCol: ['radiant_win_int']
```

In [11]:
size_hint_dire = VectorSizeHint(inputCol="dire_lineup_vec", size=len(heroes_dict), handleInvalid="skip")
size_hint_radiant = VectorSizeHint(inputCol="radiant_lineup_vec", size=len(heroes_dict), handleInvalid="skip")
vec_assembler = VectorAssembler(inputCols=['dire_lineup_vec', 'radiant_lineup_vec'], outputCol="features")
regression = LogisticRegression(featuresCol="features", labelCol="radiant_win_int")
pipeline = Pipeline(stages=[size_hint_dire, size_hint_radiant, vec_assembler, regression])

### ML model
Split the original dataset in a training and test set.  
Then use the training set to fit the model, and see the accuracy reached.

In [12]:
traint_df, test_df = df.randomSplit([0.8, 0.2])
model = pipeline.fit(df)
model.stages[-1].summary.accuracy

0.5724006385108628

## ML results
Check the results by using the newly created model with the test set.

In [13]:
result_df = model.transform(test_df)
result_df.show(5)

+--------------------+--------------------+-----------+----------+--------------------+--------------------+---------------+--------------------+--------------------+--------------------+----------+
|         dire_lineup|      radiant_lineup|radiant_win|  match_id|     dire_lineup_vec|  radiant_lineup_vec|radiant_win_int|            features|       rawPrediction|         probability|prediction|
+--------------------+--------------------+-----------+----------+--------------------+--------------------+---------------+--------------------+--------------------+--------------------+----------+
| [6, 26, 98, 91, 93]|[74, 68, 99, 50, 34]|       true|5026562916|[0.0,0.0,0.0,0.0,...|[0.0,0.0,0.0,0.0,...|              1|(242,[5,25,89,91,...|[-0.7954816237100...|[0.31099287208052...|       1.0|
|  [8, 76, 74, 11, 9]|[71, 35, 46, 45, 18]|      false|5026607908|[0.0,0.0,0.0,0.0,...|[0.0,0.0,0.0,0.0,...|              0|(242,[7,8,9,72,74...|[0.33582728285365...|[0.58317656531681...|       0.0|
|[10,

In [14]:
test_accuracy = result_df.filter(col("radiant_win").eqNullSafe(col("prediction"))).count()/result_df.count()
test_accuracy

0.5740614334470989

In [15]:
model.save("model")