# Практика PySpark и SparkSQL (pyspark.sql.functions)

**Задача**. Сделать пайплайн обработки файла `cars.csv`. 

Необходимо посчитать по каждому производителю (поле `manufacturer_name`):
- кол-во объявлений
- средний год выпуска автомобилей
- минимальную цену
- максимальную цену

Выгрузить результат в `output.csv`.

## Мое решение с вертухи

In [6]:
# cоздаём SparkSession

from pyspark.sql import SparkSession
import pyspark.sql.functions as F

spark = SparkSession.builder.appName("Practice 2").getOrCreate()

df = spark.read.format("csv").option("header", "true").load("data/cars.csv")


In [17]:
# gdf = df.groupBy('manufacturer_name')
# gdf.agg({"*": 'count', "year_produced": 'mean', "price_usd": 'min', "price_usd": 'max' }).show()

# так почему две разные аггрегации для одной колонки не раьботают "price_usd": 'min', "price_usd": 'max'


+-----------------+--------------+------------------+--------+
|manufacturer_name|max(price_usd)|avg(year_produced)|count(1)|
+-----------------+--------------+------------------+--------+
|            Acura|        9700.0| 2007.121212121212|      66|
|       Alfa Romeo|         980.0|1999.2946859903382|     207|
|             Audi|        9999.0|2000.0996758508913|    2468|
|              BMW|        9999.0|2002.5620689655173|    2610|
|            Buick|        9999.0|2014.4468085106382|      47|
|         Cadillac|        9100.0|            2006.0|      43|
|            Chery|        9150.0|2010.6206896551723|      58|
|        Chevrolet|        9999.0|2010.6857798165138|     436|
|         Chrysler|        9900.0|2001.7585365853658|     410|
|          Citroen|        9999.0|2003.1325224071702|    1562|
|            Dacia|        9960.0|2009.2203389830509|      59|
|           Daewoo|         999.0| 2001.289592760181|     221|
|            Dodge|        9990.0|2003.3535353535353|  

In [34]:
df.groupBy('manufacturer_name').agg(F.count('manufacturer_name').alias("count names"),
                                    F.round(F.avg("year_produced")).cast('Integer').alias("year_produced"),
                                    F.min(F.col('price_usd').cast('float')).alias("min_price"),
                                    F.max(F.col('price_usd').cast('float')).alias("min_price"))\
                                    .orderBy("manufacturer_name").show(30)       

+-----------------+-----------+-------------+---------+---------+
|manufacturer_name|count names|year_produced|min_price|min_price|
+-----------------+-----------+-------------+---------+---------+
|            Acura|         66|         2007|   2350.0|  36500.0|
|       Alfa Romeo|        207|         1999|   104.33|  22000.0|
|             Audi|       2468|         2000|    130.0|  46750.0|
|              BMW|       2610|         2003|     9.49|  50000.0|
|            Buick|         47|         2014|   5300.0|  24000.0|
|         Cadillac|         43|         2006|   1700.0|  25750.0|
|            Chery|         58|         2011|   1250.0| 16077.02|
|        Chevrolet|        436|         2011|    800.0|  49900.0|
|         Chrysler|        410|         2002|    550.0|  48000.0|
|          Citroen|       1562|         2003|    100.0|  19500.0|
|            Dacia|         59|         2009|   1350.0|  11950.0|
|           Daewoo|        221|         2001|    100.0|   6700.0|
|         

## Решение

In [20]:
from pyspark.sql import SparkSession, DataFrame
import pyspark.sql.functions as F
import pyspark.sql.types as t


def extract_data(spark: SparkSession) -> DataFrame:
    path = "data/cars.csv"
    return spark.read.option("header", "true").csv(path)


def transform_data(df: DataFrame) -> DataFrame:
    output = (
        df
        .groupBy("manufacturer_name")
        .agg(
            F.count("manufacturer_name").alias("count_ads"),
            F.round(F.avg("year_produced")).cast(t.IntegerType()).alias("avg_year_produced"),
            F.min("price_usd").alias("min_price"),
            F.max("price_usd").alias("max_price"),
        )
        .orderBy(F.col("count_ads").desc())
    )
    return output


def save_data(df: DataFrame) -> None:
    df.coalesce(4).write.mode("overwrite").format("json").save("output.json")


def main():
    spark = SparkSession.builder.appName("Practice 2").getOrCreate()
    df = extract_data(spark)
    output = transform_data(df)
    save_data(output)
    #spark.stop()

main()


## такое решение является полноценным пайплайном

In [35]:
spark.stop()