In [1]:
# Spark DataFrame Operations - groupby and aggregate
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('ops1').getOrCreate()
print('Spark is Connected')

24/01/09 17:15:12 WARN Utils: Your hostname, MaheshPC resolves to a loopback address: 127.0.1.1; using 192.168.1.7 instead (on interface enp5s0)
24/01/09 17:15:12 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/01/09 17:15:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/01/09 17:15:14 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
24/01/09 17:15:14 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
24/01/09 17:15:14 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
24/01/09 17:15:14 WARN Utils: Service 'SparkUI' could not bind on port 4043. Attempting port 4044.


Spark is Connected


In [3]:
df = spark.read.csv('Datasets/sales_info.csv',inferSchema=True,header=True)
df.show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|    Sam|200.0|
|   GOOG|Charlie|120.0|
|   GOOG|  Frank|340.0|
|   MSFT|   Tina|600.0|
|   MSFT|    Amy|124.0|
|   MSFT|Vanessa|243.0|
|     FB|   Carl|870.0|
|     FB|  Sarah|350.0|
|   APPL|   John|250.0|
|   APPL|  Linda|130.0|
|   APPL|   Mike|750.0|
|   APPL|  Chris|350.0|
+-------+-------+-----+



In [4]:
df.printSchema()

root
 |-- Company: string (nullable = true)
 |-- Person: string (nullable = true)
 |-- Sales: double (nullable = true)



In [6]:
df.describe().show()

+-------+-------+-------+------------------+
|summary|Company| Person|             Sales|
+-------+-------+-------+------------------+
|  count|     12|     12|                12|
|   mean|   NULL|   NULL| 360.5833333333333|
| stddev|   NULL|   NULL|250.08742410799007|
|    min|   APPL|  Chris|             120.0|
|    max|   MSFT|Vanessa|             870.0|
+-------+-------+-------+------------------+



In [8]:
# groupby keyword

df.groupBy('sales')

GroupedData[grouping expressions: [sales], value: [Company: string, Person: string ... 1 more field], type: GroupBy]

In [12]:
df.groupBy('company').mean().show()

+-------+-----------------+
|company|       avg(Sales)|
+-------+-----------------+
|   APPL|            370.0|
|   GOOG|            220.0|
|     FB|            610.0|
|   MSFT|322.3333333333333|
+-------+-----------------+



In [13]:
# minimum and maximum sales
print('Minimum: ',df.groupBy('company').min().show())
print('Maximum: ',df.groupBy('company').max().show())

+-------+----------+
|company|min(Sales)|
+-------+----------+
|   APPL|     130.0|
|   GOOG|     120.0|
|     FB|     350.0|
|   MSFT|     124.0|
+-------+----------+

Minimum:  None
+-------+----------+
|company|max(Sales)|
+-------+----------+
|   APPL|     750.0|
|   GOOG|     340.0|
|     FB|     870.0|
|   MSFT|     600.0|
+-------+----------+

Maximum:  None


In [15]:
# aggregate functions
print('Total Sales: ',df.agg({'sales':'sum'}).show())
print('Minimum: ',df.agg({'sales':'min'}).show())
print('Maximum: ',df.agg({'sales':'sum'}).show())


+----------+
|sum(sales)|
+----------+
|    4327.0|
+----------+

Total Sales:  None
+----------+
|min(sales)|
+----------+
|     120.0|
+----------+

Minimum:  None
+----------+
|sum(sales)|
+----------+
|    4327.0|
+----------+

Maximum:  None


In [16]:
grp_data = df.groupBy('company')

print('Total Sales: ',grp_data.agg({'sales':'sum'}).show())
print('Minimum: ',grp_data.agg({'sales':'min'}).show())
print('Maximum: ',grp_data.agg({'sales':'sum'}).show())

+-------+----------+
|company|sum(sales)|
+-------+----------+
|   APPL|    1480.0|
|   GOOG|     660.0|
|     FB|    1220.0|
|   MSFT|     967.0|
+-------+----------+

Total Sales:  None
+-------+----------+
|company|min(sales)|
+-------+----------+
|   APPL|     130.0|
|   GOOG|     120.0|
|     FB|     350.0|
|   MSFT|     124.0|
+-------+----------+

Minimum:  None
+-------+----------+
|company|sum(sales)|
+-------+----------+
|   APPL|    1480.0|
|   GOOG|     660.0|
|     FB|    1220.0|
|   MSFT|     967.0|
+-------+----------+

Maximum:  None


In [23]:
from pyspark.sql.functions import countDistinct,avg,stddev,format_number

print('Count Distinct: ',df.select(countDistinct('sales').alias('Distinct Company')).show())
print('Average: ',df.select(avg('sales').alias('Average Sales')).show())
print('Standard Deviation: ',df.select(stddev('sales').alias('Standard Deviation')).show())


+----------------+
|Distinct Company|
+----------------+
|              11|
+----------------+

Count Distinct:  None
+-----------------+
|    Average Sales|
+-----------------+
|360.5833333333333|
+-----------------+

Average:  None
+------------------+
|Standard Deviation|
+------------------+
|250.08742410799007|
+------------------+

Standard Deviation:  None


In [25]:
# order by 
df.select(['company','person']).orderBy('sales').show()

+-------+-------+
|company| person|
+-------+-------+
|   GOOG|Charlie|
|   MSFT|    Amy|
|   APPL|  Linda|
|   GOOG|    Sam|
|   MSFT|Vanessa|
|   APPL|   John|
|   GOOG|  Frank|
|     FB|  Sarah|
|   APPL|  Chris|
|   MSFT|   Tina|
|   APPL|   Mike|
|     FB|   Carl|
+-------+-------+



In [26]:
df.orderBy(df [ 'Sales'].desc()).show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|     FB|   Carl|870.0|
|   APPL|   Mike|750.0|
|   MSFT|   Tina|600.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   GOOG|  Frank|340.0|
|   APPL|   John|250.0|
|   MSFT|Vanessa|243.0|
|   GOOG|    Sam|200.0|
|   APPL|  Linda|130.0|
|   MSFT|    Amy|124.0|
|   GOOG|Charlie|120.0|
+-------+-------+-----+

