In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
import random
from datetime import datetime, timedelta

In [None]:
spark = SparkSession.builder \
    .appName("SalesAggregator") \
    .master("local[*]") \
    .config("spark.sql.shuffle.partitions", "4") \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()

In [None]:
# from pyspark.sql import functions as F
# from pyspark.sql.types import *

# # 1. Konfigurasi
# num_rows = 1_000_000
# categories = ['Electronics', 'Clothing', 'Food', 'Books']
# regions = ['North', 'South', 'East', 'West']

# # 2. Buat "Container" (Range)
# # Spark akan membagi 1 juta ini ke dalam beberapa partisi secara otomatis
# df = spark.range(0, num_rows)

# # 3. Transformasi menjadi Data Sales (Native Spark)
# df_sales = df.withColumn(
#         "transaction_id", 
#         F.concat(F.lit("TXN"), F.lpad(F.col("id").cast("string"), 6, "0"))
#     ).withColumn(
#         "category", 
#         F.element_at(F.array([F.lit(c) for c in categories]), 
#                     (F.rand() * len(categories) + 1).cast("int"))
#     ).withColumn(
#         "region", 
#         F.element_at(F.array([F.lit(r) for r in regions]), 
#                     (F.rand() * len(regions) + 1).cast("int"))
#     ).withColumn(
#         "amount", 
#         F.round(F.rand() * (1000 - 10) + 10, 2)
#     ).withColumn(
#         "quantity", 
#         (F.rand() * 9 + 1).cast("int")
#     ).withColumn(
#         "date", 
#         F.date_add(F.to_date(F.lit("2024-01-01")), (F.rand() * 365).cast("int"))
#     ).drop("id") # Buang kolom id bawaan spark.range

# # 4. Write ke CSV menggunakan Engine Spark
# # Ini akan jauh lebih cepat karena setiap executor menulis filenya sendiri
# df_sales.write.mode("overwrite").csv("/tmp/sales_data", header=True)

# print("Data 1 juta baris berhasil di-generate secara paralel!")

In [None]:
sales_schema = StructType([
    StructField("transaction_id", StringType(), True),
    StructField("category", StringType(), True),
    StructField("region", StringType(), True),
    StructField("amount", DoubleType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("date", StringType(), True),
])

try:
    df = spark.read.option("header", "true") \
        .option("mode", "FAILFAST") \
        .schema(sales_schema) \
        .csv("/tmp/sales_data", header = True)
    
    print(f"Loaded {df.count()} records successfully.")
except Exception as e:
    print(f"Error loading data: {e}")
    spark.stop()
    raise

In [None]:
df = df.withColumn("date",to_date(col("date"), "yyyy-MM-dd"))

df = df.withColumn("year", year(col("date"))) \
       .withColumn("month", month(col("date")))

nulls_count = df.select([ \
    count(when(col(c).isNull(), c)).alias(c) \
    for c in df.columns])

print("Nulls count per column:")
nulls_count.show()

print("\nData Ranges:")
df.select(
    min("amount").alias("min_amount"),
    max("amount").alias("max_amount"),
    min("quantity").alias("min_quantity"),
    max("quantity").alias("max_quantity"),
    min("date").alias("min_date"),
    max("date").alias("max_date")
).show()

In [None]:
# print 10 contoh data
print("Sample Data:")
df.show(10, truncate=False)


In [None]:
monthly_sales = df.groupBy("category", "region" ,"year", "month") \
    .agg(
        sum("amount").alias("total_revenue"),
        sum("quantity").alias("total_quantity"),
        count("transaction_id").alias("transactions_count"),
        avg("amount").alias("avg_transaction_value")
    )\
    .orderBy("year", "month", "category", "region")

print("Monthly Sales Aggregation:")
monthly_sales.show(20, truncate=False)

In [32]:
from pyspark.sql import Window

# Window untuk ranking per category
window_category = Window.partitionBy("category","year","month")\
    .orderBy(col("total_revenue").desc())

# Window untuk running total per region
window_running = Window.partitionBy("region") \
    .orderBy("year", "month") \
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)

enriched_sales = monthly_sales \
    .withColumn("region_rank", rank().over(window_category)) \
    .withColumn("cumulative_revenue", sum("total_revenue").over(window_running))

print("\nEnriched sales with ranking:")
enriched_sales.filter(col("region_rank") <= 2).show(20)

# buatkan penjelasan singkat tentang apa yang dilakukan window_function di atas
# Penjelasan:
# 1. window_category: Membuat jendela (window) yang mengelompokan data berdasarkan kategori, tahun, dan bulan. Data di dalam jendela ini diurutkan berdasarkan total pendapatan (total_revenue) secara menurun. Fungsi ini digunakan untuk memberikan peringkat (rank) pada setiap kategori dalam setiap bulan berdasarkan pendapatan tertinggi.
# 2. window_running: Membuat jendela yang mengelompokan data berdasarkan wilayah (region). Data di dalam jendela ini diurutkan berdasarkan tahun dan bulan. Fungsi ini digunakan untuk menghitung total kumulatif (cumulative sum) dari pendapatan (total_revenue) dari awal hingga baris saat ini dalam setiap wilayah. Dengan kata lain, ini memberikan gambaran tentang bagaimana pendapatan bertambah seiring waktu di setiap wilayah.
# 3. enriched_sales: DataFrame ini menambahkan dua kolom baru ke dalam hasil agregasi bulanan. Kolom "region_rank" memberikan peringkat untuk setiap kategori dalam setiap bulan berdasarkan total pendapatan, sedangkan kolom "cumulative_revenue" menghitung total pendapatan kumulatif untuk setiap wilayah dari waktu ke waktu. Hasil akhirnya adalah DataFrame yang lebih informatif yang memungkinkan analisis lebih lanjut berdasarkan peringkat dan tren pendapatan kumulatif.

# apa bedanya jendela dan groupby di spark?
# Perbedaan utama antara jendela (window) dan groupBy di Spark adalah cara mereka mengelompokkan dan memproses data:
# 1. groupBy: Mengelompokkan data berdasarkan satu atau lebih kolom dan kemudian menerapkan agregasi (seperti sum, avg, count) pada setiap kelompok. Hasilnya adalah satu baris per kelompok.
# 2. Jendela (Window): Membuat jendela yang memungkinkan operasi agregasi dilakukan pada subset data yang terkait dengan setiap baris, tanpa mengurangi jumlah baris dalam DataFrame. Ini memungkinkan perhitungan seperti peringkat, total kumulatif, dan rata-rata berjalan, di mana setiap baris tetap ada dalam hasil akhir.


Enriched sales with ranking:


                                                                                

+-----------+------+----+-----+------------------+--------------+------------------+---------------------+-----------+--------------------+
|   category|region|year|month|     total_revenue|total_quantity|transactions_count|avg_transaction_value|region_rank|  cumulative_revenue|
+-----------+------+----+-----+------------------+--------------+------------------+---------------------+-----------+--------------------+
|       Food|  East|2024|    1|2663134.3899999987|         26304|              5282|     504.190531995456|          2|         1.0636074E7|
|      Books|  East|2024|    2| 2487168.069999999|         24841|              4986|   498.83033894905714|          2|1.3123242069999998E7|
|      Books|  East|2024|    3|2730219.0999999987|         26858|              5352|   510.13062406576955|          1|2.3176740029999994E7|
|   Clothing|  East|2024|    3|2764524.6899999995|         27150|              5421|   509.96581627006077|          1| 2.594126471999999E7|
|Electronics|  East|

In [34]:
# Write sebagai Parquet dengan partitioning
output_path = "/tmp/sales_aggregated_parquet"
enriched_sales.write.mode("overwrite") \
    .partitionBy("year", "month") \
    .parquet(output_path)

print(f"Aggregated data written to {output_path} with partitioning by year and month.")

# Untuk dataset kecil, lebih baik NO partitioning
enriched_sales.coalesce(1).write \
    .mode("overwrite") \
    .parquet(output_path)

                                                                                

Aggregated data written to /tmp/sales_aggregated_parquet with partitioning by year and month.


                                                                                

In [35]:
# Read back dan verify
result_df = spark.read.parquet(output_path)

print("\nVerification:")
print(f"Output record count: {result_df.count()}")
print(f"Output schema:")
result_df.printSchema()

# Spot check beberapa records
print("\nSample output:")
result_df.orderBy("year", "month", "total_revenue", ascending=[True, True, False]) \
    .show(10, truncate=False)

# Check file layout
import os
for root, dirs, files in os.walk(output_path):
    level = root.replace(output_path, '').count(os.sep)
    indent = ' ' * 2 * level
    print(f"{indent}{os.path.basename(root)}/")
    subindent = ' ' * 2 * (level + 1)
    for file in files:
        if not file.startswith('.'):
            print(f"{subindent}{file}")


Verification:
Output record count: 192
Output schema:
root
 |-- category: string (nullable = true)
 |-- region: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- total_revenue: double (nullable = true)
 |-- total_quantity: long (nullable = true)
 |-- transactions_count: long (nullable = true)
 |-- avg_transaction_value: double (nullable = true)
 |-- region_rank: integer (nullable = true)
 |-- cumulative_revenue: double (nullable = true)


Sample output:
+-----------+------+----+-----+------------------+--------------+------------------+---------------------+-----------+--------------------+
|category   |region|year|month|total_revenue     |total_quantity|transactions_count|avg_transaction_value|region_rank|cumulative_revenue  |
+-----------+------+----+-----+------------------+--------------+------------------+---------------------+-----------+--------------------+
|Food       |South |2024|1    |2718987.1499999976|26679         |5

In [36]:
# Lihat execution plan
print("\nPhysical plan:")
enriched_sales.explain(mode="formatted")

# Cleanup
spark.stop()


Physical plan:
== Physical Plan ==
AdaptiveSparkPlan (15)
+- Window (14)
   +- Sort (13)
      +- Exchange (12)
         +- Window (11)
            +- Sort (10)
               +- Exchange (9)
                  +- Sort (8)
                     +- Exchange (7)
                        +- HashAggregate (6)
                           +- Exchange (5)
                              +- HashAggregate (4)
                                 +- Project (3)
                                    +- Project (2)
                                       +- Scan csv  (1)


(1) Scan csv 
Output [6]: [transaction_id#118, category#119, region#120, amount#121, quantity#122, date#123]
Batched: false
Location: InMemoryFileIndex [file:/tmp/sales_data]
ReadSchema: struct<transaction_id:string,category:string,region:string,amount:double,quantity:int,date:string>

(2) Project
Output [6]: [transaction_id#118, category#119, region#120, amount#121, quantity#122, cast(gettimestamp(date#123, yyyy-MM-dd, TimestampType, try_t