# BigDataSpark - ETL Analysis

Этот ноутбук демонстрирует процесс ETL с использованием Apache Spark

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession.builder \
    .appName("BigDataAnalysis") \
    .getOrCreate()

In [None]:

df = spark.read \
    .format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("/home/jovyan/work/data/*.csv")

print(f"Общее количество записей: {df.count()}")
df.printSchema()

In [None]:

sales_by_category = df.groupBy("product_category") \
    .agg(
        sum("sale_total_price").alias("total_revenue"),
        count("*").alias("order_count"),
        avg("sale_total_price").alias("avg_order_value")
    ) \
    .orderBy(desc("total_revenue"))

sales_by_category.show()

In [None]:

top_products = df.groupBy("product_name", "product_category") \
    .agg(
        sum("sale_quantity").alias("total_quantity"),
        sum("sale_total_price").alias("total_revenue")
    ) \
    .orderBy(desc("total_quantity")) \
    .limit(10)

top_products.show(truncate=False)

In [None]:

customers_by_country = df.groupBy("customer_country") \
    .agg(
        countDistinct("sale_customer_id").alias("unique_customers"),
        sum("sale_total_price").alias("total_revenue")
    ) \
    .orderBy(desc("total_revenue"))

customers_by_country.show()

In [None]:

df_with_date = df.withColumn("sale_date_parsed", to_date(col("sale_date"), "M/d/yyyy")) \
    .withColumn("year", year("sale_date_parsed")) \
    .withColumn("month", month("sale_date_parsed"))

monthly_sales = df_with_date.groupBy("year", "month") \
    .agg(sum("sale_total_price").alias("monthly_revenue")) \
    .orderBy("year", "month")

monthly_sales.show()

In [None]:

quality_analysis = df.select("product_name", "product_rating", "product_reviews") \
    .distinct() \
    .orderBy(desc("product_rating"))

print("Продукты с лучшими рейтингами:")
quality_analysis.limit(10).show(truncate=False)

print("\nСтатистика по рейтингам:")
df.select("product_rating").describe().show()

In [None]:
spark.stop()