# Grupowanie i agregacja

GroupBy pozwala grupować wiersze w oparciu o pewną wartość kolumny, na przykład, możesz pogrupować dane o sprzedaży do dnia, w którym nastąpiła sprzedaż, lub grupować dane klientów ponownie na podstawie nazwy klienta. Po wykonaniu operacji GroupBy możesz użyć funkcji agregującej z tych danych. Funkcja agregująca agreguje wiele wierszy danych w jedno wyjście, na przykład pobierając sumę danych wejściowych lub liczbę wejść.

Zobaczmy przykłady na przykładowym zestawie danych!

In [1]:
import sys
sys.path.append("../")

import findspark
from settings import SPARK_PATH
findspark.init(SPARK_PATH) # Ścieżka do Sparka 

In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName("groupbyagg").getOrCreate()

In [4]:
df = spark.read.csv('sales_info.csv',inferSchema=True,header=True)

In [5]:
df.printSchema()

root
 |-- Company: string (nullable = true)
 |-- Person: string (nullable = true)
 |-- Sales: double (nullable = true)



In [6]:
df.show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|    Sam|200.0|
|   GOOG|Charlie|120.0|
|   GOOG|  Frank|340.0|
|   MSFT|   Tina|600.0|
|   MSFT|    Amy|124.0|
|   MSFT|Vanessa|243.0|
|     FB|   Carl|870.0|
|     FB|  Sarah|350.0|
|   APPL|   John|250.0|
|   APPL|  Linda|130.0|
|   APPL|   Mike|750.0|
|   APPL|  Chris|350.0|
+-------+-------+-----+



Grupuj po firmach 

In [7]:
df.groupBy("Company")

<pyspark.sql.group.GroupedData at 0x10eedf100>

In [8]:
# Mean
df.groupBy("Company").mean().show()

+-------+-----------------+
|Company|       avg(Sales)|
+-------+-----------------+
|   GOOG|            220.0|
|   MSFT|322.3333333333333|
|     FB|            610.0|
|   APPL|            370.0|
+-------+-----------------+



In [9]:
# Count
df.groupBy("Company").count().show()

+-------+-----+
|Company|count|
+-------+-----+
|   GOOG|    3|
|   MSFT|    3|
|     FB|    2|
|   APPL|    4|
+-------+-----+



In [10]:
# Max
df.groupBy("Company").max().show()

+-------+----------+
|Company|max(Sales)|
+-------+----------+
|   GOOG|     340.0|
|   MSFT|     600.0|
|     FB|     870.0|
|   APPL|     750.0|
+-------+----------+



In [11]:
# Min
df.groupBy("Company").min().show()

+-------+----------+
|Company|min(Sales)|
+-------+----------+
|   GOOG|     120.0|
|   MSFT|     124.0|
|     FB|     350.0|
|   APPL|     130.0|
+-------+----------+



In [12]:
# Sum
df.groupBy("Company").sum().show()

+-------+----------+
|Company|sum(Sales)|
+-------+----------+
|   GOOG|     660.0|
|   MSFT|     967.0|
|     FB|    1220.0|
|   APPL|    1480.0|
+-------+----------+



Link do większej ilości metod
http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark-sql-module

Nie wszystkie metody wymagają wywołania groupBy, zamiast tego można po prostu wywołać uogólnioną metodę .agg (), która wywoła agregację we wszystkich wierszach określonej kolumny danych. Może przyjmować argumenty jako pojedyncza kolumna lub tworzyć wiele połączeń zagregowanych jednocześnie za pomocą notacji słownikowej.

Przykład:

In [13]:
# Ogolna maksymalna sprzedaz 
df.agg({'Sales':'max'}).show()

+----------+
|max(Sales)|
+----------+
|     870.0|
+----------+



In [14]:
grouped = df.groupBy("Company")

In [15]:
grouped.agg({"Sales":'max'}).show()

+-------+----------+
|Company|max(Sales)|
+-------+----------+
|   GOOG|     340.0|
|   MSFT|     600.0|
|     FB|     870.0|
|   APPL|     750.0|
+-------+----------+



## Funkcje
Istnieje wiele funkcji, które można importować z pyspark.sql.functions. Link do dokumentacji listy:
http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions

In [16]:
from pyspark.sql.functions import countDistinct, avg,stddev

In [17]:
df.select(countDistinct("Sales")).show()

+---------------------+
|count(DISTINCT Sales)|
+---------------------+
|                   11|
+---------------------+



In [18]:
df.select(avg('Sales')).show()

+-----------------+
|       avg(Sales)|
+-----------------+
|360.5833333333333|
+-----------------+



In [19]:
df.select(stddev("Sales")).show()

+------------------+
|stddev_samp(Sales)|
+------------------+
|250.08742410799007|
+------------------+



## Formatowanie

In [20]:
from pyspark.sql.functions import format_number

In [21]:
sales_std = df.select(stddev("Sales").alias('std'))

In [22]:
sales_std.show()

+------------------+
|               std|
+------------------+
|250.08742410799007|
+------------------+



In [23]:
sales_std.select(format_number('std',2)).show()

+---------------------+
|format_number(std, 2)|
+---------------------+
|               250.09|
+---------------------+



## Order By

You can easily sort with the orderBy method:

In [24]:
# OrderBy
# Ascending
df.orderBy("Sales").show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|Charlie|120.0|
|   MSFT|    Amy|124.0|
|   APPL|  Linda|130.0|
|   GOOG|    Sam|200.0|
|   MSFT|Vanessa|243.0|
|   APPL|   John|250.0|
|   GOOG|  Frank|340.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   MSFT|   Tina|600.0|
|   APPL|   Mike|750.0|
|     FB|   Carl|870.0|
+-------+-------+-----+



In [25]:
# Grupowanie descending
df.orderBy(df["Sales"].desc()).show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|     FB|   Carl|870.0|
|   APPL|   Mike|750.0|
|   MSFT|   Tina|600.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   GOOG|  Frank|340.0|
|   APPL|   John|250.0|
|   MSFT|Vanessa|243.0|
|   GOOG|    Sam|200.0|
|   APPL|  Linda|130.0|
|   MSFT|    Amy|124.0|
|   GOOG|Charlie|120.0|
+-------+-------+-----+

