In [0]:
# Install the pybaseball package on the server that is running this notebook.
%pip install pybaseball

In [0]:
from pybaseball import pitching_stats
import pandas as pd

pitching_pdf = pitching_stats(2022,2024)

display(pitching_pdf)

In [0]:
import re

# Apply column name conversions to comply with catalog limitations
def clean_column_name(col_name):
    col_name = col_name.replace('%', '_pct')
    col_name = col_name.replace('(', '_').replace(')', '_')
    col_name = col_name.replace(' ', '')
    col_name = re.sub('_+', '_', col_name)  # Replace multiple underscores with single
    col_name = col_name.strip('_')  # Remove leading/trailing underscores
    return col_name

new_columns = [clean_column_name(col) for col in pitching_pdf.columns]
pitching_pdf.columns = new_columns

display(pitching_pdf)

In [0]:
# Confirm that this is a Pandas dataframe by displaying the type.  Convert to a PySpark dataframe before saving.
print(type(pitching_pdf))

pitching_df = spark.createDataFrame(pitching_pdf)
display(pitching_df)

In [0]:
# Delete columns that are all null.  Slow - it scanes the entire dataframe for each column individually. Using a for loop is usually a bad idea in PySpark.
null_columns = [c for c in pitching_df.columns if pitching_df.filter(pitching_df[c].isNotNull()).count() == 0]
display(pd.DataFrame({"all_null_columns": null_columns}))

pitching_sub_df = pitching_df.drop(*null_columns)
display(pitching_sub_df)


In [0]:
# Delete columns that are all null... fast - aggregate functions speed things up.
from pyspark.sql.functions import col, sum

# Compute null counts for all columns in one pass
null_counts = pitching_df.select([
    sum(col(c).isNull().cast("int")).alias(c) for c in pitching_df.columns
        ]).collect()[0].asDict()

# Get columns where all values are null
row_count = pitching_df.count()
null_columns = [c for c, n in null_counts.items() if n == row_count]
print(null_columns)

if len(null_columns) != 0:
    display(pd.DataFrame({"all_null_columns": null_columns}))
    pitching_df = pitching_df.drop(*null_columns)

display(pitching_sub_df)

In [0]:
# Create the destination schema if needed.  The 'bronze' schema aligns to the raw ingestion tier of the medallion architecture.
spark.sql("CREATE SCHEMA IF NOT EXISTS bronze_examples")

# Catalogs have limitations on column names.  Do the following conversions:
#   - Convert % to _pct
#   - Convert ( and ) to _
#   - Remove spaces


# Store pitching_df in a table in the bronze schema
pitching_df.write.mode("overwrite").saveAsTable("bronze_examples.pitching")