# Run this cell just one time

In [None]:
import os
import requests
from bs4 import BeautifulSoup
import urllib.parse
import re


!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!pip install pyspark==3.3.2
!pip install beautifulsoup4 requests

os.environ["PYSPARK_PYTHON"] = "python3"
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.functions import col, regexp_replace, format_number
from pyspark.sql.types import FloatType

# URL for the Dota 2 heroes list on Gamepedia
url = "https://dota2.gamepedia.com/Heroes"

# Fetch the HTML content from the URL
response = requests.get(url)
html_content = response.content

# Parse the HTML using BeautifulSoup
soup = BeautifulSoup(html_content, "html.parser")

# Find all hero names within <span> elements with the specified style
hero_spans = soup.find_all("span", style="font-size:17px; color:white; text-shadow:-1px 0 0.2em black, 0 1px 0.2em black, 1px 0 0.2em black, 0 -1px 0.2em black;")

# Extract hero names and store them in a list
hero_names = [span.text.strip() for span in hero_spans]

# Add prefix and replace spaces with "%20"
all_hero_names = [
    "https://dota2protracker.com/hero/" + urllib.parse.quote(name) + "#" for name in hero_names
]

# Create a Spark session
spark = SparkSession.builder.appName("HeroData").getOrCreate()

spark.conf.set("spark.sql.repl.eagerEval.enabled", True)
spark.conf.set("spark.sql.repl.eagerEval.maxNumRows", 10000)

Collecting pyspark==3.3.2
  Downloading pyspark-3.3.2.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m5.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5 (from pyspark==3.3.2)
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 kB[0m [31m31.5 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.2-py2.py3-none-any.whl size=281824003 sha256=0b787455719e7dc115a1523cdebb0336980b8fe39b0d44e70fd991c939503edb
  Stored in directory: /root/.cache/pip/wheels/89/d6/52/1178e354ba2207673484f0ccd7b2ded0ab6671ae5c1fc5b49a
Successfully built pyspark
Installing collected packages: py4j, pyspark
  Attempting uninstall: py4j
    Found existing installation: py

## Pre loading win rate data from all heroes. This cell might take an average of 5 minutes to run. Execute it just one time.

In [None]:
all_heroes_data = []

for index, url in enumerate(all_hero_names):
    # Fetch the HTML content from the URL
    response = requests.get(url)
    html_content = response.content

    # Parse the HTML using BeautifulSoup
    soup = BeautifulSoup(html_content, "html.parser")

    heroes_data = []

    # Find all <tr> elements
    hero_rows = soup.find_all("tr", style="")

    for row in hero_rows:
        hero_data = {}

        # Extract hero name from <td class="td-hero-pic">
        hero_pic = row.find("td", class_="td-hero-pic")
        if hero_pic:
            hero_name = hero_pic.a.get("title")
            hero_data["hero_name"] = hero_name

        # Extract win rates from <td class="td-record">
        win_rate_elems = row.find_all("td", class_="td-record")
        win_rates = []
        for elem in win_rate_elems:
            win_rate_span = elem.find("span", class_=["green", "red"])
            if win_rate_span:
                win_rates.append(win_rate_span.text)

        if len(win_rates) >= 2:
            #hero_data["win_rate_1"] = win_rates[0]
            hero_data["win_rate_2"] = win_rates[1]

        heroes_data.append(hero_data)

    # Removing empty items from the list
    heroes_data = [item for item in heroes_data if item]

    # Append the heroes_data for this URL to the all_heroes_data list
    all_heroes_data.append(heroes_data)


In [None]:
def best_heroes_to_draft(input_heroes, analysis_mode):

  # List of hero URLs
  hero_list = []
  hero_list_index = []

  # Iterate through the input hero names
  for index, input_hero in enumerate(input_heroes):
      # Convert the input hero name to a standardized format for comparison
      standardized_input_hero = input_hero.lower().replace(" ", "%20")

      # Find matches in all_hero_names and store the index of the match
      matches = [(i, link) for i, link in enumerate(all_hero_names) if standardized_input_hero in link.lower()]

      # Append matches to the hero_list list and store the indices in hero_list_index
      for match_index, match_link in matches:
          hero_list.append(match_link)
          hero_list_index.append(match_index)

  # Check if the lengths of input_heroes and all_hero_names are equal
  if len(input_heroes) != len(hero_list):
    print(hero_list)
    assert len(input_heroes) == len(hero_list), "Lengths of input_heroes and all_hero_names are not equal."

  display_cleaned_hero_list = [re.sub(r'.*/', '', hero).replace('%20', '_').replace('%27', '').replace('#', '').upper() for hero in hero_list]

  #display hero names that will be written as columns on the final table
  # display_cleaned_hero_list

  for i, index in enumerate(hero_list_index):
    heroes_data = all_heroes_data[index]
    print(heroes_data)
    df = spark.createDataFrame(heroes_data)
    #df = df.withColumn("win_rate_1", (regexp_replace(col("win_rate_1"), "%", "").cast(FloatType()) / 100))
    df = df.withColumn("win_rate_2", (regexp_replace(col("win_rate_2"), "%", "").cast(FloatType()) / 100))
    #df = df.withColumn("win_rate_1", format_number(col("win_rate_1"), 3))
    df = df.withColumn("win_rate_2", format_number(col("win_rate_2"), 3))

    # Create a temporary view for each hero data array
    df.createOrReplaceTempView(f"hero_{i}")

  # Generate the SQL query for creating the selected_heroes view
  union_queries = "\n    UNION\n    ".join([
      f"(SELECT hero_name FROM hero_{i})" # Add WHERE win_rate_2 < .5 ORDER BY win_rate_2 ASC if you want the old version
      for i in range(len(display_cleaned_hero_list))
  ])

  # Generate the main SQL query
  select_queries = ",\n    ".join([
      f"b{i}.win_rate_2 AS {hero_name}"
      for i, hero_name in enumerate(display_cleaned_hero_list)
  ])

  aggregate_queries = " + ".join([
      f"b{i}.win_rate_2"
      for i in range(len(display_cleaned_hero_list))
  ])

  left_join_queries = "\n  ".join([
      f"LEFT JOIN hero_{i} b{i} ON a.hero_name = b{i}.hero_name"
      for i in range(len(display_cleaned_hero_list))
  ])

  where_conditions = "\n    AND ".join([
      f"b{i}.win_rate_2 IS NOT NULL"
      for i in range(len(display_cleaned_hero_list))
  ])

  # Initialize a dictionary to map analysis modes to their respective values
  analysis_modes = {
      'pos1': ('', '', '--', '--'),
      'pos2': ('', '--', '', '--'),
      'draft': ('--', '--', '--', ''),
  }

  # Check if the analysis_mode is valid
  if analysis_mode in analysis_modes:
      score_restriction, pos1_analysis, pos2_analysis, draft_analysis = analysis_modes[analysis_mode]
  else:
    raise ValueError("Invalid analysis_mode. Choose from 'pos1', 'pos2', or 'draft'.")

  sql_query = f'''
    SELECT *
    FROM (
      SELECT
        UPPER(a.hero_name)                                                 AS POTENTIAL_HERO,
        ROUND(({aggregate_queries}) / {len(display_cleaned_hero_list)}, 3) AS LOSS_PROBABILITY_SCORE,
        {select_queries}
      FROM (
        {union_queries}
      ) a
      {left_join_queries}
      WHERE {where_conditions}
      ORDER BY LOSS_PROBABILITY_SCORE ASC
    )
    WHERE 1=1
      {score_restriction}AND LOSS_PROBABILITY_SCORE < 0.5
      {pos1_analysis}AND POTENTIAL_HERO IN ("RAZOR", "BRISTLEBACK", "LONE DRUID", "NAGA SIREN", "GYROCOPTER", "SLARK", "CHAOS KNIGHT", "MONKEY KING", "URSA", "RIKI", "WEAVER", "ANTI-MAGE", "JUGGERNAUT", "MAGNUS", "LUNA", "SNIPER", "ARC WARDEN", "DROW RANGER", "TROLL WARLORD", "MORPHLING", "PHANTOM ASSASSIN", "PHANTOM LANCER", "TEMPLAR ASSASSIN", "SVEN", "TERRORBLADE", "WRAITH KING", "SPECTRE", "MUERTA", "MEDUSA", "LIFESTEALER", "SHADOW FIEND")
      {pos2_analysis}AND POTENTIAL_HERO IN ("MAGNUS", "PRIMAL BEAST", "MUERTA", "PANGOLIER", "INVOKER", "EMBER SPIRIT", "QUEEN OF PAIN", "PUCK", "TEMPLAR ASSASSIN", "LINA", "MONKEY KING", "ZEUS", "VOID SPIRIT", "OUTWORLD DESTROYER", "TINKER", "SHADOW FIEND", "NECROPHOS", "TIMBERSAW", "WINDRANGER", "HUSKAR", "LESHRAC", "BATRIDER", "DEATH PROPHET", "KUNKKA", "SNIPER", "BROODMOTHER", "ARC WARDEN", "STORM SPIRIT")
      {draft_analysis}AND POTENTIAL_HERO IN ("SILENCER", "ALCHEMIST", "PRIMAL BEAST", "GRIMSTROKE", "NECROPHOS")
  '''

  # Execute the SQL query
  spark.sql(sql_query).show()

  if(analysis_mode == "draft"):
    score_loss = spark.sql(f'''SELECT AVG(LOSS_PROBABILITY_SCORE) FROM ({sql_query})''').collect()[0][0]
    # input_heroes is being considered as Radiant
    if (score_loss > .5):
      print(f"RADIANT team will WIN: {score_loss}")
    else:
      print(f"DIRE team will WIN: {score_loss}")

In [None]:
# Input hero names
input_heroes = [
                "LUNA",
                "SPIRIT B",
                "CENTAUR",
                "SHADOW D",
                "PHOENIX"
                ]

# Available Analysys mode options: "pos1", "pos2", "draft"
best_heroes_to_draft(input_heroes, analysis_mode = 'draft')

[]


ValueError: ignored