In [1]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType

In [2]:
sc = SparkContext()
spark = SparkSession(sc, jsparkSession=None).builder.appName("TotalAmount").getOrCreate()

In [3]:
schema = StructType([StructField("customerID", StringType()),
                     StructField("itemID", StringType()),
                     StructField("AmountSpend", FloatType())])

In [4]:
df = spark.read.schema(schema).csv("Data/customer-orders.csv")

In [5]:
df.printSchema()

root
 |-- customerID: string (nullable = true)
 |-- itemID: string (nullable = true)
 |-- AmountSpend: float (nullable = true)



In [6]:
df.show(5)

+----------+------+-----------+
|customerID|itemID|AmountSpend|
+----------+------+-----------+
|        44|  8602|      37.19|
|        35|  5368|      65.89|
|         2|  3391|      40.64|
|        47|  6694|      14.98|
|        29|   680|      13.08|
+----------+------+-----------+
only showing top 5 rows



In [7]:
# Solution 1 with SQL Query

In [8]:
df.createOrReplaceTempView("customer")

In [9]:
totalAmount = spark.sql('''
                        Select customerID,
                               ROUND(sum(AmountSpend), 2) As TotalAmount
                        From customer
                        Group By customerID
                        Order By TotalAmount
                        ''')

In [10]:
totalAmount.show()

+----------+-----------+
|customerID|TotalAmount|
+----------+-----------+
|        45|    3309.38|
|        79|    3790.57|
|        96|    3924.23|
|        23|    4042.65|
|        99|    4172.29|
|        75|     4178.5|
|        36|    4278.05|
|        98|    4297.26|
|        47|     4316.3|
|        77|    4327.73|
|        13|    4367.62|
|        48|    4384.33|
|        49|     4394.6|
|        94|    4475.57|
|        67|    4505.79|
|        50|    4517.27|
|        78|    4524.51|
|         5|    4561.07|
|        57|     4628.4|
|        83|     4635.8|
+----------+-----------+
only showing top 20 rows



In [11]:
# Solution 2 with DataFrame

In [12]:
customer = df.select("customerID", "AmountSpend")

In [13]:
agg = customer.groupBy("customerID").agg(func.round(func.sum("AmountSpend"), 2).alias("TotalAmount"))

In [14]:
aggSorted = agg.sort("TotalAmount")

In [15]:
aggSorted.show(aggSorted.count())

+----------+-----------+
|customerID|TotalAmount|
+----------+-----------+
|        45|    3309.38|
|        79|    3790.57|
|        96|    3924.23|
|        23|    4042.65|
|        99|    4172.29|
|        75|     4178.5|
|        36|    4278.05|
|        98|    4297.26|
|        47|     4316.3|
|        77|    4327.73|
|        13|    4367.62|
|        48|    4384.33|
|        49|     4394.6|
|        94|    4475.57|
|        67|    4505.79|
|        50|    4517.27|
|        78|    4524.51|
|         5|    4561.07|
|        57|     4628.4|
|        83|     4635.8|
|        91|    4642.26|
|        74|    4647.13|
|        84|    4652.94|
|         3|    4659.63|
|        12|    4664.59|
|        66|    4681.92|
|        56|    4701.02|
|        21|    4707.41|
|        80|    4727.86|
|        14|    4735.03|
|        37|     4735.2|
|         7|    4755.07|
|        44|    4756.89|
|        31|    4765.05|
|        82|    4812.49|
|         4|    4815.05|
|        10|     4819.7|


In [16]:
spark.stop()