In [1]:
import os
import re
from typing import List
import joblib
import logging
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
import pandas as pd


from object_types import CAREER_FRAME, DRAFT, WIKI_PLAYER

MERGED_DIR = './players_merged'

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

spark = SparkSession.builder \
    .master("local[*]") \
    .appName("Parse data") \
    .config("spark.driver.host", "0.0.0.0") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memoryOverhead", "2g") \
    .config("spark.executor.cores", "2") \
    .config("spark.dynamicAllocation.enabled", "false") \
    .config("spark.shuffle.service.enabled", "false") \
    .config("spark.driver.maxResultSize", "2g") \
    .config("spark.driver.maxResultSize", "4g") \
    .config("spark.sql.execution.pythonUTF8StringEncoding", "true") \
    .getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/11/23 16:33:03 WARN Utils: Your hostname, MacBook-Air-uzivatela-Jaroslava.local, resolves to a loopback address: 127.0.0.1; using 192.168.100.56 instead (on interface en0)
25/11/23 16:33:03 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/23 16:33:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
def load_df(file_path = './processed_pages.joblib') -> DataFrame:
    pd_df: pd.DataFrame = joblib.load(file_path)
    return spark.createDataFrame(pd_df)

wiki_df = load_df('./wiki_df.joblib')
html_df = load_df('./html_df.joblib')
merged_df = load_df('./merged_df.joblib')

In [28]:
html_df.describe()

DataFrame[summary: string, file_path: string, download_url: string, player_name: string, dob: string, draft_team: string, position: string, hand: string, height: string, weight: string, games_played: string, wins: string, losses: string, ties_ot_losses: string, minutes: string, shootouts: string, gaa: string, save_percentage: string, goals: string, assists: string, points: string, plus_minus: string, point_shares: string, penalty_minutes: string, shots_on_goal: string, game_winning_goals: string]

In [30]:
wiki_df.describe()

DataFrame[summary: string, full_name: string, birthplace: string, career_start: string, career_end: string, draft: string, draft_year: string, draft_team: string, current_league: string, national_team: string, current_team: string, nationality: string]

In [31]:
html_df.count()

                                                                                

12165

In [32]:
wiki_df.count()

19722

In [15]:
merged_df = html_df.join(wiki_df, wiki_df['full_name'] == html_df['player_name'], how="left")

In [8]:
merged_df.show(n=5)

                                                                                

+--------------------+--------------------+------------------+-----------------+----------+----------+----+------+------+------------+----+------+--------------+-------+---------+---+---------------+-----+-------+------+----------+-------------------+---------------+-------------+------------------+--------------+----------------+------------+----------+-------------+----------+--------------------+--------------------+-------------+------------------+-----------+
|           file_path|        download_url|       player_name|              dob|draft_team|  position|hand|height|weight|games_played|wins|losses|ties_ot_losses|minutes|shootouts|gaa|save_percentage|goals|assists|points|plus_minus|       point_shares|penalty_minutes|shots_on_goal|game_winning_goals|     full_name|      birthplace|career_start|career_end|        draft|draft_year|          draft_team|      current_league|national_team|      current_team|nationality|
+--------------------+--------------------+------------------+

In [14]:
wiki_df = wiki_df.drop('draft_team')

In [4]:
merged_df.count()

                                                                                

12197

In [17]:
def dump_df(df: DataFrame, file_path = './processed_pages.joblib'):
    pd_df = df.toPandas()
    joblib.dump(pd_df, file_path)

dump_df(merged_df, './left_merged.joblib')


                                                                                

In [14]:
merged_df.describe()

DataFrame[summary: string, file_path: string, download_url: string, player_name: string, dob: string, position: string, hand: string, height: string, weight: string, games_played: string, wins: string, losses: string, ties_ot_losses: string, minutes: string, shootouts: string, gaa: string, save_percentage: string, goals: string, assists: string, points: string, plus_minus: string, point_shares: string, penalty_minutes: string, shots_on_goal: string, game_winning_goals: string, full_name: string, birthplace: string, career_start: string, career_end: string, draft: string, draft_year: string, draft_team: string, current_league: string, national_team: string, current_team: string, nationality: string]

In [3]:
html_df.show(n=2)

[Stage 0:>                                                          (0 + 1) / 1]

+--------------------+--------------------+------------------+-------------+--------+----------+----+------+------+------------+----+------+--------------+-------+---------+---+---------------+-----+-------+------+----------+------------------+---------------+-------------+------------------+
|           file_path|        download_url|       player_name|          dob|position|draft_team|hand|height|weight|games_played|wins|losses|ties_ot_losses|minutes|shootouts|gaa|save_percentage|goals|assists|points|plus_minus|      point_shares|penalty_minutes|shots_on_goal|game_winning_goals|
+--------------------+--------------------+------------------+-------------+--------+----------+----+------+------+------------+----+------+--------------+-------+---------+---+---------------+-----+-------+------+----------+------------------+---------------+-------------+------------------+
|.\scraped\players...|https://www.hocke...|       Antti Aalto|March 4, 1975|       C|   Anaheim|Left| 185.0|  95.0|   

                                                                                

In [3]:
direct_df = load_df('./direct_df.joblib')

In [4]:
html_df = direct_df

In [5]:
temp_view_name = "players"
merged_df.createOrReplaceTempView(temp_view_name)

spark.sql(f"select * from {temp_view_name} where player_name is not null and full_name is not null limit 5").show()

25/11/13 12:13:58 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+--------------------+--------------------+--------------------+------------------+----------+---------+-----+------+------+------------+-----+------+--------------+-------+---------+---+---------------+-----+-------+------+----------+-------------------+---------------+-------------+------------------+--------------------+--------------------+------------+----------+--------------------+----------+--------------------+--------------------+-------------+-------------------+-----------+
|           file_path|        download_url|         player_name|               dob|draft_team| position| hand|height|weight|games_played| wins|losses|ties_ot_losses|minutes|shootouts|gaa|save_percentage|goals|assists|points|plus_minus|       point_shares|penalty_minutes|shots_on_goal|game_winning_goals|           full_name|          birthplace|career_start|career_end|               draft|draft_year|          draft_team|      current_league|national_team|       current_team|nationality|
+-----------------

In [15]:
spark.sql(f"select count(distinct player_name, full_name) from {temp_view_name}").show()



+--------------------------------------+
|count(DISTINCT player_name, full_name)|
+--------------------------------------+
|                                  4854|
+--------------------------------------+



                                                                                

In [5]:
html_view = "html"
html_df.createOrReplaceTempView(html_view)

spark.sql(f"""
    SELECT COUNT(player_name) as count_name, player_name 
    FROM {html_view} 
    WHERE player_name IS NOT NULL 
    GROUP BY player_name
    HAVING COUNT(player_name) > 1
""").show()

[Stage 0:>                                                          (0 + 8) / 8]

+----------+--------------------+
|count_name|         player_name|
+----------+--------------------+
|         7|                 JiÅ|
|         2|               ZdenÄ|
|         3|               AndrÃ|
|         4|                  SÃ|
|         2|   Sylvanus Marshall|
|         2|       Josh Anderson|
|         2|       Sebastian Aho|
|         4|                 FrÃ|
|         4|          Jean-FranÃ|
|         2|  John Matthew Adams|
|         4|          Marc-AndrÃ|
|         2| Michael John Boland|
|         2|           Harvey A.|
|         4|                   Ã|
|         2|Ronald Henry Ande...|
|         3|             Jean-SÃ|
|         2| Gregory Daren Adams|
|         3|     Peter Andersson|
|         2|Garnet Edward Bailey|
|         2|             Roman Ä|
+----------+--------------------+
only showing top 20 rows


                                                                                

In [39]:
merged_view = "merged"
merged_df.createOrReplaceTempView(merged_view)

In [43]:
spark.sql(f"""
    select download_url, player_name, dob
    from {merged_view}
    where player_name is not null and full_name is null
""").show(truncate=False, n=200)

[Stage 112:>                (0 + 8) / 8][Stage 114:>                (0 + 0) / 1]

+---------------------------------------------------------+-------------------------------+------------------+
|download_url                                             |player_name                    |dob               |
+---------------------------------------------------------+-------------------------------+------------------+
|https://www.hockey-reference.com/players/g/greeraj01.html|A                              |December 14, 1996 |
|https://www.hockey-reference.com/players/r/rasanaa01.html|Aapeli Rasanen                 |June 1, 1998      |
|https://www.hockey-reference.com/players/t/talviaa01.html|Aarne Talvitie                 |February 11, 1999 |
|https://www.hockey-reference.com/players/b/broteaa01.html|Aaron Broten                   |November 14, 1960 |
|https://www.hockey-reference.com/players/d/dawsoaa01.html|Aaron Dawson                   |March 11, 1985    |
|https://www.hockey-reference.com/players/h/harstaa01.html|Aaron Harstad                  |April 27, 1992    |
|

                                                                                

In [8]:
data = joblib.load('./processed_data.joblib')

In [12]:
def load_from_tsv() -> DataFrame:
    df = spark.read \
    .option("header", "true") \
    .option("delimiter", "\t") \
    .option("nullValue", None) \
    .option("emptyValue", None) \
    .schema(WIKI_PLAYER_SCHEMA) \
    .csv(os.path.join('.', 'wiki_df', 'data.tsv'))
    return df 

wiki_df = load_from_tsv()

In [13]:
wiki_df.count()

19731