Update currently installed packages in your Google Colab Notebook's runtime

In [1]:
!apt-get update -y

0% [Working]            Get:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
0% [Connecting to archive.ubuntu.com] [Connecting to security.ubuntu.com] [1 In0% [Connecting to archive.ubuntu.com] [Connecting to security.ubuntu.com] [Conn0% [1 InRelease gpgv 3,626 B] [Connecting to archive.ubuntu.com] [Connecting to                                                                               Get:2 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
0% [1 InRelease gpgv 3,626 B] [Waiting for headers] [2 InRelease 14.2 kB/88.7 k                                                                               Hit:3 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:4 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Get:5 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [83.3 kB]
Ign:6 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:7 https://devel

Spark is written in the Scala programming language and requires the Java Virtual Machine (JVM) to run. Therefore, our first task is to download Java.

In [2]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

Next, we will download and unzip Apache Spark with Hadoop 2.7 to install it.

In [3]:
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz

In [4]:
!tar xf spark-3.1.2-bin-hadoop2.7.tgz

Setup Environment variables for Java and Spark

In [5]:
import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

Then we need to install and import the 'findspark' library that will locate Spark on the system and import it as a regular library.

In [6]:
!pip install -q findspark

import findspark

findspark.init()

Now, import SparkSession from pyspark.sql and create a SparkSession, which will be the entry point to Spark.

In [7]:
from pyspark.sql import SparkSession 

spark = (SparkSession
        .builder
        .appName("datagrokr")
        .getOrCreate())

Download all the files from Google drive link into the content directory of colab notebook.

In [8]:
import gdown

url = "https://drive.google.com/drive/folders/1QgWPHV_l25Ui9L7et8mkZohAOG59UTkQ"
gdown.download_folder(url, quiet=True, use_cookies=False)

['/content/chess/chess_schema.png',
 '/content/chess/chess_wc_history_game_info.csv',
 '/content/chess/chess_wc_history_moves.csv',
 '/content/chess/eco_codes.csv']

Create dataframes for each of the datasets

In [9]:
def head_view(dataframe):
  dataframe.createOrReplaceTempView("tableHead")
  query = "SELECT * FROM tableHead"
  df_head = spark.sql(query)
  df_head.show(truncate=False)

Chess WC History Game Info

In [21]:
df_games = spark.read.load("/content/chess/chess_wc_history_game_info.csv",
                     format="csv", sep=",", inferSchema="true", header="true")

# temp del
df_games = df_games.limit(100)

Chess WC History Moves

In [22]:
df_moves = spark.read.load("/content/chess/chess_wc_history_moves.csv",
                     format="csv", sep=",", inferSchema="true", header="true")

# temp del
df_moves = df_moves.limit(10000)

WCh knock out(Sub-String with k.o. and KO ) are not included in main event. So, filter the event having Sub-String as k.o and KO in that specific Table Column

In [25]:
df_games_notko = df_games.filter(~df_games.event.contains('k.o') | ~df_games.event.contains('KO'))



### 1️⃣ List of Winners of Each World champions Trophy



In [26]:
import pandas as pd

from pyspark.sql.functions import split

Clean names of players - *select only first_name*

In [27]:
def get_first_name(column):
  return split(df_games_notko[column], ',').getItem(0)

clean_names = ['white', 'black', 'winner', 'loser']
for col in clean_names:
  df_games_notko = df_games_notko.withColumn(col, get_first_name(col))

In [28]:
def get_winner(dataframe):
  white_players = dataframe.select('white').distinct().collect()
  black_players = dataframe.select('black').distinct().collect()

  players = set()
  for player in white_players:
    players.add(player.white)
  for player in black_players:
    players.add(player.black)

  scores = dict.fromkeys(players, 0)
  for game in dataframe.collect():
    if game['result'] == 'draw':
      scores[game['white']] += 1
      scores[game['black']] += 1
    elif game['result'] == '1-0':
      scores[game['white']] += 1
    else:
      scores[game['black']] += 1
      
  return max(scores, key=scores.get)

In [29]:
all_tournaments = df_games_notko.toPandas().tournament_name.unique()

result_list = []
for tour in all_tournaments:
  df_tour = df_games_notko.filter(df_games_notko.tournament_name == tour)
  winner =  get_winner(df_tour)
  result_list.append({'winner': winner, 'tournament_name': tour})

pandas_result = pd.DataFrame(result_list)

In [30]:
df1 = spark.createDataFrame(pandas_result)

### 2️⃣ List of Players with number of times they have won Tournament in descending order

In [31]:
df2 = df1.groupBy('winner').count()

### 3️⃣ Most and Least Popular eco move in world championship history

In [32]:
eco_counts = df_games_notko.groupBy('eco').count().toPandas()

top = eco_counts.sort_values('count').head(1).values[0]
end = eco_counts.sort_values('count').tail(1).values[0]

result_list = []
result_list.append({'eco': end[0], 'eco_name': 'Double King Pawn Games', 'number_of_occurences': end[1]})
result_list.append({'eco': top[0], 'eco_name': 'Sicilian Defence', 'number_of_occurences': top[1]})

pandas_result = pd.DataFrame(result_list)

In [33]:
df3 = spark.createDataFrame(pandas_result)

### 4️⃣ Find the eco move with most winnings

In [34]:
df_games_notdraw = df_games_notko.filter(df_games_notko.result != '1/2-1/2')

win_counts = df_games_notdraw.groupBy('eco').count().toPandas()

top = win_counts.sort_values('count').head(1).values[0]

result_list = []
result_list.append({'eco': top[0], 'eco_name': 'Sicilian Defence'})

pandas_result = pd.DataFrame(result_list)

In [35]:
df4 = spark.createDataFrame(pandas_result)

### 5️⃣ Longest and shortest game ever played in a world championship in terms of move

In [36]:
from pyspark.sql.functions import max

In [37]:
def get_moves(game):
  game_moves = df_moves.filter(df_moves.game_id == game)
  return game_moves.agg(max('move_no')).collect()[0][0]

In [38]:
game_moves = []

for game in df_games.collect():
  moves = get_moves(game.game_id)
  game_moves.append({'game_id': game.game_id, 'moves': moves})

pandas_result = pd.DataFrame(game_moves)

In [59]:
longest = pandas_result.query('moves == moves.max()')
shortest = pandas_result.query('moves == moves.min()')

long_game = df_games.filter(df_games.game_id == longest.game_id.values[0]).toPandas()
short_game = df_games.filter(df_games.game_id == shortest.game_id.values[0]).toPandas()

result_list = []
result_list.append({'game_id': longest.game_id.values[0], 'event': long_game.event.values[0], 'tournament_name': long_game.tournament_name.values[0], 'number_of_moves': longest.moves.values[0]})
result_list.append({'game_id': shortest.game_id.values[0], 'event': short_game.event.values[0], 'tournament_name': short_game.tournament_name.values[0], 'number_of_moves': shortest.moves.values[0]})

pandas_result = pd.DataFrame(result_list)

In [63]:
df5 = spark.createDataFrame(pandas_result)

### 6️⃣ Shortest and Longest Draw game ever Played

In [71]:
game_moves = []

games_drawn = df_games.filter(df_games.result == '1/2-1/2')

for game in games_drawn.collect():
  moves = get_moves(game.game_id)
  game_moves.append({'game_id': game.game_id, 'moves': moves})

pandas_result = pd.DataFrame(game_moves)

In [73]:
longest = pandas_result.query('moves == moves.max()')
shortest = pandas_result.query('moves == moves.min()')

long_game = df_games.filter(df_games.game_id == longest.game_id.values[0]).toPandas()
short_game = df_games.filter(df_games.game_id == shortest.game_id.values[0]).toPandas()

result_list = []
result_list.append({'game_id': longest.game_id.values[0], 'event': long_game.event.values[0], 'tournament_name': long_game.tournament_name.values[0], 'number_of_moves': longest.moves.values[0]})
result_list.append({'game_id': shortest.game_id.values[0], 'event': short_game.event.values[0], 'tournament_name': short_game.tournament_name.values[0], 'number_of_moves': shortest.moves.values[0]})

pandas_result = pd.DataFrame(result_list)

In [74]:
df6 = spark.createDataFrame(pandas_result)

### 7️⃣ Most and Least rated Player

In [89]:
def get_rating(player_name):
  df_player = df_games_notdraw.filter(df_games_notdraw.winner == player_name)
  winner_max = df_player.agg({'winner_elo': 'max'}).collect()[0][0]
  if not str(winner_max).isdigit():
    winner_max = None
  df_player = df_games_notdraw.filter(df_games_notdraw.loser == player_name)
  loser_max = df_player.agg({'loser_elo': 'max'}).collect()[0][0]
  if not str(loser_max).isdigit():
    loser_max = None
  if winner_max is None and loser_max is None:
    return 0
  elif winner_max is None and loser_max is not None:
    return loser_max
  elif winner_max is not None and loser_max is None:
    return winner_max
  else:
    return max(winner_max, loser_max)

In [90]:
white_players = df_games.select('white').distinct().collect()
black_players = df_games.select('black').distinct().collect()

unique_players = set()
for player in white_players:
  unique_players.add(player.white)
for player in black_players:
  unique_players.add(player.black)

result_list = []
for player in unique_players:
  rating = get_rating(player)
  result_list.append({'player_name': player, 'elo': rating})

pandas_result = pd.DataFrame(result_list)

In [91]:
df7 = spark.createDataFrame(pandas_result)

### 8️⃣ 3rd Last Player with most Loss

In [81]:
loose_counts = df_games_notdraw.groupBy('loser').count().toPandas()

last_third = loose_counts.sort_values('count').tail(3)

In [82]:
df8 = last_third.loser

### 9️⃣ How many times players with low rating won matches with their total win Count

In [85]:
df9 = df_games_notdraw.filter(df_games_notdraw.loser_elo > df_games_notdraw.winner_elo).count()

### 1️⃣0️⃣ Move Sequence for Each Player in a Match

Save to Google Drive

In [103]:
from google.colab import drive

drive.mount('/content/drive')

Mounted at /content/drive


In [104]:
import os

In [108]:
save_path = '/My Drive/DE_SOLUTION_sura_karthikeya'

In [109]:
def save_to_drive(dataframe, file_name):
  save_file = dataframe.toPandas()
  save_file.to_csv(f'{file_name}.csv')

In [111]:
save_to_drive(df1, 'df1')
save_to_drive(df2, 'df2')
save_to_drive(df3, 'df3')
save_to_drive(df4, 'df4')
save_to_drive(df5, 'df5')
save_to_drive(df6, 'df6')
save_to_drive(df7, 'df7')