In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import Row

In [3]:
spark = SparkSession.builder.appName("Agg").getOrCreate()

In [4]:
print(spark)
print(type(spark))

<pyspark.sql.session.SparkSession object at 0x113f162b0>
<class 'pyspark.sql.session.SparkSession'>


In [7]:
df = spark.read \
.csv('file:///Users/hdagar3/Documents/Spark_Things/Spark_Course_Files_JosePortilla/Spark_DataFrames/sales_info.csv', \
    header=True,inferSchema=True)

In [8]:
print(df.show())

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|    Sam|200.0|
|   GOOG|Charlie|120.0|
|   GOOG|  Frank|340.0|
|   MSFT|   Tina|600.0|
|   MSFT|    Amy|124.0|
|   MSFT|Vanessa|243.0|
|     FB|   Carl|870.0|
|     FB|  Sarah|350.0|
|   APPL|   John|250.0|
|   APPL|  Linda|130.0|
|   APPL|   Mike|750.0|
|   APPL|  Chris|350.0|
+-------+-------+-----+

None


In [9]:
print(df.printSchema())

root
 |-- Company: string (nullable = true)
 |-- Person: string (nullable = true)
 |-- Sales: double (nullable = true)

None


In [11]:
print(df.take(3)) # same as head() function

[Row(Company='GOOG', Person='Sam', Sales=200.0), Row(Company='GOOG', Person='Charlie', Sales=120.0), Row(Company='GOOG', Person='Frank', Sales=340.0)]


In [12]:
filtered_dataframe = df.filter((df['Company']=='GOOG') & (df['Sales']>150))

In [14]:
print(filtered_dataframe)
print(filtered_dataframe.show())

DataFrame[Company: string, Person: string, Sales: double]
+-------+------+-----+
|Company|Person|Sales|
+-------+------+-----+
|   GOOG|   Sam|200.0|
|   GOOG| Frank|340.0|
+-------+------+-----+

None


In [17]:
filtered_dataframe.select(['Company','Sales']).show()

# All operations on original dataframe are immutable, so everytime new dataframe is created --> original dataframe 
# remains unaffected   : remember this in mind

+-------+-----+
|Company|Sales|
+-------+-----+
|   GOOG|200.0|
|   GOOG|340.0|
+-------+-----+



In [18]:
# we will look at groupBy and aggregate functions with spark dataframe

In [20]:
grouped_object = df.groupBy('Company')
print(grouped_object)
print(type(grouped_object)) 

# this is a groupedData object, it will become dataframe when we would apply some operation on this grouped object. 

<pyspark.sql.group.GroupedData object at 0x113f6ddd8>
<class 'pyspark.sql.group.GroupedData'>


In [22]:
grouped_object.count().show()

+-------+-----+
|Company|count|
+-------+-----+
|   APPL|    4|
|   GOOG|    3|
|     FB|    2|
|   MSFT|    3|
+-------+-----+



In [23]:
sumed_df = grouped_object.sum()
print(sumed_df.show())

+-------+----------+
|Company|sum(Sales)|
+-------+----------+
|   APPL|    1480.0|
|   GOOG|     660.0|
|     FB|    1220.0|
|   MSFT|     967.0|
+-------+----------+

None


In [25]:
mean_df = grouped_object.mean()
print(mean_df.show())

+-------+-----------------+
|Company|       avg(Sales)|
+-------+-----------------+
|   APPL|            370.0|
|   GOOG|            220.0|
|     FB|            610.0|
|   MSFT|322.3333333333333|
+-------+-----------------+

None


In [26]:
grouped_object.min().show()

+-------+----------+
|Company|min(Sales)|
+-------+----------+
|   APPL|     130.0|
|   GOOG|     120.0|
|     FB|     350.0|
|   MSFT|     124.0|
+-------+----------+



In [27]:
grouped_object.max().show()

+-------+----------+
|Company|max(Sales)|
+-------+----------+
|   APPL|     750.0|
|   GOOG|     340.0|
|     FB|     870.0|
|   MSFT|     600.0|
+-------+----------+



In [28]:
# Now in order to generalise things, you can use agg() method i.e if you don't want things in group, instead you want
# it on whole dataframe.

In [36]:
aggregated_df = df.agg({'Sales': 'sum'}) # you can apply agg() function on dataframe and grouped object also, passed
                                # dict inside agg(), key should be column name and value would be agg function.
print(aggregated_df)
print(type(aggregated_df))

DataFrame[sum(Sales): double]
<class 'pyspark.sql.dataframe.DataFrame'>


In [38]:
aggregated_df.show() # this result is on whole data.

+----------+
|sum(Sales)|
+----------+
|    4327.0|
+----------+



In [39]:
print(grouped_object)

<pyspark.sql.group.GroupedData object at 0x113f6ddd8>


In [40]:
new_df = grouped_object.agg({'Sales':'max'})  # it is same as saying grouped_object.max(), but agg() is more general.
print(new_df)

DataFrame[Company: string, max(Sales): double]


In [43]:
new_df.show()

# For more groupBy functions and agg() functions, checkout the documentation.

+-------+----------+
|Company|max(Sales)|
+-------+----------+
|   APPL|     750.0|
|   GOOG|     340.0|
|     FB|     870.0|
|   MSFT|     600.0|
+-------+----------+



http://spark.apache.org/docs/latest/api/python/#

http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions

In [44]:
# We can also import functions in spark and then apply those functions on columns of dataframe 

In [45]:
from pyspark.sql.functions import countDistinct,stddev,count,avg

In [47]:
print(df)
df.show()

DataFrame[Company: string, Person: string, Sales: double]
+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|    Sam|200.0|
|   GOOG|Charlie|120.0|
|   GOOG|  Frank|340.0|
|   MSFT|   Tina|600.0|
|   MSFT|    Amy|124.0|
|   MSFT|Vanessa|243.0|
|     FB|   Carl|870.0|
|     FB|  Sarah|350.0|
|   APPL|   John|250.0|
|   APPL|  Linda|130.0|
|   APPL|   Mike|750.0|
|   APPL|  Chris|350.0|
+-------+-------+-----+



In [64]:
print(df['Sales'])

print(avg('Sales'))
# OR
print(avg(df['Sales']))

Column<b'Sales'>
Column<b'avg(Sales)'>
Column<b'avg(Sales)'>


In [58]:
print(countDistinct(df['Sales']))
print(type(countDistinct(df['Sales'])))
df.select(countDistinct('Sales')).show()
another_df = df.select(countDistinct('Sales').alias('Distinct Sales'))
another_df.show()

Column<b'count(DISTINCT Sales)'>
<class 'pyspark.sql.column.Column'>
+---------------------+
|count(DISTINCT Sales)|
+---------------------+
|                   11|
+---------------------+

+--------------+
|Distinct Sales|
+--------------+
|            11|
+--------------+



In [63]:
df.select([avg('Sales'),stddev('Sales')]).show()
# OR
df.select([avg(df['Sales']),stddev(df['Sales'])]).show()
# OR
df.select([avg(df['Sales']).alias('Average Sales'),stddev(df['Sales']).alias('Standard Deviation')]).show()

+-----------------+------------------+
|       avg(Sales)|stddev_samp(Sales)|
+-----------------+------------------+
|360.5833333333333|250.08742410799007|
+-----------------+------------------+

+-----------------+------------------+
|       avg(Sales)|stddev_samp(Sales)|
+-----------------+------------------+
|360.5833333333333|250.08742410799007|
+-----------------+------------------+

+-----------------+------------------+
|    Average Sales|Standard Deviation|
+-----------------+------------------+
|360.5833333333333|250.08742410799007|
+-----------------+------------------+



In [65]:
from pyspark.sql.functions import format_number

In [70]:
sales_std_df = df.select(stddev('Sales').alias('std')) # OR df.select(stddev(df['Sales']))

In [71]:
sales_std_df.show()

+------------------+
|               std|
+------------------+
|250.08742410799007|
+------------------+



In [75]:
sales_std_df.select(format_number('std',2).alias('std')).show()

+------+
|   std|
+------+
|250.09|
+------+



In [76]:
df.show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|    Sam|200.0|
|   GOOG|Charlie|120.0|
|   GOOG|  Frank|340.0|
|   MSFT|   Tina|600.0|
|   MSFT|    Amy|124.0|
|   MSFT|Vanessa|243.0|
|     FB|   Carl|870.0|
|     FB|  Sarah|350.0|
|   APPL|   John|250.0|
|   APPL|  Linda|130.0|
|   APPL|   Mike|750.0|
|   APPL|  Chris|350.0|
+-------+-------+-----+



In [82]:
df.orderBy('Sales').show() # in ascending
# OR
# df.orderBy(df['Sales']).show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|Charlie|120.0|
|   MSFT|    Amy|124.0|
|   APPL|  Linda|130.0|
|   GOOG|    Sam|200.0|
|   MSFT|Vanessa|243.0|
|   APPL|   John|250.0|
|   GOOG|  Frank|340.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   MSFT|   Tina|600.0|
|   APPL|   Mike|750.0|
|     FB|   Carl|870.0|
+-------+-------+-----+



In [83]:
df.orderBy(df['Sales'].desc()).show() # in descending

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|     FB|   Carl|870.0|
|   APPL|   Mike|750.0|
|   MSFT|   Tina|600.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   GOOG|  Frank|340.0|
|   APPL|   John|250.0|
|   MSFT|Vanessa|243.0|
|   GOOG|    Sam|200.0|
|   APPL|  Linda|130.0|
|   MSFT|    Amy|124.0|
|   GOOG|Charlie|120.0|
+-------+-------+-----+



In [None]:
# END 