In [None]:
# ===========================
#       MAPREDUCE (Python)
# ===========================

import pandas as pd

def map_chunk(df_chunk):
    total_playtime = 0
    count = 0

    for early, playtime in zip(
        df_chunk["written_during_early_access"],
        df_chunk["author.playtime_forever"]
    ):
        if early:
            count += 1
            total_playtime += playtime
    return {"sum": total_playtime, "count": count}


def shuffle(results):
    total_s = 0
    total_c = 0
    for r in results:
        total_s += r["sum"]
        total_c += r["count"]
    return total_s, total_c


def reduce(total_s, total_c):
    if total_c == 0:
        return 0
    return total_s / total_c / 60     # минуты → часы


def mapReduce(file_path, chunksize=200000):
    mapped = []

    for chunk in pd.read_csv(
        file_path,
        usecols=[
            "written_during_early_access",
            "author.playtime_forever"
        ],
        chunksize=chunksize
    ):
        chunk["written_during_early_access"] = chunk["written_during_early_access"].astype(bool)
        chunk["author.playtime_forever"] = pd.to_numeric(chunk["author.playtime_forever"], errors="coerce").fillna(0)


        mapped.append(map_chunk(chunk))

    total_s, total_c = shuffle(mapped)

    avg_hours = reduce(total_s,total_c)

    return total_c, avg_hours


file_path = "path_to_file.csv"

count, avg = mapReduce(file_path)

print("=== MAPREDUCE ===")
print("Количество пользователей раннего доступа:", count)
print("Среднее время игры (часы):", avg)




In [None]:

#          SPARK
# =============================

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg

spark = (
    SparkSession.builder
    .appName("SteamEarlyAccess")
    .master("local[*]")
    .config("spark.driver.host", "127.0.0.1")
    .config("spark.driver.bindAddress", "127.0.0.1")
    .getOrCreate()
)

df = spark.read.csv(
    file_path,
    header=True,
    inferSchema=True
)

# === КЛЮЧЕВОЕ ИСПРАВЛЕНИЕ ===

df = df.withColumn(
    "written_during_early_access",
    (col("written_during_early_access") == "True")
)

df = df.withColumn(
    "playtime",
    col("`author.playtime_forever`").cast("double")
)

early = df.filter(col("written_during_early_access") == True)

result = early.select(
    (col("playtime") / 60).alias("hours")
).agg(
    avg("hours").alias("avg_hours")
)

print("=== SPARK ===")
result.show()

spark.stop()

## Часть 3. Оркестрация пайплайна (Prefect как аналог Airflow)

Построим pipeline:

check_file — проверка наличия steam_reviews.csv;

spark_mart — Spark-задача, которая:

фильтрует пользователей раннего доступа,

считает среднее время игры (в часах),

сохраняет витрину в формате CSV;

export_report — вывод результата пользователю (может отправляться в BI / отчёт).

Все три шага объединяются в один flow (steam_pipeline), который можно запускать по расписанию.

In [26]:
from prefect import flow, task
import os


# входной и выходной файлы:
FILE = r"C:\Users\Firo\PycharmProjects\InstituteProject-course--4-\BigData\lab3\data\steam_reviews.csv"
OUT  = r"C:\Users\Firo\PycharmProjects\InstituteProject-course--4-\BigData\lab3\data\steam_early_access_result.csv"


@task
def check_file(path: str):
    if not os.path.exists(path):
        raise FileNotFoundError(f"Файл не найден: {path}")
    size = os.path.getsize(path) / (1024*1024)
    print(f"[check_file] OK ({size:.2f} MB)")
    return path


@task
def spark_mart(path: str):
    """
    Spark-задача: фильтруем только early-access,
    считаем среднее время в часах,
    сохраняем CSV (витрина данных).
    """

    from pyspark.sql import SparkSession
    from pyspark.sql.functions import col, avg

    spark = (
        SparkSession.builder
        .appName("SteamEarlyAccessPrefect")
        .master("local[*]")
        .config("spark.driver.host", "127.0.0.1")
        .config("spark.driver.bindAddress", "127.0.0.1")
        .getOrCreate()
    )

    df = spark.read.csv(path, header=True, inferSchema=True)

    df = df.withColumn(
        "written_during_early_access",
        (col("written_during_early_access") == "True")
    )

    df = df.withColumn(
        "playtime",
        col("`author.playtime_forever`").cast("double")
    )

    early = df.filter(col("written_during_early_access") == True)

    result = early.select(
        (col("playtime") / 60).alias("hours")
    ).agg(
        avg("hours").alias("avg_hours")
    )

    # === Windows-safe CSV витрина ===
    result.toPandas().to_csv(OUT, index=False)

    print(f"[spark_mart] витрина сохранена в: {OUT}")
    return OUT


@task
def export_report(csv_path: str):
    """
    Последний шаг — можно печатать итог,
    отправлять в BI, email и т.д.
    """
    print(f"[export_report] итоговый CSV: {csv_path}")
    return csv_path


@flow
def steam_pipeline(path: str):
    src = check_file(path)
    mart = spark_mart(src)
    export_report(mart)


# запуск flow
steam_pipeline(FILE)


[check_file] OK (7793.22 MB)


[spark_mart] витрина сохранена в: C:\Users\Firo\PycharmProjects\InstituteProject-course--4-\BigData\lab3\data\steam_early_access_result.csv


[export_report] итоговый CSV: C:\Users\Firo\PycharmProjects\InstituteProject-course--4-\BigData\lab3\data\steam_early_access_result.csv
