In [11]:
import os
import json
import os.path
if os.path.basename(os.getcwd()) == "notebooks":
    os.chdir('..')
from dotenv import load_dotenv

load_dotenv(".env")  # take environment variables from .env.

from steam_trade_bot.containers import Container
from steam_trade_bot.settings import BotSettings
container = Container()
container.config.from_pydantic(BotSettings())
container.wire(modules=[__name__])

In [27]:
import pandas as pd
import json
import fastparquet as fp
from functools import lru_cache

from steam_trade_bot.domain.services.sell_history_analyzer import steam_date_str_to_datetime
from steam_trade_bot.domain.fee_calculator import compute_fee_from_total

@lru_cache(maxsize=None)
def compute_fee(price, market_fee):
    fee = compute_fee_from_total(price, market_fee)
    return (fee.payload, fee.game, fee.steam)

@lru_cache(maxsize=None)
def parse_date(s):
    return steam_date_str_to_datetime(s)

GAME_DEFAULT_FEE = 0.1

def method_a(x):
    return (parse_date(x[0]), round(x[1], 2), int(x[2]))

def method_b(row):
    return compute_fee(row["price"], row["market_fee"])

uow_ = container.repositories.unit_of_work
append = False
async with uow_() as uow:
    for game in (await uow.game.get_all()):
        print(f"Processing '{game.name}'")
        market_items = await uow.market_item.get_all(app_id=game.app_id)
        items_df = pd.DataFrame(market_items)
        if items_df.empty:
            continue
        items_df = items_df.fillna(value={"market_fee": GAME_DEFAULT_FEE})
        items_df["market_fee"] = items_df["market_fee"].astype(float)
        items_df["market_fee"] = items_df["market_fee"].apply(lambda x: round(x, 2))
        async for batch in uow.sell_history.yield_all(app_id=game.app_id, currency=1, count=1000):
            df = pd.DataFrame(batch)
            df["history"] = df["history"].apply(json.loads)
            df = df.explode("history")
            df = df.dropna(subset=["history"])
            df2 = pd.DataFrame(df.history.apply(method_a).tolist(), columns=["timestamp", "price", "amount"])
            df.index = df2.index
            # df = pd.merge(df.drop(["timestamp", "history"], axis=1), df2, left_index=True, right_index=True)
            df = df.drop(["timestamp", "history"], axis=1).join(df2)
            df["timestamp"] = pd.to_datetime(df["timestamp"])
            df = df.merge(items_df[["market_hash_name", "market_fee"]], on="market_hash_name")
            # if the dataframe is empty, we should skip it
            if df.empty:
                continue
            df["partition"] = df.apply(lambda x: x["timestamp"].strftime("%Y-%m"), axis=1)
            # df[["price_no_fee", "game_fee", "steam_fee"]] = df.apply(lambda row: compute_fee(row["price"], row["market_fee"]), axis=1)
            df3 = pd.DataFrame(df[["price", "market_fee"]].apply(method_b, axis=1).tolist(), columns=["price_no_fee", "game_fee", "steam_fee"])
            df = pd.merge(df, df3, left_index=True, right_index=True)
            fp.write("tmp.parquet", df, append=append, write_index=False, compression="SNAPPY", times='int96')
            append = True
            print(f"Processed {len(batch)} rows of '{game.name}'")


Processing 'Banana Shooter'
Processed 604 rows of 'Banana Shooter'
Processing 'Flying Pengy'
Processed 37 rows of 'Flying Pengy'
Processing 'Killing Floor 2'
Processed 1000 rows of 'Killing Floor 2'
Processed 1000 rows of 'Killing Floor 2'
Processed 456 rows of 'Killing Floor 2'
Processing 'Gem Forge'
Processed 29 rows of 'Gem Forge'
Processing 'Team Fortress 2'
Processed 1000 rows of 'Team Fortress 2'
Processed 1000 rows of 'Team Fortress 2'
Processed 1000 rows of 'Team Fortress 2'
Processed 1000 rows of 'Team Fortress 2'
Processed 1000 rows of 'Team Fortress 2'
Processed 1000 rows of 'Team Fortress 2'
Processed 1000 rows of 'Team Fortress 2'
Processed 1000 rows of 'Team Fortress 2'
Processed 1000 rows of 'Team Fortress 2'
Processed 1000 rows of 'Team Fortress 2'
Processed 1000 rows of 'Team Fortress 2'
Processed 1000 rows of 'Team Fortress 2'
Processed 1000 rows of 'Team Fortress 2'
Processed 1000 rows of 'Team Fortress 2'
Processed 1000 rows of 'Team Fortress 2'
Processed 1000 rows 

In [2]:
from pyspark.sql import SparkSession
#.config("spark.memory.fraction", 0.8) \
    #.config("spark.sql.shuffle.partitions" , "800") \
    #.config("spark.memory.offHeap.enabled",'true')\
    #.config("spark.memory.offHeap.size","16g")\
spark = SparkSession.builder \
    .master('local[*]') \
    .appName('myAppName') \
    .config("spark.executor.memory", "10g") \
    .config("spark.driver.memory", "10g")\
    .config("spark.sql.shuffle.partitions" , "100") \
    .config('spark.sql.files.maxPartitionBytes', str(250 * 1024 * 1024))\
    .getOrCreate()

In [3]:
import pyspark.sql.functions as func
from pyspark.sql.types import DecimalType, Row
from pyspark.sql import Window as W

In [4]:
df = spark.read.format("parquet").load("tmp.parquet")

In [5]:
df.printSchema()

root
 |-- app_id: long (nullable = true)
 |-- market_hash_name: string (nullable = true)
 |-- currency: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- amount: long (nullable = true)
 |-- market_fee: double (nullable = true)
 |-- partition: string (nullable = true)
 |-- price_no_fee: double (nullable = true)
 |-- game_fee: double (nullable = true)
 |-- steam_fee: double (nullable = true)



In [6]:
df.show()

+-------+----------------+--------+-------------------+-----+------+----------+---------+------------+--------+---------+
| app_id|market_hash_name|currency|          timestamp|price|amount|market_fee|partition|price_no_fee|game_fee|steam_fee|
+-------+----------------+--------+-------------------+-----+------+----------+---------+------------+--------+---------+
|1949740|      AL48 Metal|       1|2022-07-01 04:00:00| 0.03|    44|       0.1|  2022-07|        0.01|    0.01|     0.01|
|1949740|      AL48 Metal|       1|2022-07-02 04:00:00| 0.03|    92|       0.1|  2022-07|        0.01|    0.01|     0.01|
|1949740|      AL48 Metal|       1|2022-07-03 04:00:00| 0.03|    27|       0.1|  2022-07|        0.01|    0.01|     0.01|
|1949740|      AL48 Metal|       1|2022-07-04 04:00:00| 0.03|   187|       0.1|  2022-07|        0.01|    0.01|     0.01|
|1949740|      AL48 Metal|       1|2022-07-05 04:00:00| 0.03|    44|       0.1|  2022-07|        0.01|    0.01|     0.01|
|1949740|      AL48 Meta

In [7]:
df.count()

215930656

In [9]:
df.write.partitionBy('partition').parquet('parquetv2')

In [10]:
spark.stop()

In [21]:
import pandas as pd
uow_ = container.repositories.unit_of_work
VALVE_GAME_IDS = {440, 570, 730, 753, 250820, 583950}
rows = []
async with uow_() as uow:
    for game in (await uow.game.get_all()):
        rows.append((game.app_id, game.name, game.app_id in VALVE_GAME_IDS))
        
games_df = pd.DataFrame(rows, columns=["app_id", "app_name", "is_valve_publisher"])
games_df.to_parquet('apps.parquet')