<a href="https://colab.research.google.com/github/KAYALAIMMANUELRAJU/PySpark1/blob/main/Advanced%20Spark%20with%20Java.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Case Study on Advanced Spark with Java
Task 1:

In [None]:
# Install Java 11
!apt-get install openjdk-11-jdk-headless -qq > /dev/null

# Install Spark 3.3.2
!wget -q https://downloads.apache.org/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz
!tar xf spark-3.3.2-bin-hadoop3.tgz

# Set environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.2-bin-hadoop3"

# Install findspark to link Python and Spark
!pip install -q findspark

tar: spark-3.3.2-bin-hadoop3.tgz: Cannot open: No such file or directory
tar: Error is not recoverable: exiting now


In [None]:
!apt-get install openjdk-11-jdk-headless -qq > /dev/null

In [None]:
!wget -q https://downloads.apache.org/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz

In [None]:
%%writefile /content/project/src/DataIngestionSetup.java
import org.apache.spark.sql.*;
public class DataIngestionSetup {
    public static void main(String[] args) {
        SparkSession spark = SparkSession.builder()
            .appName("DataIngestionSetup")
            .master("local")
            .getOrCreate();
        String[] tables = { "customers","employees","offices","orderdetails",
                            "orders","payments","products","productlines" };
        for (String t : tables) {
            spark.read().option("header","true").csv("/content/data/csv/"+t+".csv")
                 .write().mode("overwrite").parquet("/content/data/parquet/"+t);
            System.out.println("✅ "+t);
        }
        spark.stop();
    }
}

Overwriting /content/project/src/DataIngestionSetup.java


In [None]:
%cd /content/project/src
!javac -cp "/content/spark-3.3.2-bin-hadoop3/jars/*" DataIngestionSetup.java

/content/project/src


In [None]:
!/content/spark-3.3.2-bin-hadoop3/bin/spark-submit \
  --class DataIngestionSetup --master local \
  --conf "spark.driver.extraClassPath=/content/project/src:/content/spark-3.3.2-bin-hadoop3/jars/*" \
  --jars /content/spark-3.3.2-bin-hadoop3/jars/* \
  DataIngestionSetup

25/08/06 06:35:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/08/06 06:35:26 INFO SparkContext: Running Spark version 3.3.2
25/08/06 06:35:27 INFO ResourceUtils: No custom resources configured for spark.driver.
25/08/06 06:35:27 INFO SparkContext: Submitted application: DataIngestionSetup
25/08/06 06:35:27 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
25/08/06 06:35:27 INFO ResourceProfile: Limiting resource is cpu
25/08/06 06:35:27 INFO ResourceProfileManager: Added ResourceProfile id: 0
25/08/06 06:35:27 INFO SecurityManager: Changing view acls to: root
25/08/06 06:35:27 INFO SecurityManager: Changing modify acls to: root
25/08/06 06:35:27 INFO Securi

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark.read.parquet("/content/data/parquet/customers").show(3)

+--------------+--------------------+---------------+----------------+------------+-----------------+------------+---------+--------+----------+---------+----------------------+-----------+
|customerNumber|        customerName|contactLastName|contactFirstName|       phone|     addressLine1|addressLine2|     city|   state|postalCode|  country|salesRepEmployeeNumber|creditLimit|
+--------------+--------------------+---------------+----------------+------------+-----------------+------------+---------+--------+----------+---------+----------------------+-----------+
|           103|   Atelier graphique|        Schmitt|         Carine |  40.32.2555|   54, rue Royale|        null|   Nantes|    null|     44000|   France|                1370.0|    21000.0|
|           112|  Signal Gift Stores|           King|            Jean|  7025551838|  8489 Strong St.|        null|Las Vegas|      NV|     83030|      USA|                1166.0|    71800.0|
|           114|Australian Collec...|       Fergus

Task 2:

In [None]:
%%writefile /content/project/src/OrderRevenueAnalysis.java
import org.apache.spark.sql.*;

public class OrderRevenueAnalysis {
    public static void main(String[] args) {
        SparkSession spark = SparkSession.builder()
            .appName("OrderRevenueAnalysis")
            .master("local")
            .getOrCreate();

        Dataset<Row> orders = spark.read().parquet("/content/data/parquet/orders");
        Dataset<Row> orderDetails = spark.read().parquet("/content/data/parquet/orderdetails");
        Dataset<Row> products = spark.read().parquet("/content/data/parquet/products");

        // Top 10 products by quantity sold
        Dataset<Row> topProducts = orderDetails.groupBy("productCode")
            .agg(functions.sum("quantityOrdered").alias("totalSold"))
            .join(products, "productCode")
            .select("productName", "totalSold")
            .orderBy(functions.col("totalSold").desc())
            .limit(10);

        topProducts.write().mode("overwrite").parquet("/output/processed/top_10_products.parquet");

        // Product-wise revenue
        Dataset<Row> productRevenue = orderDetails.withColumn("revenue",
                functions.expr("quantityOrdered * priceEach"))
            .groupBy("productCode")
            .agg(functions.sum("revenue").alias("totalRevenue"))
            .join(products, "productCode")
            .select("productName", "totalRevenue")
            .orderBy(functions.col("totalRevenue").desc());

        productRevenue.write().mode("overwrite").parquet("/output/processed/product_revenue.parquet");

        // Average order value per customer
        Dataset<Row> ordersWithTotal = orderDetails.withColumn("lineTotal",
                functions.expr("quantityOrdered * priceEach"))
            .groupBy("orderNumber")
            .agg(functions.sum("lineTotal").alias("orderTotal"));

        Dataset<Row> ordersWithCustomer = orders.join(ordersWithTotal, "orderNumber")
            .groupBy("customerNumber")
            .agg(functions.avg("orderTotal").alias("avgOrderValue"));

        ordersWithCustomer.write().mode("overwrite").parquet("/output/processed/avg_order_value.parquet");

        spark.stop();
    }
}

Overwriting /content/project/src/OrderRevenueAnalysis.java


In [None]:
%cd /content/project/src
!javac -cp "/content/spark-3.3.2-bin-hadoop3/jars/*" OrderRevenueAnalysis.java

/content/project/src


In [None]:
!/content/spark-3.3.2-bin-hadoop3/bin/spark-submit \
  --class OrderRevenueAnalysis \
  --master local \
  --conf "spark.driver.extraClassPath=/content/project/src:/content/spark-3.3.2-bin-hadoop3/jars/*" \
  --jars /content/spark-3.3.2-bin-hadoop3/jars/* \
  OrderRevenueAnalysis

25/08/06 06:44:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/08/06 06:44:23 INFO SparkContext: Running Spark version 3.3.2
25/08/06 06:44:23 INFO ResourceUtils: No custom resources configured for spark.driver.
25/08/06 06:44:23 INFO SparkContext: Submitted application: OrderRevenueAnalysis
25/08/06 06:44:23 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
25/08/06 06:44:23 INFO ResourceProfile: Limiting resource is cpu
25/08/06 06:44:23 INFO ResourceProfileManager: Added ResourceProfile id: 0
25/08/06 06:44:23 INFO SecurityManager: Changing view acls to: root
25/08/06 06:44:23 INFO SecurityManager: Changing modify acls to: root
25/08/06 06:44:23 INFO Secu

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

spark.read.parquet("/output/processed/top_10_products.parquet").show()
spark.read.parquet("/output/processed/product_revenue.parquet").show()
spark.read.parquet("/output/processed/avg_order_value.parquet").show()

+--------------------+---------+
|         productName|totalSold|
+--------------------+---------+
|1992 Ferrari 360 ...|   1808.0|
|1937 Lincoln Berline|   1111.0|
|American Airlines...|   1085.0|
|1941 Chevrolet Sp...|   1076.0|
|1930 Buick Marque...|   1074.0|
|    1940s Ford truck|   1061.0|
|1969 Harley David...|   1057.0|
|   1957 Chevy Pickup|   1056.0|
|1964 Mercedes Tou...|   1053.0|
|1956 Porsche 356A...|   1052.0|
+--------------------+---------+

+--------------------+------------------+
|         productName|      totalRevenue|
+--------------------+------------------+
|1992 Ferrari 360 ...|         276839.98|
|   2001 Ferrari Enzo|         190755.86|
|1952 Alpine Renau...|190017.95999999996|
|2003 Harley-David...|170685.99999999997|
|   1968 Ford Mustang|161531.47999999992|
|    1969 Ford Falcon|         152543.02|
|1980s Black Hawk ...|144959.90999999997|
|1998 Chrysler Ply...|142530.62999999998|
|1917 Grand Tourin...|140535.60000000003|
|    2002 Suzuki XREO|135767.0300

Task 3:

In [None]:
java_code = """
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import static org.apache.spark.sql.functions.*;

public class RegionalSalesInsights {
    public static void main(String[] args) {
        SparkSession spark = SparkSession.builder()
            .appName("RegionalSalesInsights")
            .master("local[*]")
            .getOrCreate();

        // Load data from Parquet
        Dataset<Row> customers = spark.read().parquet("/content/data/parquet/customers");
        Dataset<Row> payments = spark.read().parquet("/content/data/parquet/payments");
        Dataset<Row> employees = spark.read().parquet("/content/data/parquet/employees");
        Dataset<Row> offices = spark.read().parquet("/content/data/parquet/offices");

        // === 1. Total Revenue by Country ===
        Dataset<Row> customerPayments = customers.alias("c")
            .join(payments.alias("p"), col("c.customerNumber").equalTo(col("p.customerNumber")))
            .groupBy(col("c.country").alias("country"))
            .agg(round(sum("p.amount"), 2).alias("totalRevenue"))
            .orderBy(desc("totalRevenue"));

        customerPayments.write().mode("overwrite").parquet("/output/processed/revenue_by_country.parquet");

        // === 2. Sales per Region (Customer Sales by Office) ===
        // Use left_outer join to keep all customers, even those without a sales rep
        Dataset<Row> customerOffices = customers.alias("c")
            .join(employees.alias("e"), col("c.salesRepEmployeeNumber").equalTo(col("e.employeeNumber")), "left_outer")
            .join(offices.alias("o"), col("e.officeCode").equalTo(col("o.officeCode")), "left_outer")
            .join(payments.alias("p"), col("c.customerNumber").equalTo(col("p.customerNumber"))); // Inner join with payments as we only care about customers with payments

        Dataset<Row> salesByOffice = customerOffices
            .groupBy(
                coalesce(col("o.officeCode"), lit("Unknown Office")).alias("officeCode"),
                coalesce(col("o.city"), lit("Unknown City")).alias("city"),
                coalesce(col("o.country"), lit("Unknown Country")).alias("country")
            )
            .agg(
                countDistinct("c.customerNumber").alias("customerCount"),
                round(sum("p.amount"), 2).alias("totalSales")
            )
            .orderBy(desc("totalSales"));

        salesByOffice.write().mode("overwrite").parquet("/output/processed/customer_sales_by_office.parquet");

        // === 3. Top Performing Offices ===
        Dataset<Row> topOffices = salesByOffice
            .select("officeCode", "city", "country", "totalSales")
            .orderBy(desc("totalSales"))
            .limit(5);

        topOffices.write().mode("overwrite").parquet("/output/processed/top_offices.parquet");

        spark.stop();
    }
}
"""

with open("/content/project/src/RegionalSalesInsights.java", "w") as f:
    f.write(java_code)

In [None]:
!javac -cp "/content/spark-3.3.2-bin-hadoop3/jars/*" /content/project/src/RegionalSalesInsights.java

In [None]:
!/content/spark-3.3.2-bin-hadoop3/bin/spark-submit \
  --class RegionalSalesInsights \
  --master local \
  --conf "spark.driver.extraClassPath=/content/project/src:/content/spark-3.3.2-bin-hadoop3/jars/*" \
  --conf "spark.executor.extraClassPath=/content/project/src:/content/spark-3.3.2-bin-hadoop3/jars/*" \
  --jars /content/spark-3.3.2-bin-hadoop3/jars/* \
  RegionalSalesInsights

25/08/06 09:09:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/08/06 09:09:07 INFO SparkContext: Running Spark version 3.3.2
25/08/06 09:09:07 INFO ResourceUtils: No custom resources configured for spark.driver.
25/08/06 09:09:07 INFO SparkContext: Submitted application: RegionalSalesInsights
25/08/06 09:09:07 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
25/08/06 09:09:07 INFO ResourceProfile: Limiting resource is cpu
25/08/06 09:09:07 INFO ResourceProfileManager: Added ResourceProfile id: 0
25/08/06 09:09:07 INFO SecurityManager: Changing view acls to: root
25/08/06 09:09:07 INFO SecurityManager: Changing modify acls to: root
25/08/06 09:09:07 INFO Sec

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

spark.read.parquet("/output/processed/customer_sales_by_office.parquet").show()
spark.read.parquet("/output/processed/revenue_by_country.parquet").show()
spark.read.parquet("/output/processed/top_offices.parquet").show()

+--------------+------------+---------------+-------------+----------+
|    officeCode|        city|        country|customerCount|totalSales|
+--------------+------------+---------------+-------------+----------+
|Unknown Office|Unknown City|Unknown Country|           98|8853839.23|
+--------------+------------+---------------+-------------+----------+

+-----------+------------+
|    country|totalRevenue|
+-----------+------------+
|        USA|  3040029.52|
|      Spain|   994438.53|
|     France|   965750.58|
|  Australia|   509385.82|
|New Zealand|   392486.59|
|         UK|    391503.9|
|      Italy|   325254.55|
|    Finland|   295149.35|
|  Singapore|    261671.6|
|     Canada|   205911.86|
|    Denmark|    197356.3|
|    Germany|   196470.99|
|      Japan|   167909.95|
|   Norway  |   166621.51|
|    Austria|   136119.99|
|     Sweden|   120457.09|
|Switzerland|   108777.92|
|     Norway|   104224.79|
|    Belgium|    91471.03|
|Philippines|     87468.3|
+-----------+----------

Task 4:

In [None]:
%%writefile /content/project/src/PerformanceOptimization.java
import org.apache.spark.sql.*;
import static org.apache.spark.sql.functions.*;
import org.apache.spark.api.java.function.MapPartitionsFunction;

public class PerformanceOptimization {
    public static void main(String[] args) {
        SparkSession spark = SparkSession.builder()
            .appName("PerformanceOptimization")
            .master("local")
            .getOrCreate();

        // 1. Read & cache datasets
        Dataset<Row> orderDetails = spark.read().parquet("/content/data/parquet/orderdetails").cache();
        Dataset<Row> products     = spark.read().parquet("/content/data/parquet/products").cache();
        Dataset<Row> offices      = spark.read().parquet("/content/data/parquet/offices");
        Dataset<Row> orders       = spark.read().parquet("/content/data/parquet/orders");
        Dataset<Row> customers    = spark.read().parquet("/content/data/parquet/customers");
        Dataset<Row> employees    = spark.read().parquet("/content/data/parquet/employees");

        // 2. Trigger lazy evaluation
        Dataset<Row> cached = orderDetails
            .join(products, "productCode")
            .withColumn("lineRevenue", col("quantityOrdered").multiply(col("priceEach")))
            .cache();
        cached.count();

        // 3. Broadcast join for office sales
        Dataset<Row> officeSales = orders
            .join(customers, "customerNumber")
            .join(employees,
                  customers.col("salesRepEmployeeNumber")
                           .equalTo(employees.col("employeeNumber")))
            .join(broadcast(offices),
                  employees.col("officeCode")
                           .equalTo(offices.col("officeCode")))
            .groupBy(offices.col("officeCode"),
                     offices.col("city"),
                     offices.col("country"))
            .agg(count("*").alias("orderCount"));

        officeSales.show();

        // 4. mapPartitions example
        Dataset<String> productNames = products.mapPartitions(
            new MapPartitionsFunction<Row, String>() {
                @Override
                public java.util.Iterator<String> call(java.util.Iterator<Row> it) {
                    java.util.List<String> out = new java.util.ArrayList<>();
                    while (it.hasNext()) {
                        Row r = it.next();
                        out.add(r.getAs("productName").toString().toUpperCase());
                    }
                    return out.iterator();
                }
            }, Encoders.STRING()
        );
        productNames.show();

        // 5. Write outputs
        officeSales.write().mode("overwrite")
            .parquet("/content/output/processed/employee_sales_summary.parquet");
        productNames.write().mode("overwrite")
            .parquet("/content/output/processed/product_names_uppercase.parquet");

        spark.stop();
    }
}

Overwriting /content/project/src/PerformanceOptimization.java


In [None]:
# Compile & package as a JAR
%cd /content/project/src
!javac -cp "/content/spark-3.3.2-bin-hadoop3/jars/*" PerformanceOptimization.java
!jar cf performance-opt.jar PerformanceOptimization.class

/content/project/src


In [None]:
# Run with spark-submit
!/content/spark-3.3.2-bin-hadoop3/bin/spark-submit \
  --class PerformanceOptimization \
  --master local \
  --jars /content/spark-3.3.2-bin-hadoop3/jars/*,/content/project/src \
  performance-opt.jar

25/08/06 09:12:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/08/06 09:12:21 INFO SparkContext: Running Spark version 3.3.2
25/08/06 09:12:21 INFO ResourceUtils: No custom resources configured for spark.driver.
25/08/06 09:12:21 INFO SparkContext: Submitted application: PerformanceOptimization
25/08/06 09:12:21 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
25/08/06 09:12:21 INFO ResourceProfile: Limiting resource is cpu
25/08/06 09:12:21 INFO ResourceProfileManager: Added ResourceProfile id: 0
25/08/06 09:12:21 INFO SecurityManager: Changing view acls to: root
25/08/06 09:12:21 INFO SecurityManager: Changing modify acls to: root
25/08/06 09:12:21 INFO S

In [None]:
# Verify the Parquet outputs
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("VerifyTask4").getOrCreate()

print("=== Office Sales Summary ===")
spark.read.parquet("/content/output/processed/employee_sales_summary.parquet").show()

print("=== Uppercased Product Names ===")
spark.read.parquet("/content/output/processed/product_names_uppercase.parquet").show()

=== Office Sales Summary ===
+----------+-------------+---------+----------+
|officeCode|         city|  country|orderCount|
+----------+-------------+---------+----------+
|         2|       Boston|      USA|        32|
|         6|       Sydney|Australia|        38|
|         1|San Francisco|      USA|        48|
|         3|          NYC|      USA|        39|
|         5|        Tokyo|    Japan|        16|
|         7|       London|       UK|        47|
|         4|        Paris|   France|       106|
+----------+-------------+---------+----------+

=== Uppercased Product Names ===
+--------------------+
|               value|
+--------------------+
|1969 HARLEY DAVID...|
|1952 ALPINE RENAU...|
|1996 MOTO GUZZI 1...|
|2003 HARLEY-DAVID...|
| 1972 ALFA ROMEO GTA|
|1962 LANCIAA DELT...|
|   1968 FORD MUSTANG|
|   2001 FERRARI ENZO|
|      1958 SETRA BUS|
|    2002 SUZUKI XREO|
|  1969 CORVAIR MONZA|
|  1968 DODGE CHARGER|
|    1969 FORD FALCON|
|1970 PLYMOUTH HEM...|
|   1957 CHEVY PIC

Task 5:

In [None]:
%%writefile /content/project/src/ProductDemandAnalysis.java
import org.apache.spark.sql.*;
import static org.apache.spark.sql.functions.*;

public class ProductDemandAnalysis {
    public static void main(String[] args) {
        SparkSession spark = SparkSession.builder()
            .appName("ProductDemandAnalysis")
            .master("local")
            .getOrCreate();

        // Read the already‐written Parquet tables
        Dataset<Row> orderDetails = spark.read().parquet("/content/data/parquet/orderdetails");
        Dataset<Row> products    = spark.read().parquet("/content/data/parquet/products");

        // Join and aggregate
        Dataset<Row> demand = orderDetails
            .join(products.select("productCode","productName"), "productCode")
            .groupBy("productCode","productName")
            .agg(
                countDistinct("orderNumber").alias("orderCount"),
                sum("quantityOrdered").alias("totalOrderedQty"),
                round(avg("priceEach"), 2).alias("averagePriceEach")
            )
            .orderBy(desc("totalOrderedQty"));

        // Show and write to Parquet
        demand.show(false);
        demand.write().mode("overwrite")
              .parquet("/content/output/processed/product_demand_summary.parquet");

        spark.stop();
    }
}

Overwriting /content/project/src/ProductDemandAnalysis.java


In [None]:
%cd /content/project/src
!javac -cp "/content/spark-3.3.2-bin-hadoop3/jars/*" ProductDemandAnalysis.java

/content/project/src


In [None]:
!/content/spark-3.3.2-bin-hadoop3/bin/spark-submit \
  --class ProductDemandAnalysis \
  --master local \
  --conf "spark.driver.extraClassPath=/content/project/src:/content/spark-3.3.2-bin-hadoop3/jars/*" \
  --jars /content/spark-3.3.2-bin-hadoop3/jars/* \
  ProductDemandAnalysis

25/08/06 09:14:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/08/06 09:14:57 INFO SparkContext: Running Spark version 3.3.2
25/08/06 09:14:57 INFO ResourceUtils: No custom resources configured for spark.driver.
25/08/06 09:14:57 INFO SparkContext: Submitted application: ProductDemandAnalysis
25/08/06 09:14:57 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
25/08/06 09:14:57 INFO ResourceProfile: Limiting resource is cpu
25/08/06 09:14:57 INFO ResourceProfileManager: Added ResourceProfile id: 0
25/08/06 09:14:57 INFO SecurityManager: Changing view acls to: root
25/08/06 09:14:57 INFO SecurityManager: Changing modify acls to: root
25/08/06 09:14:57 INFO Sec

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("VerifyDemand").getOrCreate()

spark.read.parquet("/content/output/processed/product_demand_summary.parquet") \
     .select("productCode","productName","orderCount","totalOrderedQty","averagePriceEach") \
     .show(20, False)

+-----------+---------------------------------------+----------+---------------+----------------+
|productCode|productName                            |orderCount|totalOrderedQty|averagePriceEach|
+-----------+---------------------------------------+----------+---------------+----------------+
|S18_3232   |1992 Ferrari 360 Spider red            |53        |1808.0         |152.34          |
|S18_1342   |1937 Lincoln Berline                   |28        |1111.0         |92.1            |
|S700_4002  |American Airlines: MD-11S              |28        |1085.0         |65.97           |
|S18_3856   |1941 Chevrolet Special Deluxe Cabriolet|28        |1076.0         |95.77           |
|S50_1341   |1930 Buick Marquette Phaeton           |28        |1074.0         |38.79           |
|S18_4600   |1940s Ford truck                       |28        |1061.0         |107.98          |
|S10_1678   |1969 Harley Davidson Ultimate Chopper  |28        |1057.0         |85.17           |
|S12_4473   |1957 Ch