In [72]:
import requests
import pyodbc
import pandas as pd

# SQL Connection Setup
conn = pyodbc.connect(f"DRIVER={{ODBC Driver 17 for SQL Server}};SERVER={server_name};DATABASE={database_name};Trusted_Connection=yes;")
cursor = conn.cursor()

# Function to create ChangeLogs table if it doesn't exist
def create_change_logs_table(conn, cursor):
    cursor.execute("""
        IF OBJECT_ID('ChangeLogs', 'U') IS NULL
        CREATE TABLE ChangeLogs (
            id INT PRIMARY KEY IDENTITY(1,1),
            dataset_name NVARCHAR(255),
            description NVARCHAR(MAX),
            timestamp DATETIME
        );
    """)
    conn.commit()
    print("ChangeLogs table created or already exists.")

# Function to log changes
def log_change(conn, cursor, dataset_name, description):
    timestamp = pd.Timestamp.now()
    try:
        cursor.execute("""
            INSERT INTO ChangeLogs (dataset_name, description, timestamp)
            VALUES (?, ?, ?)
        """, (dataset_name, description, timestamp))
        conn.commit()
        print(f"Logged change for {dataset_name}: {description}")
    except Exception as e:
        print(f"Error logging change for {dataset_name}: {e}")
        conn.rollback()

# Step 1: Ensure ChangeLogs table exists
create_change_logs_table(conn, cursor)

# Base URL for ASA API
BASE_URL = "https://app.americansocceranalysis.com/api/v1"

# List of endpoints
endpoints = [
    "mls/players",
    "mls/players/xgoals",
    "mls/players/xpass",
    "mls/players/goals-added",
    "mls/players/salaries",
    "mls/goalkeepers/xgoals",
    "mls/goalkeepers/goals-added",
    "mls/teams",
    "mls/teams/xgoals",
    "mls/teams/xpass",
    "mls/teams/goals-added",
    "mls/teams/salaries",
    "mls/games",
    "mls/games/xgoals",
    "mls/managers",
    "mls/referees",
    "mls/stadia"
]

# Function to fetch and log data
def fetch_key_fields(endpoint):
    url = f"{BASE_URL}/{endpoint}"
    try:
        response = requests.get(url)
        response.raise_for_status()  # Raise HTTPError for bad responses
        data = response.json()
        
        if isinstance(data, list) and len(data) > 0:
            key_fields = pd.json_normalize(data).columns.tolist()
            print(f"=== Endpoint: {endpoint} ===")
            print(key_fields)
            
            # Log the successful data fetch
            log_change(conn, cursor, endpoint, f"Successfully fetched data from {endpoint} API endpoint.")
            return key_fields
        else:
            print(f"Endpoint {endpoint} returned unexpected data format.")
            log_change(conn, cursor, endpoint, f"Error: Endpoint {endpoint} returned unexpected data format.")
            return []
    except Exception as e:
        print(f"Error fetching {endpoint}: {e}")
        log_change(conn, cursor, endpoint, f"Error fetching data from {endpoint}: {e}")
        return []

# Iterate over endpoints and fetch key fields
endpoint_key_fields = {}
for endpoint in endpoints:
    key_fields = fetch_key_fields(endpoint)
    endpoint_key_fields[endpoint] = key_fields

# Display collected key fields for all endpoints
endpoint_key_fields_df = pd.DataFrame.from_dict(endpoint_key_fields, orient="index").T
endpoint_key_fields_df.fillna("", inplace=True)

# Close the connection
conn.close()


ChangeLogs table created or already exists.
=== Endpoint: mls/players ===
['player_id', 'player_name', 'birth_date', 'height_ft', 'height_in', 'weight_lb', 'nationality', 'primary_broad_position', 'primary_general_position', 'season_name', 'secondary_general_position', 'secondary_broad_position']
Logged change for mls/players: Successfully fetched data from mls/players API endpoint.
=== Endpoint: mls/players/xgoals ===
['player_id', 'team_id', 'general_position', 'minutes_played', 'shots', 'shots_on_target', 'goals', 'xgoals', 'xplace', 'goals_minus_xgoals', 'key_passes', 'primary_assists', 'xassists', 'primary_assists_minus_xassists', 'xgoals_plus_xassists', 'points_added', 'xpoints_added']
Logged change for mls/players/xgoals: Successfully fetched data from mls/players/xgoals API endpoint.
=== Endpoint: mls/players/xpass ===
['player_id', 'team_id', 'general_position', 'minutes_played', 'attempted_passes', 'pass_completion_percentage', 'xpass_completion_percentage', 'passes_completed

In [73]:
import requests
import pandas as pd
import pyodbc

# Base URL for ASA API
BASE_URL = "https://app.americansocceranalysis.com/api/v1"

# Initialize connection and cursor before starting the data fetch and log
conn = pyodbc.connect(f"DRIVER={{ODBC Driver 17 for SQL Server}};SERVER={server_name};DATABASE={database_name};Trusted_Connection=yes;")
cursor = conn.cursor()

# Function to log changes to ChangeLogs table
def log_change(conn, cursor, dataset_name, description):
    timestamp = pd.Timestamp.now()
    try:
        cursor.execute("""
            INSERT INTO ChangeLogs (dataset_name, description, timestamp)
            VALUES (?, ?, ?)
        """, (dataset_name, description, timestamp))
        conn.commit()
        print(f"Logged change for {dataset_name}: {description}")
    except Exception as e:
        print(f"Error logging change for {dataset_name}: {e}")
        conn.rollback()

# Function to fetch data from the API and filter columns
def fetch_data(endpoint, required_columns):
    url = f"{BASE_URL}/{endpoint}"
    try:
        response = requests.get(url)
        response.raise_for_status()
        data = response.json()
        
        # Convert to DataFrame and filter for required columns
        if isinstance(data, list) and len(data) > 0:
            df = pd.json_normalize(data)
            filtered_df = df[required_columns]
            
            # Log success
            log_change(conn, cursor, endpoint, f"Successfully fetched data from {endpoint} API endpoint.")
            return filtered_df
        else:
            print(f"Endpoint {endpoint} returned unexpected data format.")
            
            # Log failure for unexpected format
            log_change(conn, cursor, endpoint, f"Error: Endpoint {endpoint} returned unexpected data format.")
            return pd.DataFrame(columns=required_columns)
    except Exception as e:
        print(f"Error fetching {endpoint}: {e}")
        
        # Log failure for fetch error
        log_change(conn, cursor, endpoint, f"Error fetching data from {endpoint}: {e}")
        return pd.DataFrame(columns=required_columns)

# Define endpoints and required columns
endpoints_and_columns = {
    "mls/players": [
        "player_id", "player_name", "birth_date", "primary_broad_position", "primary_general_position"
    ],
    "mls/players/xgoals": [
        "player_id", "team_id", "minutes_played", "shots", "shots_on_target",
        "goals", "xgoals", "xplace", "goals_minus_xgoals", "key_passes",
        "primary_assists", "xassists", "primary_assists_minus_xassists",
        "xgoals_plus_xassists", "points_added", "xpoints_added"
    ],
    "mls/players/xpass": [
        "player_id", "team_id", "minutes_played", "attempted_passes",
        "pass_completion_percentage", "xpass_completion_percentage",
        "passes_completed_over_expected", "avg_distance_yds"
    ],
    "mls/players/salaries": [
        "player_id", "team_id", "season_name", "position",
        "base_salary", "guaranteed_compensation"
    ],
    "mls/teams": [
        "team_id", "team_name"
    ],
    "mls/games": [
        "game_id", "date_time_utc", "home_score", "away_score",
        "home_team_id", "away_team_id"
    ]
}

# Initialize dictionary to store DataFrames for each endpoint
dataframes = {}

# Fetch data for each endpoint and store in the dictionary
for endpoint, columns in endpoints_and_columns.items():
    print(f"Fetching data from {endpoint}...")
    dataframes[endpoint] = fetch_data(endpoint, columns)

# Example: Access specific DataFrame
players_df = dataframes["mls/players"]
xgoals_df = dataframes["mls/players/xgoals"]

# Display or process data further
print("Players DataFrame preview:")
print(players_df.head())
print("\nXGoals DataFrame preview:")
print(xgoals_df.head())

# Close the connection after all data has been fetched and logged
conn.close()


Fetching data from mls/players...
Logged change for mls/players: Successfully fetched data from mls/players API endpoint.
Fetching data from mls/players/xgoals...
Logged change for mls/players/xgoals: Successfully fetched data from mls/players/xgoals API endpoint.
Fetching data from mls/players/xpass...
Logged change for mls/players/xpass: Successfully fetched data from mls/players/xpass API endpoint.
Fetching data from mls/players/salaries...
Logged change for mls/players/salaries: Successfully fetched data from mls/players/salaries API endpoint.
Fetching data from mls/teams...
Logged change for mls/teams: Successfully fetched data from mls/teams API endpoint.
Fetching data from mls/games...
Logged change for mls/games: Successfully fetched data from mls/games API endpoint.
Players DataFrame preview:
    player_id          player_name  birth_date primary_broad_position  \
0  0Oq6006M6D          Ugo Ihemelu  1983-04-03                    NaN   
1  0Oq60APM6D      Jason Hernandez  1983-

In [75]:
import pandas as pd

# Define a dictionary to log changes
change_logs = {}

# Function to log changes
def log_change(dataset_name, description):
    if dataset_name not in change_logs:
        change_logs[dataset_name] = []
    change_logs[dataset_name].append(f"{pd.Timestamp.now()} - {description}")

# Function to clean and validate a dataset
def clean_and_validate_dataset(df, dataset_name, dataframes):
    if df is None:
        return None  # Skip if DataFrame is missing

    # 1. Handle Missing Values
    if dataset_name == "mls/players":
        if "birth_date" in df.columns:
            missing_birthdate_count = df["birth_date"].isnull().sum()
            df.loc[:, "birth_date"] = df["birth_date"].fillna("1900-01-01")
            df.loc[:, "birth_date"] = pd.to_datetime(df["birth_date"], errors="coerce")
            if missing_birthdate_count > 0:
                log_change(dataset_name, f"Filled missing birth_date with '1900-01-01'. {missing_birthdate_count} rows updated.")
        for col in ["primary_broad_position", "primary_general_position"]:
            if col in df.columns:
                missing_position_count = df[col].isnull().sum()
                df.loc[:, col] = df[col].fillna("UNKNOWN")
                if missing_position_count > 0:
                    log_change(dataset_name, f"Filled missing values in {col} with 'UNKNOWN'. {missing_position_count} rows updated.")
    if dataset_name == "mls/players/salaries":
        if "team_id" in df.columns:
            missing_team_id_count = df["team_id"].isnull().sum()
            df = df.dropna(subset=["team_id"])
            if missing_team_id_count > 0:
                log_change(dataset_name, f"Dropped {missing_team_id_count} rows with missing 'team_id'.")

    # 2. Handle Duplicates
    problematic_rows = df.apply(lambda x: isinstance(x, (dict, list)), axis=1)
    if problematic_rows.any():
        count = problematic_rows.sum()
        df = df[~problematic_rows]
        log_change(dataset_name, f"Dropped {count} rows with unhashable types.")
    duplicate_count = df.duplicated().sum()
    if duplicate_count > 0:
        df = df.drop_duplicates()
        log_change(dataset_name, f"Dropped {duplicate_count} duplicate rows.")
    duplicate_columns = df.columns[df.columns.duplicated()].tolist()
    if duplicate_columns:
        df = df.loc[:, ~df.columns.duplicated()]
        log_change(dataset_name, f"Dropped duplicate columns: {duplicate_columns}.")

    # 3. Validate Data Types and Cast for SQL Compatibility
    for col in df.columns:
        if col in ["birth_date", "date_time_utc"]:
            df.loc[:, col] = pd.to_datetime(df[col], errors="coerce")
        elif pd.api.types.is_float_dtype(df[col]):
            df.loc[:, col] = df[col].apply(lambda x: round(x, 6) if pd.notnull(x) and pd.api.types.is_number(x) else None)
        elif pd.api.types.is_object_dtype(df[col]):
            df.loc[:, col] = df[col].apply(lambda x: str(x) if isinstance(x, (str, int, float)) else None)
        elif pd.api.types.is_integer_dtype(df[col]):
            df.loc[:, col] = pd.to_numeric(df[col], downcast="integer", errors="coerce")

    # 4. Drop Unwanted Columns
    unwanted_columns = ["extra_time", "penalties", "home_penalties", "away_penalties"]
    unwanted_columns += [col for col in df.columns if "outlier" in col]
    dropped_columns = [col for col in unwanted_columns if col in df.columns]
    if dropped_columns:
        df = df.drop(columns=dropped_columns)
        log_change(dataset_name, f"Dropped unwanted columns: {dropped_columns}.")

    # 5. Split season_name Column if Applicable
    if "season_name" in df.columns:
        df.loc[:, "season_name"] = df["season_name"].apply(
            lambda x: x if isinstance(x, list) else [int(x)] if pd.notnull(x) else []
        )
        df.loc[:, "season_name_initial"] = df["season_name"].apply(lambda x: min(x) if x else None)
        df.loc[:, "season_name_current"] = df["season_name"].apply(lambda x: max(x) if x else None)
        df = df.drop(columns=["season_name"])
        log_change(dataset_name, "Split 'season_name' into 'season_name_initial' and 'season_name_current'.")

    # 6. Foreign Key Validation
    if dataset_name == "mls/players/xgoals" or dataset_name == "mls/players/xpass":
        player_ids = set(dataframes["mls/players"]["player_id"])
        df = df[df["player_id"].isin(player_ids)]
        log_change(dataset_name, "Validated and kept only valid player_id entries.")

    if dataset_name == "mls/games":
        team_ids = set(dataframes["mls/teams"]["team_id"])
        df = df[df["home_team_id"].isin(team_ids) & df["away_team_id"].isin(team_ids)]
        log_change(dataset_name, "Validated and kept only valid team_id entries.")

    return df

# Function to clean and validate all datasets
def clean_and_validate_all(dataframes):
    for dataset_name, df in dataframes.items():
        print(f"Cleaning and validating {dataset_name}...")
        dataframes[dataset_name] = clean_and_validate_dataset(df, dataset_name, dataframes)
    return dataframes

# Run the cleaning and validation process
dataframes = clean_and_validate_all(dataframes)

# Display change logs
print("Change Logs:")
for dataset, logs in change_logs.items():
    print(f"Dataset: {dataset}")
    for log in logs:
        print(f"  - {log}")


Cleaning and validating mls/players...
Cleaning and validating mls/players/xgoals...
Cleaning and validating mls/players/xpass...
Cleaning and validating mls/players/salaries...
Cleaning and validating mls/teams...
Cleaning and validating mls/games...
Change Logs:
Dataset: mls/players/xgoals
  - 2024-12-05 16:16:58.161872 - Validated and kept only valid player_id entries.
Dataset: mls/players/xpass
  - 2024-12-05 16:16:58.178130 - Validated and kept only valid player_id entries.
Dataset: mls/games
  - 2024-12-05 16:16:58.188131 - Validated and kept only valid team_id entries.


In [76]:
import pyodbc
import pandas as pd

# Function to dynamically map Python data types to SQL data types
def get_sql_data_type(dtype, column_name=None):
    if column_name == "player_id":
        return "VARCHAR(50)"  # Ensure player_id is a fixed-length string for indexing
    elif column_name == "primary_broad_position" or column_name == "primary_general_position":
        return "VARCHAR(50)"  # Make sure positions are fixed length for indexing
    elif column_name == "birth_date":
        return "DATE"
    elif column_name == "date_time_utc":
        return "DATETIME"
    elif column_name in ["base_salary", "guaranteed_compensation"]:
        return "DECIMAL(12, 2)"
    elif pd.api.types.is_integer_dtype(dtype):
        return "INT"
    elif pd.api.types.is_float_dtype(dtype):
        return "FLOAT"
    elif pd.api.types.is_object_dtype(dtype):
        return "NVARCHAR(255)"  # Limit size for object (string) columns to make indexing possible
    else:
        return "NVARCHAR(255)"  # Default to NVARCHAR(255)

# Main function to upload DataFrame to SQL Server
def upload_to_sql_server(df, table_name, server, database, batch_size=5000):
    conn = pyodbc.connect(f"DRIVER={{ODBC Driver 17 for SQL Server}};SERVER={server};DATABASE={database};Trusted_Connection=yes;")
    cursor = conn.cursor()

    # Drop table if it exists
    cursor.execute(f"IF OBJECT_ID('{table_name}', 'U') IS NOT NULL DROP TABLE {table_name}")
    conn.commit()

    # Create table
    create_query = f"CREATE TABLE {table_name} ({', '.join([f'[{col}] {get_sql_data_type(df[col].dtype, col)}' for col in df.columns])})"
    cursor.execute(create_query)
    conn.commit()

    # Insert data in batches
    placeholders = ", ".join(["?" for _ in df.columns])
    insert_query = f"INSERT INTO {table_name} VALUES ({placeholders})"
    for start in range(0, len(df), batch_size):
        batch = df.iloc[start:start + batch_size]
        print(f"Uploading batch {start} to {start + len(batch) - 1}...")
        try:
            cursor.executemany(insert_query, batch.values.tolist())
            conn.commit()
            # Log change after each successful batch
            log_change(table_name, f"Successfully uploaded batch {start} to {start + len(batch) - 1}.")
        except Exception as e:
            print(f"Error uploading batch {start} to {start + len(batch) - 1}: {e}")
            problematic_rows = batch
            print(problematic_rows.head())
            conn.close()
            return

    # Create Clustered Index on player_id in mls/players
    if "player_id" in df.columns:
        cursor.execute(f"CREATE CLUSTERED INDEX idx_player_id ON {table_name} (player_id)")
        conn.commit()

    # Create Non-Clustered Indexes based on column names for each dataset
    if table_name == "AU_players":
        cursor.execute(f"CREATE NONCLUSTERED INDEX idx_primary_broad_position ON {table_name} (primary_broad_position)")
        cursor.execute(f"CREATE NONCLUSTERED INDEX idx_primary_general_position ON {table_name} (primary_general_position)")
        conn.commit()

    if table_name == "AU_players_xgoals":
        cursor.execute(f"CREATE NONCLUSTERED INDEX idx_team_id_xgoals ON {table_name} (team_id)")
        cursor.execute(f"CREATE NONCLUSTERED INDEX idx_goals_xgoals ON {table_name} (goals)")
        cursor.execute(f"CREATE NONCLUSTERED INDEX idx_xgoals_plus_xassists ON {table_name} (xgoals_plus_xassists)")
        conn.commit()

    if table_name == "AU_players_xpass":
        cursor.execute(f"CREATE NONCLUSTERED INDEX idx_team_id_xpass ON {table_name} (team_id)")
        cursor.execute(f"CREATE NONCLUSTERED INDEX idx_attempted_passes_xpass ON {table_name} (attempted_passes)")
        cursor.execute(f"CREATE NONCLUSTERED INDEX idx_pass_completion_percentage_xpass ON {table_name} (pass_completion_percentage)")
        conn.commit()

    if table_name == "AU_players_salaries":
        cursor.execute(f"CREATE NONCLUSTERED INDEX idx_team_season_salary ON {table_name} (team_id, season_name_initial)")
        cursor.execute(f"CREATE NONCLUSTERED INDEX idx_base_salary ON {table_name} (base_salary)")
        cursor.execute(f"CREATE NONCLUSTERED INDEX idx_guaranteed_compensation ON {table_name} (guaranteed_compensation)")
        conn.commit()

    if table_name == "AU_teams":
        cursor.execute(f"CREATE NONCLUSTERED INDEX idx_team_name ON {table_name} (team_name)")
        conn.commit()

    if table_name == "AU_games":
        cursor.execute(f"CREATE NONCLUSTERED INDEX idx_home_team_id ON {table_name} (home_team_id)")
        cursor.execute(f"CREATE NONCLUSTERED INDEX idx_away_team_id ON {table_name} (away_team_id)")
        cursor.execute(f"CREATE NONCLUSTERED INDEX idx_date_time_utc ON {table_name} (date_time_utc)")
        conn.commit()

    # Log completion of the dataset upload
    log_change(table_name, f"Data successfully uploaded and indexed in {table_name}")

    # Close the connection
    conn.close()

# Workflow to process and upload each DataFrame
server_name = "RAMSEY_BOLTON\\SQLEXPRESS"
database_name = "SoccerProjects"

# Process and upload data
for table_name, df in dataframes.items():
    sql_table_name = table_name.replace("mls/", "AU_").replace("/", "_")  # Customize table naming
    print(f"Uploading {sql_table_name}...")
    upload_to_sql_server(df, sql_table_name, server_name, database_name, batch_size=5000)


Uploading AU_players...
Uploading batch 0 to 999...
Uploading AU_players_xgoals...
Uploading batch 0 to 611...
Uploading AU_players_xpass...
Uploading batch 0 to 611...
Uploading AU_players_salaries...
Uploading batch 0 to 583...
Uploading AU_teams...
Uploading batch 0 to 29...
Uploading AU_games...
Uploading batch 0 to 999...
