### Dataframe example

In [1]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.2.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m3.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m9.9 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.2-py2.py3-none-any.whl size=281824025 sha256=48c9d829429f8e3547996a56d9325ff97700cededb1fd48253e3ef0cee416904
  Stored in directory: /root/.cache/pip/wheels/b1/59/a0/a1a0624b5e865fd389919c1a10f53aec9b12195d6747710baf
Successfully built pyspark
Installing collected packages: py4j, pyspa

In [9]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [18]:
!ls drive/MyDrive/CS452

grades.csv


In [6]:
import os
import pyspark
import re
import sys
from operator import add
from pyspark.sql import SparkSession
from pyspark.sql.types import *


In [7]:
spark = SparkSession.builder.appName("DataFrameExample").getOrCreate()
sc = spark.sparkContext
sc.setLogLevel('WARN')
sc.uiWebUrl

'http://310b83cb9e84:4040'

In [19]:
# grades.csv should be:
# name,hw1,hw2,hw3
# john,26,29,30
# mary,27,25,35
# bill,19,22,25
# lee,25,25,25

myfile = "drive/MyDrive/CS452/grades.csv"
myschema = StructType([ \
           StructField("name", StringType(), True), \
           StructField("hw1", IntegerType(), True), \
           StructField("hw2", IntegerType(), True), \
           StructField("hw3", IntegerType(), True), \
        ])

gradeDF = spark.read.format("csv") \
      .option("header", "true") \
      .schema(myschema) \
      .load(myfile)
      
gradeDF.show()

# gradeDF = spark.read.options(inferSchema='True',delimiter=',', header='True').csv("grades.csv")
# gradeDF.show()

+----+---+---+---+
|name|hw1|hw2|hw3|
+----+---+---+---+
|john| 26| 29| 30|
|mary| 27| 25| 35|
|bill| 19| 22| 25|
| lee| 25| 25| 25|
+----+---+---+---+



In [20]:
gradeDF.toPandas().head()

Unnamed: 0,name,hw1,hw2,hw3
0,john,26,29,30
1,mary,27,25,35
2,bill,19,22,25
3,lee,25,25,25


In [21]:
gradeDF.printSchema()

root
 |-- name: string (nullable = true)
 |-- hw1: integer (nullable = true)
 |-- hw2: integer (nullable = true)
 |-- hw3: integer (nullable = true)



In [23]:
# majors.csv should be
# name,dept
# john,CSE
# mary,BIOLOGY
# bill,ECE
# lee,CIVIL

myfile = "drive/MyDrive/CS452/majors.csv"
myschema = StructType([ \
           StructField("name", StringType(), True), \
           StructField("dept", StringType(), True), \
        ])

majorDF = spark.read.format("csv") \
      .option("header", "true") \
      .schema(myschema) \
      .load(myfile)
      
majorDF.show()

# majorDF = spark.read.options(inferSchema='True',delimiter=',', header='True').csv("majors.csv")
# majorDF.show()

+----+-------+
|name|   dept|
+----+-------+
|john|    CSE|
|mary|BIOLOGY|
|bill|    ECE|
| lee|  CIVIL|
+----+-------+



### Join the grades and majors tables.

In [24]:
result = gradeDF.join(majorDF, ['name'], how='inner')
result.show()

+----+---+---+---+-------+
|name|hw1|hw2|hw3|   dept|
+----+---+---+---+-------+
|john| 26| 29| 30|    CSE|
|mary| 27| 25| 35|BIOLOGY|
|bill| 19| 22| 25|    ECE|
| lee| 25| 25| 25|  CIVIL|
+----+---+---+---+-------+



### Calculate the total grade

In [25]:
hwtotalDF = result.selectExpr("name", "hw1+hw2+hw3 as hwgrade", "dept")
hwtotalDF.show()

+----+-------+-------+
|name|hwgrade|   dept|
+----+-------+-------+
|john|     85|    CSE|
|mary|     87|BIOLOGY|
|bill|     66|    ECE|
| lee|     75|  CIVIL|
+----+-------+-------+



### Find students with total grade greater than 80

In [26]:
hwtotalDF.filter(hwtotalDF.hwgrade > 80).show()

+----+-------+-------+
|name|hwgrade|   dept|
+----+-------+-------+
|john|     85|    CSE|
|mary|     87|BIOLOGY|
+----+-------+-------+



### Create Dataframe for college information

In [27]:
c = (('CSE','EGR'), ('ECE', 'EGR'), ('CIVIL', 'EGR'), ('BIOLOGY', 'NATSCI'))
crdd = sc.parallelize(c)
collegeDF = crdd.toDF(['deptname', 'collegename'])
collegeDF.show()

+--------+-----------+
|deptname|collegename|
+--------+-----------+
|     CSE|        EGR|
|     ECE|        EGR|
|   CIVIL|        EGR|
| BIOLOGY|     NATSCI|
+--------+-----------+



### Count the number of departments in each college

In [28]:
res = collegeDF.groupBy(collegeDF.collegename).agg({'deptname': 'count'})
res.show()

+-----------+---------------+
|collegename|count(deptname)|
+-----------+---------------+
|        EGR|              3|
|     NATSCI|              1|
+-----------+---------------+



### Compute Average grade of students in each college

In [29]:
studentsDF = hwtotalDF.join(collegeDF, hwtotalDF['dept'] == collegeDF['deptname'])
studentsDF.show()

+----+-------+-------+--------+-----------+
|name|hwgrade|   dept|deptname|collegename|
+----+-------+-------+--------+-----------+
|john|     85|    CSE|     CSE|        EGR|
|bill|     66|    ECE|     ECE|        EGR|
| lee|     75|  CIVIL|   CIVIL|        EGR|
|mary|     87|BIOLOGY| BIOLOGY|     NATSCI|
+----+-------+-------+--------+-----------+



In [30]:
res = studentsDF.groupBy(studentsDF['collegename']).agg({'hwgrade':'avg'})
res.show()

+-----------+-----------------+
|collegename|     avg(hwgrade)|
+-----------+-----------------+
|        EGR|75.33333333333333|
|     NATSCI|             87.0|
+-----------+-----------------+



### We can do all this with SQL also

In [31]:
# First, register the table for SQL so it will be recognized in the SQL statement
studentsDF.registerTempTable("studentsDF")



In [32]:
# If we use """ we can do it on multiple lines to make it easier to read
res = spark.sql("""select collegename, AVG(hwgrade) as avgHomework 
                   from studentsDF 
                   group by studentsDF.collegename 
                   order by avgHomework
                """)
res.show()

+-----------+-----------------+
|collegename|      avgHomework|
+-----------+-----------------+
|        EGR|75.33333333333333|
|     NATSCI|             87.0|
+-----------+-----------------+



In [33]:
# We can do a more complex query
hwtotalDF.registerTempTable('hwtotalDF')
collegeDF.registerTempTable('collegeDF')
res = spark.sql("""
            select collegename, AVG(hwgrade) from hwtotalDF join collegeDF
            on hwtotalDF.dept == collegeDF.deptname 
            group by collegename
            order by 2
            """)
res.show()

+-----------+-----------------+
|collegename|     avg(hwgrade)|
+-----------+-----------------+
|        EGR|75.33333333333333|
|     NATSCI|             87.0|
+-----------+-----------------+



In [34]:
gradeDF.registerTempTable('gradeDF')
spark.sql("select * from gradeDF").show()

+----+---+---+---+
|name|hw1|hw2|hw3|
+----+---+---+---+
|john| 26| 29| 30|
|mary| 27| 25| 35|
|bill| 19| 22| 25|
| lee| 25| 25| 25|
+----+---+---+---+



In [35]:
gradeDF = spark.sql("select *, hw1+hw2+hw3 as hwtotoal from gradeDF")
gradeDF.show()

+----+---+---+---+--------+
|name|hw1|hw2|hw3|hwtotoal|
+----+---+---+---+--------+
|john| 26| 29| 30|      85|
|mary| 27| 25| 35|      87|
|bill| 19| 22| 25|      66|
| lee| 25| 25| 25|      75|
+----+---+---+---+--------+



In [36]:
gradeDF.registerTempTable('gradeDF')
spark.sql("select * from gradeDF").show()

+----+---+---+---+--------+
|name|hw1|hw2|hw3|hwtotoal|
+----+---+---+---+--------+
|john| 26| 29| 30|      85|
|mary| 27| 25| 35|      87|
|bill| 19| 22| 25|      66|
| lee| 25| 25| 25|      75|
+----+---+---+---+--------+

