## Overview

This notebook will show you how to create and query a table or DataFrame that you uploaded to DBFS. [DBFS](https://docs.databricks.com/user-guide/dbfs-databricks-file-system.html) is a Databricks File System that allows you to store data for querying inside of Databricks. This notebook assumes that you have a file already inside of DBFS that you would like to read from.

This notebook is written in **Python** so the default cell type is Python. However, you can use different languages by using the `%LANGUAGE` syntax. Python, Scala, SQL, and R are all supported.

In [0]:
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from pyspark.sql.functions import col
from pyspark.sql.types import  StringType 

In [0]:
# File location and type
file_location = "dbfs:/databricks-datasets/flights/departuredelays.csv"
file_type = "csv"

# CSV options
infer_schema = "True"
first_row_is_header = "True"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
flights = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(flights)

date,delay,distance,origin,destination
1011245,6,602,ABE,ATL
1020600,-8,369,ABE,DTW
1021245,-2,602,ABE,ATL
1020605,-4,602,ABE,ATL
1031245,-4,602,ABE,ATL
1030605,0,602,ABE,ATL
1041243,10,602,ABE,ATL
1040605,28,602,ABE,ATL
1051245,88,602,ABE,ATL
1050605,9,602,ABE,ATL


In [0]:
type(flights)

Out[4]: pyspark.sql.dataframe.DataFrame

In [0]:
#Create RDD by loading the yahoo_stocks.csv of DBFS
rdd_flight = spark.read.format('com.databricks.spark.csv').\
                               options(header='true', \
                               inferschema='true').\
                load("dbfs:/databricks-datasets/flights/departuredelays.csv",header=True).rdd


print("\n",type(rdd_flight))


 <class 'pyspark.rdd.RDD'>


In [0]:
# Display RDD with various methods available in Pyspark.
spark = SparkSession.builder.appName("Python Spark create RDD example")\
                            .config("spark.some.config.option", "some-value").getOrCreate()

rdd_par = spark.sparkContext.parallelize([(1,2,3,'a b c'),(4,5,6,'d e f'),(7,8,9,'g h i')]).toDF(["col_1","col_2","col_3","col_4"]).rdd

In [0]:
rdd_par.collect()

Out[7]: [Row(col_1=1, col_2=2, col_3=3, col_4='a b c'),
 Row(col_1=4, col_2=5, col_3=6, col_4='d e f'),
 Row(col_1=7, col_2=8, col_3=9, col_4='g h i')]

In [0]:
type(rdd_par)

Out[8]: pyspark.rdd.RDD

In [0]:
# Display number of Partitions occupied by RDD 
rdd_par.getNumPartitions()

Out[9]: 8

In [0]:
#Repartition of dataframe 
#By default it showing parquet format

flights.repartition(3).write.format("csv").mode("overwrite").save("/FileStore/tables/flight_df_partition/")

In [0]:
#Create Single Column Dataframe with List, Tuple and mention your observations.


In [0]:
data = [("Virat Kohli", "India", 5,246,123.00),
        ("Max oDowd", "Netherlands",8,242,34.57),
        ("Suryakumar Yadav", "India", 5,225,75.00),
        ("kusal Mendis", "Srilanka", 8,223,31.86),
        ("Sikandar Raza", "Zimbabwe", 8,219,27.38)]
  
# giving column names of dataframe
columns = ["Name", "Country", "Match","Run","AVG"]
  
# creating a dataframe
most_run = spark.createDataFrame(data, columns)

In [0]:
display(most_run)

Name,Country,Match,Run,AVG
Virat Kohli,India,5,246,123.0
Max oDowd,Netherlands,8,242,34.57
Suryakumar Yadav,India,5,225,75.0
kusal Mendis,Srilanka,8,223,31.86
Sikandar Raza,Zimbabwe,8,219,27.38


In [0]:
#Create Single Column Dataframe with List
name=spark.createDataFrame(["Virat Kohli","Max oDowd","Suryakumar Yadav","kusal Mendis","Sikandar Raza"], "string").toDF("Name")
display(name)

Name
Virat Kohli
Max oDowd
Suryakumar Yadav
kusal Mendis
Sikandar Raza


In [0]:
#Create Single Column Dataframe with tuple
name_tupple=spark.createDataFrame(("Virat Kohli","Max oDowd","Suryakumar Yadav","kusal Mendis","Sikandar Raza","kusal Mendis","Suryakumar Yadav"), "string").toDF("Name")
display(name_tupple)

Name
Virat Kohli
Max oDowd
Suryakumar Yadav
kusal Mendis
Sikandar Raza
kusal Mendis
Suryakumar Yadav


In [0]:
type(name_tupple)#cheking types 

Out[34]: pyspark.sql.dataframe.DataFrame

In [0]:
#Create a Multi Column Dataframe with List of List
data = [["Virat Kohli", "India", 5,246,123.00],
        ["Max oDowd", "nNetherlands",8,242,34.57],
        ["Suryakumar Yadav", "India", 5,225,75.00],
        ["kusal Mendis", "Srilanka", 8,223,31.86],
        ["Sikandar Raza", "Zimbabwe", 8,219,27.38]]
  
# giving column names of dataframe
columns = ["Name", "Country", "Match","Run","AVG"]
  
# creating a dataframe
most_run = spark.createDataFrame(data, columns)
display(most_run)

Name,Country,Match,Run,AVG
Virat Kohli,India,5,246,123.0
Max oDowd,nNetherlands,8,242,34.57
Suryakumar Yadav,India,5,225,75.0
kusal Mendis,Srilanka,8,223,31.86
Sikandar Raza,Zimbabwe,8,219,27.38


In [0]:
##Create a Multi Column Dataframe with List of  Tuple  Mention Your Observations.
data = [("Virat Kohli", "India", 5,246,123.00),
        ("Max oDowd", "nNetherlands",8,242,34.57),
        ("Suryakumar Yadav", "India", 5,225,75.00),
        ("kusal Mendis", "Srilanka", 8,223,31.86),
        ("Sikandar Raza", "Zimbabwe", 8,219,27.38)]

for i in data:
    print(type(i))
# giving column names of dataframe
columns = ["Name", "Country", "Match","Run","AVG"]
  
# creating a dataframe
most_run = spark.createDataFrame(data, columns)
display(most_run)


<class 'tuple'>
<class 'tuple'>
<class 'tuple'>
<class 'tuple'>
<class 'tuple'>


Name,Country,Match,Run,AVG
Virat Kohli,India,5,246,123.0
Max oDowd,nNetherlands,8,242,34.57
Suryakumar Yadav,India,5,225,75.0
kusal Mendis,Srilanka,8,223,31.86
Sikandar Raza,Zimbabwe,8,219,27.38


In [0]:
#Create a Multi Column Dataframe with List of  Dictionary and Mention Your Observations.
students =[{'rollno':'001','name':'sravan','age':23,'height':5.79,'weight':67,'address':'guntur'},
               {'rollno':'002','name':'ojaswi','age':16,'height':3.79,'weight':34,'address':'hyd'},
               {'rollno':'003','name':'gnanesh chowdary','age':7,'height':2.79,'weight':17,'address':'patna'},
               {'rollno':'004','name':'rohith','age':9,'height':3.69,'weight':28,'address':'hyd'},
               {'rollno':'005','name':'sridevi','age':37,'height':5.59,'weight':54,'address':'hyd'}]

for i in students:
    print(type(i))
students=spark.createDataFrame( students)
display(students)

<class 'dict'>
<class 'dict'>
<class 'dict'>
<class 'dict'>
<class 'dict'>


address,age,height,name,rollno,weight
guntur,23,5.79,sravan,1,67
hyd,16,3.79,ojaswi,2,34
patna,7,2.79,gnanesh chowdary,3,17
hyd,9,3.69,rohith,4,28
hyd,37,5.59,sridevi,5,54


In [0]:
#Create Dataframe from RDD
data = [
    {'Property_code':"P01",'Property':"Uptown",'Tenant Units':50,'Salary':50000},
    {'Property_code':"P02",'Property':"Alabang",'Tenant Units':40,'Salary':40000},
    {'Property_code':"P03",'Property':"Chinatown",'Tenant Units':70,'Salary':70000},
    {'Property_code':"P04",'Property':"Riversdale",'Tenant Units':35,'Salary':20000},
    {'Property_code':"P05",'Property':"Bonificio",'Tenant Units':60,'Salary':80000} 
        ]


conf = SparkConf().setAppName("appName").setMaster("local[*]")
sc = SparkContext.getOrCreate(conf)

rdd = sc.parallelize(data)
print(type(rdd))#Resilient Distributed Dataset (RDD)

df_rdd = rdd.toDF()
print(type(df_rdd))


<class 'pyspark.rdd.RDD'>
<class 'pyspark.sql.dataframe.DataFrame'>


In [0]:

display(df_rdd)

Property,Property_code,Salary,Tenant Units
Uptown,P01,50000,50
Alabang,P02,40000,40
Chinatown,P03,70000,70
Riversdale,P04,20000,35
Bonificio,P05,80000,60


####Create a Data Frame using Range

In [0]:
df_range = spark.range(0,11)
df_range.show()

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
|  5|
|  6|
|  7|
|  8|
|  9|
| 10|
+---+



###Pandas DataFrame VS Spark DataFrame

In [0]:
import pandas as pd
#Reading CSV using pandas and spark
file_location='dbfs:/databricks-datasets/flights/departuredelays.csv'#file path
pandasDF = spark.read.format('csv').options(header='true').load(file_location).toPandas() #reading csv file for Pandas Dataframe
sparkDF = spark.read.option("header",True).option("schema",True).csv(file_location)#reading csv file for spark Dataframe

In [0]:
display(pandasDF)


date,delay,distance,origin,destination
1011245,6,602,ABE,ATL
1020600,-8,369,ABE,DTW
1021245,-2,602,ABE,ATL
1020605,-4,602,ABE,ATL
1031245,-4,602,ABE,ATL
1030605,0,602,ABE,ATL
1041243,10,602,ABE,ATL
1040605,28,602,ABE,ATL
1051245,88,602,ABE,ATL
1050605,9,602,ABE,ATL


In [0]:




display(sparkDF)

date,delay,distance,origin,destination
1011245,6,602,ABE,ATL
1020600,-8,369,ABE,DTW
1021245,-2,602,ABE,ATL
1020605,-4,602,ABE,ATL
1031245,-4,602,ABE,ATL
1030605,0,602,ABE,ATL
1041243,10,602,ABE,ATL
1040605,28,602,ABE,ATL
1051245,88,602,ABE,ATL
1050605,9,602,ABE,ATL


In [0]:


#show count for pandas and spark

print("Pandas Count:","\n",pandasDF.count())
print("\n","Spark Count:",sparkDF.count())

Pandas Count: 
 date           1391578
delay          1391578
distance       1391578
origin         1391578
destination    1391578
dtype: int64

 Spark Count: 1391578


In [0]:
#showing shape for dataframe, but for spark, there's no shape command

print("Pandas Shape: ",pandasDF.shape)
print("\n","Spark Shape: ",sparkDF.count(),len(sparkDF.columns))

Pandas Shape:  (1391578, 5)

 Spark Shape:  1391578 5


In [0]:
pandasDF.head(5)

Unnamed: 0,date,delay,distance,origin,destination
0,1011245,6,602,ABE,ATL
1,1020600,-8,369,ABE,DTW
2,1021245,-2,602,ABE,ATL
3,1020605,-4,602,ABE,ATL
4,1031245,-4,602,ABE,ATL


In [0]:
sparkDF.head()

Out[47]: Row(date='01011245', delay='6', distance='602', origin='ABE', destination='ATL')

In [0]:
%sql

/* Query the created temp table in a SQL cell */
select * from departuredelays_csv
--select * from temp_table_name  --you can use this

date,delay,distance,origin,destination
1011245,6,602,ABE,ATL
1020600,-8,369,ABE,DTW
1021245,-2,602,ABE,ATL
1020605,-4,602,ABE,ATL
1031245,-4,602,ABE,ATL
1030605,0,602,ABE,ATL
1041243,10,602,ABE,ATL
1040605,28,602,ABE,ATL
1051245,88,602,ABE,ATL
1050605,9,602,ABE,ATL


In [0]:
# Create a view or table

temp_table_name = "departuredelays_csv"#----> why this assignment
flights.createOrReplaceTempView(temp_table_name)

In [0]:
%sql

/* Query the created temp table in a SQL cell */

select * from departuredelays_csv
--select * from temp_table_name  --you can use this

date,delay,distance,origin,destination
1011245,6,602,ABE,ATL
1020600,-8,369,ABE,DTW
1021245,-2,602,ABE,ATL
1020605,-4,602,ABE,ATL
1031245,-4,602,ABE,ATL
1030605,0,602,ABE,ATL
1041243,10,602,ABE,ATL
1040605,28,602,ABE,ATL
1051245,88,602,ABE,ATL
1050605,9,602,ABE,ATL


###Narrow Transformation

In [0]:
marks=spark.read.csv('/FileStore/tables/student_marks.csv',header=True)

In [0]:
display(marks)

_c0,Gender,DOB,Maths,Physics,Chemistry,English,Biology,Economics,History,Civics
John,M,05/04/1988,55,45,56.0,87,21,52,89,65
Suresh,M,4/5/1987,75,55,,64,90,61,58,2
Ramesh,M,25/5/1989,25,54,89.0,76,95,87,56,74
Jessica,F,12/8/1990,78,55,86.0,63,54,89,75,45
Jennifer,F,2/9/1989,58,96,78.0,46,96,77,83,53


In [0]:
marks=marks.withColumnRenamed('_c0', 'Name')#column rename
display(marks)

Name,Gender,DOB,Maths,Physics,Chemistry,English,Biology,Economics,History,Civics
John,M,05/04/1988,55,45,56.0,87,21,52,89,65
Suresh,M,4/5/1987,75,55,,64,90,61,58,2
Ramesh,M,25/5/1989,25,54,89.0,76,95,87,56,74
Jessica,F,12/8/1990,78,55,86.0,63,54,89,75,45
Jennifer,F,2/9/1989,58,96,78.0,46,96,77,83,53


###Select()

In [0]:
display(marks.select('name'))#selecting name  column

name
John
Suresh
Ramesh
Jessica
Jennifer


In [0]:
display(marks.select('name','gender')) #selecting Name and Gender columns

name,gender
John,M
Suresh,M
Ramesh,M
Jessica,F
Jennifer,F


In [0]:
from pyspark.sql.functions import *
display(marks.select(avg('Maths')))#selecting Avrage   score of Maths

avg(Maths)
58.2


In [0]:
display(marks.select(max('Maths')))#selecting Maximum   score of Maths

max(Maths)
78


min(Maths)
25


##Filter()

In [0]:
#data filter
display(marks.filter(marks.Maths<=55))#filtering data where maths score is below and equal to 55

Name,Gender,DOB,Maths,Physics,Chemistry,English,Biology,Economics,History,Civics
John,M,05/04/1988,55,45,56,87,21,52,89,65
Ramesh,M,25/5/1989,25,54,89,76,95,87,56,74


In [0]:
display(marks.filter((marks.Maths>=55) | (marks.English<=60)))
#filtering data where maths score is below or equal to 55
#filtering data where English  score is more than 60  or equal to 60

Name,Gender,DOB,Maths,Physics,Chemistry,English,Biology,Economics,History,Civics
John,M,05/04/1988,55,45,56.0,87,21,52,89,65
Suresh,M,4/5/1987,75,55,,64,90,61,58,2
Jessica,F,12/8/1990,78,55,86.0,63,54,89,75,45
Jennifer,F,2/9/1989,58,96,78.0,46,96,77,83,53


In [0]:
#filters data where chemistry score is null
display(marks.filter( (marks.Chemistry.isNull())))

Name,Gender,DOB,Maths,Physics,Chemistry,English,Biology,Economics,History,Civics
Suresh,M,4/5/1987,75,55,,64,90,61,58,2


#withColumn()

In [0]:
marks=marks.withColumn('New_data',(marks.Maths)+(marks.History) )#Creating new column with sum of two records(maths and History)

In [0]:
display(marks)

Name,Gender,DOB,Maths,Physics,Chemistry,English,Biology,Economics,History,Civics,New_data
John,M,05/04/1988,55,45,56.0,87,21,52,89,65,144.0
Suresh,M,4/5/1987,75,55,,64,90,61,58,2,133.0
Ramesh,M,25/5/1989,25,54,89.0,76,95,87,56,74,81.0
Jessica,F,12/8/1990,78,55,86.0,63,54,89,75,45,153.0
Jennifer,F,2/9/1989,58,96,78.0,46,96,77,83,53,141.0


In [0]:
marks=marks.withColumnRenamed('New_data','Sum_of_math_and_History')
display(marks)
# renamingt the column

Name,Gender,DOB,Maths,Physics,Chemistry,English,Biology,Economics,History,Civics,Sum_of_math_and_History
John,M,05/04/1988,55,45,56.0,87,21,52,89,65,144.0
Suresh,M,4/5/1987,75,55,,64,90,61,58,2,133.0
Ramesh,M,25/5/1989,25,54,89.0,76,95,87,56,74,81.0
Jessica,F,12/8/1990,78,55,86.0,63,54,89,75,45,153.0
Jennifer,F,2/9/1989,58,96,78.0,46,96,77,83,53,141.0


In [0]:
marks=marks.drop('Sum_of_math_and_History')#Dropping the columns
print(marks.columns)
display(marks)

['Name', 'Gender', 'DOB', 'Maths', 'Physics', 'Chemistry', 'English', 'Biology', 'Economics', 'History', 'Civics']


Name,Gender,DOB,Maths,Physics,Chemistry,English,Biology,Economics,History,Civics
John,M,05/04/1988,55,45,56.0,87,21,52,89,65
Suresh,M,4/5/1987,75,55,,64,90,61,58,2
Ramesh,M,25/5/1989,25,54,89.0,76,95,87,56,74
Jessica,F,12/8/1990,78,55,86.0,63,54,89,75,45
Jennifer,F,2/9/1989,58,96,78.0,46,96,77,83,53


#Set Operations such as Union etc.,

In [0]:
duplicate  = spark.createDataFrame([('Ram','M','12/8/1990',45,54,55,34,45,34,45,67),('Jennifer','F','2/9/1989',58,96,78,46,96,77,83,53)], columns)
display(duplicate)

Name,Country,Match,Run,AVG,_6,_7,_8,_9,_10,_11
Ram,M,12/8/1990,45,54,55,34,45,34,45,67
Jennifer,F,2/9/1989,58,96,78,46,96,77,83,53


In [0]:
new_df = marks.union(duplicate)#merging dataframe
display(new_df)

Name,Gender,DOB,Maths,Physics,Chemistry,English,Biology,Economics,History,Civics
John,M,05/04/1988,55,45,56.0,87,21,52,89,65
Suresh,M,4/5/1987,75,55,,64,90,61,58,2
Ramesh,M,25/5/1989,25,54,89.0,76,95,87,56,74
Jessica,F,12/8/1990,78,55,86.0,63,54,89,75,45
Jennifer,F,2/9/1989,58,96,78.0,46,96,77,83,53
Ram,M,12/8/1990,45,54,55.0,34,45,34,45,67
Jennifer,F,2/9/1989,58,96,78.0,46,96,77,83,53


##Distinct()

In [0]:
#Distinct()
new_df.count()

Out[82]: 7

In [0]:
dist_new_df=new_df.distinct()#removing Duplicates records
dist_new_df.count()

Out[84]: 6

#join

In [0]:

emp = [(1,"Smith",-1,"2018","10","M",3000), \
    (2,"Rose",1,"2010","20","M",4000), \
    (3,"Williams",1,"2010","10","M",1000), \
    (4,"Jones",2,"2005","10","F",2000), \
    (5,"Brown",2,"2010","40","",-1), \
      (6,"Brown",2,"2010","50","",-1) \
  ]
empColumns = ["emp_id","name","superior_emp_id","year_joined", \
       "emp_dept_id","gender","salary"]

empDF = spark.createDataFrame(data=emp, schema = empColumns)



dept = [("Finance",10), \
    ("Marketing",20), \
    ("Sales",30), \
    ("IT",40) \
  ]
deptColumns = ["dept_name","dept_id"]
deptDF = spark.createDataFrame(data=dept, schema = deptColumns)
display(deptDF)


dept_name,dept_id
Finance,10
Marketing,20
Sales,30
IT,40


#inner join

In [0]:

emp_inner=empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"inner")#performing inner join based on dept_id
display(emp_inner)


emp_id,name,superior_emp_id,year_joined,emp_dept_id,gender,salary,dept_name,dept_id
1,Smith,-1,2018,10,M,3000,Finance,10
3,Williams,1,2010,10,M,1000,Finance,10
4,Jones,2,2005,10,F,2000,Finance,10
2,Rose,1,2010,20,M,4000,Marketing,20
5,Brown,2,2010,40,,-1,IT,40


#right join

In [0]:
emp_right=empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"right")
#perform right join  based on emp_dpt_id and dept_id
display(emp_right)

emp_id,name,superior_emp_id,year_joined,emp_dept_id,gender,salary,dept_name,dept_id
1.0,Smith,-1.0,2018.0,10.0,M,3000.0,Finance,10
3.0,Williams,1.0,2010.0,10.0,M,1000.0,Finance,10
4.0,Jones,2.0,2005.0,10.0,F,2000.0,Finance,10
2.0,Rose,1.0,2010.0,20.0,M,4000.0,Marketing,20
,,,,,,,Sales,30
5.0,Brown,2.0,2010.0,40.0,,-1.0,IT,40


#left join

In [0]:
emp_left=empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"left")
display(emp_left)

emp_id,name,superior_emp_id,year_joined,emp_dept_id,gender,salary,dept_name,dept_id
1,Smith,-1,2018,10,M,3000,Finance,10.0
3,Williams,1,2010,10,M,1000,Finance,10.0
4,Jones,2,2005,10,F,2000,Finance,10.0
2,Rose,1,2010,20,M,4000,Marketing,20.0
5,Brown,2,2010,40,,-1,IT,40.0
6,Brown,2,2010,50,,-1,,


#groupBy()

In [0]:
display(marks)

Name,Gender,DOB,Maths,Physics,Chemistry,English,Biology,Economics,History,Civics
John,M,05/04/1988,55,45,56.0,87,21,52,89,65
Suresh,M,4/5/1987,75,55,,64,90,61,58,2
Ramesh,M,25/5/1989,25,54,89.0,76,95,87,56,74
Jessica,F,12/8/1990,78,55,86.0,63,54,89,75,45
Jennifer,F,2/9/1989,58,96,78.0,46,96,77,83,53


In [0]:
marks=marks.withColumn("Maths",marks.Maths.cast('integer'))#changing the data types of Maths columns
marks_grp=marks.groupBy("Gender").sum("Maths")
display(marks_grp)

Gender,sum(Maths)
F,136
M,155


#sort() /orderBy()

In [0]:
display(marks.sort(marks.Maths))#sorting data based on maths score

Name,Gender,DOB,Maths,Physics,Chemistry,English,Biology,Economics,History,Civics
Ramesh,M,25/5/1989,25,54,89.0,76,95,87,56,74
John,M,05/04/1988,55,45,56.0,87,21,52,89,65
Jennifer,F,2/9/1989,58,96,78.0,46,96,77,83,53
Suresh,M,4/5/1987,75,55,,64,90,61,58,2
Jessica,F,12/8/1990,78,55,86.0,63,54,89,75,45


In [0]:
display(marks.orderBy(marks.History))

Name,Gender,DOB,Maths,Physics,Chemistry,English,Biology,Economics,History,Civics
Ramesh,M,25/5/1989,25,54,89.0,76,95,87,56,74
Suresh,M,4/5/1987,75,55,,64,90,61,58,2
Jessica,F,12/8/1990,78,55,86.0,63,54,89,75,45
Jennifer,F,2/9/1989,58,96,78.0,46,96,77,83,53
John,M,05/04/1988,55,45,56.0,87,21,52,89,65


#UDF with PySpark

In [0]:
#Create a data frame
data = [(1,"steph martinez",25),(2,"nico saurez",20),(3,"leo messi",34),(4,"erwin halland",30),(5,"cesc fabregas",32)]
columns = ["Id","Name_","Age"]

df_udf = spark.createDataFrame(data=data,schema=columns)
display(df_udf)

Id,Name_,Age
1,steph martinez,25
2,nico saurez,20
3,leo messi,34
4,erwin halland,30
5,cesc fabregas,32


In [0]:
#Create a python function
#Convert string into Camel Case(First letter capital in each word)

def convertcase(str):
    c=""
    arr=str.split(" ")
    for i in arr:
        c = c + i[0:1].upper() + i[1:len(i)] + " "
    return c

In [0]:
#define user defined function and apply into data frame

df_convertcase=udf(convertcase)
df_udf.select(col("Id"),df_convertcase(col("Name_")).alias("New_Name"),col("Age")).show()

+---+---------------+---+
| Id|       New_Name|Age|
+---+---------------+---+
|  1|Steph Martinez | 25|
|  2|   Nico Saurez | 20|
|  3|     Leo Messi | 34|
|  4| Erwin Halland | 30|
|  5| Cesc Fabregas | 32|
+---+---------------+---+



#UDF with Lambda

In [0]:
#define UDF using lambda

convertUDF = udf(lambda z: convertcase(z)) 
df_udf.select(col("Id"),convertUDF(col("Name_")).alias("New_Name"),col("Age")).show(truncate=False)

+---+---------------+---+
|Id |New_Name       |Age|
+---+---------------+---+
|1  |Steph Martinez |25 |
|2  |Nico Saurez    |20 |
|3  |Leo Messi      |34 |
|4  |Erwin Halland  |30 |
|5  |Cesc Fabregas  |32 |
+---+---------------+---+



In [0]:
#create a function for upper case
#convert string to UPPER CASE

def uppercase(str):
    return str.upper()

In [0]:
#Defined UDF
uppercaseUDF = udf(lambda z: uppercase(z))
df_udf.withColumn("Upper_Name",uppercaseUDF(col("Name_"))).show(truncate=False)

+---+--------------+---+--------------+
|Id |Name_         |Age|Upper_Name    |
+---+--------------+---+--------------+
|1  |steph martinez|25 |STEPH MARTINEZ|
|2  |nico saurez   |20 |NICO SAUREZ   |
|3  |leo messi     |34 |LEO MESSI     |
|4  |erwin halland |30 |ERWIN HALLAND |
|5  |cesc fabregas |32 |CESC FABREGAS |
+---+--------------+---+--------------+



#UDF with SQL

In [0]:
#Defined UDF using SQL

spark.udf.register("convertUDF",convertcase)
df_udf.createOrReplaceTempView("udf_table")
spark.sql(""" select *,convertUDF(Name_) AS New_Name from udf_table """).show()

+---+--------------+---+---------------+
| Id|         Name_|Age|       New_Name|
+---+--------------+---+---------------+
|  1|steph martinez| 25|Steph Martinez |
|  2|   nico saurez| 20|   Nico Saurez |
|  3|     leo messi| 34|     Leo Messi |
|  4| erwin halland| 30| Erwin Halland |
|  5| cesc fabregas| 32| Cesc Fabregas |
+---+--------------+---+---------------+



#UDF using Annotation

In [0]:
#Defined UDF using Annotation
#Here, we have to put function and defined function in the same cell

@udf(returnType=StringType())
def uppercase(str):
    return str.upper()
df_udf.withColumn("New_Name", uppercase(col("Name_"))).show()

+---+--------------+---+--------------+
| Id|         Name_|Age|      New_Name|
+---+--------------+---+--------------+
|  1|steph martinez| 25|STEPH MARTINEZ|
|  2|   nico saurez| 20|   NICO SAUREZ|
|  3|     leo messi| 34|     LEO MESSI|
|  4| erwin halland| 30| ERWIN HALLAND|
|  5| cesc fabregas| 32| CESC FABREGAS|
+---+--------------+---+--------------+



#Null value with UDF Sql

In [0]:
#Create a data frame including null values
data = [(1,"steph martinez"),(2,"nico saurez"),(3,"leo messi"),(4,"erwin halland"),(5,None)]
columns = ["Id","Name_"]
df_udf_null = spark.createDataFrame(data=data,schema=columns)
display(df_udf_null)

Id,Name_
1,steph martinez
2,nico saurez
3,leo messi
4,erwin halland
5,


In [0]:
#defined UDF using lambda and sql for null values data frame
spark.udf.register("nullUDF",lambda str:convertcase(str) if not str is None else "")
df_udf_null.createOrReplaceTempView("udf_null")
spark.sql(""" select *, nullUDF(Name_) AS new_name from udf_null """).show()

+---+--------------+---------------+
| Id|         Name_|       new_name|
+---+--------------+---------------+
|  1|steph martinez|Steph Martinez |
|  2|   nico saurez|   Nico Saurez |
|  3|     leo messi|     Leo Messi |
|  4| erwin halland| Erwin Halland |
|  5|          null|               |
+---+--------------+---------------+



#Optimization Techniques

##Cache vs Persist
 - Using cache() and persist() methods, Spark provides an optimization mechanism to store the intermediate computation of an RDD, DataFrame, and Dataset so they can be reused in subsequent actions(reusing the RDD, Dataframe, and Dataset computation result’s).
 - Both caching and persisting are used to save the Spark RDD, Dataframe, and Dataset’s. But, the difference is, RDD cache() method default saves it to memory (MEMORY_ONLY) whereas persist() method is used to store it to the user-defined storage level.
 - When you persist a dataset, each node stores its partitioned data in memory and reuses them in other actions on that dataset. And Spark’s persisted data on nodes are fault-tolerant meaning if any partition of a Dataset is lost, it will automatically be recomputed using the original transformations that created it.
 
 
***Advantages for Caching and Persistence***
 - Below are the advantages of using Spark Cache and Persist methods.
   - ***Cost efficient*** – Spark computations are very expensive hence reusing the computations are used to save cost.
   - ***Time efficient*** – Reusing the repeated computations saves lots of time.
   - ***Execution time*** – Saves execution time of the job and we can perform more jobs on the same cluster.

##Spark Cache
- Spark DataFrame or Dataset caching by default saves it to storage level `MEMORY_AND_DISK` because recomputing the in-memory columnar representation of the underlying table is expensive. Note that this is different from the default cache level of `RDD.cache()` which is ‘MEMORY_ONLY‘.

In [0]:

columns = ["Seqno","Quote"]
data = [("1", "Be the change that you wish to see in the world"),
("2", "Everyone thinks of changing the world, but no one thinks of changing himself."),
("3", "The purpose of our lives is to be happy.")]

#Persist store the data into Size on disk
#when we defined data frame, at the end write persist()
#Under the output of Saprk Jobs go to the  last job and select View, then go to the Storage
#Under the Storage we can see the file size in Size on disk column
df=spark.createDataFrame(data, columns).persist(pyspark.StorageLevel.DISK_ONLY)
display(df)

Seqno,Quote
1,Be the change that you wish to see in the world
2,"Everyone thinks of changing the world, but no one thinks of changing himself."
3,The purpose of our lives is to be happy.


In [0]:
#cache store the data into Size on disk
#when we defined data frame, at the end write cache()
#Under the output of Saprk Jobs go to the  last job and select View, then go to the Storage
#Under the Storage we can see the file size in Size on disk column
df=spark.createDataFrame(data, columns).cache()
display(df)

Seqno,Quote
1,Be the change that you wish to see in the world
2,"Everyone thinks of changing the world, but no one thinks of changing himself."
3,The purpose of our lives is to be happy.
