<a href="https://colab.research.google.com/github/drummond8scott/tokyo_dataset/blob/main/tokyo_dataset_de.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [5]:
# Import required packages for loading, transforming and exporting tokyo data
import openpyxl
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import (col, upper, trim, regexp_replace, when, count,
                                 initcap, concat_ws, split, to_timestamp, coalesce, isnan, when, length, trim,
                                 sum as spark_sum, countDistinct, desc, expr, round)
from pyspark.sql.window import Window
from pyspark.sql.types import *

In [54]:
# Base URL for raw GitHub content
base_url = "https://raw.githubusercontent.com/drummond8scott/tokyo_dataset/main/"

# Load datasets
athletes_df = pd.read_excel(base_url + "Athletes.xlsx")
coaches_df = pd.read_excel(base_url + "Coaches.xlsx")
medals_df = pd.read_excel(base_url + "Medals.xlsx")
teams_df = pd.read_excel(base_url + "Teams.xlsx")

# Print counts for loaded datasets
print("Dataset Sizes:")
print(f"Athletes: {len(athletes_df)} records")
print(f"Coaches: {len(coaches_df)} records")
print(f"Medals: {len(medals_df)} records")
print(f"Teams: {len(teams_df)} records")

# Print first 3 rows of each dataset
print("\nFirst few rows of each dataset:")
for name, df in [("Athletes", athletes_df), ("Coaches", coaches_df),
                 ("Medals", medals_df), ("Teams", teams_df)]:
    print(f"\n{name} dataset:")
    print(df.head(3))

  warn("Workbook contains no default style, apply openpyxl's default")
  warn("Workbook contains no default style, apply openpyxl's default")


Dataset Sizes:
Athletes: 11085 records
Coaches: 394 records
Medals: 93 records
Teams: 743 records

First few rows of each dataset:

Athletes dataset:
                Name     NOC           Discipline
0    AALERUD Katrine  Norway         Cycling Road
1        ABAD Nestor   Spain  Artistic Gymnastics
2  ABAGNALE Giovanni   Italy               Rowing

Coaches dataset:
              Name    NOC  Discipline Event
0  ABDELMAGID Wael  Egypt    Football   NaN
1        ABE Junya  Japan  Volleyball   NaN
2    ABE Katsuhiko  Japan  Basketball   NaN

Medals dataset:
   Rank                    Team/NOC  Gold  Silver  Bronze  Total  \
0     1    United States of America    39      41      33    113   
1     2  People's Republic of China    38      32      18     88   
2     3                       Japan    27      14      17     58   

   Rank by Total  
0              1  
1              2  
2              5  

Teams dataset:
      Name      Discipline                         NOC  Event
0  Belgium  

In [30]:
# Convert pandas dataframes to PySpark dataframes
athletes_df = spark.createDataFrame(athletes_df)
coaches_df = spark.createDataFrame(coaches_df)
medals_df = spark.createDataFrame(medals_df)
teams_df = spark.createDataFrame(teams_df)

In [39]:
from pyspark.sql.functions import *

# Athletes DataFrame Analysis
print("=== ATHLETES DATASET ANALYSIS ===")
total_athletes = athletes_df.count()
print(f"Total athletes: {total_athletes}")

# Check missing values in athletes
for column in athletes_df.columns:
    null_count = athletes_df.filter(col(column).isNull()).count()
    if null_count > 0:
        print(f"Missing values in {column}: {null_count} ({(null_count/total_athletes*100):.2f}%)")

# Check for duplicate athletes
duplicate_athletes = athletes_df.groupBy(athletes_df.columns).count().filter("count > 1")
if duplicate_athletes.count() > 0:
    print(f"\nFound {duplicate_athletes.count()} duplicate athlete entries")


# Check NOC distribution
noc_counts = athletes_df.groupBy("NOC").count().orderBy("count", ascending=False)
print("\nTop 5 NOCs by athlete count:")
noc_counts.show(5)

print("\n=== MEDALS DATASET ANALYSIS ===")
total_medal_records = medals_df.count()
print(f"Total medal records: {total_medal_records}")

# Check missing values in medals
for column in medals_df.columns:
    null_count = medals_df.filter(col(column).isNull()).count()
    if null_count > 0:
        print(f"Missing values in {column}: {null_count} ({(null_count/total_medal_records*100):.2f}%)")

# Check medal counts by type - Modified to handle separate medal columns
medal_distribution = medals_df.select(
    sum(col("Gold")).alias("Gold"),
    sum(col("Silver")).alias("Silver"),
    sum(col("Bronze")).alias("Bronze")
).withColumn("Total", col("Gold") + col("Silver") + col("Bronze"))

print("\nMedal distribution:")
medal_distribution.show()

# Cross-reference check between athletes and medals
athlete_countries = athletes_df.select("NOC").distinct().count()
medal_countries = medals_df.select("Team/NOC").distinct().count()
print(f"\nUnique countries in athletes: {athlete_countries}")
print(f"Unique countries in medals: {medal_countries}")

print("\n=== DATA CONSISTENCY CHECKS ===")
# Check for any NOCs in medals that don't appear in athletes
medals_nocs = set(medals_df.select("Team/NOC").distinct().collect())
athletes_nocs = set(athletes_df.select("NOC").distinct().collect())
mismatched_nocs = medals_nocs - athletes_nocs
if mismatched_nocs:
    print(f"NOCs in medals but not in athletes: {mismatched_nocs}")

# Check for unusual values in key fields
print("\nUnique disciplines:")
athletes_df.select("Discipline").distinct().show(truncate=False)

print("\nSample of unusual names (if any):")
athletes_df.filter(~col("Name").rlike("^[A-Za-z\\s\\-\\']+$")).show(5)

=== ATHLETES DATASET ANALYSIS ===
Total athletes: 11085

Found 1 duplicate athlete entries

Top 5 NOCs by athlete count:
+--------------------+-----+
|                 NOC|count|
+--------------------+-----+
|United States of ...|  615|
|               Japan|  586|
|           Australia|  470|
|People's Republic...|  401|
|             Germany|  400|
+--------------------+-----+
only showing top 5 rows


=== MEDALS DATASET ANALYSIS ===
Total medal records: 93

Medal distribution:
+----+------+------+-----+
|Gold|Silver|Bronze|Total|
+----+------+------+-----+
| 340|   338|   402| 1080|
+----+------+------+-----+


Unique countries in athletes: 206
Unique countries in medals: 93

=== DATA CONSISTENCY CHECKS ===

Unique disciplines:
+---------------------+
|Discipline           |
+---------------------+
|Tennis               |
|Boxing               |
|Marathon Swimming    |
|Golf                 |
|Rowing               |
|Baseball/Softball    |
|Judo                 |
|Sailing           

In [55]:
from pyspark.sql.functions import *

# Get unique NOC and Team combinations from each dataset
print("=== NOC/Team Analysis ===\n")

# Athletes Dataset Analysis
athlete_teams = athletes_df.select("NOC").distinct()
print(f"Number of unique NOC-Team combinations in Athletes: {athlete_teams.count()}")

# Medals Dataset Analysis
medal_teams = medals_df.select("Team/NOC").distinct()
print(f"Number of unique NOC-Team combinations in Medals: {medal_teams.count()}")

# Coaches Dataset Analysis
coach_teams = coaches_df.select("NOC").distinct()
print(f"Number of unique NOC-Team combinations in Coaches: {coach_teams.count()}")

# Show discrepancies in Athletes dataset
print("\n=== Athletes Dataset: Cases where NOC doesn't match expected Team pattern ===")
athletes_df.select("NOC") \
    .distinct() \
    .orderBy("NOC") \
    .show(truncate=False)

# Compare NOCs across datasets
athlete_nocs = set([row.NOC for row in athletes_df.select("NOC").distinct().collect()])
medal_nocs = set([row["Team/NOC"] for row in medals_df.select("Team/NOC").distinct().collect()])
coach_nocs = set([row.NOC for row in coaches_df.select("NOC").distinct().collect()])

print("\n=== NOC Comparison Across Datasets ===")
print(f"Total unique NOCs in Athletes: {len(athlete_nocs)}")
print(f"Total unique NOCs in Medals: {len(medal_nocs)}")
print(f"Total unique NOCs in Coaches: {len(coach_nocs)}")

# Find NOCs that appear in one dataset but not others
print("\n=== NOC Discrepancies ===")
print("NOCs in Medals but not in Athletes:", medal_nocs - athlete_nocs if medal_nocs - athlete_nocs else "None")
print("NOCs in Athletes but not in Medals:", athlete_nocs - medal_nocs if athlete_nocs - medal_nocs else "None")
print("NOCs in Coaches but not in Athletes:", coach_nocs - athlete_nocs if coach_nocs - athlete_nocs else "None")
print("NOCs in Athletes but not in Coaches:", athlete_nocs - coach_nocs if athlete_nocs - coach_nocs else "None")

# Analyze Team naming patterns
print("\n=== Team Name Pattern Analysis ===")
print("Sample of Team names from Athletes dataset:")
athletes_df.select("NOC") \
    .distinct() \
    .orderBy(rand()) \
    .show(10, truncate=False)

print("\nSample of Team names from Medals dataset:")
medals_df.select("Team/NOC") \
    .distinct() \
    .orderBy(rand()) \
    .show(10, truncate=False)

print("\nSample of Team names from Coaches dataset:")
coaches_df.select("NOC") \
    .distinct() \
    .orderBy(rand()) \
    .show(10, truncate=False)

# Check for any unusual characters or patterns in NOC codes
print("\n=== Unusual NOC Patterns ===")
print("Athletes Dataset - NOCs not following standard 3-letter pattern:")
athletes_df.filter(~col("NOC").rlike("^[A-Z]{3}$")) \
    .select("NOC") \
    .distinct() \
    .show()

print("\nMedals Dataset - NOCs not following standard 3-letter pattern:")
medals_df.filter(~col("Team/NOC").rlike("^[A-Z]{3}$")) \
    .select("Team/NOC") \
    .distinct() \
    .show()

print("\nCoaches Dataset - NOCs not following standard 3-letter pattern:")
coaches_df.filter(~col("NOC").rlike("^[A-Z]{3}$")) \
    .select("NOC") \
    .distinct() \
    .show()

=== NOC/Team Analysis ===



AttributeError: 'DataFrame' object has no attribute 'select'

In [56]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (col, upper, trim, regexp_replace, when, count,
                                 initcap, concat_ws, split, to_timestamp, coalesce)
from pyspark.sql.window import Window

def clean_olympics_data(spark, athletes_df, coaches_df, medals_df, teams_df):
    """
    Main function to clean Tokyo Olympics dataset
    Parameters:
        spark: SparkSession object
        athletes_df, coaches_df, medals_df, teams_df: Input dataframes
    Returns:
        Tuple of cleaned dataframes
    """

    ###########################################
    # 1. CLEAN ATHLETES DATA
    ###########################################

    cleaned_athletes = (athletes_df
        # Standardize name format
        .withColumn("Name",
            # Remove any leading/trailing spaces
            trim(col("Name"))
            # Convert to Title Case (First letter capital)
            .pipe(lambda c: initcap(c))
            # Remove any non-ASCII characters while preserving diacritical marks
            .pipe(lambda c: regexp_replace(c, "[^\p{L}\p{M}\s]", ""))
        )

        # Clean Team column
        .withColumn("Team",
            # Convert to uppercase for consistency
            upper(trim(col("Team")))
            # Replace common variations
            .pipe(lambda c: regexp_replace(c, "USA", "UNITED STATES"))
            .pipe(lambda c: regexp_replace(c, "UK", "UNITED KINGDOM"))
        )

        # Clean Discipline column
        .withColumn("Discipline",
            # Standardize discipline names
            trim(initcap(col("Discipline")))
            # Remove any numerical suffixes
            .pipe(lambda c: regexp_replace(c, "\d+", ""))
        )

        # Remove duplicate entries
        .dropDuplicates(["Name", "Team", "Discipline"])

        # Handle missing values
        .na.fill({
            "Discipline": "Unknown",
            "Team": "UNAFFILIATED"
        })
    )

    ###########################################
    # 2. CLEAN COACHES DATA
    ###########################################

    cleaned_coaches = (coaches_df
        # Clean coach names similar to athletes
        .withColumn("Name",
            trim(initcap(col("Name")))
            .pipe(lambda c: regexp_replace(c, "[^\p{L}\p{M}\s]", ""))
        )

        # Standardize Event column
        .withColumn("Event",
            # Remove special characters and standardize format
            regexp_replace(trim(col("Event")), "[^a-zA-Z0-9\\s]", "")
        )

        # Clean Discipline column (same as athletes)
        .withColumn("Discipline",
            trim(initcap(col("Discipline")))
            .pipe(lambda c: regexp_replace(c, "\d+", ""))
        )

        # Remove duplicate coach entries
        .dropDuplicates(["Name", "Team", "Discipline"])
    )

    ###########################################
    # 3. CLEAN MEDALS DATA
    ###########################################

    cleaned_medals = (medals_df
        # Standardize athlete names to match athletes table
        .withColumn("Athlete",
            trim(initcap(col("Athlete")))
            .pipe(lambda c: regexp_replace(c, "[^\p{L}\p{M}\s]", ""))
        )

        # Clean medal values
        .withColumn("Medal",
            # Convert to uppercase and trim
            upper(trim(col("Medal")))
            # Standardize medal names
            .pipe(lambda c: when(c == "GOLD MEDAL", "GOLD")
                           .when(c == "SILVER MEDAL", "SILVER")
                           .when(c == "BRONZE MEDAL", "BRONZE")
                           .otherwise(c))
        )

        # Handle team medals (where multiple athletes get same medal)
        .dropDuplicates(["Athlete", "Team", "Medal", "Event"])
    )

    ###########################################
    # 4. CLEAN TEAMS DATA
    ###########################################

    cleaned_teams = (teams_df
        # Standardize team names
        .withColumn("Team",
            upper(trim(col("Team")))
            .pipe(lambda c: regexp_replace(c, "USA", "UNITED STATES"))
            .pipe(lambda c: regexp_replace(c, "UK", "UNITED KINGDOM"))
        )

        # Clean country names
        .withColumn("Country",
            # Convert to title case for readability
            initcap(trim(col("Country")))
            # Handle special cases
            .pipe(lambda c: when(col("Team") == "ROC", "Russian Olympic Committee")
                           .otherwise(c))
        )

        # Remove duplicates
        .dropDuplicates(["Team", "Country"])
    )

    ###########################################
    # 5. DATA VALIDATION
    ###########################################

    def validate_cleaned_data(df, table_name):
        """
        Validate the cleaned data and print statistics
        """
        print(f"\nValidation for {table_name}:")

        # Check for nulls
        null_counts = df.select([count(when(col(c).isNull(), c)).alias(c)
                               for c in df.columns])
        print("Null counts:", null_counts.collect())

        # Check for duplicates
        duplicate_count = df.count() - df.dropDuplicates().count()
        print(f"Duplicate rows: {duplicate_count}")

        return df

    # Validate all cleaned dataframes
    cleaned_athletes = validate_cleaned_data(cleaned_athletes, "Athletes")
    cleaned_coaches = validate_cleaned_data(cleaned_coaches, "Coaches")
    cleaned_medals = validate_cleaned_data(cleaned_medals, "Medals")
    cleaned_teams = validate_cleaned_data(cleaned_teams, "Teams")

    ###########################################
    # 6. CREATE QUALITY METRICS
    ###########################################

    def calculate_quality_metrics(df, table_name):
        """
        Calculate and return data quality metrics
        """
        total_rows = df.count()
        null_rows = df.where(reduce(lambda x, y: x | y,
                                  [col(c).isNull() for c in df.columns])).count()

        metrics = {
            "table_name": table_name,
            "total_rows": total_rows,
            "null_percentage": (null_rows / total_rows) * 100,
            "duplicate_percentage": (duplicate_count / total_rows) * 100
        }

        return metrics

    # Store quality metrics for each table
    quality_metrics = {
        "athletes": calculate_quality_metrics(cleaned_athletes, "Athletes"),
        "coaches": calculate_quality_metrics(cleaned_coaches, "Coaches"),
        "medals": calculate_quality_metrics(cleaned_medals, "Medals"),
        "teams": calculate_quality_metrics(cleaned_teams, "Teams")
    }

    return (cleaned_athletes, cleaned_coaches, cleaned_medals, cleaned_teams,
            quality_metrics)

###########################################
# USAGE EXAMPLE
###########################################

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("Tokyo Olympics Data Cleaning") \
    .getOrCreate()

# Read your data
# athletes_df = spark.read...
# coaches_df = spark.read...
# medals_df = spark.read...
# teams_df = spark.read...

# Clean the data
cleaned_data = clean_olympics_data(spark, athletes_df, coaches_df,
                                 medals_df, teams_df)

# Access cleaned dataframes
cleaned_athletes, cleaned_coaches, cleaned_medals, cleaned_teams, metrics = cleaned_data

# Optional: Save cleaned data
# cleaned_athletes.write.parquet("cleaned_athletes.parquet")
# cleaned_coaches.write.parquet("cleaned_coaches.parquet")
# cleaned_medals.write.parquet("cleaned_medals.parquet")
# cleaned_teams.write.parquet("cleaned_teams.parquet")

AttributeError: 'DataFrame' object has no attribute 'withColumn'