In [43]:
import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkConf
import org.apache.log4j.Level
import org.apache.log4j.Logger
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions._

lastException: Throwable = null


In [44]:
val spark=SparkSession.builder.appName("Capstone Project").enableHiveSupport().getOrCreate()

spark = org.apache.spark.sql.SparkSession@4855abaf


org.apache.spark.sql.SparkSession@4855abaf

In [45]:
spark

org.apache.spark.sql.SparkSession@4855abaf

In [46]:
spark.sql("use capstoneproject")

[]

In [47]:
spark.sql("show tables").show()

+---------------+--------------------+-----------+
|       database|           tableName|isTemporary|
+---------------+--------------------+-----------+
|capstoneproject|studentcoursecomp...|      false|
|capstoneproject|studentcoursedetails|      false|
|capstoneproject|studentcoursedeta...|      false|
+---------------+--------------------+-----------+



## Using DataFrames

In [48]:
val df1_coursecompletion = spark.read.table("capstoneproject.studentcoursecompletionstatus")

df1_coursecompletion = [studentsid: string, courseid: string ... 4 more fields]


[studentsid: string, courseid: string ... 4 more fields]

In [49]:
df1_coursecompletion.printSchema()

root
 |-- studentsid: string (nullable = true)
 |-- courseid: string (nullable = true)
 |-- examdate: string (nullable = true)
 |-- attendedstatus: string (nullable = true)
 |-- marks: integer (nullable = true)
 |-- result: string (nullable = true)



In [50]:
df1_coursecompletion.show()

+----------+--------+---------+--------------+-----+-------------+
|studentsid|courseid| examdate|attendedstatus|marks|       result|
+----------+--------+---------+--------------+-----+-------------+
|     S0001|   C0001|17 Feb 19|      Attended|   70|    Qualified|
|     S0298|   C0008|24 Feb 19|      Attended|   70|    Qualified|
|     S0297|   C0007|23 Feb 19|        Absent|    0|Not Qualified|
|     S0296|   C0030|20 Feb 19|      Attended|   70|    Qualified|
|     S0291|   C0013| 1 Mar 19|      Attended|   85|    Qualified|
|     S0290|   C0017| 5 Mar 19|        Absent|    0|Not Qualified|
|     S0289|   C0016| 4 Mar 19|        Absent|    0|Not Qualified|
|     S0288|   C0015| 3 Mar 19|        Absent|    0|Not Qualified|
|     S0287|   C0014| 2 Mar 19|        Absent|    0|Not Qualified|
|     S0286|   C0013| 1 Mar 19|      Attended|   85|    Qualified|
|     S0285|   C0012|28 Feb 19|      Attended|   70|    Qualified|
|     S0284|   C0011|27 Feb 19|        Absent|    0|Not Qualif

In [51]:
val df2_coursedetails = spark.read.table("capstoneproject.studentcoursedetails")

df2_coursedetails = [courseid: string, title: string ... 3 more fields]


[courseid: string, title: string ... 3 more fields]

In [52]:
df2_coursedetails.show()

+--------+--------------------+--------------------+----------+--------------------+
|courseid|               title|          competency|complexity|          coursetype|
+--------+--------------------+--------------------+----------+--------------------+
|   C0001|Certificate in Cl...|           Technical|     Basic|               Cloud|
|   C0002|Certificate in Vi...|           Technical|     Basic|               Cloud|
|   C0003|"Diploma in Infor...| Networking and C...|  Security|               Cloud|
|   C0004|BE (Hons) in CSE ...|              Domain|  Advanced|               Cloud|
|   C0005|BTech in Computer...|              Domain|  Advanced|               Cloud|
|   C0006|BTech in Computer...|              Domain|  Advanced|               Cloud|
|   C0007|BCA with Microsof...|            Security|  Advanced|               Cloud|
|   C0008|BTech in Informat...|           Technical|  Advanced|               Cloud|
|   C0009|MCA with speciali...|           Technical|  Advanced|  

### Total number of students Absent

In [53]:
df1_coursecompletion.where(df1_coursecompletion("attendedstatus") === "Absent").count()

149

In [54]:
df1_coursecompletion.where(df1_coursecompletion("attendedstatus") === "Absent").groupBy("courseid").count().orderBy("courseid").show()

+--------+-----+
|courseid|count|
+--------+-----+
|   C0002|   10|
|   C0005|   11|
|   C0006|   11|
|   C0007|   18|
|   C0010|   19|
|   C0011|   14|
|   C0014|   15|
|   C0015|   15|
|   C0016|   16|
|   C0017|   14|
|   C0022|    2|
|   C0023|    2|
|   C0024|    1|
|   C0025|    1|
+--------+-----+



### Maximum, minimum and average marks scored by students

In [55]:
df1_coursecompletion.agg(max("marks").as("max_mark"),min("marks").as("min_mark"),avg("marks").as("avg_mark")).show()

+--------+--------+------------------+
|max_mark|min_mark|          avg_mark|
+--------+--------+------------------+
|      92|       0|40.013333333333335|
+--------+--------+------------------+



## Uisng Spark SQL

In [56]:
df1_coursecompletion.createOrReplaceTempView("df1_table")

In [57]:
spark.sql("SELECT MAX(marks) as maxval, MIN(marks) as minval, AVG(marks) as avgval FROM df1_table").show()

+------+------+------------------+
|maxval|minval|            avgval|
+------+------+------------------+
|    92|     0|40.013333333333335|
+------+------+------------------+



In [58]:
spark.sql("select courseid, count(*) as cnt from df1_table where attendedstatus = 'Absent' group by courseid order by courseid").show()

+--------+---+
|courseid|cnt|
+--------+---+
|   C0002| 10|
|   C0005| 11|
|   C0006| 11|
|   C0007| 18|
|   C0010| 19|
|   C0011| 14|
|   C0014| 15|
|   C0015| 15|
|   C0016| 16|
|   C0017| 14|
|   C0022|  2|
|   C0023|  2|
|   C0024|  1|
|   C0025|  1|
+--------+---+



### Using RDD

In [61]:
val input = df1_coursecompletion.rdd

input = MapPartitionsRDD[75] at rdd at <console>:55


MapPartitionsRDD[75] at rdd at <console>:55

In [62]:
input.collect()

Array([S0001,C0001,17 Feb 19,Attended,70,Qualified], [S0298,C0008,24 Feb 19,Attended,70,Qualified], [S0297,C0007,23 Feb 19,Absent,0,Not Qualified], [S0296,C0030,20 Feb 19,Attended,70,Qualified], [S0291,C0013,1 Mar 19,Attended,85,Qualified], [S0290,C0017,5 Mar 19,Absent,0,Not Qualified], [S0289,C0016,4 Mar 19,Absent,0,Not Qualified], [S0288,C0015,3 Mar 19,Absent,0,Not Qualified], [S0287,C0014,2 Mar 19,Absent,0,Not Qualified], [S0286,C0013,1 Mar 19,Attended,85,Qualified], [S0285,C0012,28 Feb 19,Attended,70,Qualified], [S0284,C0011,27 Feb 19,Absent,0,Not Qualified], [S0283,C0010,26 Feb 19,Absent,0,Not Qualified], [S0281,C0008,24 Feb 19,Attended,70,Qualified], [S0280,C0007,23 Feb 19,Absent,0,Not Qualified], [S0279,C0006,22 Feb 19,Absent,0,Not Qualifi...

In [70]:
val rdd1 = input.map(x => (x(0),x(1),x(5)))

rdd1 = MapPartitionsRDD[78] at map at <console>:54


MapPartitionsRDD[78] at map at <console>:54

In [71]:
rdd1.collect()

Array((S0001,C0001,Qualified), (S0298,C0008,Qualified), (S0297,C0007,Not Qualified), (S0296,C0030,Qualified), (S0291,C0013,Qualified), (S0290,C0017,Not Qualified), (S0289,C0016,Not Qualified), (S0288,C0015,Not Qualified), (S0287,C0014,Not Qualified), (S0286,C0013,Qualified), (S0285,C0012,Qualified), (S0284,C0011,Not Qualified), (S0283,C0010,Not Qualified), (S0281,C0008,Qualified), (S0280,C0007,Not Qualified), (S0279,C0006,Not Qualified), (S0278,C0005,Not Qualified), (S0276,C0003,Qualified), (S0275,C0017,Not Qualified), (S0274,C0016,Not Qualified), (S0273,C0015,Not Qualified), (S0272,C0014,Not Qualified), (S0271,C0013,Qualified), (S0270,C0012,Qualified), (S0269,C0011,Not Qualified), (S0268,C0010,Not Qualified), (S0266,C0008,Qualified), (S0265,C0007,Not Qua...

In [73]:
val rdd2 = rdd1.map(x => (x._3,x._1))

rdd2 = MapPartitionsRDD[79] at map at <console>:54


MapPartitionsRDD[79] at map at <console>:54

In [74]:
rdd2.collect()

Array((Qualified,S0001), (Qualified,S0298), (Not Qualified,S0297), (Qualified,S0296), (Qualified,S0291), (Not Qualified,S0290), (Not Qualified,S0289), (Not Qualified,S0288), (Not Qualified,S0287), (Qualified,S0286), (Qualified,S0285), (Not Qualified,S0284), (Not Qualified,S0283), (Qualified,S0281), (Not Qualified,S0280), (Not Qualified,S0279), (Not Qualified,S0278), (Qualified,S0276), (Not Qualified,S0275), (Not Qualified,S0274), (Not Qualified,S0273), (Not Qualified,S0272), (Qualified,S0271), (Qualified,S0270), (Not Qualified,S0269), (Not Qualified,S0268), (Qualified,S0266), (Not Qualified,S0265), (Not Qualified,S0264), (Not Qualified,S0263), (Qualified,S0261), (Not Qualified,S0260), (Qualified,S0259), (Not Qualified,S0258), (Not Qualified,S0257), (Not Qualif...

In [76]:
val rdd3 = rdd2.groupByKey

rdd3 = ShuffledRDD[81] at groupByKey at <console>:54


ShuffledRDD[81] at groupByKey at <console>:54

In [78]:
rdd3.map(x => (x._1,x._2.size)).collect()

Array((Qualified,151), (Not Qualified,149))

### Writing Resultset to Hive

In [80]:
//writing the dataset to Hive

df1_coursecompletion.write.mode("overwrite").saveAsTable("capstoneproject.sample_table")

In [83]:
//verifying the result in Hive table

spark.sql("select * from sample_table").show()

+----------+--------+---------+--------------+-----+-------------+
|studentsid|courseid| examdate|attendedstatus|marks|       result|
+----------+--------+---------+--------------+-----+-------------+
|     S0001|   C0001|17 Feb 19|      Attended|   70|    Qualified|
|     S0298|   C0008|24 Feb 19|      Attended|   70|    Qualified|
|     S0297|   C0007|23 Feb 19|        Absent|    0|Not Qualified|
|     S0296|   C0030|20 Feb 19|      Attended|   70|    Qualified|
|     S0291|   C0013| 1 Mar 19|      Attended|   85|    Qualified|
|     S0290|   C0017| 5 Mar 19|        Absent|    0|Not Qualified|
|     S0289|   C0016| 4 Mar 19|        Absent|    0|Not Qualified|
|     S0288|   C0015| 3 Mar 19|        Absent|    0|Not Qualified|
|     S0287|   C0014| 2 Mar 19|        Absent|    0|Not Qualified|
|     S0286|   C0013| 1 Mar 19|      Attended|   85|    Qualified|
|     S0285|   C0012|28 Feb 19|      Attended|   70|    Qualified|
|     S0284|   C0011|27 Feb 19|        Absent|    0|Not Qualif