## Aggregating and Summarizing Data

### Example of Aggregate Functions

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, avg, min, max, count
spark = SparkSession.builder.appName("AggregationExample").getOrCreate()
data = [(1, "A", 100), (2, "A", 200), (3, "B", 300), (4, "B", 400)]
df = spark.createDataFrame(data, ["id", "group", "value"])
df.show()

+---+-----+-----+
| id|group|value|
+---+-----+-----+
|  1|    A|  100|
|  2|    A|  200|
|  3|    B|  300|
|  4|    B|  400|
+---+-----+-----+



#### groupBy

In [2]:
df.groupBy("group") #object

GroupedData[grouping expressions: [group], value: [id: bigint, group: string ... 1 more field], type: GroupBy]

Member in each group

In [3]:
from pyspark.sql.functions import collect_list

df.groupBy("group").agg(collect_list("id").alias("ids_in_group")).show

#### Alias

In [4]:
result = df.groupBy("group").agg(count("*"))
result.show()

+-----+--------+
|group|count(1)|
+-----+--------+
|    A|       2|
|    B|       2|
+-----+--------+



In [5]:
result = df.groupBy("group").agg(count("*").alias("count members in each group"))
result.show()

+-----+---------------------------+
|group|count members in each group|
+-----+---------------------------+
|    A|                          2|
|    B|                          2|
+-----+---------------------------+



#### groupBy and agg

In [6]:
result = df.groupBy("group").agg(
count("*").alias("count"),
sum("value").alias("sum"),
avg("value").alias("avg"),
min("value").alias("min"),
max("value").alias("max")
)
result.show()

+-----+-----+---+-----+---+---+
|group|count|sum|  avg|min|max|
+-----+-----+---+-----+---+---+
|    A|    2|300|150.0|100|200|
|    B|    2|700|350.0|300|400|
+-----+-----+---+-----+---+---+



#### col

In [None]:
# Select columns using col
df.select(col("id"), col("value")).show()

+---+-----+
| id|value|
+---+-----+
|  1|  100|
|  2|  200|
|  3|  300|
|  4|  400|
+---+-----+



#### Column transformation

In [None]:
# Multiply the value column by 2
df.withColumn("double_value", col("value") * 2).show()

+---+-----+-----+------------+
| id|group|value|double_value|
+---+-----+-----+------------+
|  1|    A|  100|         200|
|  2|    A|  200|         400|
|  3|    B|  300|         600|
|  4|    B|  400|         800|
+---+-----+-----+------------+



#### Filtering Rows

In [None]:
# Filter rows where value > 100
df.filter(col("value") > 100).show()

+---+-----+-----+
| id|group|value|
+---+-----+-----+
|  2|    A|  200|
|  3|    B|  300|
|  4|    B|  400|
+---+-----+-----+



In [None]:
spark.stop()

### sale_info.csv

**sales_info.csv**

- Company: Represents the name of the company.
Example values: GOOG, MSFT, FB, APPL.
- Person:
Indicates the individual associated with the sales transaction.
Example values: Sam, Charlie, Linda.
- Sales:
A numeric value representing the amount of sales made by the respective person.
Example values: 200.0, 600.0, 750.0.

      +-------+-------+-----+
      |Company| Person|Sales|
      +-------+-------+-----+
      |   GOOG|    Sam|200.0|
      |   GOOG|Charlie|120.0|
      |   GOOG|  Frank|340.0|
      |   MSFT|   Tina|600.0|
      |   MSFT|    Amy|124.0|
      |   MSFT|Vanessa|243.0|
      |     FB|   Carl|870.0|
      |     FB|  Sarah|350.0|
      |   APPL|   John|250.0|
      |   APPL|  Linda|130.0|
      |   APPL|   Mike|750.0|
      |   APPL|  Chris|350.0|
      +-------+-------+-----+


#### Load the Data from the URL

In [7]:
import os
import requests
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("LoadDataFromURL").getOrCreate()

# Define the URL and the local file path
url = "https://raw.githubusercontent.com/oakabc/DEA/main/6%20-%20Aggregating%20and%20Summarizing%20Data/sales_info.csv"
local_file = "sales_info.csv"

#### Download the file

In [8]:
# Download the file
response = requests.get(url)
if response.status_code == 200:
   with open(local_file, "wb") as f:
       f.write(response.content)
   print(f"File downloaded as {local_file}")
else:
   print("Failed to download the file")

File downloaded as sales_info.csv


#### Load and Show DataFrame

In [9]:
# Load the CSV file into PySpark DataFrame
df = spark.read.csv(local_file, header=True, inferSchema=True) # local_file = "sales_info.csv"

# Show the DataFrame
df.show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|    Sam|200.0|
|   GOOG|Charlie|120.0|
|   GOOG|  Frank|340.0|
|   MSFT|   Tina|600.0|
|   MSFT|    Amy|124.0|
|   MSFT|Vanessa|243.0|
|     FB|   Carl|870.0|
|     FB|  Sarah|350.0|
|   APPL|   John|250.0|
|   APPL|  Linda|130.0|
|   APPL|   Mike|750.0|
|   APPL|  Chris|350.0|
+-------+-------+-----+



printSchema

In [10]:
df.printSchema()

root
 |-- Company: string (nullable = true)
 |-- Person: string (nullable = true)
 |-- Sales: double (nullable = true)



Using groupBy

In [11]:
df.groupBy("Company") # Object: GroupedData[grouping expressions: [Company], value: [Company: string, Person: string ... 1 more field], type: GroupBy]

GroupedData[grouping expressions: [Company], value: [Company: string, Person: string ... 1 more field], type: GroupBy]

#### Using groupBy and then aggregate function

mean

In [12]:
df.groupBy("Company").mean().show()

+-------+-----------------+
|Company|       avg(Sales)|
+-------+-----------------+
|   APPL|            370.0|
|   GOOG|            220.0|
|     FB|            610.0|
|   MSFT|322.3333333333333|
+-------+-----------------+



sum

In [13]:
df.groupBy("Company").sum().show()

+-------+----------+
|Company|sum(Sales)|
+-------+----------+
|   APPL|    1480.0|
|   GOOG|     660.0|
|     FB|    1220.0|
|   MSFT|     967.0|
+-------+----------+



max

In [14]:
df.groupBy("Company").max().show()

+-------+----------+
|Company|max(Sales)|
+-------+----------+
|   APPL|     750.0|
|   GOOG|     340.0|
|     FB|     870.0|
|   MSFT|     600.0|
+-------+----------+



min

In [15]:
df.groupBy("Company").min().show()

+-------+----------+
|Company|min(Sales)|
+-------+----------+
|   APPL|     130.0|
|   GOOG|     120.0|
|     FB|     350.0|
|   MSFT|     124.0|
+-------+----------+



count

In [16]:
df.groupBy("Company").count().show()

+-------+-----+
|Company|count|
+-------+-----+
|   APPL|    4|
|   GOOG|    3|
|     FB|    2|
|   MSFT|    3|
+-------+-----+



#### Using agg

sum

In [17]:
df.agg({'Sales':'sum'}).show()

+----------+
|sum(Sales)|
+----------+
|    4327.0|
+----------+



max

In [18]:
df.agg({'Sales':'max'}).show()

+----------+
|max(Sales)|
+----------+
|     870.0|
+----------+



groupBy then agg

In [19]:
group_data = df.groupBy("Company")
group_data.agg({'Sales':'max'}).show()

+-------+----------+
|Company|max(Sales)|
+-------+----------+
|   APPL|     750.0|
|   GOOG|     340.0|
|     FB|     870.0|
|   MSFT|     600.0|
+-------+----------+



#### Import more functions from PySpark

countDistinct

In [20]:
from pyspark.sql.functions import avg, stddev, countDistinct
# How many companies are there ?
df.select(countDistinct('Company')).show()

+-----------------------+
|count(DISTINCT Company)|
+-----------------------+
|                      4|
+-----------------------+



In [21]:
# Change alias
df.select(countDistinct('Company').alias('num_company')).show()

+-----------+
|num_company|
+-----------+
|          4|
+-----------+



avg = mean

In [22]:
df.select(avg('sales')).show()

+-----------------+
|       avg(sales)|
+-----------------+
|360.5833333333333|
+-----------------+



Using Alias

In [23]:
df.select(avg('sales').alias('sales on average')).show()

+-----------------+
| sales on average|
+-----------------+
|360.5833333333333|
+-----------------+



**stddev**

function calculates the standard deviation of a column, which is a measure of the dispersion or spread of data values around the mean.

In [24]:
df.select(stddev('Sales')).show()

+------------------+
|     stddev(Sales)|
+------------------+
|250.08742410799007|
+------------------+



groupBy then stddev

In [25]:
group_data = df.groupBy("Company")
group_data.agg({'Sales':'stddev'}).show()

+-------+------------------+
|Company|     stddev(Sales)|
+-------+------------------+
|   APPL|268.82460204874604|
|   GOOG|111.35528725660043|
|     FB| 367.6955262170047|
|   MSFT| 247.7182539364698|
+-------+------------------+



Formatting numbers

In [26]:
from pyspark.sql.functions import format_number
sales_std = df.select(stddev('Sales').alias('std'))
sales_std.select(format_number('std',2).alias('std')).show()

+------+
|   std|
+------+
|250.09|
+------+



Sorting the data using orderBy

In [27]:
# Sort by sales in ascending order
df.orderBy('Sales').show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|Charlie|120.0|
|   MSFT|    Amy|124.0|
|   APPL|  Linda|130.0|
|   GOOG|    Sam|200.0|
|   MSFT|Vanessa|243.0|
|   APPL|   John|250.0|
|   GOOG|  Frank|340.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   MSFT|   Tina|600.0|
|   APPL|   Mike|750.0|
|     FB|   Carl|870.0|
+-------+-------+-----+



Sorting in Descending Order

In [28]:
df.orderBy('Sales', ascending=False).show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|     FB|   Carl|870.0|
|   APPL|   Mike|750.0|
|   MSFT|   Tina|600.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   GOOG|  Frank|340.0|
|   APPL|   John|250.0|
|   MSFT|Vanessa|243.0|
|   GOOG|    Sam|200.0|
|   APPL|  Linda|130.0|
|   MSFT|    Amy|124.0|
|   GOOG|Charlie|120.0|
+-------+-------+-----+



limit(n)

In [29]:
df.orderBy('Sales', ascending = False).limit(3).show()

+-------+------+-----+
|Company|Person|Sales|
+-------+------+-----+
|     FB|  Carl|870.0|
|   APPL|  Mike|750.0|
|   MSFT|  Tina|600.0|
+-------+------+-----+



Sorting by Multiple Columns

In [30]:
data = [("A", 3), ("B", 1), ("C", 3), ("D", 1)]
df = spark.createDataFrame(data, ["name", "value"])

# Sort by "value" (ascending) and "name" (descending)
df.orderBy("value", df["name"].desc()).show()

+----+-----+
|name|value|
+----+-----+
|   D|    1|
|   B|    1|
|   C|    3|
|   A|    3|
+----+-----+



##### Transform data

In [31]:
from pyspark.sql.functions import col, expr
# ASCII representation of the messages
data = [
   (1, [77, 101, 114, 114, 121, 32, 67, 104, 114, 105, 115, 116, 109, 97, 115]),
   (2, [72, 97, 112, 112, 121, 32, 78, 101, 119, 32, 89, 101, 97, 114])
]
columns = ["ID", "ASCII"]

# Create DataFrame
df = spark.createDataFrame(data, columns)
df.collect()

[Row(ID=1, ASCII=[77, 101, 114, 114, 121, 32, 67, 104, 114, 105, 115, 116, 109, 97, 115]),
 Row(ID=2, ASCII=[72, 97, 112, 112, 121, 32, 78, 101, 119, 32, 89, 101, 97, 114])]

Transform data: num -> char

In [32]:
# Convert ASCII list to message using lambda function
df_with_message = df.withColumn(
   "Message",
   expr("concat_ws('', transform(ASCII, x -> char(x)))")
)

In [33]:
# What is the result ?
df_with_message.select("ID", "Message").show()

+---+---------------+
| ID|        Message|
+---+---------------+
|  1|Merry Christmas|
|  2| Happy New Year|
+---+---------------+

