<a href="https://colab.research.google.com/github/RyzhovDmt/PySpark_projects/blob/main/Simple_ETL_PySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 33 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 53.4 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=ea7520d752e43ab8f1899f5a9ccce0ead180ed1be489a912bbb590bc16714e89
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


In [None]:
from google.colab import files
from pyspark.sql import SparkSession, DataFrame
import pyspark.sql.functions as F
import pyspark.sql.types as t

In [None]:
files.upload()

In [None]:
spark = SparkSession.builder.appName("ETL").getOrCreate()

In [None]:
spark

In [None]:
df = spark.read.format("csv").option("header", "true").load("cars.csv")

In [None]:
df.printSchema()

root
 |-- manufacturer_name: string (nullable = true)
 |-- model_name: string (nullable = true)
 |-- transmission: string (nullable = true)
 |-- color: string (nullable = true)
 |-- odometer_value: string (nullable = true)
 |-- year_produced: string (nullable = true)
 |-- engine_fuel: string (nullable = true)
 |-- engine_has_gas: string (nullable = true)
 |-- engine_type: string (nullable = true)
 |-- engine_capacity: string (nullable = true)
 |-- body_type: string (nullable = true)
 |-- has_warranty: string (nullable = true)
 |-- state: string (nullable = true)
 |-- drivetrain: string (nullable = true)
 |-- price_usd: string (nullable = true)
 |-- is_exchangeable: string (nullable = true)
 |-- location_region: string (nullable = true)
 |-- number_of_photos: string (nullable = true)
 |-- up_counter: string (nullable = true)
 |-- feature_0: string (nullable = true)
 |-- feature_1: string (nullable = true)
 |-- feature_2: string (nullable = true)
 |-- feature_3: string (nullable = true)


In [None]:
df.show()

+-----------------+----------+------------+------+--------------+-------------+-----------+--------------+-----------+---------------+---------+------------+-----+----------+---------+---------------+---------------+----------------+----------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------------+
|manufacturer_name|model_name|transmission| color|odometer_value|year_produced|engine_fuel|engine_has_gas|engine_type|engine_capacity|body_type|has_warranty|state|drivetrain|price_usd|is_exchangeable|location_region|number_of_photos|up_counter|feature_0|feature_1|feature_2|feature_3|feature_4|feature_5|feature_6|feature_7|feature_8|feature_9|duration_listed|
+-----------------+----------+------------+------+--------------+-------------+-----------+--------------+-----------+---------------+---------+------------+-----+----------+---------+---------------+---------------+----------------+----------+---------+---------+---------+----

In [None]:
df.select("manufacturer_name", "model_name").filter(F.col("manufacturer_name") == 'Audi').distinct().show()

+-----------------+----------+
|manufacturer_name|model_name|
+-----------------+----------+
|             Audi|        S4|
|             Audi|       100|
|             Audi|        Q5|
|             Audi|        A8|
|             Audi|        Q7|
|             Audi|        Q3|
|             Audi|        S5|
|             Audi|        A2|
|             Audi|       RS6|
|             Audi|        A1|
|             Audi|        A7|
|             Audi|        V8|
|             Audi|        TT|
|             Audi|     Coupe|
|             Audi|        A5|
|             Audi|        A3|
|             Audi|        S8|
|             Audi|        A6|
|             Audi|A6 Allroad|
|             Audi|        80|
+-----------------+----------+
only showing top 20 rows



In [None]:
def extract(spark: SparkSession, path: str) -> DataFrame:
    return spark.read.option("header", "true").csv(path)

In [None]:
# Инфо по каждому производителю:
# число объявл
# средний год
# min/max цену
def transform(df: DataFrame) -> DataFrame:
    output = (
        df
        #.filter(F.col("manufacturer_name") == 'Audi')
        .groupBy("manufacturer_name")
        .agg(
            F.count("manufacturer_name").alias("count_ads"),
            F.round(F.avg("year_produced")).cast(t.IntegerType()).alias("avg_year_produced"),
            F.min(F.col("price_usd").cast(t.FloatType())).alias("min_price"),
            F.max(F.col("price_usd").cast(t.FloatType())).alias("max_price"),
        )
        .orderBy(F.col("count_ads").desc())
    )
    return output

In [None]:
def save(df: DataFrame) -> None:
    #df.write.mode("overwrite").format("json").save("result.json")
    df.coalesce(2).write.mode("overwrite").format("json").save("result")

In [None]:
def pipeline():
    spark = SparkSession.builder.appName("ETL").getOrCreate()
    df = extract(spark, "cars.csv")
    output = transform(df)
    save(output)
    output.show()

pipeline()

+-----------------+---------+-----------------+---------+---------+
|manufacturer_name|count_ads|avg_year_produced|min_price|max_price|
+-----------------+---------+-----------------+---------+---------+
|       Volkswagen|     4243|             2002|      1.0|  43999.0|
|             Opel|     2759|             2002|    150.0|  22900.0|
|              BMW|     2610|             2003|     9.49|  50000.0|
|             Ford|     2566|             2002|    110.0|  41000.0|
|          Renault|     2493|             2003|     10.0| 30304.47|
|             Audi|     2468|             2000|    130.0|  46750.0|
|    Mercedes-Benz|     2237|             2002|     1.42|  49999.0|
|          Peugeot|     1909|             2003|      1.0|  20450.0|
|          Citroen|     1562|             2003|    100.0|  19500.0|
|           Nissan|     1361|             2004|    150.0|  39000.0|
|            Mazda|     1328|             2002|      1.0|  39500.0|
|           Toyota|     1246|             2006| 