In [1]:
### Group By and Aggregation

In [2]:
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-2.3.1/spark-2.3.1-bin-hadoop2.7.tgz
!tar xf spark-2.3.1-bin-hadoop2.7.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.3.1-bin-hadoop2.7"

!ls

import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate() 
spark

0% [Working]            Get:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
0% [Connecting to archive.ubuntu.com (91.189.88.152)] [Connecting to security.u0% [Connecting to archive.ubuntu.com (91.189.88.152)] [Connecting to security.u0% [1 InRelease gpgv 3,626 B] [Connecting to archive.ubuntu.com (91.189.88.152)                                                                               Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
0% [1 InRelease gpgv 3,626 B] [Connecting to archive.ubuntu.com (91.189.88.152)                                                                               Ign:3 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
0% [1 InRelease gpgv 3,626 B] [Connecting to archive.ubuntu.com (91.189.88.152)                                                                               Get:4 https://developer.download.nvidia.com/comp

In [5]:
from pyspark.sql.types import *
myschema = StructType([
                       StructField('id', IntegerType()),
                       StructField('first_name', StringType()),
                       StructField('last_name', StringType()),
                       StructField('gender', StringType()),
                       StructField('city', StringType()),
                       StructField('job_title', StringType()),
                       StructField('Salary', StringType()),
                       StructField('latitude', FloatType()),
                       StructField('longitude', FloatType())
])

df = spark.read.csv("original.csv", header=True, schema=myschema)
df.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+----------+
| id|first_name| last_name|gender|           city|           job_title|   Salary|  latitude| longitude|
+---+----------+----------+------+---------------+--------------------+---------+----------+----------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18| 50.577408| 16.496717|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|  48.82316| 103.52182|
|  3|    Alvera|  Di Boldi|Female|           null|                null|$57576.52| 39.994747|116.339775|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23| 44.504723| 38.130016|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63863.09|      null| 37.648994|
|  6|     Maris|      Folk|Female|Kinsealy-Drinan|      Civil Engineer|$30101.16| 53.426613|-6.1644998|
|  7|     Masha|    Divers|Female|         Dachun|              

In [6]:
from pyspark.sql.functions import *
df = df.withColumn('clean_salary', df.Salary.substr(2,100).cast('float'))
df.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+----------+------------+
| id|first_name| last_name|gender|           city|           job_title|   Salary|  latitude| longitude|clean_salary|
+---+----------+----------+------+---------------+--------------------+---------+----------+----------+------------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18| 50.577408| 16.496717|    57438.18|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|  48.82316| 103.52182|     62846.6|
|  3|    Alvera|  Di Boldi|Female|           null|                null|$57576.52| 39.994747|116.339775|    57576.52|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23| 44.504723| 38.130016|    61489.23|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63863.09|      null| 37.648994|    63863.09|
|  6|     Maris|      Folk|Female|Kinsealy-Drinan|      Civil En

In [7]:
import pyspark.sql.functions as sqlfunc
df1 = df.groupBy('gender').agg(sqlfunc.sum('clean_salary'))
df1.show()

+------+--------------------+
|gender|   sum(clean_salary)|
+------+--------------------+
|Female|2.7364519950195312E7|
|  Male|2.8123435678710938E7|
+------+--------------------+



In [9]:
df1 = df.groupBy('gender').agg(sqlfunc.sum('clean_salary').alias('total'),
                               sqlfunc.avg('clean_salary').alias('average'),
                               sqlfunc.min('clean_salary').alias('min'),
                               sqlfunc.max('clean_salary').alias('max')                         
                               )
df1.show()

+------+--------------------+-----------------+--------+--------+
|gender|               total|          average|     min|     max|
+------+--------------------+-----------------+--------+--------+
|Female|2.7364519950195312E7|55618.94298820185|10616.44|99948.28|
|  Male|2.8123435678710938E7|55361.09385573019|10101.92|99942.92|
+------+--------------------+-----------------+--------+--------+



In [10]:
df1 = df.groupBy('gender','city').agg(sqlfunc.sum('clean_salary').alias('total'),
                               sqlfunc.avg('clean_salary').alias('average'),
                               sqlfunc.min('clean_salary').alias('min'),
                               sqlfunc.max('clean_salary').alias('max')                         
                               )
df1.show()

+------+-----------------+----------------+----------------+--------+--------+
|gender|             city|           total|         average|     min|     max|
+------+-----------------+----------------+----------------+--------+--------+
|Female|           Dachun| 25090.869140625| 25090.869140625|25090.87|25090.87|
|Female|      Trollhättan|106623.369140625|53311.6845703125|26830.47| 79792.9|
|  Male|          Wenshao| 18941.509765625| 18941.509765625|18941.51|18941.51|
|Female|            Lanas| 13765.900390625| 13765.900390625| 13765.9| 13765.9|
|  Male|            Mörön|    77940.078125|    77940.078125|77940.08|77940.08|
|Female|             Same|   73369.7265625|   73369.7265625|73369.73|73369.73|
|Female|          Sawahan|  24608.83984375|  24608.83984375|24608.84|24608.84|
|  Male|Monte da Boavista|     98586.71875|     98586.71875|98586.72|98586.72|
|Female|         Nusajaya|    71637.921875|    71637.921875|71637.92|71637.92|
|Female|            Kista|   96192.3984375|   96192.

In [None]:
### Writing Dataframes to Files

df1.write.csv('df1.csv')
df1.write.json('df1.json')
df1.write.parquet('df1.parquet')

# can add partitions and compression as options