### Module and Function Instance

In [1]:
import requests
import pypyodbc
import time
import pandas as pd
from bs4 import BeautifulSoup
from sqlalchemy import create_engine
import urllib
import re
import pyodbc
import html5lib
import csv

# # Parameter

Last_Round = 'MW_38'
Next_Week = 38
Upper_Week = 38

# select sink database option ["Az SQL", "Bif_Database"]
sink = "Bif_Database"


# #### Connect to database

# ### Az Sql
if sink == "Az SQL":
    SERVER_NAME = 'socceranalyticsgroup.database.windows.net'
    DATABASE_NAME = 'eplanalytics'
    USERNAME = 'socceranalyticsgroup'
    PASSWORD = 'Zador@63'

    driver= '{ODBC Driver 17 for SQL Server}'
    conn_str = (
        f"Driver={driver};"
        f"Server=tcp:{SERVER_NAME};"
        f"Database={DATABASE_NAME};"
        f"Uid={USERNAME};"
        f"Pwd={PASSWORD};"
        f"Encrypt=yes;"
        f"TrustServerCertificate=no;"
        f"Connection Timeout=30;"
    )

    # Create a pyodbc connection object
    conn = pyodbc.connect(conn_str)

    # Create a SQLAlchemy engine object
    engine = create_engine("mssql+pyodbc://", creator=lambda: conn)


# ### Bif On Premise
if sink == "Bif_Database":
    SERVER_NAME = 'WS1\\WSBIF'
    DATABASE_NAME = 'SoccerAnalysis'
    USERNAME = 'WS1\\User'
    conn= pypyodbc.connect("""
        Driver={{SQL Server}};
        Server={0};
        Database={1};
        Trusted_Connection=yes;""".format(SERVER_NAME, DATABASE_NAME))

    engine = create_engine("mssql+pyodbc://@"+SERVER_NAME+"/"+DATABASE_NAME+"?trusted_connection=yes&driver=SQL+Server")



# # Renaming some club
#creating a class that inherit from the dictionary class for mapping of the name variation
class MissingDict(dict):
    __missing__ = lambda self, key: key
    
map_values = {
    "Wolverhampton Wanderers": "Wolves",
    "West Ham United": "West Ham",
    "Tottenham Hotspur": "Tottenham",
    "Nottingham Forest": "Nott'ham Forest",
    "Newcastle United": "Newcastle Utd",
    "Manchester United": "Manchester Utd",
    "Brighton and Hove Albion": "Brighton",
    "Sheffield United": "Sheffield Utd"
}
mapping = MissingDict(**map_values)


# # Functions
# return list of all column in sqltable
def sourcedestcompare(sqltablename:str, pythondf):
    sql_columnlist= """  
    Select COLUMN_NAME from INFORMATION_SCHEMA.COLUMNS
    where TABLE_NAME = '{0}'

    """.format(sqltablename)
    
    df_sqlcolumnlist = pd.read_sql(sql_columnlist, conn)

    # remove the index column coming from database
    sqllist = df_sqlcolumnlist[1:].values.tolist()
    # convert list of list into list of strings
    sqllist = [''.join(ele) for ele in sqllist]
    
    dflist=pythondf.columns.values.tolist()
    
    # identify difference between sql table and pydataframe
    # what sql has but df doesnt contain
    temp = []
    for x in sqllist:
        if x not in dflist:
            temp.append(x)        
    print(temp)
    # what df contains but sql doesnt have
    temp1 = []
    for x in dflist:
        if x not in sqllist:
            temp1.append(x)        
    print(temp1)
#     return sqllist

# rename duplicate field from fbref.com
def renameduplicatecolumns(dlist, df: pd.DataFrame):
    for ele in dlist:
        cols = []
        count = 1
        for column in df.columns:
            if column == ele:
                cols.append(f'{ele}_{count}')
                count+=1
                continue
            cols.append(column)
        df.columns = cols

def add(x,y):
    result = x+y
    return(result)

In [2]:
def read_csv_to_array_of_tuples(csv_file):
    result = []
    with open(csv_file, 'r') as file:
        reader = csv.reader(file)
        next(reader)  # Skip header if exists
        for row in reader:
            # Convert the row into a tuple
            result.append(tuple(row))
    return result


## Function: player_record_to_database

In [3]:
def player_record_to_database(html_identifier, database_table):
    standing_url = "https://fbref.com/en/comps/9/Premier-League-Stats"
    data = requests.get(standing_url)
    soup = BeautifulSoup(data.text, 'lxml')
    standings_table = soup.select('table.stats_table')[0]
    links = standings_table.find_all('a')
    links = [l.get("href") for l in links]
    links = [l for l in links if 'squads' in l]
    team_urls = [f"https://fbref.com{l}" for l in links]
    url_to_remove = 'https://fbref.com/en/squads/18bb7c10/Arsenal-Stats'

    team_urls = [url for url in team_urls if url != url_to_remove]
    all_squad_record = []


    for team_url in team_urls:
        team_name = team_url.split("/")[-1].replace("-Stats", "").replace("-"," ")

        
        data = requests.get(team_url)
        squad_record = pd.read_html(data.text, match = html_identifier)[0]
        squad_record.columns = squad_record.columns.droplevel()
        squad_record["Club"] = mapping[team_name]
        squad_record.drop(squad_record.tail(2).index, inplace=True)
        all_squad_record.append(squad_record)
        time.sleep(2)    

    all_squad_record_df = pd.concat(all_squad_record)
    all_squad_record_df["Season"] = "2023/2024"

    if html_identifier == "Goalkeeping ":
        RepeatedColumn = ['Save%']
        renameduplicatecolumns(RepeatedColumn, all_squad_record_df)
    if html_identifier == "Advanced Goalkeeping ":
        RepeatedColumn = ['Att', 'Launch%', 'AvgLen']
        renameduplicatecolumns(RepeatedColumn, all_squad_record_df)
    if html_identifier == "Passing ":
        RepeatedColumn = ['Cmp','Att','Cmp%']
        renameduplicatecolumns(RepeatedColumn, all_squad_record_df)
        sourcedestcompare(database_table, all_squad_record_df)
    if html_identifier == "Standard Stats ":
        RepeatedColumn = ['Gls','Ast','xG','xAG']
        renameduplicatecolumns(RepeatedColumn, all_squad_record_df)
        sourcedestcompare(database_table, all_squad_record_df)
        all_squad_record_df = all_squad_record_df.drop(['PrgC','PrgP','PrgR'],axis=1)
    if html_identifier == "Goal and Shot Creation ":
        RepeatedColumn = ['PassLive','PassDead','TO','Sh','Fld','Def']
        renameduplicatecolumns(RepeatedColumn, all_squad_record_df)
        sourcedestcompare('PlayerChancesStat', all_squad_record_df)
    if html_identifier == "Defensive Actions ":
        RepeatedColumn = ['Tkl']
        renameduplicatecolumns(RepeatedColumn, all_squad_record_df)    
        sourcedestcompare('PlayerDefensiveactionStat', all_squad_record_df)
    
    
    # Send to database (if i want to replace table add if_exists='replace' to the parameter; if_exists='append' )
    all_squad_record_df["Round"] = Last_Round
    # if you want to export to dbo then remove the schema parameter
    all_squad_record_df.to_sql(database_table, engine, if_exists='replace', schema='refactor')

In [4]:
# Example usage:
csv_file = 'player_record_database_name.csv'
player_array_of_tuples = read_csv_to_array_of_tuples(csv_file)

### Executable: player_record_to_database

In [5]:
for html_identifier, database_table in player_array_of_tuples:
    player_record_to_database(html_identifier, database_table)
    # pause for 1mins before iterating
    time.sleep(60)

  df_sqlcolumnlist = pd.read_sql(sql_columnlist, conn)


['Round']
[]


  df_sqlcolumnlist = pd.read_sql(sql_columnlist, conn)


['Round']
['PrgC', 'PrgP', 'PrgR', 'Matches']


  df_sqlcolumnlist = pd.read_sql(sql_columnlist, conn)


['Round']
[]


  df_sqlcolumnlist = pd.read_sql(sql_columnlist, conn)


['Past', 'Press', 'Succ', '%', 'ShSv', 'Round']
[]


## Squad Function: squad_record_to_database


In [6]:
def squad_record_to_database(html_identifier, database_table):
    data = requests.get("https://fbref.com/en/comps/9/Premier-League-Stats")
    squad_record = pd.read_html(data.text, match = html_identifier)[0]
    squad_record.columns = squad_record.columns.droplevel()
    if html_identifier == "Squad Goalkeeping ":
        RepeatedColumn = ['Save%']
        return RepeatedColumn
    if html_identifier == "Squad Advanced Goalkeeping ":
        RepeatedColumn = ['Att', 'Launch%', 'AvgLen']
        return RepeatedColumn
    if html_identifier == "Squad Passing ":
        RepeatedColumn = ['Cmp','Att','Cmp%']
        return RepeatedColumn
    if html_identifier == "Squad Standard Stats ":
        RepeatedColumn = ['Gls','Ast','xG','xAG']
        renameduplicatecolumns(RepeatedColumn, squad_record)
        return RepeatedColumn
    if html_identifier == "Goal and Shot Creation ":
        RepeatedColumn = ['PassLive','PassDead','TO','Sh','Fld','Def']
        return RepeatedColumn
    if html_identifier == "Defensive Actions ":
        RepeatedColumn = ['Tkl']
        return RepeatedColumn 
    
    if html_identifier in("Squad Goalkeeping ", "Squad Advanced Goalkeeping ", "Squad Passing ", "Squad Standard Stats ", "Goal and Shot Creation ", "Defensive Actions "):
        renameduplicatecolumns(RepeatedColumn, squad_record)
    if html_identifier == "Squad Standard Stats ":
        squad_record = squad_record.drop(['PrgC','PrgP'],axis=1)
    squad_record["Season"] = "2023/2024"
    squad_record["Round"] = Last_Round
    squad_record.to_sql(database_table, engine, if_exists='replace', schema='refactor')

In [7]:
# Example usage:
csv_file_squad = 'squad_record_database_name.csv'
squad_array_of_tuples = read_csv_to_array_of_tuples(csv_file_squad)

### Executable: squad_record_to_database

In [None]:
for html_identifier, database_table in squad_array_of_tuples:
    squad_record_to_database(html_identifier, database_table)
    # pause for 1mins before iterating
    time.sleep(60)