In [1]:
import requests
import json
from collections import defaultdict
from datetime import datetime
import plotly.graph_objects as go
import csv
from urllib.request import urlopen
from io import StringIO

Zebranie danych dla omawianych gier

In [3]:
# Lista gier
games = {
    "Dota2": {"players": 570},
    "War Thunder": {"players": "chart.csv"},
    "Lost Ark": {"players": "lostark.csv"},
    "PUBG": {"players": 578080, "prices": "pubg_p.csv"},
    "Palworld": {"players": 1623730, "prices": "palworld_p.csv"},
    "New World": {"players": 'newworld.csv', "prices": "newworld_p.csv"},
}

# Funkcja do zebrania danych o cenie z pliku csv
def fetch_data_csv(game, csv_file):
    data = []
    with open(csv_file, 'r') as file:
        reader = csv.reader(file, delimiter=';')
        next(reader)
        for row in reader:
            date = row[0].split(' ')[0]  # Usuwanie czasu z csv
            price_str = row[1].replace(',', '.')  
            if price_str:  
                price = float(price_str)  
                data.append({'date': date, 'price': price})

    return data

# Funkcja do zebrania danych o liczbie graczy z pliku csv
def fetch_data_csv_players(game, csv_file):
    data = defaultdict(int) 
    with open(csv_file, 'r') as file:
        reader = csv.reader(file, delimiter=';')
        next(reader)
        for row in reader:
            datetime_str = row[0] 
            date = datetime_str.split()[0]  
            player_count_str = row[1]  
            if player_count_str:  
                player_count = int(player_count_str)  
                data[date] = max(data[date], player_count) #Wybranie największej ilości graczy w ciągu dnia

    return [{'date': date, 'players': player_count} for date, player_count in data.items()]

# Funkcja do zebrania danych o liczbię graczy z steambase API
def fetch_data_api(game, appid):
    url = f"https://api.steambase.io/apps/{appid}/stats/chart?type=1&period=all"
    response = requests.get(url)
    if response.status_code == 200:
        data = response.json()
        # Zamiana 'x' i 'y' na 'data' i 'players'
        for item in data:
            # Convert Unix timestamp to a date string
            timestamp = int(item.pop('x')) / 1000  # zamiana z milisekund na sekundy
            date_str = datetime.utcfromtimestamp(timestamp).strftime('%Y-%m-%d')
            item['date'] = date_str
            item['players'] = item.pop('y')
        return data
    else:
        print(f"Failed to fetch data for {game}. Status code: {response.status_code}")
        return None

In [2]:
# Lista gier
games = {
    "Dota2": {"players": 570},
    "War Thunder": {"players": "https://file.io/ioWCpkQslxwo"},
    "Lost Ark": {"players": "lostark.csv"},
    "PUBG": {"players": 578080, "prices": "pubg_p.csv"},
    "Palworld": {"players": 1623730, "prices": "palworld_p.csv"},
    "New World": {"players": 'newworld.csv', "prices": "newworld_p.csv"},
}

# Funkcja do zebrania danych o cenie z pliku csv
def fetch_data_csv(game, csv_url):
    data = []
    response = requests.get(csv_url)
    if response.status_code == 200:
        file = StringIO(response.text)
        reader = csv.reader(file, delimiter=';')
        next(reader)
        for row in reader:
            date = row[0].split(' ')[0]  # Usuwanie czasu z csv
            price_str = row[1].replace(',', '.')  
            if price_str:  
                price = float(price_str)  
                data.append({'date': date, 'price': price})
    else:
        print(f"Failed to fetch data for {game}. Status code: {response.status_code}")

    return data

# Funkcja do zebrania danych o liczbie graczy z pliku csv
def fetch_data_csv_players(game, csv_url):
    data = defaultdict(int) 
    response = requests.get(csv_url)
    if response.status_code == 200:
        file = StringIO(response.text)
        reader = csv.reader(file, delimiter=';')
        next(reader)
        for row in reader:
            datetime_str = row[0]
            date = datetime_str.split()[0]
            player_count_str = row[1]
            if player_count_str:
                player_count = int(player_count_str)
                data[date] = max(data[date], player_count) # Wybranie największej ilości graczy w ciągu dnia
    else:
        print(f"Failed to fetch data for {game}. Status code: {response.status_code}")

    return [{'date': date, 'players': player_count} for date, player_count in data.items()]

# Funkcja do zebrania danych o liczbię graczy z steambase API
def fetch_data_api(game, appid):
    url = f"https://api.steambase.io/apps/{appid}/stats/chart?type=1&period=all"
    response = requests.get(url)
    if response.status_code == 200:
        data = response.json()
        # Zamiana 'x' i 'y' na 'data' i 'players'
        for item in data:
            # Convert Unix timestamp to a date string
            timestamp = int(item.pop('x')) / 1000  # zamiana z milisekund na sekundy
            date_str = datetime.utcfromtimestamp(timestamp).strftime('%Y-%m-%d')
            item['date'] = date_str
            item['players'] = item.pop('y')
        return data
    else:
        print(f"Failed to fetch data for {game}. Status code: {response.status_code}")
        return None

Użycie zebranych danych do wygenerowania interaktywnych wykesów liczby graczy/cena lub liczby graczy/wydarzenia w grze

In [3]:
for game, sources in games.items():
    if 'players' in sources:
        if isinstance(sources['players'], int):  # Sprawdzenie czy dane są pobierane z api czy z csv
            player_data = fetch_data_api(game, sources['players'])
        else:  
            player_data = fetch_data_csv_players(game, sources['players'])
    if 'prices' in sources:
        price_data = fetch_data_csv(game, sources['prices'])
        price_dates = [item['date'] for item in price_data]
        prices = [item['price'] for item in price_data] 
    else:
        price_data = None
        price_dates = []
        prices = []

    player_dates = [item['date'] for item in player_data]
    players = [item.get('players', 0) for item in player_data]

    fig = go.Figure()
    fig.add_trace(go.Scatter(x=player_dates, y=players, mode='lines', name='Players'))
    if game == 'Dota2':
        season_start_date = '2022-10-27'  # Przykładowe wydarzenie
        player_count_at_date = 0
        for item in player_data:
            if item['date'] == season_start_date:
                player_count_at_date = item['players']
                break
        fig.add_annotation(x=season_start_date, y=player_count_at_date + 20000, text="Start nowego sezonu", showarrow=True, arrowhead=1)

    if price_data is not None:
        fig.add_trace(go.Scatter(x=price_dates, y=prices, mode='lines', line_shape='hv', name='Price', yaxis='y2'))  # Dodanie ceny jeżeli mamy dane
    fig.update_layout(title=game, xaxis_title='Date', yaxis_title='Players',
                  yaxis2=dict(title='Price[PLN]', overlaying='y', side='right'))  # Dodanie drugiej osi
    fig.show()

MissingSchema: Invalid URL 'lostark.csv': No scheme supplied. Perhaps you meant https://lostark.csv?