# Section - 1
#### Setting up PySpark in Colab and loading some data sets.

In [1]:
!apt-get update -y

0% [Working]            Get:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
0% [Waiting for headers] [Connecting to security.ubuntu.com (185.125.190.39)] [0% [Waiting for headers] [Connecting to security.ubuntu.com (185.125.190.39)] [0% [1 InRelease gpgv 3,626 B] [Waiting for headers] [Connecting to security.ubu                                                                               Hit:2 http://archive.ubuntu.com/ubuntu bionic InRelease
0% [1 InRelease gpgv 3,626 B] [Waiting for headers] [Connecting to security.ubu                                                                               Get:3 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
0% [1 InRelease gpgv 3,626 B] [3 InRelease 14.2 kB/88.7 kB 16%] [Waiting for he0% [1 InRelease gpgv 3,626 B] [Waiting for headers] [Waiting for headers] [Conn                                                                               Get:4 http://archive.ubuntu.com/ub

In [2]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [3]:
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz

In [4]:
!tar xf spark-3.1.2-bin-hadoop2.7.tgz

In [5]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

In [6]:
!pip install -q findspark
import findspark
findspark.init()

In [7]:
from pyspark.sql import SparkSession
spark = (SparkSession
.builder
.appName("ChessTournament")
.getOrCreate())

In [8]:
pip install gdown

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [9]:
import gdown

In [10]:
url = "https://drive.google.com/file/d/1UBoiIGhfg-Yxk9wXbc48Lx9Z9ZzWg_4Z/view?usp=sharing"
output = "chess_wc_history_game_info.csv"
gdown.download(url, output, quiet=False, fuzzy = True)

Downloading...
From: https://drive.google.com/uc?id=1UBoiIGhfg-Yxk9wXbc48Lx9Z9ZzWg_4Z
To: /content/chess_wc_history_game_info.csv
100%|██████████| 596k/596k [00:00<00:00, 112MB/s]


'chess_wc_history_game_info.csv'

In [11]:
url = "https://drive.google.com/file/d/1Th6rP8rVchIvXV25bzO5Er_SvyRoNaic/view?usp=sharing"
output = "chess_wc_history_moves.csv"
gdown.download(url, output, quiet=False, fuzzy = True)

Downloading...
From: https://drive.google.com/uc?id=1Th6rP8rVchIvXV25bzO5Er_SvyRoNaic
To: /content/chess_wc_history_moves.csv
100%|██████████| 112M/112M [00:01<00:00, 91.1MB/s]


'chess_wc_history_moves.csv'

In [12]:
url = "https://drive.google.com/file/d/1eysHHc8I905r8YVeALYydQBl0E9caNNI/view?usp=sharing"
output = "eco_codes.csv"
gdown.download(url, output, quiet=False, fuzzy = True)

Downloading...
From: https://drive.google.com/uc?id=1eysHHc8I905r8YVeALYydQBl0E9caNNI
To: /content/eco_codes.csv
100%|██████████| 45.6k/45.6k [00:00<00:00, 30.3MB/s]


'eco_codes.csv'

# Section - 2
#### Using PySpark for data analysis

In [55]:
import pyspark.sql.functions as F
from pyspark.sql.window import Window
from google.colab import drive

df_wc = spark.read.csv("/content/chess_wc_history_game_info.csv", header=True, inferSchema=True)
df_eco = spark.read.csv("/content/eco_codes.csv", header=True, inferSchema=True)
df_moves = spark.read.csv("/content/chess_wc_history_moves.csv", header=True, inferSchema=True)

1. List of Winners of Each World champions Trophy Hint: Total Result of all rounds of Tournament for that player is considered as that player's
Score/Result.
Result attributes: winner, tournament_name

In [14]:
result_split = df_wc.withColumn("white_scr", F.split("result", "-").getItem(0)) \
                    .withColumn("black_scr", F.split("result", "-").getItem(1))

In [15]:
tournament = result_split.withColumn('white_scr', F.regexp_replace('white_scr', '1/2', '0.5').cast('float')) \
                         .withColumn('black_scr', F.regexp_replace('black_scr', '1/2', '0.5').cast('float'))

In [16]:
wch_event = tournament.filter(~(tournament.event.like("%k.o%") | tournament.event.like("%KO%"))).filter(tournament.event.like('%WCh%'))

In [17]:
wch_wht_scr = wch_event.groupBy('tournament_name', 'white').sum('white_scr')
wch_blk_scr = wch_event.groupBy('tournament_name', 'black').sum('black_scr')

In [18]:
wch_final_scr = wch_wht_scr.unionAll(wch_blk_scr).withColumnRenamed('white', 'winner')

In [19]:
wch_winners = wch_final_scr.groupBy('tournament_name', 'winner').agg(F.sum("sum(white_scr)").alias('total_score')) \
                           .withColumn('total_scores', F.max('total_score').over(Window.partitionBy('tournament_name'))) \
                           .where(F.col('total_scores') == F.col('total_score')) \
                           .select('winner', 'tournament_name')                       

2. List of Players with number of times they have won Tournament in descending order(Max to min).
Result attributes: player_name, number_of_wins

In [20]:
T_wht_scr = tournament.groupBy('tournament_name', 'white').sum('white_scr')
T_blk_scr = tournament.groupBy('tournament_name', 'black').sum('black_scr')

In [21]:
T_final_scr = T_wht_scr.unionAll(T_blk_scr).withColumnRenamed('white', 'winner')

In [22]:
T_winners = T_final_scr.groupBy('tournament_name', 'winner').agg(F.sum("sum(white_scr)").alias('total_score')) \
                       .withColumn('max_score', F.max('total_score').over(Window.partitionBy('tournament_name')))

In [23]:
T_wins = T_winners.withColumn('win_flag', F.when(T_winners.total_score == T_winners.max_score, 1).otherwise(0)) \
                  .groupBy('winner').agg(F.sum(F.col('win_flag')).alias('number_of_wins')) \
                  .sort(F.col('number_of_wins').desc())

3. Most and Least Popular eco move in world championship history.
Result attributes: eco, eco_name, number_of_occurences
Final result will have only two rows

In [24]:
wch_move_count = wch_event.groupBy('eco').count().join(df_eco, wch_event.eco == df_eco.eco, 'inner') \
                          .withColumnRenamed('count', 'number_of_occurences') \
                          .select(df_eco['eco'], 'eco_name', 'number_of_occurences').filter(F.col('number_of_occurences').isNotNull())

In [25]:
max_eco = wch_move_count.sort(F.col('number_of_occurences').desc()).limit(1)

In [26]:
min_eco = wch_move_count.sort(F.col('number_of_occurences').asc()).limit(1)

In [27]:
eco_moves = max_eco.union(min_eco)

4. Find the eco move with most winnings.
Ps. Use this opening move in your next chess game🙂
Result attributes: eco, eco_name

In [28]:
eco_wins = tournament.filter(tournament.winner != 'draw') \
                     .groupBy('eco').count() \
                     .join(df_eco, tournament.eco == df_eco.eco, 'inner') \
                     .select(df_eco.eco, 'eco_name') \
                     .sort(F.col('count').desc()) \
                     .limit(1)

5. Longest and shortest game ever played in a world championship in terms of move.
Chess Funda: "move" is completed once both White and Black have played one turn. e.g If a game lasts 10 moves, both White and Black have
played 10 moves)
Result attributes: game_id, event, tournament_name, number_of_moves
Final result will have only two rows

In [29]:
max_move = df_moves.join(wch_event, wch_event.game_id == df_moves.game_id,'right') \
        .sort(F.col('move_no_pair').desc()) \
        .select(wch_event.game_id, 'event', 'tournament_name', df_moves.move_no_pair.alias('number_of_moves')).limit(1)

In [30]:
min_move = df_moves.groupBy('game_id').agg(F.count('game_id').alias('number_of_moves')) \
        .join(wch_event, wch_event.game_id == df_moves.game_id,'inner') \
        .select(wch_event.game_id, 'event', 'tournament_name', F.ceil(F.col('number_of_moves')/2).alias('number_of_moves')) \
        .sort("number_of_moves").limit(1)

In [31]:
wch_moves = max_move.union(min_move)

6. Shortest and Longest Draw game ever Played.
Result attributes: game_id, event, tournament_name, number_of_moves
Final result will have only two rows

In [32]:
longest = tournament.filter(tournament.winner == 'draw') \
                    .join(df_moves, df_moves.game_id == tournament.game_id, 'inner') \
                    .sort(F.col('move_no_pair').desc()) \
                    .select(tournament.game_id, 'event', 'tournament_name', df_moves.move_no_pair.alias('number_of_moves')).limit(1)

In [33]:
shortest = df_moves.groupBy('game_id').agg(F.count('game_id').alias('number_of_moves')) \
                   .join(tournament, tournament.game_id == df_moves.game_id,'inner') \
                   .filter(tournament.winner == 'draw') \
                   .select(tournament.game_id, 'event', 'tournament_name', F.ceil(F.col('number_of_moves')/2).alias('number_of_moves')) \
                   .sort("number_of_moves").limit(1)

In [34]:
draw_game = longest.union(shortest)

7. Most and Least rated Player.
Result attributes: player_name, elo
Chess Funda: elo is the rating of the player in chess tournament.
Final result will have only two rows

In [35]:
high_elo = df_wc.sort(F.col('winner_elo').desc()) \
                .select(df_wc.winner.alias('player_name'), df_wc.winner_elo.alias('elo')).limit(1)

In [36]:
low_elo = df_wc.filter(df_wc.loser_elo.isNotNull()) \
               .select(df_wc.loser.alias('player_name'), df_wc.loser_elo.alias('elo')) \
               .sort(F.col('elo').asc()).limit(1)

In [37]:
player_ratings = high_elo.union(low_elo)

8. 3rd Last Player with most Loss.
Result attributes: player_name
Final result will have only one row

In [38]:
most_loss = df_wc.filter(df_wc.result != '1/2-1/2').groupBy('loser').count() \
                 .select('loser').sort(F.col('count').desc()).limit(3).sort('count').limit(1)

9. How many times players with low rating won matches with their total win Count.
Result attributes: player_name, win_count

In [39]:
low_rate_wins = \
tournament.filter(((tournament.white_scr > tournament.black_scr) & (tournament.white_elo < tournament.black_elo)) | \
          ((tournament.black_scr > tournament.white_scr) & (tournament.white_elo > tournament.black_elo))) \
          .groupBy('winner').count().withColumnRenamed('winner','player_name') \
          .withColumnRenamed('count','win_count')

10. Move Sequence for Each Player in a Match.
Result attributes: game_id, player_name, move_sequence, move_count

In [40]:
move_seq = \
df_moves.groupBy('game_id', 'player') \
        .agg(F.concat_ws('|', F.collect_list(df_moves.notation)).alias('move_sequence'), F.size(F.collect_list(df_moves.notation)).alias('move_count')) \
        .withColumnRenamed('player','player_name') \
        .orderBy('game_id')

11. Total Number of games where losing player has more Captured score than Winning player.
Hint: Captured score is cumulative, i.e., for 3rd capture it will have score for 1, 2, and 3rd.
Result attributes: total_number_of_games Final result will have only one row

In [41]:
df_moves = df_moves.withColumn('captured_score_for_white', df_moves.captured_score_for_white.cast('int')) \
                   .withColumn('captured_score_for_black', df_moves.captured_score_for_black.cast('int'))

In [42]:
capture_score = \
df_moves.groupBy('game_id', 'player').agg(F.max('captured_score_for_white').alias('white_scr'), F.max('captured_score_for_black').alias('black_scr')) \
        .join(df_wc, 'game_id', 'inner') \
        .filter((df_wc.winner != 'draw') & (((df_wc.white == df_wc.winner) & (F.col('white_scr') < F.col('black_scr'))) | \
                                            ((df_wc.black == df_wc.winner) & (F.col('white_scr') > F.col('black_scr'))))) \
        .select(F.countDistinct('game_id').alias('total_number_of_games'))

12. List All Perfect Tournament with Winner Name.
Chess Funda: Perfect Tournament means a player has won all the matches excluding draw matches. e.g Player A has won 5 matches out of 7
Matches in tournament where 2 matches are draw and player B has won 0 matches)
Result attributes: winner_name, tournament_name

In [43]:
perf_wins = tournament.filter(tournament.winner != 'draw') \
                      .groupBy('tournament_name', 'winner').count() \
                      .withColumn('cnt', F.count('tournament_name').over(Window.partitionBy('tournament_name'))) \
                      .filter(F.col('cnt') == 1).select(F.col('winner').alias('winner_name'), 'tournament_name') \
                      .sort("tournament_name")

13. Player with highest winning ratio.
Hint: Winning ratio: (Number of rounds won)/(Number of rounds played)
Result attributes: player_name
Final result will have only one row

In [44]:
wht_data = tournament.withColumn('wht_match', F.count('*').over(Window.partitionBy('white'))) \
                     .withColumn('win_flag', F.when(tournament.white == tournament.winner, 1).otherwise(0)) \
                     .groupBy('white', 'wht_match').agg(F.sum('win_flag').alias('wins'))\

In [45]:
blk_data = tournament.withColumn('blk_match', F.count('*').over(Window.partitionBy('black'))) \
                     .withColumn('win_flag', F.when(tournament.black == tournament.winner, 1).otherwise(0)) \
                     .groupBy('black', 'blk_match').agg(F.sum('win_flag').alias('wins'))

In [46]:
win_ratio = wht_data.unionAll(blk_data).groupBy('white') \
                    .agg(F.sum('wht_match').alias('total_matches'), F.sum('wins').alias('total_wins')) \
                    .withColumn('winning_ratio', F.col('total_wins')/F.col('total_matches')) \
                    .sort(F.col('winning_ratio').desc(), F.col('total_wins').desc()) \
                    .limit(1)

14. Player who had given checkmate with Pawn.
Note: Consider all events for this query
Result attributes: player_name
Final result will have only one row

In [47]:
pawn = df_moves.filter((df_moves.is_check_mate == 1) & (df_moves.piece == 'P')) \
               .select(df_moves.player.alias('player_name')).limit(1)

15. List games where player has won game without queen.
Result attributes: game_id, event, player_name

In [48]:
wo_queen = \
tournament.filter(tournament.winner != 'draw') \
          .join(df_moves, 'game_id', 'inner') \
          .withColumn('max_moves', F.max('move_no').over(Window.partitionBy('game_id'))).filter(F.col('move_no') == F.col('max_moves')) \
          .filter(((F.col('black') == F.col('winner')) & (F.col('black_queen_count') == 0)) | ((F.col('white') == F.col('winner')) & (F.col('white_queen_count') == 0))) \
          .select('game_id', 'event', tournament.winner.alias('player_name')) \
          .sort('game_id')

# Section - 3
#### Save the results from Section 2 into your google drive.

In [49]:
def gdrive_mount():
  try:
    drive.mount('/content/drive')
  except:
    print('Problem occured while Mount')

In [50]:
gdrive_mount()

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [51]:
def create_dir():
  try:
    path = "/content/drive/My Drive/DE_SOLUTION_Anul_Kanpuria"
    os.mkdir(path)
    os.mkdir(path + "/results")
  except OSError as error:
    print(error)

In [52]:
create_dir()

[Errno 17] File exists: '/content/drive/My Drive/DE_SOLUTION_Anul_Kanpuria'


In [53]:
def df_upload():
  try:
    file_name = ["df1.csv", "df2.csv", "df3.csv", "df4.csv", "df5.csv", "df6.csv", "df7.csv", "df8.csv", "df9.csv", "df10.csv", "df11.csv", "df12.csv", "df13.csv", "df14.csv", "df15.csv"]
    pandas_df = [wch_winners, T_wins, eco_moves, eco_wins, wch_moves, draw_game, player_ratings, most_loss, low_rate_wins, move_seq, capture_score, perf_wins, win_ratio, pawn, wo_queen]
    for (name, df) in zip(file_name, pandas_df):
      path = "/content/drive/My Drive/DE_SOLUTION_Anul_Kanpuria/results/" + name
      with open(path, 'w', encoding = 'utf-8-sig') as f:
        df.toPandas().to_csv(f)
  except NameError as error:
    print(error)
  except OSError as error:
    print(error)
  except ValueError as error:
    print(error)

In [54]:
df_upload()