In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Spark DataFrames").getOrCreate()

In [0]:
std1 = spark.read.csv("/FileStore/tables/StudentData.csv")
std1.show()

In [0]:
# Reveal the headers in the data frame
spark.read.option("header", True).csv("/FileStore/tables/StudentData.csv").show()


In [0]:

df = spark.read.option("inferSchema", True).option("header", True).csv("/FileStore/tables/StudentData.csv")
#or df = spark.read.options(inferSchema = 'True',header= 'True').csv("/FileStore/tables/StudentData.csv")
df.printSchema()

In [0]:
# You can assign a new schema to and existing dataframe, i.e. change the schema type.
# df = spark.read.options(inferSchema = 'True',header= 'True').csv("/FileStore/tables/StudentData.csv")

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

schema = StructType([
  StructField("age", IntegerType(), True),
  StructField("gender", StringType(), True),
  StructField("name", StringType(), True),
  StructField("course", StringType(), True),
  StructField("roll", StringType(), True),
  StructField("marks", IntegerType(), True),
  StructField("email", StringType(), True)
])


In [0]:
df1 = spark.read.options(header = 'True').schema(schema).csv('/FileStore/tables/StudentData.csv')
df1.show()
df1.printSchema()

In [0]:
from pyspark import SparkConf, SparkContext 
conf = SparkConf().setAppName("RDD") 
sc = SparkContext.getOrCreate(conf=conf)

rdd= sc.textFile('/FileStore/tables/StudentData.csv') 
headers = rdd. first() # Get header/ first row of the data frame 
rdd = rdd. filter (lambda x: x != headers).map(lambda x: x.split (','))

In [0]:
rdd.collect()

In [0]:
 # Converting the RDD to a dataframe
columns = headers.split(',') #Get column names
rdd_to_df = rdd.toDF(columns)
rdd_to_df.show()

In [0]:
rdd_to_df.printSchema()

In [0]:
#Access Schema type and change it for analysis.

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

schema = StructType([
  StructField("age", IntegerType(), True),
  StructField("gender", StringType(), True), 
  StructField("name", StringType(), True),
  StructField("course", StringType(), True),
  StructField("roll", StringType(), True),
  StructField("marks", IntegerType(), True),
  StructField("email", StringType(), True)
])


In [0]:
# Create data frame
rdd_to_df = spark.createDataFrame(rdd, schema = schema)
rdd_to_df.printSchema()
#rdd_to_df.show() #this returns an error because the input value need to be reassigned.

# Reassign fields type.
rdd.collect() 
rdd.map(lambda x: [int(x[0]), x[1], x[2], x[3],x[4], int(x[5]), x[5]]).collect()


In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Select Spark DataFrames").getOrCreate()

In [0]:
df = spark.read.options(header = 'True', inferSchema= 'True').csv('/FileStore/tables/StudentData.csv')
df.show()

In [0]:
#select and display only certain columns
#df.select('email','course').show()
df.select(df.name, df.roll).show()

In [0]:
# USING 'columns' to show columns by array position

df.select(df.columns[:3]).show()

In [0]:
# Use a variable to filter columns.
df2= df.select(df.name, df.email)

In [0]:
df2.show()

In [0]:
# withColumns allow us to to only manipulate or change a single columns schema type
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Select Spark DataFrames").getOrCreate()

df1 = spark.read.options(header = 'True', inferSchema= 'True').csv('/FileStore/tables/StudentData.csv')
df1.printSchema()

In [0]:
# import col
from pyspark.sql.functions import col
#use the .cast to change the column schematype
df2 = df1.withColumn('marks',col('marks').cast('String'))
df2.printSchema()

In [0]:
# Other manipulations like additions on a column:

dfa= df1.withColumn('marks',col('marks') + 5)

dfa.show()



In [0]:
# Add another column based on aggregation
dfb= df1.withColumn('subtructed marks',col('marks') - 5) #this will create a new column 'aggregated marks'
dfb.show()

In [0]:
#Import lit to do below
from pyspark.sql.functions import col, lit
# Adding hard coded word/values to a column and creat a new column
dfa= df1.withColumn('Country',lit('GM Office'))

In [0]:
dfa.show()

In [0]:
dfa= df1.withColumn('Country',lit('GM Office'))
dfa.show()

In [0]:
#Manipulate one cell and create another cell from another cell
df1.withColumn("marks", col("marks") -10).withColumn("updated marks", col("marks") + 20).show()

In [0]:
df1.withColumn("marks", col("marks") -10).withColumn("updated marks", col("marks") + 20).withColumn('Country',lit('Jamaica')).show()

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Select Spark DataFrames").getOrCreate()

df1 = spark.read.options(header = 'True', inferSchema= 'True').csv('/FileStore/tables/StudentData.csv')
df1.printSchema()

#Use withColumnRenamed() to change the columns name.
df1.show()

In [0]:
#This renames the columns that is pecified, if the specified columns name is not available, it will ignore it.
df1 = df1.withColumnRenamed('gender', 'sex').withColumnRenamed('marks', 'score')
df1.show()

In [0]:
df1.select(col('email').alias('email address')).show()

In [0]:
# filter allows you to filter a dataset column with a specified word
df1.filter(df1.course =='DB').show()
#or
# df1.filter(col('course')=='DB').show()

In [0]:
# Selecting the dataset by defining criteria's 
df1.filter((df1.course == 'DB') & (df1.marks > 50) ).show()

In [0]:
#USING 'isin()' to get data that is in a list

courses = ['DB', 'Cloud', 'OOP']
df1.filter(df1.course.isin(courses)).show() # this gets all rows that have a course name == 'DB', 'Cloud' or 'OOP'

In [0]:
#USING 'startswith()' and to get data that Starts with at Letter
df1. filter(df1.course.startswith('D')).show()

#USING 'endswith()' and to get data that Ends with at Letter
df1. filter(df1.course.endswith('e')).show()


In [0]:
#USING 'contains()' and to get data with a Letter
df1. filter(df1.name.contains('e')).show()

In [0]:
#USING 'like()' and to get data that Ends with at Letter
df1. filter(df1.name.like('%g_e%')).show()
df1. filter(df1.name.like('%e%')).show()
df1. filter(df1.name.like('%se%')).show()

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit
spark = SparkSession.builder.appName("Quiz").getOrCreate()

df = spark.read.options(header = 'True', inferSchema= 'True').csv('/FileStore/tables/StudentData.csv')
df.printSchema()

# For the quiz you'll be using StudentData.csv 
# Read this file in the DF 
# Create a new column in the DF for total marks and let the total marks be 120 
# Create a new column average to calculate the average marks of the student. (marks / total marks) * 100 
# Filter out all those students who have achieved more than 80% marks in OOP course and save it in a new DF. 
# Filter out all those students who have achieved more than 60% marks in Cloud course and save it in a new DF. 
# Print the names and marks of all the students from the above DFs

In [0]:
# Create a new column in the DF for total marks and let the total marks be 120 
total_marks = df.withColumn('Total Mark', lit(120))
total_marks.show()

In [0]:
# Create a new column average to calculate the average marks of the student. (marks / total marks) * 100
Average_marks = total_marks.withColumn('Average Mark', col('marks')/col('Total Mark') * 100)
Average_marks.show()

In [0]:
# Filter out all those students who have achieved more than 80% marks in OOP course and save it in a new DF. 
OPP_above_80 = Average_marks.filter((col('course') == 'OOP') & (col('Average Mark') > 80))
OPP_above_80.show()


In [0]:
# Filter out all those students who have achieved more than 60% marks in Cloud course and save it in a new DF. 
Cloud_above_60 = Average_marks.filter((col('course') == 'Cloud') & (col('Average Mark') > 60))
Cloud_above_60.show()



In [0]:
# Print the names and marks of all the students from the above DFs
Cloud_above_60.select(col('name'),col('marks'))

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit
spark = SparkSession.builder.appName("Count, Distinct, Duplicate").getOrCreate()

df = spark.read.options(header = 'True', inferSchema= 'True').csv('/FileStore/tables/StudentData.csv')

In [0]:
#Using Count
# df.count()
df.filter(df.course == 'Cloud').count()

In [0]:
#Using Distinct to get the first distinct row based one the specified critirea (on selected column)
df.select('gender').distinct().show()

In [0]:
df.select('age','gender', 'course').distinct().show()

In [0]:
#Use dropDuplucates() to drop duplicates from a dataset
df.dropDuplicates(['gender','course']).show()

In [0]:
# SORTING ONLY WORKS WITH integers, to use it on strings, the column will have to be conveterd (google how to)
# Using 'sort()' to sort a dataset
df.sort('marks', 'age').show()

In [0]:
df.sort(df.marks.asc(), df.age.desc()).show()

In [0]:
# For the quiz you'll be using OfficeData.csv 
# Read this file in the DF 
# Create a DF, sorted on bonus in ascending order and show it. 
# Create a DF, sorted on age and salary in descending and ascending order respectively and show it. 
# Create a DF sorted on age, bonus and salary in descending, descending and ascending order respectively and show it

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit
spark = SparkSession.builder.appName("Count, Distinct, Duplicate").getOrCreate()

df = spark.read.options(header = 'True', inferSchema= 'True').csv('/FileStore/tables/OfficeData.csv')


In [0]:
asc_office = df.sort('bonus')
asc_office.show()

In [0]:
df.sort(df.age.desc(), df.salary.asc()).show()

In [0]:
df.sort(df.bonus.desc(), df.age.desc(), df.salary.asc()).show()

In [0]:
# groupBy allows an aggreagted(this must be an integer that can be caluculated) value linked to criterial
df.groupBy('department').sum('salary').show()
# below, we can groupby() and get the total salary for each department

In [0]:
# Get the count of states
df.groupBy('state').count().show()

In [0]:
# Get the highest salary by department
df.groupBy('department').max('salary').show()
df.groupBy('department').min('salary').show()

In [0]:
#df.groupBy('department').avg('bonus').show()

df.groupBy('department').mean('bonus').show()

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit
from pyspark.sql.functions import sum,avg,max,min,mean,count
spark = SparkSession.builder.appName("sum,avg,max,min,mean,count").getOrCreate()

df = spark.read.options(header = 'True', inferSchema= 'True').csv('/FileStore/tables/OfficeData.csv')

In [0]:
df.show()

In [0]:
df.groupBy('department').agg(sum('salary'), min('salary')).show()

In [0]:
df.groupBy('department','state').agg(count('*'),sum('salary')).show()

In [0]:
# This is to filter for a particular value and groupby based on it.
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum,avg,max,min,mean,count
spark = SparkSession.builder.appName("sum,avg,max,min,mean,count").getOrCreate()

dd= spark.read.options(header = 'True', inferSchema= 'True').csv('/FileStore/tables/StudentData.csv')


In [0]:
#Get Male value in column and groupby it. i.e. filter out not neede values 
dd1 = dd.filter(dd.gender =='Male').groupBy('gender').agg(count('*'))
dd1.show()

In [0]:
# Use 'filter' or the 'where' clause to further filter a dataset
dd2 = dd.filter(dd.gender =='Male').groupBy('gender','course').agg(count('*').alias('total_enrollment'))
dd2.show()

In [0]:
dd2.filter(dd2.total_enrollment > 80).show()
dd2.where(dd2.total_enrollment > 80).show()

In [0]:
# For the quiz you'll be using StudentData.csv 
# Read this file in the DF 
# Display the total numbers of students enrolled in each course 
# Display the total number of male and female students enrolled in each course 
# Display the total marks achieved by each gender in each course 
# Display the minimum, maximum and average marks achieved in each *course by each age group.

# This is to filter for a particular value and groupby based on it.
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum,avg,max,min,mean,count
spark = SparkSession.builder.appName("sum,avg,max,min,mean,count").getOrCreate()

ds= spark.read.options(header = 'True', inferSchema= 'True').csv('/FileStore/tables/StudentData.csv')


In [0]:
ds.show()

In [0]:
total_count = ds.groupBy('course').agg(count('*'))
total_count.show()

In [0]:
male_female = ds.groupBy('course', 'gender').agg(count('*'))
male_female.show()

In [0]:
total_marks = ds.groupBy('gender','course').agg(sum('marks'))
total_marks.show()

In [0]:
min_max_avg = ds.groupBy(ds.gender, 'course').agg(min('marks'), max('marks'), avg('marks'))
min_max_avg.show()

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum,avg,max,min,mean,count
spark = SparkSession.builder.appName("sum,avg,max,min,mean,count").getOrCreate()

dc= spark.read.text('/FileStore/tables/WordData.txt')


In [0]:
#dc.show()
dc.groupBy(dc.value).count().show()

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, udf
from pyspark.sql.types import IntegerType

sp = SparkSession.builder.appName('Spark UDF').getOrCreate()

In [0]:
df = sp.read.options(header='True', inferSchema='True').csv('/FileStore/tables/OfficeData.csv')
df.show()

In [0]:
# Using User defined functions:

def get_total_salary(salary, Bonus):
  return salary + Bonus

totalPayUDF = udf(lambda x,y: get_total_salary(x,y), IntegerType())

# Add 'get_total_salary' as a new column
df.withColumn('total_salary',totalPayUDF(df.salary, df.bonus)).show()

In [0]:
'''
1. For the quiz you'll be using OfficeData.csv 
2. Read this file in the DF 
3. Create a new column increment and provide the increment to the employees on the following criteria 
  - If the employee is in NY state, his increment would be 10% of salary plus 5% of bonus 
  - If the employee is in CA state, his increment would be 12% of salary plus 3% of bonus
'''

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType # becouse the output of sum is a float/ double we will need to define he output in the lambda function with 'DoubleType()'

sp = SparkSession.builder.appName('UDF Quiz').getOrCreate()

df = sp.read.options(header = 'True', inferSchema = 'True').csv('/FileStore/tables/OfficeData.csv')


In [0]:
def increment(state,salary, bonus):
  if state == 'NY':
    sum = salary * 0.1 + (bonus *0.05)
  elif state =='CA':
    sum = salary * 0.12 + (bonus *0.03)
  return sum


incre = udf(lambda x,y,z: increment(x,y,z), DoubleType())


df.withColumn('incre_salary',incre(df.state, df.salary, df.bonus)).show()

In [0]:
def increment(state,salary, bonus):
  sum=0
  if state == 'NY':
    sum = salary * 0.10
    sum += bonus * 0.05
  elif state =='CA':
    sum = salary * 0.12
    sum += bonus * 0.03
  return sum


incre = udf(lambda x,y,z: increment(x,y,z), DoubleType())


df.withColumn('incre_salary',incre(df.state, df.salary, df.bonus)).show()

In [0]:
# Anytime an action is run on a dataframe, it starts from the top down before it executed.
# Cacht is used so the code does run from the top but from the last cached.

df.cache()

In [0]:
#chech datatype
type(df)

In [0]:
rdd = df.rdd

In [0]:
type(rdd)

In [0]:
rdd.collect()

In [0]:
rdd.filter(lambda x:x[2] == 'NY').collect()

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf

sp = SparkSession.builder.appName('SparkSQL').getOrCreate()

ds = sp.read.options(header = 'True', inferSchema = 'True').csv('/FileStore/tables/StudentData.csv')

In [0]:
 #Register the dataFrame as a sql table called 'Student'
ds.createOrReplaceTempView('Student')

In [0]:
#Query the Student Table
#sp.sql('select course from Student where age > 20').show()

sp.sql('select course, count(*) from Student group by course').show()

In [0]:
sp.sql('select course,gender, sum(marks) as New from Student group by course, gender').show()

In [0]:
# By writing to the output, this dataframe will be safed in the file path below.
ds.write.options(header = 'True').csv('/FileStore/tables/StudentData/output')

#this saves it to output as one partition

In [0]:
#get number of partition
ds.rdd.getNumPartitions()

In [0]:
dfi = ds.groupBy('gender', 'course').count()

In [0]:
#Use the overwrite mode apply overwirte changes
dfi.write.mode('overwrite').options(header='True').csv('/FileStore/tables/StudentData/output')

In [0]:
dfi.collect()