In [1]:
import os
import json
import os.path
if os.path.basename(os.getcwd()) == "notebooks":
    os.chdir('..')
from dotenv import load_dotenv

load_dotenv(".env")  # take environment variables from .env.

from steam_trade_bot.containers import Container
from steam_trade_bot.settings import BotSettings
container = Container()
container.config.from_pydantic(BotSettings())
container.wire(modules=[__name__])

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master('local[*]') \
    .appName('Steam Trade Bot ETL') \
    .config("spark.jars", "vendors/postgresql-42.6.0.jar") \
    .config("spark.executor.memory", "10g") \
    .config("spark.driver.memory", "5g")\
    .config("spark.sql.shuffle.partitions" , "30") \
    .config('spark.sql.files.maxPartitionBytes', str(20 * 1024 * 1024))\
    .getOrCreate()

In [3]:
df = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://localhost/trade_bot") \
    .option("dbtable", "raw.market_item_sell_history") \
    .option("user", "gaben") \
    .option("password", "qwerty") \
    .option("driver", "org.postgresql.Driver") \
    .load()

In [4]:
app_id_market_name_df_partitions = max(1, round(df.count() / 1000))
app_id_market_name_df = df.select("app_id", "market_hash_name").repartition(app_id_market_name_df_partitions).cache()
app_id_df = df.select("app_id").distinct().repartition(1).cache()

In [5]:
# df = df.repartition(100).persist()

In [6]:
app_id_market_name_df.printSchema()

root
 |-- app_id: integer (nullable = true)
 |-- market_hash_name: string (nullable = true)



In [7]:
app_id_df.printSchema()

root
 |-- app_id: integer (nullable = true)



In [8]:
app_id_market_name_df.count(), app_id_market_name_df.rdd.getNumPartitions()

(113, 1)

In [9]:
app_id_df.count(), app_id_df.rdd.getNumPartitions()

(1, 10)

In [10]:
from steam_trade_bot.etl.processors.market_item import process_market_item, \
    process_market_item_sell_history, process_market_item_orders
from steam_trade_bot.etl.processors.game import process_game
from steam_trade_bot.etl.models import MarketItemRaw, MarketItemSellHistoryRaw, MarketItemOrdersRaw, \
    GameRaw
from steam_trade_bot.infrastructure.models.raw_market import market_item_table as raw_market_item_table, \
    market_item_sell_history_table as raw_market_sell_history_table, \
    game_table as raw_game_table, \
    market_item_orders_table as raw_market_item_orders_table

from steam_trade_bot.infrastructure.models.stg_market import market_item_table as stg_market_item_table, \
    game_table as stg_game_table, market_item_stats_table as stg_market_item_stats_table, \
    market_item_sell_history_table as stg_market_item_sell_history_table, \
    market_item_orders_table as stg_market_item_orders_table

from steam_trade_bot.infrastructure.models.dwh_market import market_item_table as dwh_market_item_table, \
    market_item_sell_history_table as dwh_market_item_sell_history_table, \
    market_item_stats_table as dwh_market_item_stats_table, \
    market_item_orders_table as dwh_market_item_orders_table, \
    game_table as dwh_game_table

from steam_trade_bot.etl.models import MarketItemSellHistoryRaw, GameRaw, MarketItemRaw
from steam_trade_bot.infrastructure.repositories import AppMarketNameBasedRepository, GameRepository, MarketItemSellHistoryRepository

import asyncio
from sqlalchemy.orm import sessionmaker
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from operator import attrgetter
import traceback
from dataclasses import asdict

import platform
from functools import wraps

from asyncio.proactor_events import _ProactorBasePipeTransport


async def _upsert_many(session, table, values, index_elements: list[str], set_: list[str]):
    if values:
        insert_stmt = insert(table).values()
        set_ = {
            column: attrgetter(column)(insert_stmt.excluded)
            for column in set_
        }
        await session.execute(
            insert_stmt.on_conflict_do_update(
                index_elements=index_elements,
                set_=set_
            ),
            values,
        )

async def process_batch_and_write2(batch_iter):
    stage_list = []
    dwh_list = []
    
    database_url = 'postgresql+asyncpg://gaben:qwerty@localhost/trade_bot'
    engine = create_async_engine(database_url)
    async_session = sessionmaker(bind=engine, expire_on_commit=False, class_=AsyncSession)
    async with async_session() as session:
        async with session.begin():
            respository = GameRepository(session, table=raw_game_table, type_=GameRaw)
            pairs = [x.app_id for x in batch_iter]
            async for rows in respository.yield_all_by_app_ids(pairs, 1000):
                for row in rows:
                    stage, dwh = process_game(row)
                    stage_list.append(stage)
                    dwh_list.append(dwh)

            await _upsert_many(session, stg_game_table, stage_list, ["app_id"],
                       ["name", "icon_url", "is_publisher_valve"])
            await _upsert_many(session, dwh_game_table, dwh_list, ["app_id"],
                       ["name", "icon_url", "is_publisher_valve"])

def process_batch_and_write(batch_iter):
    def silence_event_loop_closed(func):
        @wraps(func)
        def wrapper(self, *args, **kwargs):
            try:
                return func(self, *args, **kwargs)
            except RuntimeError as e:
                if str(e) != 'Event loop is closed':
                    raise
        return wrapper

    _ProactorBasePipeTransport.__del__ = silence_event_loop_closed(_ProactorBasePipeTransport.__del__)

    asyncio.run(process_batch_and_write2(batch_iter))

app_id_df.foreachPartition(process_batch_and_write)

In [11]:
async def process_batch_and_write2(batch_iter):
    stage_list = []
    dwh_list = []
    
    database_url = 'postgresql+asyncpg://gaben:qwerty@localhost/trade_bot'
    engine = create_async_engine(database_url)
    async_session = sessionmaker(bind=engine, expire_on_commit=False, class_=AsyncSession)
    async with async_session() as session:
        async with session.begin():
            respository = AppMarketNameBasedRepository(session, table=raw_market_item_table, type_=MarketItemRaw)
            pairs = [(x.app_id, x.market_hash_name) for x in batch_iter]
            async for rows in respository.yield_all_by_pairs(pairs, 1000):
                for row in rows:
                    stage, dwh = process_market_item(row)
                    stage_list.append(stage)
                    dwh_list.append(dwh)

            await _upsert_many(session, stg_market_item_table, stage_list, ["app_id", "market_hash_name"],
                       ["market_fee", "market_marketable_restriction", "market_tradable_restriction", "commodity"])
            await _upsert_many(session, dwh_market_item_table, dwh_list, ["app_id", "market_hash_name"],
                       ["market_fee", "market_marketable_restriction", "market_tradable_restriction", "commodity"])

def process_batch_and_write(batch_iter):
    def silence_event_loop_closed(func):
        @wraps(func)
        def wrapper(self, *args, **kwargs):
            try:
                return func(self, *args, **kwargs)
            except RuntimeError as e:
                if str(e) != 'Event loop is closed':
                    raise
        return wrapper

    _ProactorBasePipeTransport.__del__ = silence_event_loop_closed(_ProactorBasePipeTransport.__del__)

    asyncio.run(process_batch_and_write2(batch_iter))

app_id_market_name_df.foreachPartition(process_batch_and_write)

In [12]:
async def process_batch_and_write2(batch_iter):
    sell_history_stage_list = []
    sell_history_dwh_list = []
    stats_stage_list = []
    stats_dwh_list = []
    
    database_url = 'postgresql+asyncpg://gaben:qwerty@localhost/trade_bot'
    engine = create_async_engine(database_url)
    async_session = sessionmaker(bind=engine, expire_on_commit=False, class_=AsyncSession)
    async with async_session() as session:
        async with session.begin():
            respository = AppMarketNameBasedRepository(session, raw_market_sell_history_table, MarketItemSellHistoryRaw)
            pairs = [(x.app_id, x.market_hash_name) for x in batch_iter]
            async for rows in respository.yield_all_by_pairs(pairs, 1000):
                for row in rows:
                    sell_history_stage, \
                        sell_history_dwh, \
                        stats_stage, \
                        stats_dwh = process_market_item_sell_history(row)
                    sell_history_stage_list.append(sell_history_stage)
                    sell_history_dwh_list.append(sell_history_dwh)
                    stats_stage_list.append(stats_stage)
                    stats_dwh_list.append(stats_dwh)

            await _upsert_many(session, stg_market_item_sell_history_table, sell_history_stage_list, ["app_id", "market_hash_name"],
                       ["timestamp", "history"])
            await _upsert_many(session, dwh_market_item_sell_history_table, sell_history_dwh_list,
                       ["app_id", "market_hash_name"],
                       ["timestamp", "history"])
            await _upsert_many(session, stg_market_item_stats_table, stats_stage_list,
                       ["app_id", "market_hash_name"],
                       ["total_sold", "total_volume", "total_volume_steam_fee",
                        "total_volume_publisher_fee", "min_price", "max_price",
                        "first_sale_timestamp", "last_sale_timestamp"])
            await _upsert_many(session, dwh_market_item_stats_table, stats_dwh_list,
                       ["app_id", "market_hash_name"],
                       ["total_sold", "total_volume", "total_volume_steam_fee",
                        "total_volume_publisher_fee", "min_price", "max_price",
                        "first_sale_timestamp", "last_sale_timestamp"])

    
def process_batch_and_write(batch_iter):
    def silence_event_loop_closed(func):
        @wraps(func)
        def wrapper(self, *args, **kwargs):
            try:
                return func(self, *args, **kwargs)
            except RuntimeError as e:
                if str(e) != 'Event loop is closed':
                    raise
        return wrapper

    _ProactorBasePipeTransport.__del__ = silence_event_loop_closed(_ProactorBasePipeTransport.__del__)

    asyncio.run(process_batch_and_write2(batch_iter))

    
app_id_market_name_df.foreachPartition(process_batch_and_write)


In [13]:
from steam_trade_bot.etl.models import MarketItemOrdersRaw
from steam_trade_bot.etl.processors.market_item import process_market_item_orders

async def process_batch_and_write2(batch_iter):
    stage_list = []
    dwh_list = []
    
    database_url = 'postgresql+asyncpg://gaben:qwerty@localhost/trade_bot'
    engine = create_async_engine(database_url)
    async_session = sessionmaker(bind=engine, expire_on_commit=False, class_=AsyncSession)
    async with async_session() as session:
        async with session.begin():
            respository = AppMarketNameBasedRepository(session, raw_market_item_orders_table, MarketItemSellHistoryRaw)
            pairs = [(x.app_id, x.market_hash_name) for x in batch_iter]
            async for rows in respository.yield_all_by_pairs(pairs, 1000):
                for row in rows:
                    stage, dwh = process_market_item_orders(row)
                    stage_list.append(stage)
                    dwh_list.append(dwh)

            await _upsert_many(session, stg_market_item_orders_table, stage_list,
                       ["app_id", "market_hash_name"],
                       ["timestamp", "buy_orders", "sell_orders"])
            await _upsert_many(session, dwh_market_item_orders_table, dwh_list,
                       ["app_id", "market_hash_name"],
                       ["timestamp", "buy_orders", "sell_orders"])
    
def process_batch_and_write(batch_iter):
    def silence_event_loop_closed(func):
        @wraps(func)
        def wrapper(self, *args, **kwargs):
            try:
                return func(self, *args, **kwargs)
            except RuntimeError as e:
                if str(e) != 'Event loop is closed':
                    raise
        return wrapper

    _ProactorBasePipeTransport.__del__ = silence_event_loop_closed(_ProactorBasePipeTransport.__del__)

    asyncio.run(process_batch_and_write2(batch_iter))


app_id_market_name_df.foreachPartition(process_batch_and_write)
