In [0]:
import sys
sys.path.append('/Workspace/Users/pmanoj@depaul.edu')
from config import config

from pyspark.sql.functions import *
from pyspark.sql.window import Window
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

In [0]:
%run "./03_medal_efficiency_transformation"

In [0]:
%run "./04_sport_dominance_transformation"

In [0]:
%run "./05_gender_analysis_transformation"

In [0]:
def create_master_analytics(athletes_df, medals_df, gender_df, config):
    """
    Create comprehensive analytics dataset combining all transformations.
    
    Args:
        athletes_df: Athletes DataFrame
        medals_df: Medals DataFrame  
        gender_df: Gender DataFrame
        config: Configuration object
        
    Returns:
        Dictionary with master datasets for Power BI
    """
    try:
        logger.info("Starting create_master_analytics")

        medal_efficiency = calculate_medal_efficiency(athletes_df, medals_df, config)
        sport_dominance = calculate_sport_dominance(athletes_df, config)
        gender_analysis = analyze_gender_participation(gender_df, config)
        
        logger.info("Creating country-level master dataset...")

        country_master = medal_efficiency.select(
            "Country",
            "TotalAthletes",
            "Gold", "Silver", "Bronze", "Total",
            "MedalsPerAthlete",
            "MedalEfficiencyScore"
        )
        print("=== SPORT DOMINANCE COLUMNS ===")
        sport_dominance.columns
        sport_dominance.show(5)

        sport_diversity = sport_dominance.groupBy("Country").agg(
            countDistinct("Discipline").alias("Sports"),
            max(col("SportDominanceEfficiencyPercent")).alias("MaxSportConcentration"),
            avg(col("SportDominanceEfficiencyPercent")).alias("AvgSportConcentration"),
            
        )
        

        country_master = country_master.join(sport_diversity, "Country", "inner").select(
            "Country",
            "TotalAthletes", 
            "Gold", "Silver", "Bronze", "Total",
            "MedalsPerAthlete",
            "MedalEfficiencyScore",
            "Sports",
            sport_diversity["MaxSportConcentration"].alias("MaxSportConcentration"), 
            sport_diversity["AvgSportConcentration"].alias("AvgSportConcentration")   
        )

        country_master = country_master.withColumn(
            "OverallPerformanceScore",
            round(
                (col("Total") * 0.4) +                    
                (col("MedalsPerAthlete") * 100 * 0.3) +   
                (col("Sports") * 0.2) +              
                ((100 - col("MaxSportConcentration")) * 0.1), 
                2
            )
        ).withColumn(
            "PerformanceCategory",
            when(col("OverallPerformanceScore") >= 50, "Elite")
            .when(col("OverallPerformanceScore") >= 25, "Strong")
            .when(col("OverallPerformanceScore") >= 10, "Developing")
            .otherwise("Emerging")
        )

        logger.info("Creating sport-level master dataset...")
        
        sport_master = gender_analysis.select(
            "Discipline",
            "Female", "Male", "Total",
            "FemalePercentage", "MalePercentage", "GenderGap",
            "genderCategory", "SportSizeRank", "SportSizeCategory", "OpportunityType"
        )
        
        dominance_summary = sport_dominance.groupBy("Discipline").agg(
            count("Country").alias("CountriesParticipating"),
            avg("SportDominanceEfficiencyPercent").alias("AvgConcentrationPct"),
            max("SportDominanceEfficiencyPercent").alias("MaxConcentrationPct"),
            countDistinct(when(col("IsSpecialized") == True, col("Country"))).alias("SpecializedCountries")
        )
        
        sport_master = sport_master.join(dominance_summary, "Discipline", "left")
        
        sport_master = sport_master.withColumn(
            "StrategicRecommendation",
            when((col("genderCategory") == "Female Dominant") & (col("SportSizeCategory") == "Large Sport"), 
                 "High Female Investment Priority")
            .when((col("genderCategory") == "Male Dominant") & (col("SportSizeCategory") == "Large Sport"), 
                 "High Male Investment Priority")
            .when(col("MaxConcentrationPct") >= 50, "High Competition - Avoid or Excel")
            .when(col("genderCategory") == "Balanced", "Equal Opportunity - Requires Excellence")
            .otherwise("Standard Strategic Approach")
        )
        
        logger.info("Creating executive summary...")
        
        executive_summary = country_master.agg(
            count("Country").alias("TotalCountries"),
            sum("Total").alias("TotalMedalsAwarded"),
            avg("MedalsPerAthlete").alias("GlobalAvgEfficiency"),
            countDistinct(when(col("MedalEfficiencyScore") == "High", col("Country"))).alias("HighEfficiencyCountries"),
            countDistinct(when(col("PerformanceCategory") == "Elite", col("Country"))).alias("EliteCountries")
        ).withColumn("AnalysisDate", current_date())
        
        if config.CACHE_DATAFRAMES:
            country_master.cache()
            sport_master.cache()
            executive_summary.cache()
            logger.info("All master datasets cached")

        master_datasets = {
            "country_analytics": country_master,
            "sport_analytics": sport_master,
            "executive_summary": executive_summary,
            "medal_efficiency": medal_efficiency,
            "sport_dominance": sport_dominance,
            "gender_analysis": gender_analysis
        }

        logger.info("Master analytics creation completed successfully")
        logger.info(f"Created {len(master_datasets)} comprehensive datasets")

        return master_datasets
    
    except Exception as e:
        logger.error(f"Master analytics creation failed: {str(e)}")
        raise

In [0]:
try:
    logger.info("EXECUTING MASTER ANALYTICS PIPELINE")
    
    master_data = create_master_analytics(athletes_df, medals_df, gender_df, config)
    
    print(" MASTER ANALYTICS RESULTS")
    print(f"Generated {len(master_data)} comprehensive datasets")
    
    print("\n COUNTRY ANALYTICS SAMPLE")
    master_data["country_analytics"].orderBy(col("OverallPerformanceScore").desc()).show(10)
    
    print("\nSPORT ANALYTICS SAMPLE")
    master_data["sport_analytics"].orderBy(col("SportSizeRank")).show(10)
    
    print("\n EXECUTIVE SUMMARY")
    master_data["executive_summary"].show()
    
    print("\n PERFORMANCE CATEGORY BREAKDOWN")
    master_data["country_analytics"].groupBy("PerformanceCategory").count().show()
    
    logger.info("Master analytics pipeline completed successfully")
    
except Exception as e:
    logger.error(f"Master pipeline failed: {e}")
    raise