# Activity 10: Data Manipulation with Spark DataFrames

#### We would be using the open source mtcars dataset for this activity. However, students should feel free to use any dataset of their choice

In [1]:
import pyspark
import os

In [2]:
# Creating spark context & sqlcontext
sc = pyspark.SparkContext()
from pyspark.sql import SQLContext
sqlc = SQLContext(sc)

In [3]:
# Creating a dataframe in spark
df = sqlc.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('mtcars.csv')
df.show(4)

+--------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|         model| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|
+--------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|     Mazda RX4|21.0|  6|160.0|110| 3.9| 2.62|16.46|  0|  1|   4|   4|
| Mazda RX4 Wag|21.0|  6|160.0|110| 3.9|2.875|17.02|  0|  1|   4|   4|
|    Datsun 710|22.8|  4|108.0| 93|3.85| 2.32|18.61|  1|  1|   4|   1|
|Hornet 4 Drive|21.4|  6|258.0|110|3.08|3.215|19.44|  1|  0|   3|   1|
+--------------+----+---+-----+---+----+-----+-----+---+---+----+----+
only showing top 4 rows



### Rename any 5 columns of dataframe

In [4]:
data = df
new_names = ['mpg_new', 'cyl_new', 'disp_new', 'hp_new', 'drat_new']
for i,z in zip(data.columns[0:5],new_names):
    data = data.withColumnRenamed(str(i),str(z))
    
data.columns

['mpg_new',
 'cyl_new',
 'disp_new',
 'hp_new',
 'drat_new',
 'drat',
 'wt',
 'qsec',
 'vs',
 'am',
 'gear',
 'carb']

### select any 2 numeric and 1 categorical column from the dataframe

In [5]:
data = df.select(['cyl','mpg','hp'])
data.show(5)

+---+----+---+
|cyl| mpg| hp|
+---+----+---+
|  6|21.0|110|
|  6|21.0|110|
|  4|22.8| 93|
|  6|21.4|110|
|  8|18.7|175|
+---+----+---+
only showing top 5 rows



### Count the number of distinct categories in the categorical variable

In [6]:
data.select('cyl').distinct().count()

3

### Create 2 new columns in dataframe by summing up and multiplying together the 2 numerical columns

In [7]:
data = data.withColumn('colsum',(df['mpg'] + df['hp']))
data = data.withColumn('colproduct',(df['mpg'] * df['hp']))
data.show(5)

+---+----+---+------+----------+
|cyl| mpg| hp|colsum|colproduct|
+---+----+---+------+----------+
|  6|21.0|110| 131.0|    2310.0|
|  6|21.0|110| 131.0|    2310.0|
|  4|22.8| 93| 115.8|    2120.4|
|  6|21.4|110| 131.4|    2354.0|
|  8|18.7|175| 193.7|    3272.5|
+---+----+---+------+----------+
only showing top 5 rows



### Drop both the original numerical columns

In [8]:
data = data.drop('mpg','hp')
data.show(5)

+---+------+----------+
|cyl|colsum|colproduct|
+---+------+----------+
|  6| 131.0|    2310.0|
|  6| 131.0|    2310.0|
|  4| 115.8|    2120.4|
|  6| 131.4|    2354.0|
|  8| 193.7|    3272.5|
+---+------+----------+
only showing top 5 rows



### Sort the data by categorical column

In [9]:
data = data.orderBy(data.cyl)
data.show(5)

+---+------+----------+
|cyl|colsum|colproduct|
+---+------+----------+
|  4|  98.9|    2203.5|
|  4| 130.4|    2332.6|
|  4| 118.5|    2085.5|
|  4|  82.4|    1580.8|
|  4|  93.3|    1801.8|
+---+------+----------+
only showing top 5 rows



### Calculate the mean of the summation column for each distinct category in the catgeorical variable

In [10]:
data.groupby('cyl').agg({'colsum':'mean'}).show()

+---+------------------+
|cyl|       avg(colsum)|
+---+------------------+
|  4|             109.3|
|  6|142.02857142857144|
|  8|224.31428571428575|
+---+------------------+



### Filter the rows with values greater than the mean of all the mean values calculated in the previous step

In [11]:
data.count()

32

In [12]:
cyl_avg = data.groupby('cyl').agg({'colsum':'mean'})
avg = cyl_avg.agg({'avg(colsum)':'mean'}).toPandas().iloc[0,0]
data = data.filter(data.colsum > avg)
data.count()

15

In [13]:
data.show(5)

+---+------+------------------+
|cyl|colsum|        colproduct|
+---+------+------------------+
|  6| 194.7|            3447.5|
|  8| 193.7|            3272.5|
|  8| 196.4|2951.9999999999995|
|  8| 259.3|            3503.5|
|  8| 197.3|            3114.0|
+---+------+------------------+
only showing top 5 rows



### De-duplicate the resultant dataframe to make sure it has all unique records

In [14]:
data = data.dropDuplicates()
data.count()

15