In [None]:
import sys
import os

In [None]:
current_dir = os.getcwd()
project_root = os.path.dirname(os.path.dirname(current_dir))

In [None]:
df_raw_json_path = os.path.join(project_root, 'data', 'raw', 'steam_games_data.json')
df_raw_csv_path = os.path.join(project_root, 'data', 'raw', 'steam_games_data.csv')

In [None]:
df_processed_json_path = os.path.join(project_root, 'data', 'processed', 'steam_games_data.json')
df_processed_csv_path = os.path.join(project_root, 'data', 'processed', 'steam_games_data.csv')

In [None]:
if project_root not in sys.path:
    sys.path.append(project_root)

In [None]:
import requests

from dotenv import load_dotenv
from src.kaggle_downloader import SteamGameDataDownloader

In [None]:
load_dotenv()

In [None]:
KAGGLE_API_KEY = os.environ.get('KAGGLE_API_KEY')

In [None]:
import pandas as pd
import re

In [None]:
downloader = SteamGameDataDownloader()  # Uses default values for dataset_name, filename, and force_download

In [None]:
path = downloader.download()
path = r"{}".format(path)

In [None]:
path = "C:\\Users\\pyramidheadshark\\Repos\\Steam-AI-Recommendations\\data\\raw\\games.json"
path  = r"{}".format(path)
# TODO: fix encoding of the original kagglehub download (unusual line separators)

In [None]:
if os.path.exists(path):
    df = pd.read_json(path)
else:
    print(f"File not found: {path}")

In [None]:
df = df.T
df.info()

In [None]:
columns_to_drop = [
    'price', 'dlc_count', 'about_the_game',
    'reviews', 'website', 'support_url',
    'support_email', 'metacritic_score',
    'metacritic_url', 'achievements', 'recommendations',
    'notes', 'full_audio_languages', 'packages',
    'user_score', 'score_rank', 'positive', 'negative',
    'screenshots', 'movies',
    'average_playtime_forever', 'average_playtime_2weeks',
    'median_playtime_forever', 'median_playtime_2weeks',
    'peak_ccu'
]

In [None]:
df.drop(columns=columns_to_drop, inplace=True)

In [None]:
df.columns

In [None]:
mask_to_remove = (
    ((df['short_description'].isna()) | (df['short_description'] == '')) |
    ((df['detailed_description'].isna()) | (df['detailed_description'] == '')) |
    (df['name'].str.contains('playtest', case=False, na=False)) |
    ((df['header_image'].isna()) | (df['header_image'] == '')) |
    (df['supported_languages'].astype(str) == '[]') |
    (df['categories'].astype(str) == '[]') |
    (df['tags'].astype(str) == '[]')
)

df_filtered = df[~mask_to_remove]

In [None]:
df_filtered.shape

In [None]:
df = df_filtered.copy()

In [None]:
def contains_only_asian_chars(text):
    if isinstance(text, str):
        # Регулярное выражение для поиска азиатских символов (CJK Unified Ideographs)
        return bool(re.fullmatch(r'[\u4E00-\u9FFF\u3400-\u4DBF\u20000-\u2A6DF\u2A700-\u2B73F\u2B740-\u2B81F\u2B820-\u2CEAF\uF900-\uFAFF\u3300-\u33FF\uFE30-\uFE4F]+', text))
    return False

In [None]:
def contains_only_digits(text):
    if isinstance(text, str):
        return text.isdigit()
    return False

In [None]:
mask_to_remove = (
    (df['name'].apply(contains_only_asian_chars)) |
    (df['name'].apply(contains_only_digits))
)

df_filtered = df[~mask_to_remove]

In [None]:
df_filtered.shape

In [None]:
df = df_filtered.copy()

In [None]:
df['release_date'] = pd.to_datetime(df['release_date'], errors='coerce')

In [None]:
bool_mapping = {'true': True, 'false': False,}

In [None]:
for col in ['windows', 'mac', 'linux']:
    if col in df.columns:
        df[col] = df[col].astype(str).str.lower().replace({'nan': None})
        df[col] = df[col].map(bool_mapping).fillna(False).astype(bool)

In [None]:
def extract_first_number(owner_range):
    if isinstance(owner_range, str):
        parts = owner_range.split(' ', 1)
        first_part = parts[0].replace(',', '')
        try:
            return int(first_part)
        except ValueError:
            return None
    return None

In [None]:
df['estimated_owners'] = df['estimated_owners'].apply(extract_first_number)

In [None]:
df.dtypes

In [None]:
def combine_tags(row):
    all_tags_list = []

    if isinstance(row['categories'], list):
        all_tags_list.extend(row['categories'])

    if isinstance(row['genres'], list):
        all_tags_list.extend(row['genres'])

    if isinstance(row['tags'], dict):
        all_tags_list.extend(row['tags'].keys())

    return list(set(all_tags_list))

In [None]:
df['all_tags'] = df.apply(combine_tags, axis=1)
df = df.drop(columns=['categories', 'genres', 'tags'])

In [None]:
def replace_empty_with_none(series):
    def replace_item(item):
        if item == [] or item == [''] or item == [""] or item == "":
            return None
        return item

    return series.apply(replace_item)

In [None]:
df['developers'] = replace_empty_with_none(df['developers'])
df['publishers'] = replace_empty_with_none(df['publishers'])

In [None]:
from langdetect import detect, LangDetectException

In [None]:
def is_english_or_russian(text):
    if not isinstance(text, str):
        return False
    try:
        lang = detect(text)
        return lang == 'en' or lang == 'ru'
    except LangDetectException:
        return False

In [None]:
df['detailed_is_en_ru'] = df['detailed_description'].apply(is_english_or_russian)
df['short_is_en_ru'] = df['short_description'].apply(is_english_or_russian)

In [None]:
df_filtered = df[df['detailed_is_en_ru'] & df['short_is_en_ru']]
df_filtered = df_filtered.drop(columns=['detailed_is_en_ru', 'short_is_en_ru'])

In [None]:
df = df_filtered.copy()

In [None]:
df_copy = df.copy()

In [None]:
import re
import nltk
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
import pymorphy2

In [None]:
try:
    stopwords.words('english')
except LookupError:
    nltk.download('stopwords')
try:
    wordnet_lemmatizer = WordNetLemmatizer()
    wordnet_lemmatizer.lemmatize('cats')
except LookupError:
    nltk.download('wordnet')
    nltk.download('omw-1.4')

In [None]:
lemmatizer_en = WordNetLemmatizer()
stop_words_en = set(stopwords.words('english'))

In [None]:
morph = pymorphy2.MorphAnalyzer()
stop_words_ru = set(stopwords.words('russian'))

In [None]:
def clean_and_lemmatize(text, lang='en'):
    if not isinstance(text, str):
        return ""

    text = text.lower()
    # Удаление знаков пунктуации и цифр
    text = re.sub(r'[^\w\s]', '', text)
    text = re.sub(r'\d+', '', text)

    words = text.split()

    if lang == 'ru':
        lemmatized_words = [morph.parse(word)[0].normal_form for word in words if word not in stop_words_ru]
    else:  # Предполагаем английский, если не указан русский
        lemmatized_words = [lemmatizer_en.lemmatize(word) for word in words if word not in stop_words_en]

    return " ".join(lemmatized_words)

In [None]:
df['detailed_description_clean'] = df['detailed_description'].apply(lambda x: clean_and_lemmatize(x))
df['short_description_clean'] = df['short_description'].apply(lambda x: clean_and_lemmatize(x))

In [None]:
df.shape

In [None]:
def remove_specific_words(df, column, words_to_remove):
    def remove_words(text):
       if isinstance(text, str):
        words = text.split()
        filtered_words = [word for word in words if word not in words_to_remove]
        return ' '.join(filtered_words)
       return text
    
    df[column] = df[column].apply(remove_words)
    return df

In [None]:
def filter_description_length(df, column, min_length, max_length):
    df_filtered = df[(df[column].str.len() >= min_length) & (df[column].str.len() <= max_length)]
    return df_filtered

In [None]:
def clean_and_lowercase_tags(df, column):
    def clean_tags(tags):
        if isinstance(tags, list):
            cleaned_tags = [re.sub(r'[^a-zA-Z0-9\s]', '', tag).lower().strip() for tag in tags]
            return cleaned_tags
        return tags
        
    df[column] = df[column].apply(clean_tags)
    return df

In [None]:
def filter_tags_count(df, column, min_tags):
    df_filtered = df[df[column].apply(lambda x: isinstance(x, list) and len(x) >= min_tags)]
    return df_filtered

In [None]:
df = clean_and_lowercase_tags(df, 'all_tags')

In [None]:
words_to_remove = ['game', 'world']
df = remove_specific_words(df, 'short_description_clean', words_to_remove)
df = remove_specific_words(df, 'detailed_description_clean', words_to_remove)

In [None]:
min_length = 30
max_length = 240
df = filter_description_length(df, 'short_description_clean', min_length, max_length)

In [None]:
min_tags = 3
df = filter_tags_count(df, 'all_tags', min_tags)

In [None]:
df = clean_and_lowercase_tags(df, 'all_tags')

In [None]:
df.shape

In [None]:
from ydata_profiling import ProfileReport

In [None]:
profile = ProfileReport(df, title="Profile Report")

In [None]:
profile_path = os.path.join(project_root, 'src', 'visualization', 'Report.html')

In [None]:
profile.to_file(profile_path)

In [None]:
df.to_json(df_processed_json_path)
df.to_csv(df_processed_csv_path)