# Практика PySpark и SparkSQL (pyspark.sql.functions)

**Задача**. Сделать пайплайн обработки файла `cars.csv`. 

Необходимо посчитать по каждому производителю (поле `manufacturer_name`):
- кол-во объявлений
- средний год выпуска автомобилей
- минимальную цену
- максимальную цену

Выгрузить результат в `output.csv`.

In [3]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as t

def main():
    spark = SparkSession.builder.getOrCreate()

    df = spark.read.format("csv").option("header", "true").load("data/cars.csv")

    output = (
        df
        .groupBy("manufacturer_name")
        .agg(
            F.count("manufacturer_name").alias("count"),
            F.round(F.avg("year_produced")).cast(t.IntegerType()).alias("avg_year"),
            F.min(F.col("price_usd").cast(t.FloatType())).alias("min_price"),
            F.max("price_usd").cast(t.FloatType()).alias("max_price")
        )
    )

    output.coalesce(4).write.mode("overwrite").format("json").save("result.json")

main()

In [4]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as t

def extract_data(spark: SparkSession, path: str):
    """Загрузка данных из CSV."""
    return spark.read.format("csv").option("header", "true").load(path)

def transform_data(df):
    """Группировка и агрегация данных по производителю."""
    return (
        df
        .groupBy("manufacturer_name")
        .agg(
            F.count("manufacturer_name").alias("count"),
            F.round(F.avg("year_produced")).cast(t.IntegerType()).alias("avg_year"),
            F.min("price_usd").alias("min_price"),
            F.max("price_usd").alias("max_price")
        )
    )

def save_data(df, output_path: str):
    """Сохранение результата в json."""
    df.write.mode("overwrite").format("json").save(output_path)

def main():
    spark = SparkSession.builder.appName("Cars ETL").getOrCreate()
    df = extract_data(spark, "data/cars.csv")
    result = transform_data(df)
    save_data(result, "result.json")
    spark.stop()

if __name__ == "__main__":
    main()
