In [0]:
print('hello')

In [0]:
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('Spark DataFrame').getOrCreate()


# limitation in databrick community edition: can create only one spark sesion in a notebook

In [0]:

df=spark.read.option('inferSchema',True).option('header',True).csv('/FileStore/tables/student.csv')
df.show()

In [0]:
df.printSchema()

In [0]:
df=spark.read.options(inferSchema='True',header='True').csv('/FileStore/tables/student.csv')
df.show()

In [0]:
df.printSchema()

In [0]:
# although roll number is numeric it is nominal data, thus there is no any sense to perform any mathematical manupulation to it,, thus we can define our own schema to tell spark to treat roll no as string

In [0]:
from pyspark.sql.types import StructField, StructType, StringType, IntegerType

In [0]:
# creating our own schema

schema = StructType([
  StructField('age',IntegerType(),True),
  StructField('gender',StringType(),True),
  StructField('name',StringType(),True),
  StructField('course',StringType(),True),
  StructField('roll',StringType(),True),
  StructField('marks',IntegerType(),True),
  StructField('email',StringType(),True)
  
])

In [0]:
df=spark.read.options(header='True').schema(schema).csv('/FileStore/tables/student.csv')
df.show()


In [0]:
df.printSchema()

In [0]:
# creating a dataframe from spark RDD

In [0]:
from pyspark import SparkConf, SparkContext

conf = SparkConf().setAppName('rdd')
sc = SparkContext.getOrCreate(conf=conf)


rdd = sc.textFile('/FileStore/tables/student.csv')
header = rdd.first()

rdd=rdd.filter(lambda x: x!=header).map(lambda x: x.split(',')) # gives the expected format for dataframe i.e list of values
rdd.collect()

In [0]:
header 

In [0]:
columns = header.split(',') # gives list of column names

dfRdd = rdd.toDF(columns)
dfRdd.printSchema()

In [0]:
dfRdd = spark.createDataFrame(rdd,schema=schema)
dfRdd.printSchema()

In [0]:
# select dataframe columns

In [0]:
from pyspark.sql.functions import col,lit

df.select(col('name'),col('roll')).show() # approach 1 of selecting columns

In [0]:
df.select('name','age').show() # approach2 of selecting columns

In [0]:
df.select(df.age,df.name,df.gender).show() # approach3 of selecting columns

In [0]:
df.select('*').show()

In [0]:
df.columns

In [0]:
df.columns[2]

In [0]:
df.select(df.columns[1]).show()

In [0]:
df.select(df.columns[:2]).show()

In [0]:
df.select(df.columns[:]).show()

In [0]:
# combination approach

df2=df.select(df.columns[0],df.email).show() # creating a new df from existing one by taking subset of columns

In [0]:
#withColumn in DataFrame

In [0]:
df.printSchema()

In [0]:
# converting roll form string back to integer
df_new=df.withColumn('roll',col('roll').cast('Integer'))
# if same name is given as the existing column, it will update that column else a new column will be written
df_new.printSchema()


In [0]:
# adding 10 to column value

df2 = df.withColumn('marks',col('marks')+10)
df2.show()

In [0]:
df2=df.withColumn('aggregated_marks',col('marks')-10)
df2.show()

In [0]:
df2=df.withColumn('Country',lit('USA')) # for each row use the constant string 'USA'
df2.show()

In [0]:
df.withColumn('Country',lit('USA')).withColumn('age',col('age')+3).withColumn('updated_marks',col('marks')+10).show()

In [0]:
# with column renamed

In [0]:
df.withColumnRenamed('name','fullname').show()

In [0]:
 df.select(col('name').alias('fullname')).show() # if we dont want to maniplulate the column value directly on the dataframe 

In [0]:
df.filter(df.course == 'DB').show()

In [0]:
df.filter(col('course') == 'DB').show()

In [0]:
df.filter((df.course == 'DB') & (df.marks >50)).show()

In [0]:
course = ['DB','cloud','OOP','DSA']
df.filter(df.course.isin(course)).show()

In [0]:
df.filter(df.course.startswith('D')).show()

In [0]:
df.filter(df.course.endswith('A')).show()

In [0]:
df.filter(df.email.contains('ld')).show()

In [0]:
df.filter(df.name.like('%ld%')).show()

In [0]:
df=df.withColumn('total_marks',lit(120).cast('Integer'))
df.show()

In [0]:
df=df.withColumn('average',col('marks')/col('total_marks')*100)
df.show()

In [0]:
df_oop=df.filter((df.course =='OOP')&(df.average>80))
df_oop.show()

In [0]:
df_cloud=df.filter((df.course =='Cloud')&(df.average>60))
df_cloud.show()

In [0]:
df_oop.select(df.name,df.marks).show()

In [0]:
df_cloud.select(df.name,df.marks).show()

In [0]:
df.count()

In [0]:
df.filter(df.course=='DB').count()

In [0]:
# to get the uniquerows use distinct

In [0]:
df.distinct().count() # all rows are unique

In [0]:
df.select('age','gender').distinct().count()

In [0]:
df.select('gender').distinct().count() # only 2 unique gender

In [0]:
# dropDuplicates() is the way to go if you want to drop duplicates over a subset of columns, and also keep only the specified columns in the dataframe the columns of the original structure.

In [0]:
df.show()

In [0]:
df.dropDuplicates(['gender']).show() # considers the first unique row for the column gender

In [0]:
df.dropDuplicates(['gender','course']).show() # considers the first unique row for the column gender and course

In [0]:
# dropDuplicates() is the way to go if you want to drop duplicates over a subset of columns, but at the same time you want to keep all the columns of the original structure.

In [0]:
df.drop_duplicates(['age','gender','course']).show()

In [0]:
df.select(['age','gender','course']).distinct().show()

In [0]:
df.sort(['marks']).show()

In [0]:
df.sort(['marks','age']).show() # first sort the data based on marks and then sort the data based on age

In [0]:
 df.orderBy(df.age.asc()).show() # sort by ascending order

In [0]:
 df.orderBy(df.age.desc()).show() # sort by descending order

In [0]:
 df.orderBy(df.age.desc(),df.marks.asc()).show() # sort age on the basis of desc and then sort the marks on the basis of ascending
  
  
  # note: mandatory for data to be in integer format if you use default sorting method

In [0]:
df_office=spark.read.option('inferSchema',True).option('header',True).csv('/FileStore/tables/office.csv')
df_office.show()

In [0]:
df_office.printSchema()

In [0]:
df_bonus=df.orderBy(['bonus'])
df_bonus.show()

In [0]:
df.sort(df.age.asc(),df.salary.desc()).show()

In [0]:
df.orderBy(df.age.desc(),df.bonus.desc(),df.salary.asc()).show()

In [0]:
# groupBy

In [0]:
# must specify the aggegare function

In [0]:
df.groupBy('gender').max('marks').show() # shows maximum marks obtained by male and female student

In [0]:
df.groupBy('gender').count().show()
df.groupBy('course').count().show()

In [0]:
df.groupBy('course').max('marks').show()  # maximum marks of each subject
df.groupBy('course').min('marks').show()  # minimum marks of each subject

In [0]:
df.groupBy('gender').avg('marks').show()

In [0]:
df.groupBy('course').avg('marks').show()

In [0]:
df.groupBy('age').avg('marks').show() # average marks of each group

In [0]:
# grouping by multiple columns

In [0]:
df.groupBy(df.gender,df.course).count().show() # for each gender, number of students count in each subject

df.groupBy(df.course,df.gender).count().show() # for each course, number of students in each gender ,, they are basically the same thing

In [0]:
df.groupBy(df.course,df.gender).max('marks').show()

In [0]:
# import aggregation function

from pyspark.sql.functions import avg,min,max,sum,mean,count

In [0]:
df.groupBy('gender').agg(count('*'),sum('marks'), min('marks'),avg('marks')).show()

In [0]:
df.groupBy('course').agg(count('*').alias('total_num_students'),sum('marks').alias('total_marks'), min('marks').alias('min_marks'),avg('marks').alias('average_marks')).show()

In [0]:
df.groupBy('course','gender').agg(count('*').alias('total_num_students'),sum('marks').alias('total_marks'), min('marks').alias('min_marks'),avg('marks').alias('average_marks')).show()

In [0]:
df_office.groupBy('department').count().show()

In [0]:
df_office.groupBy('department').sum('salary').show()

In [0]:
df_office.groupBy('department').agg(sum('salary').alias('sum_salary'), avg('salary').alias('average_salary')).show()

In [0]:
df_office.groupBy('department','state').agg(sum('salary').alias('sum_salary'), avg('salary').alias('average_salary')).show()

In [0]:
df_office.groupBy('department','state').agg(count('*').alias('total_employee')).show()

In [0]:
df.groupBy('gender').agg(count('*')).show()

In [0]:
# filtering before applying the filter

df_pre_filter=df.filter(df.gender=='Male').groupBy('gender','course').agg(count('*').alias('total_enrollment'))
df_pre_filter.show()

In [0]:
# filtering after applying the filter

df_pre_filter.where(df_pre_filter.total_enrollment>75).show()
# same as
df_pre_filter.filter(df_pre_filter.total_enrollment>75).show()

In [0]:
# col is dependent on the context and not the dataframe


# df.filter(df.gender=='Male').groupBy('gender','course').agg(count('*').alias('total_enrollment')).where(df_pre_filter.total_enrollment>75).show()  ---> raises an exception as there is no column named total_enrollments

df.filter(df.gender=='Male').groupBy('gender','course').agg(count('*').alias('total_enrollment')).where(col('total_enrollment')>75).show()
# instead we can use the col('') notation 

In [0]:
df.show()

In [0]:
 df.groupBy('course').count().show()

In [0]:
df.groupBy('course','gender').count().show()

In [0]:
df.groupBy('course','gender').agg(sum('marks').alias('total_marks')).show()

In [0]:
df.groupBy('course','age').agg(min('marks').alias('minimum_marks'),max('marks').alias('maximum_marks'),avg('marks').alias('average_marks')).show()

In [0]:

df_word_count=spark.read.text('/FileStore/tables/wordcount.txt')
df_word_count.show()


In [0]:
df_word_count.groupBy('value').count().show()

In [0]:
# USER DEFINED FUNCTION

In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType,FloatType

In [0]:
df_office.show()




In [0]:
def total_salary(sal,bon):
  return sal+bon

tot_sal_udf = udf(lambda x,y: total_salary(x,y),IntegerType()) # default is string type


In [0]:
df_office.withColumn('total_salary',tot_sal_udf(df_office.salary,df_office.bonus)).show()

In [0]:
def increment(sal,bon,state):
  if state=='NY':
    total_sal = 1.1*sal+1.05*bon
  elif state=='CA':
    total_sal = 1.12*sal+1.02*bon
  return total_sal


increment_udf = udf(lambda x,y,z: increment(x,y,z), FloatType())
    

In [0]:
df_office.withColumn('total_sal',increment_udf(df_office.salary,df_office.bonus,df_office.state)).show()

In [0]:
## CACHE AND PERSIST

In [0]:
df_gb=df.groupBy('course','gender','age').count()
df_gb = df_gb.withColumn('dummy',col('age')*100)

In [0]:
df_gb.show()

In [0]:
df_gb.cache() # this will persist the data using the persist function

# cache under the hood uses the persist function
# helps in optimization of time
# reduces the time of data analysis

In [0]:
df_gb.show() # will refer to cache/memoryt rather than start reading from the beginning

In [0]:
# to get the underlying rdd of the dataframe

rdd = df.rdd

In [0]:
type(rdd)

In [0]:
type(df)

In [0]:
rdd.collect()

In [0]:
rdd.filter(lambda x: x[1]=='Male' and x[0]>28).collect() # to get only the gender that are male

In [0]:
rdd.filter(lambda x: x['gender']=='Female').collect() # using the rdd we can refer the data using the key unlike using the index 

In [0]:
## Spark SQL

In [0]:
# to create a table out of dataframe, we have to register it  as a table
df.createOrReplaceTempView('Student') # now the table is registered as a temporay view named as Student and we can run sql query on it


In [0]:
spark.sql('select * from Student').show()

In [0]:
spark.sql('select name from Student where age>28').show()

# is equivalent to writing

df.select('name').filter(df.age>28).show()

In [0]:
spark.sql('select gender,max(marks) as max_marks from Student group by gender').show()

# equivalent to

df.groupBy('gender').agg(max(df.marks).alias('max_marks')).show()

In [0]:
spark.sql('select gender,count(*) from Student group by gender').show()

# eqivalent to

df.groupBy('gender').count().show()

In [0]:
# first register the df to temporary view and then write a sql query on it

In [0]:
# writing a dataframe to memory

In [0]:
df.write.options(header='True').csv('/FileStore/tables/StudentData/output') # no need to provide the name of the file because the underlying structure is the RDD

In [0]:
df.rdd.getNumPartitions() # since we have just 1 partition, there is only a singlw file while writing to the memory

In [0]:
# to read the data back

df = spark.read.options(header='True',inferSchema='True').csv('/FileStore/tables/StudentData/output')
df.show()

In [0]:
 # modes in which we can write the data:
  
  # overwrite (replaces with the new file ), append, ignore (wont do anything if the data is already present) and error (deafult, raises the exception if file already present) mode

In [0]:
df=df.groupBy('gender').count()

In [0]:
df.write.mode('overwrite').options(header='True').csv('/FileStore/tables/StudentData/output')

In [0]:
df = spark.read.options(header='True',inferSchema='True').csv('/FileStore/tables/StudentData/output')
df.show()
