**Printing the Dataset**

In [0]:
from pyspark.sql import SparkSession
import pyspark
spark = SparkSession.builder.appName("ExampleApp").getOrCreate()

In [0]:
#creating a dataset
data = [("James","","Smith","36636","M",60000),
        ("Michael","Rose","","40288","M",70000),
        ("Robert","","Williams","42114","",400000),
        ("Maria","Anne","Jones","39192","F",500000),
        ("Jen","Mary","Brown","","F",0)]
columns = ["first_name","middle_name","last_name","dob","gender","salary"]
pysparkDF = spark.createDataFrame(data = data, schema = columns)
pysparkDF.printSchema()
pysparkDF.show(truncate=False)

root
 |-- first_name: string (nullable = true)
 |-- middle_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: long (nullable = true)

+----------+-----------+---------+-----+------+------+
|first_name|middle_name|last_name|dob  |gender|salary|
+----------+-----------+---------+-----+------+------+
|James     |           |Smith    |36636|M     |60000 |
|Michael   |Rose       |         |40288|M     |70000 |
|Robert    |           |Williams |42114|      |400000|
|Maria     |Anne       |Jones    |39192|F     |500000|
|Jen       |Mary       |Brown    |     |F     |0     |
+----------+-----------+---------+-----+------+------+



# ***SPARK FUNCTIONS***

**when**

In [0]:
import pyspark
from pyspark.sql.functions import *
from pyspark.sql.functions import when
spark = SparkSession.builder.appName("ExampleApp").getOrCreate()

data2 = [('Ravi', 'Dubey', 'Male', 12000),
        ('Gururaj', 'Pal','Male',20000),
        ('Tribuvana', 'Das', 'Female', 10500),
        ('Malathi', 'Ramaswamy','Female',20000)
        ]

columns = ['First Name', 'Last Name', 'Gender', 'Salary']
pysparkDF = spark.createDataFrame(data = data2, schema=columns)
#pysparkDF.printSchema()
pysparkDF.show(truncate=False)

+----------+---------+------+------+
|First Name|Last Name|Gender|Salary|
+----------+---------+------+------+
|Ravi      |Dubey    |Male  |12000 |
|Gururaj   |Pal      |Male  |20000 |
|Tribuvana |Das      |Female|10500 |
|Malathi   |Ramaswamy|Female|20000 |
+----------+---------+------+------+



In [0]:
#Add new column Gen_Abb 
new_df = pysparkDF.withColumn("Gender_Abb", when(col("Gender")=='Male', 'M')
                   .when(col("Gender")=='Female',"F").otherwise(col("Gender")))
new_df.show()


+----------+---------+------+------+----------+
|First Name|Last Name|Gender|Salary|Gender_Abb|
+----------+---------+------+------+----------+
|      Ravi|    Dubey|  Male| 12000|         M|
|   Gururaj|      Pal|  Male| 20000|         M|
| Tribuvana|      Das|Female| 10500|         F|
|   Malathi|Ramaswamy|Female| 20000|         F|
+----------+---------+------+------+----------+



**expr**

In [0]:
from pyspark.sql.functions import *
from pyspark.sql import *
import pyspark
spark = SparkSession.builder.appName('Expr').getOrCreate()

data = [('Sagar', 'More', 12000, 'Sales'),
        ('Abhishek', 'Dubey', 13000, 'Sales'),
        ('Sheela', 'Vyas', 15000, 'HR'),
        ('Namita','Godbole', 10000, 'HR'),
        ('Radhika', 'Mathur', 23000,'IT'),
        ('Antony', 'Louis',25000, 'IT')]

columns = ['First_Name', 'Last_Name', 'Salary', 'Department']
new_data = spark.createDataFrame(data, columns)
new_data.show()

+----------+---------+------+----------+
|First_Name|Last_Name|Salary|Department|
+----------+---------+------+----------+
|     Sagar|     More| 12000|     Sales|
|  Abhishek|    Dubey| 13000|     Sales|
|    Sheela|     Vyas| 15000|        HR|
|    Namita|  Godbole| 10000|        HR|
|   Radhika|   Mathur| 23000|        IT|
|    Antony|    Louis| 25000|        IT|
+----------+---------+------+----------+



In [0]:
new_case = new_data.withColumn('Gender',expr("CASE WHEN First_Name='Abhishek' or First_Name= 'Antony' or First_Name='Sagar' THEN 'Male' "+
                                             "WHEN First_Name='Sheela' or First_Name= 'Radhika' or First_Name= 'Namita' THEN 'Female' "+
                                             "ELSE 'Unknown' END"))
new_case.show()

+----------+---------+------+----------+------+
|First_Name|Last_Name|Salary|Department|Gender|
+----------+---------+------+----------+------+
|     Sagar|     More| 12000|     Sales|  Male|
|  Abhishek|    Dubey| 13000|     Sales|  Male|
|    Sheela|     Vyas| 15000|        HR|Female|
|    Namita|  Godbole| 10000|        HR|Female|
|   Radhika|   Mathur| 23000|        IT|Female|
|    Antony|    Louis| 25000|        IT|  Male|
+----------+---------+------+----------+------+



In [0]:
new_case2 = new_data.withColumn('Degree', expr("CASE WHEN DEPARTMENT='Sales' THEN 'MBA' "+
                                               "WHEN DEPARTMENT = 'HR' THEN 'HR' "+
                                               "ELSE 'IT' END" ))
new_case2.show()

+----------+---------+------+----------+------+
|First_Name|Last_Name|Salary|Department|Degree|
+----------+---------+------+----------+------+
|     Sagar|     More| 12000|     Sales|   MBA|
|  Abhishek|    Dubey| 13000|     Sales|   MBA|
|    Sheela|     Vyas| 15000|        HR|    HR|
|    Namita|  Godbole| 10000|        HR|    HR|
|   Radhika|   Mathur| 23000|        IT|    IT|
|    Antony|    Louis| 25000|        IT|    IT|
+----------+---------+------+----------+------+



**concat_ws**

In [0]:
from pyspark.sql import *
from pyspark.sql.functions import *
import pyspark

spark = SparkSession.builder.appName('Concat_WS').getOrCreate()

data = [('Roy', ['Data Engineer', 'Data Scientist'],12000),
              ('Rashi', ['Data Analyst', 'Data Scientist'],15000),
              ('Ravi', ['Data Analyst', 'Data Scientist'],18000)
              ]
columns = ['Name', 'Designations', 'Salary']

data_conws = spark.createDataFrame(data, columns)
data_conws.show()

+-----+--------------------+------+
| Name|        Designations|Salary|
+-----+--------------------+------+
|  Roy|[Data Engineer, D...| 12000|
|Rashi|[Data Analyst, Da...| 15000|
| Ravi|[Data Analyst, Da...| 18000|
+-----+--------------------+------+



In [0]:
#Designation Array to be separated to String
from pyspark.sql.functions import *
new_data = data_conws.withColumn('Designations',
                                 concat_ws(',',col('Designations')
                                 ))
new_data.show()

+-----+--------------------+------+
| Name|        Designations|Salary|
+-----+--------------------+------+
|  Roy|Data Engineer,Dat...| 12000|
|Rashi|Data Analyst,Data...| 15000|
| Ravi|Data Analyst,Data...| 18000|
+-----+--------------------+------+



#***Math Functions***

In [0]:
#sum
from pyspark.sql import *
from pyspark.sql.functions import *
import pyspark

spark = SparkSession.builder.appName('Math_Functions').getOrCreate()

In [0]:
# Create DataFrame
data_math = [("James","Sales","NY",90000,34,10000),
    ("Michael","Sales","NY",86000,56,20000),
    ("Robert","Sales","CA",81000,30,23000),
    ("Maria","Finance","CA",90000,24,23000),
    ("Raman","Finance","CA",99000,40,24000),
    ("Scott","Finance","NY",83000,36,19000),
    ("Jen","Finance","NY",79000,53,15000),
    ("Jeff","Marketing","CA",80000,25,18000),
    ("Kumar","Marketing","NY",91000,50,21000)
  ]

columns = ["employee_name","department","state","salary","age","bonus"]
stat_data = spark.createDataFrame(data_math,columns)
stat_data.show()

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|        James|     Sales|   NY| 90000| 34|10000|
|      Michael|     Sales|   NY| 86000| 56|20000|
|       Robert|     Sales|   CA| 81000| 30|23000|
|        Maria|   Finance|   CA| 90000| 24|23000|
|        Raman|   Finance|   CA| 99000| 40|24000|
|        Scott|   Finance|   NY| 83000| 36|19000|
|          Jen|   Finance|   NY| 79000| 53|15000|
|         Jeff| Marketing|   CA| 80000| 25|18000|
|        Kumar| Marketing|   NY| 91000| 50|21000|
+-------------+----------+-----+------+---+-----+



**sum()**

In [0]:
#sum
print("Sum: " + str(stat_data.select(sum("Salary")).collect()[0][0]))

Sum: 779000


In [0]:
#Avg
print("Avg: " + str(stat_data.select(mean("Salary")).collect()[0][0]))

Avg: 86555.55555555556


In [0]:
#Min and Max
print("Min: " + str(stat_data.select(min("Salary")).collect()[0][0]))
print("Max: "+ str(stat_data.select(max('Salary')).collect()[0][0]))

Min: 79000
Max: 99000


In [0]:
#Standard Deviation
print("STD: "+ str(stat_data.select(stddev('Salary')).collect()[0][0]))

STD: 6540.472290116195


In [0]:
#Variance
print("Var: "+ str(stat_data.select(variance('Salary')).collect()[0][0]))

Var: 42777777.77777778


#***Window Functions***

**row-number**

In [0]:
from pyspark.sql import SparkSession 
from pyspark.sql.window import Window 
from pyspark.sql.functions import *

spark = SparkSession.builder.appName('Window').getOrCreate()

data = [('John', 'Sales', 12000), ('Harshal', 'IT', 24000), ('Farsana', 'HR',34000), ('Rashmi', 'Tech', 32000)]
columns = ['Name', 'Dept', 'Salary']
window_data = spark.createDataFrame(data, columns)
if window_data is not None: 
  # Define window specification 
  window_data.show()
else:
  print("DataFrame is None")

+-------+-----+------+
|   Name| Dept|Salary|
+-------+-----+------+
|   John|Sales| 12000|
|Harshal|   IT| 24000|
|Farsana|   HR| 34000|
| Rashmi| Tech| 32000|
+-------+-----+------+



**Row Number**

In [0]:
from pyspark.sql.window import Window 
from pyspark.sql.functions import rank, dense_rank,row_number, lag, lead
if window_data is not None: 
  # Define window specification 
  windows = Window.partitionBy("Name").orderBy("Salary") 
  # Add Row column 
  row_data = window_data.withColumn("Row Number", row_number().over(windows)) 
  row_data.show()
else:
  print("DataFrame is None")

+-------+-----+------+----------+
|   Name| Dept|Salary|Row Number|
+-------+-----+------+----------+
|Farsana|   HR| 34000|         1|
|Harshal|   IT| 24000|         1|
|   John|Sales| 12000|         1|
| Rashmi| Tech| 32000|         1|
+-------+-----+------+----------+



**Rank**

In [0]:

if window_data is not None: 
  # Define window specification 
  windows = Window.partitionBy("Dept").orderBy("Salary") 
  # Add Rank column 
  rank_data = window_data.withColumn("Rank", rank().over(windows)) 
  rank_data.show()
else:
  print("DataFrame is None")

+-------+-----+------+----+
|   Name| Dept|Salary|Rank|
+-------+-----+------+----+
|Farsana|   HR| 34000|   1|
|Harshal|   IT| 24000|   1|
|   John|Sales| 12000|   1|
| Rashmi| Tech| 32000|   1|
+-------+-----+------+----+



**Dense Rank**

In [0]:
if window_data is not None: 
  # Define window specification 
  windows = Window.partitionBy("Dept").orderBy("Salary") 
  # Add Dense_Rank column 
  drank_data = window_data.withColumn("Dense_Rank", dense_rank().over(windows)) 
  drank_data.show()
else:
  print("DataFrame is None")

+-------+-----+------+----------+
|   Name| Dept|Salary|Dense_Rank|
+-------+-----+------+----------+
|Farsana|   HR| 34000|         1|
|Harshal|   IT| 24000|         1|
|   John|Sales| 12000|         1|
| Rashmi| Tech| 32000|         1|
+-------+-----+------+----------+



**Lag**

In [0]:
from pyspark.sql.functions import lag

window_data.withColumn("lag",lag("Salary",2).over(windows)) \
      .show()

+-------+-----+------+----+
|   Name| Dept|Salary| lag|
+-------+-----+------+----+
|Farsana|   HR| 34000|null|
|Harshal|   IT| 24000|null|
|   John|Sales| 12000|null|
| Rashmi| Tech| 32000|null|
+-------+-----+------+----+



**Lead**

In [0]:
from pyspark.sql.functions import lead 
window_data.withColumn("lead",lead("Salary",2).over(windows)) \
      .show()

+-------+-----+------+----+
|   Name| Dept|Salary|lead|
+-------+-----+------+----+
|Farsana|   HR| 34000|null|
|Harshal|   IT| 24000|null|
|   John|Sales| 12000|null|
| Rashmi| Tech| 32000|null|
+-------+-----+------+----+

