In [0]:
spark.conf.set("fs.azure.account.auth.type.gpelt.dfs.core.windows.net", "SAS")
spark.conf.set("fs.azure.sas.token.provider.type.gpelt.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider")


In [0]:
import pandas as pd

def prepare_games_dataset_cloud(json_path):
    print(f"Reading full file from: {json_path}")
    
    # 1. Read from Azure using Spark
    raw_spark_df = spark.read.option("multiline", "true").json(json_path)
    
    # 2. Convert the entire dataset into a list of dictionaries
    games_list = [row.asDict(recursive=True) for row in raw_spark_df.collect()]
    
    # 3. USE JSON NORMALIZE (This replaces the loop)
    # This automatically turns nested dicts into 'rating_alltime.RatingCount', etc.
    df = pd.json_normalize(games_list)
    
    # 4. Cleanup column names to match your Master CSV/ML logic
    # We rename the 'dot' names to 'underscore' names
    column_mapping = {
        'rating_7_days.RatingCount': 'rating_7_days_count',
        'rating_7_days.AverageRating': 'rating_7_days_avg',
        'rating_30_days.RatingCount': 'rating_30_days_count',
        'rating_30_days.AverageRating': 'rating_30_days_avg',
        'rating_alltime.RatingCount': 'rating_alltime_count',
        'rating_alltime.AverageRating': 'rating_alltime_avg',
        'esrb': 'ESRB',
        'category': 'Genre'
    }
    df = df.rename(columns=column_mapping)

    # 5. Extract Price (since 'prices' is a list, normalize won't flatten it automatically)
    # We create a simple helper to get the first list_price > 0
    def get_price(price_list):
        if not isinstance(price_list, list): return 0
        return next((p.get('list_price', 0) for p in price_list if p.get('list_price', 0) > 0), 0)
    
    df['current_price'] = df['prices'].apply(get_price)
    
    # 6. Filter for active games
    df_active = df[(df['rating_alltime_count'] > 0) | 
                  (df['rating_7_days_count'] > 0) | 
                  (df['rating_30_days_count'] > 0)].copy()
    
    return df_active

In [0]:
publisher_map = {
    # 2K / Take-Two
    '2k': '2k',
    '2k games': '2k',
    '2k publishing': '2k',
    'visual concepts': '2k',
    'private division': 'take-two',
    'rockstar games': 'take-two',
    'gearbox software': 'take-two',
    'hangar 13': 'take-two',
    
    # Activision Blizzard
    'activision': 'activision',
    'activision publishing inc.': 'activision',
    'blizzard entertainment': 'blizzard',
    
    # Bandai Namco / D3
    'bandai namco entertainment america': 'bandai namco',
    'bandai namco entertainment': 'bandai namco',
    'bandai namco entertainment america inc.': 'bandai namco',
    'bandai namco entertainment inc.': 'bandai namco',
    'bandai namco studios inc.': 'bandai namco',
    'bandai namco studios': 'bandai namco',
    'namco bandai games america': 'bandai namco',
    'namco bandai games america inc.': 'bandai namco',
    'd3 publisher': 'd3 publisher',
    'd3publisher of america, inc.': 'd3 publisher',
    'd3publisher of america, inc': 'd3 publisher',
    'd3publisher, inc.': 'd3 publisher',
    'd3 publisher of america, inc.': 'd3 publisher',
    'd3 publisher of america inc.': 'd3 publisher',
    
    # Capcom
    'capcom': 'capcom',
    'capcom u.s.a., inc': 'capcom',
    'capcom entertainment, inc.': 'capcom',
    '株式会社 カプコン': 'capcom',
    'capcom co., ltd.': 'capcom',
    'capcom co,. ltd.': 'capcom',
    
    # Daedalic / Nacon
    'daedalic entertainment': 'daedalic',
    'daedalic entertainment gmbh': 'daedalic',
    'big ant studios': 'nacon',
    'big ant studios pty': 'nacon',
    'passtech games': 'nacon',
    
    # Deep Silver / Embracer
    'deep silver': 'deep silver',
    'deep silver dambuster studios': 'deep silver',
    'deep silver, inxile entertainment': 'deep silver',
    'deep silver, koch media': 'deep silver',
    'koch media': 'deep silver',
    'prime matter': 'deep silver',
    'thq nordic': 'thq nordic',
    'thq nordic gmbh': 'thq nordic',
    'nordic games': 'thq nordic',
    'nordic games gmbh': 'thq nordic',
    'coffee stain publishing': 'coffee stain',
    'coffee stain publishing ab': 'coffee stain',
    'milestone s.r.l.': 'milestone',
    'ravenscourt': 'ravenscourt',
    'ravenscourt games': 'ravenscourt',
    
    # Disney
    'disney': 'disney',
    'disney interactive studios': 'disney',
    'disney interactive': 'disney',
    'disney bolt': 'disney',
    'lucasfilm': 'disney',
    'lucasarts': 'disney',
    
    # Electronic Arts
    'electronic arts': 'electronic arts',
    'electronic arts ': 'electronic arts',
    'electronic arts inc.': 'electronic arts',
    'ea': 'electronic arts',
    'ea sports': 'electronic arts',
    'ea sports™': 'electronic arts',
    'ea vancouver': 'electronic arts',
    'ea digital illusions ce ab': 'electronic arts',
    'codemasters': 'electronic arts',
    'codemasters software company limited': 'electronic arts',
    'popcap': 'electronic arts',
    'popcap games': 'electronic arts',
    'popcap games, inc.': 'electronic arts',
    
    # Focus Entertainment
    'focus entertainment': 'focus entertainment',
    'focus home interactive': 'focus entertainment',
    'dotemu': 'focus entertainment',
    'the arcade crew': 'focus entertainment',
    
    # Microsoft / Xbox Game Studios
    'xbox game studios': 'xbox game studios',
    'xbox game studios ': 'xbox game studios',
    'xbox games studios, rare ltd': 'xbox game studios',
    'microsoft': 'xbox game studios',
    'microsoft studios': 'xbox game studios',
    'microsoft game studios': 'xbox game studios',
    'microsoft corporation': 'xbox game studios',
    'xbox live arcade': 'xbox game studios',
    '343 industries': 'xbox game studios',
    'the coalition': 'xbox game studios',
    'turn 10 studios': 'xbox game studios',
    'rare': 'xbox game studios',
    'mojang studios': 'xbox game studios',
    'double fine': 'xbox game studios',
    'double fine productions': 'xbox game studios',
    'double fine productions, inc.': 'xbox game studios',
    
    # SEGA
    'sega': 'sega',
    'sega europe ltd': 'sega',
    'sega of america': 'sega',
    'sega of america, inc.': 'sega',
    'ryu ga gotoku studio': 'sega',
    
    # Square Enix
    'square enix': 'square enix',
    'square enix ltd': 'square enix',
    'square enix co., ltd.': 'square enix',
    'square enix ltdio-interactive a/s': 'square enix',
    'square enix ltdi-interactive a/s': 'square enix',
    
    # Team17
    'team17': 'team17',
    'team17 ': 'team17',
    'team 17': 'team17',
    'team17 digital ltd': 'team17',
    'team17 digital ltd.': 'team17',
    
    # tinyBuild
    'tinybuild': 'tinybuild',
    'tinybuild games': 'tinybuild',
    
    # Others
    'another indie': 'another indie',
    'another indie studio': 'another indie',
    'aspyr': 'aspyr',
    'aspyr media': 'aspyr',
    'astragon entertainment': 'astragon entertainment',
    'astragon entertainment gmbh': 'astragon entertainment',
    'fatshark': 'fatshark',
    'fatshark ab': 'fatshark',
    'fatshark studios ab': 'fatshark',
    'flashbulb': 'flashbulb',
    'flashbulb games': 'flashbulb',
    'headup': 'headup',
    'headup games': 'headup',
    'headup gmbh': 'headup',
    'hypetrain digital': 'hypetrain digital',
    'image & form': 'image & form',
    'image & form games': 'image & form',
    'image & form international ab': 'image & form',
    'jackbox games': 'jackbox games',
    'jackbox games, inc.': 'jackbox games',
    'kalypso media': 'kalypso media',
    'kalypso media group gmbh': 'kalypso media',
    'kalypso media group': 'kalypso media',
    'konami': 'konami',
    'konami digital entertainment': 'konami',
    'konami digital entertainment, inc.': 'konami',
    'playway': 'playway',
    'playway sa': 'playway',
    'playway s.a.': 'playway',
    'pm studios, inc.': 'pm studios',
    'pm-studios, inc.': 'pm studios',
    'spike': 'spike chunsoft',
    'spike chunsoft': 'spike chunsoft',
    'spike chunsoft co., ltd.': 'spike chunsoft',
    'telltale': 'telltale',
    'telltale games': 'telltale',
    'valve': 'valve',
    'versus evil': 'versus evil',
    'versus evil, llc.': 'versus evil',
    'arc system works': 'arc systems',
    'ubisoft entertainment': 'ubisoft',
    'wired productions ltd': 'wired productions',
    'snk playmore corporation':'snk playmore',
    'unknown worlds entertainment': 'unknown worlds',
    'thq nordic': 'thq',
    'tecmo koei america': 'tecmo'
    
}


In [0]:
def calculate_game_metrics(df):
    """Add calculated metric columns directly to DataFrame."""
    # Get rating counts
    r7 = pd.to_numeric(df["rating_7_days_count"], errors='coerce').fillna(0)
    r30 = pd.to_numeric(df["rating_30_days_count"], errors='coerce').fillna(0)
    r_all = pd.to_numeric(df["rating_alltime_count"], errors='coerce').fillna(0)
    
    # Get ratings
    rating_7d = pd.to_numeric(df["rating_7_days_avg"], errors='coerce').fillna(0)
    rating_30d = pd.to_numeric(df["rating_30_days_avg"], errors='coerce').fillna(0)
    rating_all = pd.to_numeric(df["rating_alltime_avg"], errors='coerce').fillna(0)

    
    # Parse dates
    release_date = pd.to_datetime(df["Release"], errors='coerce')
    gamepass_date = pd.to_datetime(df["Added"], errors='coerce')

    # Normalize timezones: make sure datetimes are tz-naive so subtraction works
    # If series are timezone-aware, convert to naive by removing tz info.
    try:
        if getattr(release_date.dt, 'tz', None) is not None:
            release_date = release_date.dt.tz_convert(None)
    except Exception:
        # fallback: attempt elementwise removal
        release_date = release_date.apply(lambda x: x.tz_convert(None) if getattr(x, 'tzinfo', None) is not None else x)

    try:
        if getattr(gamepass_date.dt, 'tz', None) is not None:
            gamepass_date = gamepass_date.dt.tz_convert(None)
    except Exception:
        gamepass_date = gamepass_date.apply(lambda x: x.tz_convert(None) if getattr(x, 'tzinfo', None) is not None else x)
    
    # Calculate time deltas using a tz-naive 'now'
    now = pd.Timestamp.now()
    df['days_since_release'] = (now - release_date).dt.days
    df['days_since_gp_add'] = (now - gamepass_date).dt.days
    
    # Calculate metrics
    df['momentum'] = ((r7 / r30 * 100).fillna(0)).round(2)
    df['discovery_capture'] = ((r7 / r_all * 100).fillna(0)).round(2)
    df['quality_retention'] = (rating_30d - rating_all).round(3)
    df['rating_trend_7d_vs_alltime'] = (rating_7d - rating_all).round(3)
    df['is_day_one_gp'] = (df['days_since_gp_add'] <= 1) & (gamepass_date.notna())
    
    return df

In [0]:
# 1. Load your Master CSV (the one with the schema you provided)
# If stored in DBFS or Azure, adjust the path
spark_master_df = spark.table("hive_metastore.default.xbox_final_cleaned_results")

# 2. Convert it to Pandas so the rest of your script works
master_df = spark_master_df.toPandas()

# 2. Get the latest JSON telemetry from the Azure scrape
folder_path = "abfss://xbox-data@gpelt.dfs.core.windows.net/scrapes/"
files = dbutils.fs.ls(folder_path)
latest_json_path = max(files, key=lambda f: f.modificationTime).path

# 3. Pull telemetry using your existing function
# This gets the rating_counts and current_price
df_telemetry = prepare_games_dataset_cloud(latest_json_path)

# 4. THE MASTER MERGE (On ProductID)
# We left-join telemetry onto the Master CSV
final_df = pd.merge(
    master_df,
    df_telemetry,
    left_on='ProductID',
    right_on='product_id',
    how='inner' # Use 'inner' to only keep games that were actually scraped successfully
)

# 5. RUN FINAL ENRICHMENT
final_df = calculate_game_metrics(final_df)
final_df['publisher'] = final_df['publisher'].str.strip().str.lower().replace(publisher_map)

# 6. DROP DUPLICATE/CLEANUP COLUMNS
# Since we have both 'product_id' and 'ProductID', drop one
final_df = final_df.drop_duplicates(subset=['ProductID'])

display(final_df)

In [0]:
import numpy as np 
final_df['is_day_one_gp'] = np.where(final_df['days_since_gp_add'] == final_df['days_since_release'], True, False)

In [0]:
def backfill_columns(DF):
    # 1. Fix the mixed-type columns FIRST
    # Convert lists to strings, but keep existing strings as they are
    mixed_cols = ['ESRB Content Descriptors', 'esrb_descriptors']
    
    for col in mixed_cols:
        if col in DF.columns:
            DF[col] = DF[col].apply(lambda x: ", ".join(x) if isinstance(x, list) else x)

    # 2. Proceed with your backfill logic
    DF['Genre'] = DF['Genre_x'].fillna(DF['Genre_y'])
    DF['ESRB_x'] = DF['ESRB_x'].fillna(DF['ESRB_y'])
    DF['ESRB Content Descriptors'] = DF['ESRB Content Descriptors'].fillna(DF['esrb_descriptors'])
    
    
    
    # Note: Ensure 'Series X|S' is correctly mapped to 'is_xpa' if that was your intent
    DF['Series X|S'] = DF['Series X|S'].fillna("No relation")
    DF['xCloud'] = DF ['xCloud'].fillna("None")
    return DF

# Apply the fix to your Pandas DataFrame
DF = backfill_columns(final_df)

# Now the conversion to Spark will work without the merge error


In [0]:
def drop_columns(df): 
    columns_to_drop = ['Months', 'Completion', 'Age', 'Status.1', 'Added', 'Owner Notes', 'Genre_y', 'title', 'bundle_count', 'short_description', 'esrb_descriptors', 'product_id', 'Added.1', 'Delay', 'ESRB_y', 'prices', 'Community Notes', 'is_xpa']
    df = df.drop(columns=columns_to_drop)
    return df

In [0]:
final_df = drop_columns(final_df)
def rename_columns(df):
    df = df.rename(columns={'Game': 'title', 'MS_Store_Link': 'url'})

In [0]:
def add_comprehensive_baselines(df):
    # Metrics we want to analyze
    metrics = ['momentum', 'discovery_capture', 'quality_retention']
    
    for metric in metrics:
        # 1. Calculate the 'Genre' Median (Broad Baseline)
        df[f'genre_median_{metric}'] = df.groupby('Genre')[metric].transform('median')
        df[f'lift_vs_genre_{metric}'] = df[metric] - df[f'genre_median_{metric}']

        
        # 2. Calculate the 'Genre + ESRB' Median (Conditioned Baseline)
        # This gives you the context: "How do M-rated Action games usually perform?"
        df[f'conditioned_median_{metric}'] = df.groupby(['Genre', 'ESRB_x'])[metric].transform('median')
        
        # 3. Calculate the 'Relative Lift' (for Streamlit charts)
        # Positive = Overperforming, Negative = Underperforming
        df[f'lift_vs_rating_{metric}'] = df[metric] - df[f'conditioned_median_{metric}']

        
        # 4. Standardized ML Feature (Robust Z-Score)
        # For ML, we use Median Absolute Deviation (MAD) if data is skewed, 
        # but a standard STD works for basic normalization
        std_dev = df.groupby(['Genre', 'ESRB_x'])[metric].transform('std')
        df[f'zscore_{metric}'] = (df[metric] - df[f'conditioned_median_{metric}']) / std_dev
        
    return df

# Apply to your main DataFrame
DF = add_comprehensive_baselines(final_df)


In [0]:
final_df = final_df.drop_duplicates(subset=['ProductID'])
final_df = final_df.rename(columns={'Series X_S': 'Series X|S', 'MS_Store_Link': 'url'})

In [0]:
display(final_df)

In [0]:
# 1. Convert Pandas DF to Spark DF
# This is where the 'CANNOT_MERGE_TYPE' error usually happens
final_df.columns = [col.replace('|', '_').replace(' ', '_').replace(';', '').replace('{', '').replace('}', '').replace('(', '').replace(')', '').replace('\n', '').replace('\t', '').replace('=', '') for col in final_df.columns]; spark_df = spark.createDataFrame(final_df) 

# 2. Save as a Delta Table
# This makes it queryable via SQL: SELECT * FROM xbox_analysis_data
spark_df.write.format("delta").mode("overwrite").saveAsTable("xbox_analysis_data")

In [0]:
%sql
DESCRIBE DETAIL hive_metastore.default.xbox_analysis_data

In [0]:
def Genre_performance_analysis(df):
    """Analyze performance metrics by Genre."""
    Genre_stats = df.groupby('Genre').agg({
        'momentum': ['median', 'mean', 'std'],
        'discovery_capture': ['median', 'mean'],
        'quality_retention': ['median', 'mean'],
        'rating_7_days_count': ['mean', 'std', 'median'],
        'rating_30_days_count': ['mean', 'std', 'median'],
        'rating_alltime_count': ['mean', 'std', 'median'],
        'rating_alltime_avg': ['mean', 'std', 'median'],
        'rating_30_days_avg': ['mean', 'std', 'median'],
        'rating_7_days_avg': ['mean', 'std', 'median'],
        'rating_trend_7d_vs_alltime': ['mean', 'std', 'median'],
        'Game': 'count'  # Number of games per Genre
    }).round(2)
    
    
    Genre_stats.columns = ['_'.join(col).strip() for col in Genre_stats.columns.values]
    Genre_stats = Genre_stats.reset_index()
    
    return Genre_stats

def Genre_gamepass_comparison(df):
    """Compare Game Pass vs Non-Game Pass games by Genre."""
    comparison = df.groupby(['Genre', 'has_gamepass_remediation']).agg({ #using the agg fucntion to peform a series of operations on the grouped data to get summary statistics for each Genre and Game Pass status
        'momentum': ['mean', 'std', 'median'],
        'discovery_capture': ['mean', 'std', 'median'],
        'quality_retention': ['mean', 'std', 'median'],
        'rating_7_days_avg': ['mean', 'std', 'median'],
        'rating_30_days_avg': ['mean', 'std', 'median'],
        'rating_alltime_avg': ['mean', 'std', 'median'],
        'rating_7_days_count': ['mean', 'std', 'median'],
        'rating_30_days_count': ['mean', 'std', 'median'],
        'rating_alltime_count': ['mean', 'std', 'median'],
        'has_gamepass_remediation': 'sum',  # Number of games on GP
        'Game': 'count'  # Total games
    }).round(2)
    # 2. Use the same spelling consistently
    comparison = comparison.rename(columns={'title': 'game_count'}) 
    
    # Fixed the typo from 'comparsion' to 'comparison' below
    comparison.columns = ['_'.join(col).strip() for col in comparison.columns.values]
    comparison = comparison.reset_index()
    
    return comparison
    

In [0]:
Genre_perf = Genre_performance_analysis(final_df)
display(Genre_perf)
spark.createDataFrame(Genre_perf).write.format("delta").mode("overwrite").saveAsTable("gold_xbox_genre_performance")

# 2. Generate the Game Pass Comparison
Genre_gp = Genre_gamepass_comparison(final_df)
display(Genre_gp)
spark.createDataFrame(Genre_gp).write.format("delta").mode("overwrite").saveAsTable("gold_xbox_gamepass_impact")


In [0]:

# rename base column and merge
genre_all_baseline = Genre_perf[['Genre','momentum_mean', 'discovery_capture_median', 'quality_retention_median']].rename(columns={'momentum_mean':'momentum_genre_baseline', 'discovery_capture_median': 'discovery_capture_baseline', 'quality_retention_median' : 'quality_retention_baseline'})
merged = Genre_gp.merge(genre_all_baseline, on='Genre', how='left')

# differences (row-level: GP True/False rows will get baseline)
merged['momentum_diff_vs_baseline'] = merged['momentum_mean'] - merged['momentum_genre_baseline']
merged['momentum_pct_vs_baseline'] = merged['momentum_diff_vs_baseline'] / merged['momentum_genre_baseline'].replace(0,np.nan) * 100
merged['discovery_capture_diff_vs_baseline'] = merged['discovery_capture_median'] - merged['discovery_capture_baseline']
merged['quality_retention_vs_baseline'] = merged['quality_retention_median'] - merged['quality_retention_baseline']
merged['quality_pct_vs_baseline'] = merged['quality_retention_vs_baseline'] / merged['quality_retention_baseline'].replace(0,np.nan) * 100



merged.head(10)
