# PySpark Demo – MapReduce & Agregacje
Ten notebook pokazuje alternatywę Spark dla lokalnych zadań MapReduce: word count oraz agregację transakcji.

In [None]:
from pyspark.sql import SparkSession, functions as F
spark = (SparkSession.builder.appName('PySparkDemo').getOrCreate())
spark

In [None]:
import os
DATA_PATH = os.path.join('..', 'data', 'transactions_sample.csv')
df = (spark.read.option('header', True).option('inferSchema', True).csv(DATA_PATH))
df.printSchema()
df.show(5, truncate=False)

## Word Count (kolumna category)

In [None]:
category_rdd = df.select('category').na.fill('').rdd.map(lambda r: r[0])
tokens = (category_rdd
  .flatMap(lambda v: v.replace(',', ' ').replace('.', ' ').split())
  .filter(lambda w: w)
  .map(lambda w: (w.lower(), 1))
  .reduceByKey(lambda a,b: a+b))
top_wc = tokens.takeOrdered(10, key=lambda kv: -kv[1])
top_wc

## Agregacja transakcji (DataFrame API)

In [None]:
agg = (df.groupBy('category')
          .agg(F.count('*').alias('count'),
               F.sum('amount').alias('total'),
               F.avg('amount').alias('avg'))
          .orderBy(F.col('total').desc()))
agg.show(5, truncate=False)

## Zapis do Parquet (opcjonalnie)

In [None]:
PARQUET_OUT = os.path.join('..', 'data', 'processed', 'transactions.parquet')
os.makedirs(os.path.dirname(PARQUET_OUT), exist_ok=True)
df.write.mode('overwrite').parquet(PARQUET_OUT)
print('Zapisano:', PARQUET_OUT)

## Porównanie: lokalny silnik vs Spark (opis)
Lokalny silnik MapReduce (multiprocessing) działa szybko na małych danych. Spark daje skalowalność, API DataFrame i integrację z MLlib.

In [None]:
spark.stop()