In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Spark DataFrame').getOrCreate()

In [None]:
df = spark.read.options(inferSchema='True',header='True',delimiter=',').csv('data/StudentData.csv')

In [None]:
df.show(5)

In [None]:
df.printSchema()

### Struct Schema

In [None]:
from pyspark.sql. types import StructType, StructField, StringType, IntegerType
schema = StructType ([
    StructField ("age", IntegerType(), True),
    StructField("gender", StringType (), True),
    StructField ("name", StringType (), True),
    StructField("course", StringType (), True), 
    StructField("roll", StringType (), True),
    StructField("marks", IntegerType (), True),
    StructField("email", StringType(), True)
])

In [None]:
df = spark.read.options(header='True').schema(schema).csv('data/StudentData.csv')
df.printSchema()

### Create DF from RDD

In [None]:
from pyspark import SparkConf, SparkContext
conf = SparkConf().setAppName('DF RDD').set("spark.ui.port", "8081")
sc = SparkContext.getOrCreate(conf=conf)
# transformation read the file
rdd = sc.textFile('data/StudentData.csv')
header = rdd.first()
rdd1 = rdd.filter(lambda x: x != header).map(lambda x: x.split(','))

In [None]:
columns = header.split(',')
dfRdd = rdd1.toDF(columns)
dfRdd.show(5)

In [None]:
from pyspark.sql. types import StructType, StructField, StringType, IntegerType
schema = StructType ([
    StructField ("age", IntegerType(), True),
    StructField("gender", StringType (), True),
    StructField ("name", StringType (), True),
    StructField("course", StringType (), True), 
    StructField("roll", StringType (), True),
    StructField("marks", IntegerType (), True),
    StructField("email", StringType(), True)
])

In [None]:
rdd2 = rdd1.map(lambda x: [int(x[0]), x[1], x[2], x[3], x[4], int(x[5]), [6]])
dfRdd1 = spark.createDataFrame(rdd2, schema=schema)
dfRdd.printSchema()
dfRdd.show(5)

### Select Columns

In [None]:
dfRdd.select('age', 'gender').show(5)

In [None]:
dfRdd.select(dfRdd.age, dfRdd.gender).show(5)

In [None]:
from pyspark.sql.functions import col, lit
dfRdd.select(col('roll'),col('name')).show(5)

In [None]:
dfRdd.select(dfRdd.columns[2:5]).show(5)

### withColumn

In [None]:
df1 = df.withColumn('marks', col('marks') + 10)
df1.show(5)

In [None]:
df2 = df.withColumn('aggregated_marks', col('marks') - 10)
df2.show(5)

In [None]:
df3 = df.withColumn('Country', lit('VN'))
df3.show(5)

### withColumnRenamed

In [None]:
df4 = df.withColumnRenamed('gender', 'sex')
df4.show(5)

In [None]:
df.select(col('name').alias('full name')).show(5)

### Filter Rows

In [None]:
df.filter(((df.course == "DB") | (df.course == "Cloud")) & (df.marks > 50)).show(5)

In [None]:
courses = ["DB", "Cloud", "OOP", "DSA"]
df.filter(df.course.isin(courses)).show(5)

In [None]:
df.filter(df.course.startswith("D")).show(5)

In [None]:
df.filter(df.name.like('%s%e%')).show(5)

### Quiz
- For the quiz you'll be using StudentData.csv
- Read this file in the DF
- Create a new column in the DF for total marks and let the total marks be 120
- Create a new column average to calculate the average marks of the student.
    - (marks / total marks) * 100
- Filter out all those students who have achieved more than 80% marks in OOP course and save it in a new DF.
- Filter out all those students who have achieved more than 60% marks in Cloud course and save it in a new DE.
- Print the names and marks of all the students from the above DFs

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit
spark = SparkSession.builder.appName('Spark DataFrame Quiz').getOrCreate()
df = spark.read.options(inferSchema='True',header='True',delimiter=',').csv('data/StudentData.csv')

In [None]:
df = df.withColumn('total marks',lit(120))
df = df.withColumn('average',(col('marks')/120)*100)
df1 = df.filter((df.course == 'OOP') & (df.average > 80))
df2 = df.filter((df.course == 'Cloud') & (df.average > 60))

In [None]:
df1.select(col('name'), col('marks'), col('course'), col('average')).show(5)
df2.select(col('name'), col('marks'), col('course'), col('average')).show(5)

### Count, Distinct, Duplicate

In [None]:
df.filter(df.course == "DB").count ()

In [None]:
df.select("gender").distinct().count()

In [None]:
df.dropDuplicates(["gender"]).show ()

### Sort and orderBy

In [None]:
df.sort(df.marks.asc(),df.age.desc()).show(5)

### Quiz
- Create a DF, sorted on bonus in ascending order and show it.
- Create a DF, sorted on age and salary in descending and ascending order respectively and show it.
- Create a DF sorted on age, bonus and salary in descending, descending and ascending order respectively and show it

In [None]:
df = spark.read.options(inferSchema='True',header='True',delimiter=',').csv('data/OfficeData.csv')

In [None]:
df.sort(df.bonus.asc()).show()

In [None]:
df.sort(df.age.desc(), df.salary.asc()).show()

In [None]:
df.sort(df.age.desc(),df.bonus.desc(), df.salary.asc()).show()

### GroupBy

In [None]:
df.groupBy('gender').sum('marks').show()
df.groupBy('gender').count().show()
df.groupBy('gender').max('marks').show()
df.groupBy('age').avg('marks').show()

In [None]:
df.groupBy('age','gender').count().show()


In [None]:
from pyspark.sql.functions import sum, avg, max, min, mean, count
df.groupBy("course","gender").agg(count("*").alias("total_enrollments"),
                         sum("marks").alias("total_marks"),
                         min("marks").alias("min_makrs"),
                         max("marks").alias("max_makrs"),
                         avg("marks").alias("avg_makrs")).show()

In [None]:
# dung df.total_enrollments o filter => error => dung col('total_enrollments')
df.filter(df.gender == "Male").groupBy("course", "gender")\
    .agg(count("*")\
    .alias("total_enrollments"))\
    .filter(col("total_enrollments") > 85).show()

### QUIZ
- For the quiz you'll be using StudentData.csv
- Read this file in the DF
- Display the total numbers of students enrolled in each course
- Display the total number of male and female students enrolled in each course
- Display the total marks achieved by each gender in each course
- Display the minimum, maximum and average marks achieved in each course by each age group.

In [None]:
df = spark.read.options(inferSchema='True',header='True',delimiter=',').csv('data/StudentData.csv')

In [None]:
df.show(5)

In [None]:
df.groupBy('course').agg(count("*").alias("total_enrollments")).show()

In [None]:
df.groupBy('course','gender').agg(count("*").alias("total_enrollments")).show()

In [None]:
df.groupBy('course','gender').agg(sum("marks").alias("total_marks")).show()

In [None]:
df.groupBy('course','gender').\
    agg(min("marks"), max('marks'), avg('marks')).show()

### QUIZ
- For the quiz you'll be using WordData. txt
- Read this file in the DF
- Calculate and show the count of each word present in the file

In [None]:
from pyspark import SparkConf, SparkContext
conf = SparkConf().setAppName('DF RDD').set("spark.ui.port", "8081")
sc = SparkContext.getOrCreate(conf=conf)
# transformation read the file
rdd = sc.textFile('data/WordData.txt')
rdd1 = rdd.map(lambda x: [x,len(x)])
df = rdd1.toDF(['word','count'])

In [None]:
df.groupBy('word').agg(count("*")).show()

In [None]:
schema = StructType ([
    StructField("value", StringType(), True),
])
df = spark.read.option("header", "false").schema(schema).csv('data/WordData.txt')
df.groupBy('value').count().show()

### UDFs - user defined funtions
- su dung function udf de convert my function thanh 1 function duoc dung nhu select, filter,...

In [None]:
from pyspark.sql.functions import col, lit, udf
def get_total_salary (salary, bonus):
    return salary + bonus


totalSalaryUDF = udf(lambda x, y: get_total_salary(x, y), IntegerType())

df = spark.read.options(inferSchema='True',header='True',delimiter=',').csv('data/OfficeData.csv')
df.withColumn("total_salary", totalSalaryUDF(df.salary, df.bonus)).show()

In [None]:
df.withColumn("total_salary", get_total_salary(df.salary, df.bonus)).show()

- tai sao khong dung function truc tiep?

For example, you can find that if you write:

``spark.sql("select replaceBlanksWithNulls(column_name) from dataframe")``

does not work if you didn't register the function replaceBlanksWithNulls as a udf. In spark sql we need to know the returned type of the function for the exectuion. Hence, we need to register the custom function as a user-defined function (udf) to be used in spark sql.

### QUIZ
- For the quiz you'll be using OfficeData.csv
- Read this file in the DF
- Create a new column increment and provide the increment to the employees on the following criteria
    - If the employee is in NY state, his increment would be 10% of salary plus 5% of bonus
    - If the employee is in CA state, his increment would be 12% of salary plus 3% of bonus

In [None]:
def calc_increment(state, salary, bonus):
    increment = 0
    if state == 'NY':
        increment = salary*10/100 + bonus*5/100
    elif state == 'CA':
        increment = salary*12/100 + bonus*3/100
    
    return increment
        

In [None]:

calcIncrement = udf(lambda x, y, z: calc_increment(x, y, z))

df = spark.read.options(inferSchema='True',header='True',delimiter=',').csv('data/OfficeData.csv')
df.withColumn('increment', calcIncrement(df.state, df.salary, df.bonus)).show()

### DF to RDD

In [None]:
df = spark.read.options(inferSchema='True',header='True',delimiter=',').csv('data/StudentData.csv')
rdd = df.rdd
# rdd.filter(lambda x: x['age'] == 28).collect()
type(rdd)

### Spark SQL

In [None]:
df.createOrReplaceTempView("Student")

In [None]:
spark.sql('select course from Student where gender="Male"').show(5)

In [None]:
## sql below equal this
df.select('course').filter(df.gender == 'Male').show(5)

### Write DF

In [None]:
df1 = df.groupBy('course','gender').\
    agg(min("marks"), max('marks'), avg('marks'))

In [None]:
df1.write.mode('overwrite').options(header='True').csv('data/output')

In [None]:
df2 = spark.read.options(header='True',inferSchema='True').csv('data/output')

In [None]:
df2.show()