# Steam Profile Opera - Creating Stories from Steam Usage

First step is to retrieve user Profile, games owned, played time, etc...

In [None]:
# Import Block
import datetime as dt
import time
import json
from typing import List, Dict, Optional
import os
import math

import requests
import pandas as pd
import numpy as np
from tinydb import TinyDB, Query
from tinydb.storages import JSONStorage
from tinydb_serialization import SerializationMiddleware
from tinydb_serialization.serializers import DateTimeSerializer
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from dotenv import load_dotenv

# sets time values
current_time = dt.datetime.now()
two_weeks_ago = current_time - dt.timedelta(weeks=2)
year_month_str = f"{current_time.year}-{current_time.month}"
print(f"Year-Month = {year_month_str}")

# Starts DB
serialization = SerializationMiddleware(JSONStorage)
serialization.register_serializer(DateTimeSerializer(), 'TinyDate')
db = TinyDB('db.json', storage=serialization)
users_tb = db.table('users', cache_size=200)
games_tb = db.table('games', cache_size=400)
gameplay_tb = db.table('gameplay', cache_size=1000)
friend_list_tb = db.table('friends', cache_size=200)


In [None]:
# Retrieving Constants.
load_dotenv()
STEAM_KEY= os.environ.get("STEAM_KEY")
PLAYER_ID= os.environ.get("PLAYER_ID")
RUN_FRIENDS_STATS = os.environ.get("RUN_FRIENDS_STATS") == "True"
CLEAN_USER_DB= os.environ.get("CLEAN_USER_DB") == "True"
CLEAN_GAMEPLAY_DB= os.environ.get("CLEAN_GAMEPLAY_DB") == "True"

In [None]:
def get_player_data(player_ids:str, year_month:str=None):
    """
    Retrives user data from DB and if not available will use the Steam API.
    """
    id_list = player_ids.split(",")
    missing_steam_ids = []
    result = []

    """
    for steam_id in id_list:
        if year_month:
            user_query = Query()
            user = users_tb.search((user_query.steamid == steam_id)&(user_query.year_month == year_month))
            if len(user) > 0:
                # print(f"User Found {user[0]}")
                result.append(user[0])
        if not year_month or not user:
            missing_steam_ids.append(steam_id)
    """

    if year_month:
        user_query = Query()
        user_list = users_tb.search((user_query.steamid.one_of(id_list))&(user_query.year_month == year_month))
        retrieved_id_list = [user["steamid"] for user in user_list]
        missing_steam_ids += [player_id for player_id in id_list if player_id not in retrieved_id_list ]
        result += user_list
    
    print(f"Retrieved {len(result)} users from DB.")
    print(f"{len(missing_steam_ids)} users will be fetched from the API.")
    if missing_steam_ids:
        print(f"Fetching player data for {len(missing_steam_ids)} players.")
        missing_steam_id_str = ",".join(missing_steam_ids)
        player_url = f"http://api.steampowered.com/ISteamUser/GetPlayerSummaries/v0002/?key={STEAM_KEY}&steamids={player_ids}"
        r = requests.get(player_url)
        player_list = r.json()["response"]["players"]
        
        for player in player_list:
            player["year_month"] = year_month_str
        print(player_list)
        users_tb.insert_multiple(player_list)
        result += player_list
    return result


    

In [None]:
# Retrieves current user data
player_data = get_player_data(PLAYER_ID, year_month_str)[0]
player_data

In [None]:
def get_friend_list(player_id, year_month:str=None)->List[str]:
    """
    Returns the list of steam ids for a given player id.
    Caches the friend list for 2 weeks.
    """
    existing_friend_list = []
    if year_month:
        friend_query = Query()
        existing_friend_list_query = friend_list_tb.search((friend_query.steam_id == player_id) & (friend_query.year_month == year_month))
        existing_friend_list = existing_friend_list_query[0] if len(existing_friend_list_query) > 0 else None
    if not existing_friend_list:
        friend_list_url = f"http://api.steampowered.com/ISteamUser/GetFriendList/v0001/?key={STEAM_KEY}&steamid={player_id}&relationship=friend"
        r = requests.get(friend_list_url)
        friend_list_response = r.json()
        friend_list_element = friend_list_response.get("friendslist")
        if not friend_list_element:
            print(friend_list_response)
            friend_ids = None
        else:
            friend_list_from_response = friend_list_element["friends"]
            friend_ids = [ item["steamid"] for item in friend_list_from_response]
    
        # saves to DB
        friend_list_tb.insert({
            "steam_id": player_id,
            "friend_ids":friend_ids,
            "year_month": year_month_str
        })

        return friend_ids
    return existing_friend_list["friend_ids"]



In [None]:
friend_list_ids = get_friend_list(PLAYER_ID, year_month_str)
friends_data = get_player_data(",".join(friend_list_ids), year_month_str)
print(friends_data)

In [None]:
"""
full_level_1_friend_list = []
for friend_id in friend_list_ids:
    level_1_friend_list = get_friend_list(friend_id, year_month_str)
    if level_1_friend_list:
        level_1_friends_data = get_player_data(",".join(level_1_friend_list), year_month_str)
        full_level_1_friend_list += level_1_friends_data
    print(f"Current Level 1 Friend List size = {len(full_level_1_friend_list)}")
"""

In [None]:
friends_df = pd.DataFrame.from_dict(friends_data + [player_data])
friends_df["last_year_online"] = friends_df["lastlogoff"].apply(lambda x: dt.datetime.fromtimestamp(x).year if not math.isnan(x) else None)
friends_df.head(5)

In [None]:
friends_df.columns

In [None]:
# Count Friends by Country
sns.set_theme()
sns.displot(friends_df, x="loccountrycode")

In [None]:
# Last Seen
sns.displot(friends_df, x="last_year_online")

In [None]:
def fetch_gameplay_info(player_ids:str, year_month:str=None):
    """
    Fetches the list of games and played time for a player id.
    """
    id_list = player_ids.split(",")
    missing_steam_ids = []
    result = []
    if year_month:
        gameplay_query = Query()
        gameplay_list = gameplay_tb.search((gameplay_query.player_id.one_of(id_list))&(gameplay_query.year_month == year_month))
        retrieved_id_list = [gameplay["player_id"] for gameplay in gameplay_list]
        missing_steam_ids += [player_id for player_id in id_list if player_id not in retrieved_id_list ]
        result += gameplay_list
    else:
        missing_steam_ids = id_list.copy()
    print(f"Found {len(result)} gameplay entries in DB.")
    print(f"Fetching {len(missing_steam_ids)} entries from API.")
    if missing_steam_ids:
        for idx,player_id in enumerate(id_list):
            if idx%50 == 0 :
                print(f"Current steam_id number - {idx}")
            player_game_list_url = f"http://api.steampowered.com/IPlayerService/GetOwnedGames/v0001/?key={STEAM_KEY}&steamid={player_id}&format=json"
            r = requests.get(player_game_list_url)
            player_game_list_response = r.json()
            response_content = player_game_list_response["response"]
            gameplay = response_content["games"] if response_content else []
            for entry in gameplay:
                entry.update({
                    "player_id":player_id,
                    "year_month":year_month_str
                })
            if year_month:
                gameplay_tb.insert_multiple(gameplay)
            result += gameplay

    return result



In [None]:
def get_game_details(appids:List[int]):
    """
    Retrieves game details from the local DB or from Steam.
    """
    game_query = Query()
    result = []
    missing_game_details = []
    existing_game_details = games_tb.search(game_query.appid.one_of(appids))
    if existing_game_details:
        for game_details in existing_game_details:
            if game_details.get("success") == True:
                result.append(game_details)
            elif game_details.get("success") is None:
                games_tb.update({"success":True},(game_query.appid == game_details["appid"]))
        found_game_detail_ids = [ game_detail["appid"] for game_detail in existing_game_details ]
        missing_game_details += [ game_id for game_id in found_game_detail_ids if game_id not in appids]
    else:
        missing_game_details += appids
        
    if missing_game_details:
        for missing_game_id in missing_game_details:
            print(f"Game Details for {str(appid)} NOT found in DB.")
            time.sleep(0.7)  # had to add this because the request were being blocked.
            game_details_url = f"http://store.steampowered.com/api/appdetails?appids={appid}"
            r = requests.get(game_details_url)
            response_json = r.json()
            # print(response_json)
            if (response_json[str(appid)]['success'] == 'true' or response_json[str(appid)]['success'] == True):
                print(f"Saving to DB.")
                game_data = response_json[str(appid)]['data']
                game_data.update({
                    "appid":appid,
                    "success":True})
                games_tb.insert(game_data)
                result.append(game_data)
            else:
                print(f"Saving failed record to DB - {response_json}")
                games_tb.insert({
                    "success":False,
                    "appid":appid,
                })
    return result

    

In [None]:
def build_df_for_users(player_ids:str, who_is:str=None,year_month:str=None ):

    def extract_release_year(date_dic:dict):
        if type(date_dic)==dict and date_dic["date"]:
            result_date = None
            try:
                result_date = dt.datetime.strptime(date_dic["date"],"%d %b, %Y")
            except ValueError:
                try:
                    result_date = dt.datetime.strptime(date_dic["date"],"%b %d, %Y")
                except ValueError:
                    try:
                        result_date = dt.datetime.strptime(date_dic["date"],"%d %b %Y")
                    except ValueError:
                        return None
            return int(result_date.year)

    
    all_gameplay_list = fetch_gameplay_info(player_ids,year_month)
    print(f"Found {len(all_gameplay_list)} gameplay entries.")
    
    
    if not all_gameplay_list:
        return pd.DataFrame()

    game_id_list = list(set([entry["appid"] for entry in all_gameplay_list]))
    print(f"Total games - {len(game_id_list)}")
    game_details_list = []

    game_data_list = get_game_details(game_id_list)

    for game_data in game_data_list:
  
        game_info = {
            "appid": game_data["appid"],
            "name": game_data["name"],
            "genres": game_data.get("genres"),
            "metacritic":game_data.get("metacritic"),
            "short_description":game_data.get("short_description"),
            "header_image":game_data.get("header_image"),
            "categories":game_data.get("categories"),
            "release_date":game_data.get("release_date"),
            "content_descriptors":game_data.get("content_descriptors"),
            "developers":game_data.get("developers"),
            "publishers":game_data.get("publishers")
        }

        if who_is:
            game_info.update({
            "who_is": who_is
        })
        game_details_list.append(game_info)
            
    print("Done Fetching data.")
    gameplay_df = pd.DataFrame.from_dict(all_gameplay_list)
    game_info_df = pd.DataFrame.from_dict(game_details_list)
    result = pd.merge(gameplay_df,game_info_df, how="left",left_on="appid",right_on="appid")


    result["metacritic_score"] = result["metacritic"].apply(lambda x: x["score"] if type(x)==dict else None)
    result["release_year"] = result["release_date"].apply(extract_release_year)
    result["release_year"] = result["release_year"].fillna(0.0).astype(int)
    result["release_year"].replace(0, np.nan, inplace=True)
    result["pc_developer"] = result["developers"].apply(lambda x: x[0] if type(x)==list else None)

    return result

        

In [None]:
# gameplay_tb.truncate()

In [None]:
gameplay_df = build_df_for_users(PLAYER_ID, year_month=year_month_str, who_is="me") 
# gameplay_df = build_df_for_user(76561198021990176)

In [None]:
# gameplay_df.head(5)

In [None]:
# gameplay_df.sort_values("playtime_forever", ascending=False).head(10)

In [None]:
# Count By Release Year
sns.set_theme()
sns.displot(gameplay_df[gameplay_df["release_year"]>0], x="release_year")

In [None]:
sns.relplot(data=gameplay_df[gameplay_df["playtime_forever"]>30], x="release_year", y="playtime_forever", hue="metacritic_score", palette="magma")

In [None]:
# Count by Metacritic
sns.displot(gameplay_df, x="metacritic_score")

In [None]:
# Top 10 count by developer
count_by_developer_df = gameplay_df.groupby(['pc_developer'])\
    .agg({'pc_developer':'count'})\
    .rename(columns={'pc_developer':'count_pc_developer'})\
    .sort_values('count_pc_developer', ascending=False)\
    .head(10)

sns.barplot(data=count_by_developer_df, x="count_pc_developer", y="pc_developer")

In [None]:
# Top 10 playtime by developer
count_by_developer_df = gameplay_df.groupby(['pc_developer'])\
    .agg({"playtime_forever":"sum"})\
    .rename(columns={'playtime_forever':'sum_playtime_forever'})\
    .sort_values('sum_playtime_forever', ascending=False)\
    .head(10)
count_by_developer_df["sum_playtime_forever"] = count_by_developer_df["sum_playtime_forever"].apply(lambda x: x//60)
sns.barplot(data=count_by_developer_df, x="sum_playtime_forever", y="pc_developer")

In [None]:
# Top 10 best rated in metacritid with less than 30 min
unplayed_games_df = gameplay_df[gameplay_df["playtime_forever"]<30]\
    .sort_values('metacritic_score', ascending=False)\
    .head(10)

sns.barplot(data=unplayed_games_df, x="metacritic_score", y="name", hue="playtime_forever", palette="magma")


In [None]:
if RUN_FRIENDS_STATS:
    all_gameplay_df = gameplay_df.copy()
    friend_gameplay_df = build_df_for_users(",".join(friend_list_ids), who_is="friend")
    all_gameplay_df = pd.concat([all_gameplay_df,friend_gameplay_df])

In [None]:
if RUN_FRIENDS_STATS:
    my_top_5 = gameplay_df.sort_values("playtime_forever", ascending=False).head(5)["name"]
    played_all_gameplay_df = all_gameplay_df[all_gameplay_df["playtime_forever"] > 60].copy()
    played_all_gameplay_df["playtime_forever"] = played_all_gameplay_df["playtime_forever"].apply(lambda x: x//60)
    ax = sns.stripplot(
        data=played_all_gameplay_df[played_all_gameplay_df["name"].isin(my_top_5)],
        y="name", 
        x="playtime_forever", 
        hue="who_is",
        hue_order=["me","friend"],
        dodge=True,
        size=6)
    sns.move_legend(ax, "upper left", bbox_to_anchor=(1, 1))

In [None]:
if RUN_FRIENDS_STATS:
    # Merge DFs
    final_gameplay_players_df = pd.merge(all_gameplay_df,friends_df[["steamid","personaname","realname","loccountrycode","timecreated","last_year_online","avatar" ]], how="left",left_on="player_id",right_on="steamid")
    # len(final_gameplay_players_df.index)
    final_gameplay_players_df.head(5)

In [None]:
if RUN_FRIENDS_STATS:
    top_10_gameplay = final_gameplay_players_df.groupby(['steamid','personaname'], as_index=False)\
        .agg({"playtime_forever":"sum"}, axis="columns")\
        .sort_values(ascending=False, by="playtime_forever")\
        .head(10)
    top_10_gameplay["playtime_forever"] = top_10_gameplay["playtime_forever"].apply(lambda x: x//60)
    
    ax = sns.barplot(data=top_10_gameplay,x="playtime_forever", y="personaname")
    ax.set_xlabel("Playtime in Hours")
    ax.set_ylabel("Steam Name")
    ax.set_title(f"Who played the most until Jan 2024?") 
    plt.show(ax)

In [None]:
if CLEAN_USER_DB:
    print("Removing User Data")
    users_tb.truncate()
    print(f"Users size is {len(users_tb)}")
    friend_list_tb.truncate()
    print(f"Friend List size is {len(friend_list_tb)}")
if CLEAN_GAMEPLAY_DB:
    print("Removing Gameplay Data")
    gameplay_tb.truncate()
    print(f"Gameplay List size is {len(gameplay_tb)}")
    