In [0]:
df = spark.read.format("json").load("/Volumes/dev/haridb/myvolume/emps.json")
df.show()

+---+---+------+-----+---+
|dno|eid| ename|  sal|sex|
+---+---+------+-----+---+
| 11|101|Miller|10000|  m|
| 12|102| Blake|20000|  m|
| 11|103|  Sony|30000|  f|
| 12|104| Sonia|40000|  f|
| 13|105| James|50000|  m|
+---+---+------+-----+---+



# 1. SELECT()  -  selecting Particular Columns

In [0]:
 
df.select("ename","sal").show()

+------+-----+
| ename|  sal|
+------+-----+
|Miller|10000|
| Blake|20000|
|  Sony|30000|
| Sonia|40000|
| James|50000|
+------+-----+



In [0]:
df.printSchema()

root
 |-- dno: long (nullable = true)
 |-- eid: long (nullable = true)
 |-- ename: string (nullable = true)
 |-- sal: long (nullable = true)
 |-- sex: string (nullable = true)



# 2. Transformation

In [0]:
df1 = df.withColumn('salary+Bouns', df.sal + 5000)
df1.show()


+---+---+------+-----+---+------------+
|dno|eid| ename|  sal|sex|salary+Bouns|
+---+---+------+-----+---+------------+
| 11|101|Miller|10000|  m|       15000|
| 12|102| Blake|20000|  m|       25000|
| 11|103|  Sony|30000|  f|       35000|
| 12|104| Sonia|40000|  f|       45000|
| 13|105| James|50000|  m|       55000|
+---+---+------+-----+---+------------+



In [0]:
df.select("sal",df.sal+5000).show()

+-----+------------+
|  sal|(sal + 5000)|
+-----+------------+
|10000|       15000|
|20000|       25000|
|30000|       35000|
|40000|       45000|
|50000|       55000|
+-----+------------+



In [0]:
df.select("sal",df.sal+5000).alias("sal").show()

+-----+------------+
|  sal|(sal + 5000)|
+-----+------------+
|10000|       15000|
|20000|       25000|
|30000|       35000|
|40000|       45000|
|50000|       55000|
+-----+------------+



In [0]:
df3=df.select(df.ename,(df.sal+5000).alias("sal"))
df3.show()


+------+-----+
| ename|  sal|
+------+-----+
|Miller|15000|
| Blake|25000|
|  Sony|35000|
| Sonia|45000|
| James|55000|
+------+-----+



# 3. SelectExpression()

In [0]:
df.selectExpr('sal + 5000').show()

+------------+
|(sal + 5000)|
+------------+
|       15000|
|       25000|
|       35000|
|       45000|
|       55000|
+------------+



# 4.filter():For filtering data
 I want those emps whose salary is greater than 20000

In [0]:
df.select("ename","sal").where(df.sal > 20000).show()

+-----+-----+
|ename|  sal|
+-----+-----+
| Sony|30000|
|Sonia|40000|
|James|50000|
+-----+-----+



In [0]:
df4=df.filter(df.sal>20000)
df4.show()

+---+---+-----+-----+---+
|dno|eid|ename|  sal|sex|
+---+---+-----+-----+---+
| 11|103| Sony|30000|  f|
| 12|104|Sonia|40000|  f|
| 13|105|James|50000|  m|
+---+---+-----+-----+---+



# #5.collect() :It displays list of row objects


In [0]:
df.collect()

[Row(dno=11, eid=101, ename='Miller', sal=10000, sex='m'),
 Row(dno=12, eid=102, ename='Blake', sal=20000, sex='m'),
 Row(dno=11, eid=103, ename='Sony', sal=30000, sex='f'),
 Row(dno=12, eid=104, ename='Sonia', sal=40000, sex='f'),
 Row(dno=13, eid=105, ename='James', sal=50000, sex='m')]

In [0]:
#6.count() :returns the no of rows in a dataframe
df.count()

5

In [0]:
#7.columns :Returns the  column names as a  
df.columns

['dno', 'eid', 'ename', 'sal', 'sex']

# 10.describe(): performs aggregations on all numeric cols


In [0]:
df.describe().show()

+-------+------------------+------------------+-----+------------------+----+
|summary|               dno|               eid|ename|               sal| sex|
+-------+------------------+------------------+-----+------------------+----+
|  count|                 5|                 5|    5|                 5|   5|
|   mean|              11.8|             103.0| NULL|           30000.0|NULL|
| stddev|0.8366600265340755|1.5811388300841898| NULL|15811.388300841896|NULL|
|    min|                11|               101|Blake|             10000|   f|
|    max|                13|               105| Sony|             50000|   m|
+-------+------------------+------------------+-----+------------------+----+



# distinct

In [0]:
#I want distinct dno's
df.select(df.dno).distinct().show()


+---+
|dno|
+---+
| 12|
| 11|
| 13|
+---+



# DROP

In [0]:
#dropping multiple columns
df4=df.drop("dno","sex")
df4.show()

+---+------+-----+
|eid| ename|  sal|
+---+------+-----+
|101|Miller|10000|
|102| Blake|20000|
|103|  Sony|30000|
|104| Sonia|40000|
|105| James|50000|
+---+------+-----+



In [0]:
#13)dropDuplicates(): drops duplicate rows
df.dropDuplicates().show()

+---+---+------+-----+---+
|dno|eid| ename|  sal|sex|
+---+---+------+-----+---+
| 12|102| Blake|20000|  m|
| 11|103|  Sony|30000|  f|
| 12|104| Sonia|40000|  f|
| 13|105| James|50000|  m|
| 11|101|Miller|10000|  m|
+---+---+------+-----+---+



# orderBY

In [0]:
df.orderBy(df.sal.desc()).show()

+---+---+------+-----+---+
|dno|eid| ename|  sal|sex|
+---+---+------+-----+---+
| 13|105| James|50000|  m|
| 12|104| Sonia|40000|  f|
| 11|103|  Sony|30000|  f|
| 12|102| Blake|20000|  m|
| 11|101|Miller|10000|  m|
+---+---+------+-----+---+



In [0]:
df.orderBy("ename").show()

+---+---+------+-----+---+
|dno|eid| ename|  sal|sex|
+---+---+------+-----+---+
| 12|102| Blake|20000|  m|
| 13|105| James|50000|  m|
| 11|101|Miller|10000|  m|
| 12|104| Sonia|40000|  f|
| 11|103|  Sony|30000|  f|
+---+---+------+-----+---+



In [0]:
df.rdd.getNumPartitions()

1

# 19.repartition()

In [0]:

df4=df.repartition(4)
df4.rdd.getNumPartitions()

4

In [0]:
df2=df.replace(['m','f'],['male','female'],'sex')
df2.show()

+---+---+------+-----+------+
|dno|eid| ename|  sal|   sex|
+---+---+------+-----+------+
| 11|101|Miller|10000|  male|
| 12|102| Blake|20000|  male|
| 11|103|  Sony|30000|female|
| 12|104| Sonia|40000|female|
| 13|105| James|50000|  male|
+---+---+------+-----+------+



# GroupBy()

In [0]:
df.groupBy("sex").count().show()

+---+-----+
|sex|count|
+---+-----+
|  m|    3|
|  f|    2|
+---+-----+



# 29) Aggregated functions:
  i)agg()
 ii)sum()
iii)max()
 iv)min()
  v)avg()
  vi)count()

In [0]:
from pyspark.sql import functions as F
df.groupBy("dno").agg(F.sum("sal")).show()


+---+--------+
|dno|sum(sal)|
+---+--------+
| 12|   60000|
| 11|   40000|
| 13|   50000|
+---+--------+



In [0]:
df.groupBy("dno").sum('sal').display()
df.groupBy("dno").avg('sal').show()
df.groupBy("dno").max('sal').show()
df.groupBy("dno").min('sal').show()
df.groupBy("dno").count().show()

dno,sum(sal)
12,60000
11,40000
13,50000


+---+--------+
|dno|avg(sal)|
+---+--------+
| 12| 30000.0|
| 11| 20000.0|
| 13| 50000.0|
+---+--------+

+---+--------+
|dno|max(sal)|
+---+--------+
| 12|   40000|
| 11|   30000|
| 13|   50000|
+---+--------+

+---+--------+
|dno|min(sal)|
+---+--------+
| 12|   20000|
| 11|   10000|
| 13|   50000|
+---+--------+

+---+-----+
|dno|count|
+---+-----+
| 12|    2|
| 11|    2|
| 13|    1|
+---+-----+



# 30) Multi Grouping

In [0]:
from pyspark.sql.functions import *
df.groupBy("dno").agg(sum("sal"),
                      avg("sal"),
                      max("sal"),
                      min("sal"),
                      count("sal")).show()

+---+--------+--------+--------+--------+----------+
|dno|sum(sal)|avg(sal)|max(sal)|min(sal)|count(sal)|
+---+--------+--------+--------+--------+----------+
| 12|   60000| 30000.0|   40000|   20000|         2|
| 11|   40000| 20000.0|   30000|   10000|         2|
| 13|   50000| 50000.0|   50000|   50000|         1|
+---+--------+--------+--------+--------+----------+



# 32.multiple Aggregations:


In [0]:
from pyspark.sql.functions import *
df.filter(col("sal")>=50000).groupBy("dno").agg(sum("sal")).show()


+---+--------+
|dno|sum(sal)|
+---+--------+
| 13|   50000|
+---+--------+



In [0]:
from pyspark.sql.functions import *
d1 = df.groupBy("dno").agg(sum('sal').alias("total_sal"))
d2 = d1.filter(col("total_sal")>=50000)
d2.show()

+---+---------+
|dno|total_sal|
+---+---------+
| 12|    60000|
| 13|    50000|
+---+---------+



In [0]:
# iii\)where clause
# Task: I want those dnos whose totsal>=50000
df.groupBy("dno").agg(sum("sal").alias("totsal")).where(col("totsal")>=50000).show()

+---+------+
|dno|totsal|
+---+------+
| 12| 60000|
| 13| 50000|
+---+------+



# 33) union() :merging the rows of 2DF's
The merging DF should have the same schema


In [0]:
df3 = df.union(df2)
df3.show()

+---+---+------+-----+------+
|dno|eid| ename|  sal|   sex|
+---+---+------+-----+------+
| 11|101|Miller|10000|     m|
| 12|102| Blake|20000|     m|
| 11|103|  Sony|30000|     f|
| 12|104| Sonia|40000|     f|
| 13|105| James|50000|     m|
| 11|101|Miller|10000|  male|
| 12|102| Blake|20000|  male|
| 11|103|  Sony|30000|female|
| 12|104| Sonia|40000|female|
| 13|105| James|50000|  male|
+---+---+------+-----+------+

