# Creating Spark DF

In [0]:
# while we have worked with spark RDD whe created or read the data from SparkContext. So if we read the data from the SparkContext spark will provide the data as RDD
# If we read the data from SparkSession spark will provide the data in dataframe.

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Spark DataFrame').getOrCreate() # here it will create only one sparksession

df = spark.read.csv('/FileStore/tables/StudentData.csv') # this is transformation in which we are reading the data
df.show() # this is action

+---+------+----------------+------+------+-----+--------------------+
|_c0|   _c1|             _c2|   _c3|   _c4|  _c5|                 _c6|
+---+------+----------------+------+------+-----+--------------------+
|age|gender|            name|course|  roll|marks|               email|
| 28|Female| Hubert Oliveras|    DB| 02984|   59|Annika Hoffman_Na...|
| 29|Female|Toshiko Hillyard| Cloud| 12899|   62|Margene Moores_Ma...|
| 28|  Male|  Celeste Lollis|    PF| 21267|   45|Jeannetta Golden_...|
| 29|Female|    Elenore Choy|    DB| 32877|   29|Billi Clore_Mitzi...|
| 28|  Male|  Sheryll Towler|   DSA| 41487|   41|Claude Panos_Judi...|
| 28|  Male|  Margene Moores|   MVC| 52771|   32|Toshiko Hillyard_...|
| 28|  Male|     Neda Briski|   OOP| 61973|   69|Alberta Freund_El...|
| 28|Female|    Claude Panos| Cloud| 72409|   85|Sheryll Towler_Al...|
| 28|  Male|  Celeste Lollis|   MVC| 81492|   64|Nicole Harwood_Cl...|
| 29|  Male|  Cordie Harnois|   OOP| 92882|   51|Judie Chipps_Clem...|
| 29|F

In [0]:
# for handling headers

df = spark.read.option('header', True).csv('/FileStore/tables/StudentData.csv')
df.show()

+---+------+----------------+------+------+-----+--------------------+
|age|gender|            name|course|  roll|marks|               email|
+---+------+----------------+------+------+-----+--------------------+
| 28|Female| Hubert Oliveras|    DB| 02984|   59|Annika Hoffman_Na...|
| 29|Female|Toshiko Hillyard| Cloud| 12899|   62|Margene Moores_Ma...|
| 28|  Male|  Celeste Lollis|    PF| 21267|   45|Jeannetta Golden_...|
| 29|Female|    Elenore Choy|    DB| 32877|   29|Billi Clore_Mitzi...|
| 28|  Male|  Sheryll Towler|   DSA| 41487|   41|Claude Panos_Judi...|
| 28|  Male|  Margene Moores|   MVC| 52771|   32|Toshiko Hillyard_...|
| 28|  Male|     Neda Briski|   OOP| 61973|   69|Alberta Freund_El...|
| 28|Female|    Claude Panos| Cloud| 72409|   85|Sheryll Towler_Al...|
| 28|  Male|  Celeste Lollis|   MVC| 81492|   64|Nicole Harwood_Cl...|
| 29|  Male|  Cordie Harnois|   OOP| 92882|   51|Judie Chipps_Clem...|
| 29|Female|       Kena Wild|   DSA|102285|   35|Dustin Feagins_Ma...|
| 29| 

# Spark infer Schema (Schema of Dataframe)

In [0]:
df = spark.read.option('header', True).csv('/FileStore/tables/StudentData.csv')
df.printSchema() # it will provide the scheama of table (like dir in MySQL)

root
 |-- age: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- name: string (nullable = true)
 |-- course: string (nullable = true)
 |-- roll: string (nullable = true)
 |-- marks: string (nullable = true)
 |-- email: string (nullable = true)



In [0]:
df = spark.read.option('inferSchema', True).option('header', True).csv('/FileStore/tables/StudentData.csv') # now we will get actual datatypes present in the table
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- name: string (nullable = true)
 |-- course: string (nullable = true)
 |-- roll: integer (nullable = true)
 |-- marks: integer (nullable = true)
 |-- email: string (nullable = true)



In [0]:
# using options()
df = spark.read.options(inferSchema='True', header='True', delemeter=',').csv('/FileStore/tables/StudentData.csv') # no matter in which format the data is(csv, tsv, xlst,..) spark will read the data but we need to provide appropriate delemater
df.printSchema()
df.show()

root
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- name: string (nullable = true)
 |-- course: string (nullable = true)
 |-- roll: integer (nullable = true)
 |-- marks: integer (nullable = true)
 |-- email: string (nullable = true)

+---+------+----------------+------+------+-----+--------------------+
|age|gender|            name|course|  roll|marks|               email|
+---+------+----------------+------+------+-----+--------------------+
| 28|Female| Hubert Oliveras|    DB|  2984|   59|Annika Hoffman_Na...|
| 29|Female|Toshiko Hillyard| Cloud| 12899|   62|Margene Moores_Ma...|
| 28|  Male|  Celeste Lollis|    PF| 21267|   45|Jeannetta Golden_...|
| 29|Female|    Elenore Choy|    DB| 32877|   29|Billi Clore_Mitzi...|
| 28|  Male|  Sheryll Towler|   DSA| 41487|   41|Claude Panos_Judi...|
| 28|  Male|  Margene Moores|   MVC| 52771|   32|Toshiko Hillyard_...|
| 28|  Male|     Neda Briski|   OOP| 61973|   69|Alberta Freund_El...|
| 28|Female|    Claude P

# Spark Provide Schema

In [0]:
# if we want to change the datatypes of schema then..

#if we do 'inferschema' by default values will get add for(2-int, 2.3-float, 'abc'-string). If we want to explicetly changet the types follow the below staps

from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql import SparkSession

schemaa = StructType([
    StructField('age', IntegerType(), True),
    StructField('gender', StringType(), True),
    StructField('name', StringType(), True),
    StructField('course', StringType(), True),
    StructField('roll', StringType(), True), #check the difference here
    StructField('marks', IntegerType(), True),
    StructField('email', StringType(), True)
])

spark = SparkSession.builder.appName('Provide Schema').getOrCreate()
df = spark.read.options(header='True').schema(schemaa).csv('/FileStore/tables/StudentData.csv')
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- name: string (nullable = true)
 |-- course: string (nullable = true)
 |-- roll: string (nullable = true)
 |-- marks: integer (nullable = true)
 |-- email: string (nullable = true)



# Create DF from RDD

In [0]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

# creating DF
spark = SparkSession.builder.appName('DF').getOrCreate()

# creating RDD
conf = SparkConf().setAppName('RDD')
sc = SparkContext.getOrCreate(conf=conf)
rdd = sc.textFile('/FileStore/tables/StudentData.csv')

# basic transformations to remove headers
header = rdd.first()
rdd1 = rdd.filter(lambda x: x != header).map(lambda x: x.split(','))
rdd1.collect()

Out[12]: [['28',
  'Female',
  'Hubert Oliveras',
  'DB',
  '02984',
  '59',
  'Annika Hoffman_Naoma Fritts@OOP.com'],
 ['29',
  'Female',
  'Toshiko Hillyard',
  'Cloud',
  '12899',
  '62',
  'Margene Moores_Marylee Capasso@DB.com'],
 ['28',
  'Male',
  'Celeste Lollis',
  'PF',
  '21267',
  '45',
  'Jeannetta Golden_Jenna Montague@DSA.com'],
 ['29',
  'Female',
  'Elenore Choy',
  'DB',
  '32877',
  '29',
  'Billi Clore_Mitzi Seldon@DB.com'],
 ['28',
  'Male',
  'Sheryll Towler',
  'DSA',
  '41487',
  '41',
  'Claude Panos_Judie Chipps@OOP.com'],
 ['28',
  'Male',
  'Margene Moores',
  'MVC',
  '52771',
  '32',
  'Toshiko Hillyard_Clementina Menke@MVC.com'],
 ['28',
  'Male',
  'Neda Briski',
  'OOP',
  '61973',
  '69',
  'Alberta Freund_Elenore Choy@DB.com'],
 ['28',
  'Female',
  'Claude Panos',
  'Cloud',
  '72409',
  '85',
  'Sheryll Towler_Alberta Freund@Cloud.com'],
 ['28',
  'Male',
  'Celeste Lollis',
  'MVC',
  '81492',
  '64',
  'Nicole Harwood_Claude Panos@MVC.com'],
 ['29

In [0]:
# converting above modified RDD to DF

dfRDD = rdd1.toDF()
dfRDD.show()

+---+------+----------------+-----+------+---+--------------------+
| _1|    _2|              _3|   _4|    _5| _6|                  _7|
+---+------+----------------+-----+------+---+--------------------+
| 28|Female| Hubert Oliveras|   DB| 02984| 59|Annika Hoffman_Na...|
| 29|Female|Toshiko Hillyard|Cloud| 12899| 62|Margene Moores_Ma...|
| 28|  Male|  Celeste Lollis|   PF| 21267| 45|Jeannetta Golden_...|
| 29|Female|    Elenore Choy|   DB| 32877| 29|Billi Clore_Mitzi...|
| 28|  Male|  Sheryll Towler|  DSA| 41487| 41|Claude Panos_Judi...|
| 28|  Male|  Margene Moores|  MVC| 52771| 32|Toshiko Hillyard_...|
| 28|  Male|     Neda Briski|  OOP| 61973| 69|Alberta Freund_El...|
| 28|Female|    Claude Panos|Cloud| 72409| 85|Sheryll Towler_Al...|
| 28|  Male|  Celeste Lollis|  MVC| 81492| 64|Nicole Harwood_Cl...|
| 29|  Male|  Cordie Harnois|  OOP| 92882| 51|Judie Chipps_Clem...|
| 29|Female|       Kena Wild|  DSA|102285| 35|Dustin Feagins_Ma...|
| 29|  Male| Ernest Rossbach|   DB|111449| 53|Ma

In [0]:
# adding header to the DF
columns = header.split(',')
dfRDD = rdd1.toDF(columns)
# dfRDD.show()
dfRDD.printSchema()

root
 |-- age: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- name: string (nullable = true)
 |-- course: string (nullable = true)
 |-- roll: string (nullable = true)
 |-- marks: string (nullable = true)
 |-- email: string (nullable = true)



In [0]:
# for changing the data types
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

schemaa = StructType([
    StructField('age', IntegerType(), True),
    StructField('gender', StringType(), True),
    StructField('name', StringType(), True),
    StructField('course', StringType(), True),
    StructField('roll', StringType(), True),
    StructField('marks', IntegerType(), True),
    StructField('email', StringType(), True)
])


dfRDD = spark.createDataFrame(rdd1, schema=schemaa)
dfRDD.printSchema()
dfRDD.show() # some abnormality with databricks ignore the error

root
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- name: string (nullable = true)
 |-- course: string (nullable = true)
 |-- roll: string (nullable = true)
 |-- marks: integer (nullable = true)
 |-- email: string (nullable = true)



[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m                             Traceback (most recent call last)
[0;32m<command-1563385443433437>[0m in [0;36m<module>[0;34m[0m
[1;32m     15[0m [0mdfRDD[0m [0;34m=[0m [0mspark[0m[0;34m.[0m[0mcreateDataFrame[0m[0;34m([0m[0mrdd1[0m[0;34m,[0m [0mschema[0m[0;34m=[0m[0mschemaa[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[1;32m     16[0m [0mdfRDD[0m[0;34m.[0m[0mprintSchema[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;32m---> 17[0;31m [0mdfRDD[0m[0;34m.[0m[0mshow[0m[0;34m([0m[0;34m)[0m [0;31m# some abnormality with databricks ignore the error[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;32m/databricks/spark/python/pyspark/sql/dataframe.py[0m in [0;36mshow[0;34m(self, n, truncate, vertical)[0m
[1;32m    500[0m [0;34m[0m[0m
[1;32m    501[0m         [0;32mif[0m [0misinstance[0m[0;34m([0m[0mtruncate[0m[0;34m,[0m [0m

# Rectifying the Error

In [0]:
rdd2 = rdd1.map(lambda x: [int(x[0]), x[1], x[2], x[3], x[4], int(x[5]), x[6]])

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

schemaa = StructType([
    StructField('age', IntegerType(), True),
    StructField('gender', StringType(), True),
    StructField('name', StringType(), True),
    StructField('course', StringType(), True),
    StructField('roll', StringType(), True),
    StructField('marks', IntegerType(), True),
    StructField('email', StringType(), True)
])

In [0]:
dfRDD = spark.createDataFrame(rdd2, schema=schemaa)
dfRDD.printSchema()
dfRDD.show()

root
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- name: string (nullable = true)
 |-- course: string (nullable = true)
 |-- roll: string (nullable = true)
 |-- marks: integer (nullable = true)
 |-- email: string (nullable = true)

+---+------+----------------+------+------+-----+--------------------+
|age|gender|            name|course|  roll|marks|               email|
+---+------+----------------+------+------+-----+--------------------+
| 28|Female| Hubert Oliveras|    DB| 02984|   59|Annika Hoffman_Na...|
| 29|Female|Toshiko Hillyard| Cloud| 12899|   62|Margene Moores_Ma...|
| 28|  Male|  Celeste Lollis|    PF| 21267|   45|Jeannetta Golden_...|
| 29|Female|    Elenore Choy|    DB| 32877|   29|Billi Clore_Mitzi...|
| 28|  Male|  Sheryll Towler|   DSA| 41487|   41|Claude Panos_Judi...|
| 28|  Male|  Margene Moores|   MVC| 52771|   32|Toshiko Hillyard_...|
| 28|  Male|     Neda Briski|   OOP| 61973|   69|Alberta Freund_El...|
| 28|Female|    Claude Pa

# Select DF Columns

In [0]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('Select Colums').getOrCreate()
df = spark.read.options(header='True', inferSchema='True').csv('/FileStore/tables/StudentData.csv')

df.show()

+---+------+----------------+------+------+-----+--------------------+
|age|gender|            name|course|  roll|marks|               email|
+---+------+----------------+------+------+-----+--------------------+
| 28|Female| Hubert Oliveras|    DB|  2984|   59|Annika Hoffman_Na...|
| 29|Female|Toshiko Hillyard| Cloud| 12899|   62|Margene Moores_Ma...|
| 28|  Male|  Celeste Lollis|    PF| 21267|   45|Jeannetta Golden_...|
| 29|Female|    Elenore Choy|    DB| 32877|   29|Billi Clore_Mitzi...|
| 28|  Male|  Sheryll Towler|   DSA| 41487|   41|Claude Panos_Judi...|
| 28|  Male|  Margene Moores|   MVC| 52771|   32|Toshiko Hillyard_...|
| 28|  Male|     Neda Briski|   OOP| 61973|   69|Alberta Freund_El...|
| 28|Female|    Claude Panos| Cloud| 72409|   85|Sheryll Towler_Al...|
| 28|  Male|  Celeste Lollis|   MVC| 81492|   64|Nicole Harwood_Cl...|
| 29|  Male|  Cordie Harnois|   OOP| 92882|   51|Judie Chipps_Clem...|
| 29|Female|       Kena Wild|   DSA|102285|   35|Dustin Feagins_Ma...|
| 29| 

In [0]:
# method 1
df.select('name', 'gender').show() # select() is transformation and show()is action

+----------------+------+
|            name|gender|
+----------------+------+
| Hubert Oliveras|Female|
|Toshiko Hillyard|Female|
|  Celeste Lollis|  Male|
|    Elenore Choy|Female|
|  Sheryll Towler|  Male|
|  Margene Moores|  Male|
|     Neda Briski|  Male|
|    Claude Panos|Female|
|  Celeste Lollis|  Male|
|  Cordie Harnois|  Male|
|       Kena Wild|Female|
| Ernest Rossbach|  Male|
|  Latia Vanhoose|Female|
|  Latia Vanhoose|Female|
|     Neda Briski|  Male|
|  Latia Vanhoose|Female|
|  Loris Crossett|  Male|
|  Annika Hoffman|  Male|
|   Santa Kerfien|  Male|
|Mickey Cortright|Female|
+----------------+------+
only showing top 20 rows



In [0]:
# method 2
df.select(df.name, df.email).show()

+----------------+--------------------+
|            name|               email|
+----------------+--------------------+
| Hubert Oliveras|Annika Hoffman_Na...|
|Toshiko Hillyard|Margene Moores_Ma...|
|  Celeste Lollis|Jeannetta Golden_...|
|    Elenore Choy|Billi Clore_Mitzi...|
|  Sheryll Towler|Claude Panos_Judi...|
|  Margene Moores|Toshiko Hillyard_...|
|     Neda Briski|Alberta Freund_El...|
|    Claude Panos|Sheryll Towler_Al...|
|  Celeste Lollis|Nicole Harwood_Cl...|
|  Cordie Harnois|Judie Chipps_Clem...|
|       Kena Wild|Dustin Feagins_Ma...|
| Ernest Rossbach|Maybell Duguay_Ab...|
|  Latia Vanhoose|Latia Vanhoose_Mi...|
|  Latia Vanhoose|Eda Neathery_Nico...|
|     Neda Briski|Margene Moores_Mi...|
|  Latia Vanhoose|Claude Panos_Sant...|
|  Loris Crossett|Mitzi Seldon_Jenn...|
|  Annika Hoffman|Taryn Brownlee_Mi...|
|   Santa Kerfien|Judie Chipps_Tary...|
|Mickey Cortright|Ernest Rossbach_M...|
+----------------+--------------------+
only showing top 20 rows



In [0]:
# method 3
from pyspark.sql.functions import col

df.select(col('name'), col('marks')).show()

+----------------+-----+
|            name|marks|
+----------------+-----+
| Hubert Oliveras|   59|
|Toshiko Hillyard|   62|
|  Celeste Lollis|   45|
|    Elenore Choy|   29|
|  Sheryll Towler|   41|
|  Margene Moores|   32|
|     Neda Briski|   69|
|    Claude Panos|   85|
|  Celeste Lollis|   64|
|  Cordie Harnois|   51|
|       Kena Wild|   35|
| Ernest Rossbach|   53|
|  Latia Vanhoose|   27|
|  Latia Vanhoose|   55|
|     Neda Briski|   42|
|  Latia Vanhoose|   27|
|  Loris Crossett|   36|
|  Annika Hoffman|   22|
|   Santa Kerfien|   56|
|Mickey Cortright|   62|
+----------------+-----+
only showing top 20 rows



In [0]:
# method 4
df.select('*').show()

+---+------+----------------+------+------+-----+--------------------+
|age|gender|            name|course|  roll|marks|               email|
+---+------+----------------+------+------+-----+--------------------+
| 28|Female| Hubert Oliveras|    DB|  2984|   59|Annika Hoffman_Na...|
| 29|Female|Toshiko Hillyard| Cloud| 12899|   62|Margene Moores_Ma...|
| 28|  Male|  Celeste Lollis|    PF| 21267|   45|Jeannetta Golden_...|
| 29|Female|    Elenore Choy|    DB| 32877|   29|Billi Clore_Mitzi...|
| 28|  Male|  Sheryll Towler|   DSA| 41487|   41|Claude Panos_Judi...|
| 28|  Male|  Margene Moores|   MVC| 52771|   32|Toshiko Hillyard_...|
| 28|  Male|     Neda Briski|   OOP| 61973|   69|Alberta Freund_El...|
| 28|Female|    Claude Panos| Cloud| 72409|   85|Sheryll Towler_Al...|
| 28|  Male|  Celeste Lollis|   MVC| 81492|   64|Nicole Harwood_Cl...|
| 29|  Male|  Cordie Harnois|   OOP| 92882|   51|Judie Chipps_Clem...|
| 29|Female|       Kena Wild|   DSA|102285|   35|Dustin Feagins_Ma...|
| 29| 

In [0]:
# method 5
df.columns

Out[49]: ['age', 'gender', 'name', 'course', 'roll', 'marks', 'email']

In [0]:
df.columns[2]

Out[50]: 'name'

In [0]:
df.select(df.columns[4], df.columns[2]).show()

+------+----------------+
|  roll|            name|
+------+----------------+
|  2984| Hubert Oliveras|
| 12899|Toshiko Hillyard|
| 21267|  Celeste Lollis|
| 32877|    Elenore Choy|
| 41487|  Sheryll Towler|
| 52771|  Margene Moores|
| 61973|     Neda Briski|
| 72409|    Claude Panos|
| 81492|  Celeste Lollis|
| 92882|  Cordie Harnois|
|102285|       Kena Wild|
|111449| Ernest Rossbach|
|122502|  Latia Vanhoose|
|132110|  Latia Vanhoose|
|141770|     Neda Briski|
|152159|  Latia Vanhoose|
|161771|  Loris Crossett|
|171660|  Annika Hoffman|
|182129|   Santa Kerfien|
|192537|Mickey Cortright|
+------+----------------+
only showing top 20 rows



In [0]:
# method 6
from pyspark.sql.functions import col
df.select(df.columns[4], 'name', df.course, col('marks')).show() # by this way we can filter the columns

+------+----------------+------+-----+
|  roll|            name|course|marks|
+------+----------------+------+-----+
|  2984| Hubert Oliveras|    DB|   59|
| 12899|Toshiko Hillyard| Cloud|   62|
| 21267|  Celeste Lollis|    PF|   45|
| 32877|    Elenore Choy|    DB|   29|
| 41487|  Sheryll Towler|   DSA|   41|
| 52771|  Margene Moores|   MVC|   32|
| 61973|     Neda Briski|   OOP|   69|
| 72409|    Claude Panos| Cloud|   85|
| 81492|  Celeste Lollis|   MVC|   64|
| 92882|  Cordie Harnois|   OOP|   51|
|102285|       Kena Wild|   DSA|   35|
|111449| Ernest Rossbach|    DB|   53|
|122502|  Latia Vanhoose|    DB|   27|
|132110|  Latia Vanhoose|   MVC|   55|
|141770|     Neda Briski|    PF|   42|
|152159|  Latia Vanhoose|    DB|   27|
|161771|  Loris Crossett|   MVC|   36|
|171660|  Annika Hoffman|   OOP|   22|
|182129|   Santa Kerfien|    PF|   56|
|192537|Mickey Cortright|    DB|   62|
+------+----------------+------+-----+
only showing top 20 rows



# Spark DF WithColumn

In [0]:
# using withColumn() we can add/ change datatype of particular column

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('WithColumn').getOrCreate()
df = spark.read.options(header='True', inferSchema='True').csv('/FileStore/tables/StudentData.csv')

df.printSchema()
df.show()

root
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- name: string (nullable = true)
 |-- course: string (nullable = true)
 |-- roll: integer (nullable = true)
 |-- marks: integer (nullable = true)
 |-- email: string (nullable = true)

+---+------+----------------+------+------+-----+--------------------+
|age|gender|            name|course|  roll|marks|               email|
+---+------+----------------+------+------+-----+--------------------+
| 28|Female| Hubert Oliveras|    DB|  2984|   59|Annika Hoffman_Na...|
| 29|Female|Toshiko Hillyard| Cloud| 12899|   62|Margene Moores_Ma...|
| 28|  Male|  Celeste Lollis|    PF| 21267|   45|Jeannetta Golden_...|
| 29|Female|    Elenore Choy|    DB| 32877|   29|Billi Clore_Mitzi...|
| 28|  Male|  Sheryll Towler|   DSA| 41487|   41|Claude Panos_Judi...|
| 28|  Male|  Margene Moores|   MVC| 52771|   32|Toshiko Hillyard_...|
| 28|  Male|     Neda Briski|   OOP| 61973|   69|Alberta Freund_El...|
| 28|Female|    Claude P

In [0]:
from pyspark.sql.functions import col
conDF = df.withColumn('roll', col('roll').cast('String'))
conDF.printSchema()

root
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- name: string (nullable = true)
 |-- course: string (nullable = true)
 |-- roll: string (nullable = true)
 |-- marks: integer (nullable = true)
 |-- email: string (nullable = true)



In [0]:
df.show()

+---+------+----------------+------+------+-----+--------------------+
|age|gender|            name|course|  roll|marks|               email|
+---+------+----------------+------+------+-----+--------------------+
| 28|Female| Hubert Oliveras|    DB|  2984|   59|Annika Hoffman_Na...|
| 29|Female|Toshiko Hillyard| Cloud| 12899|   62|Margene Moores_Ma...|
| 28|  Male|  Celeste Lollis|    PF| 21267|   45|Jeannetta Golden_...|
| 29|Female|    Elenore Choy|    DB| 32877|   29|Billi Clore_Mitzi...|
| 28|  Male|  Sheryll Towler|   DSA| 41487|   41|Claude Panos_Judi...|
| 28|  Male|  Margene Moores|   MVC| 52771|   32|Toshiko Hillyard_...|
| 28|  Male|     Neda Briski|   OOP| 61973|   69|Alberta Freund_El...|
| 28|Female|    Claude Panos| Cloud| 72409|   85|Sheryll Towler_Al...|
| 28|  Male|  Celeste Lollis|   MVC| 81492|   64|Nicole Harwood_Cl...|
| 29|  Male|  Cordie Harnois|   OOP| 92882|   51|Judie Chipps_Clem...|
| 29|Female|       Kena Wild|   DSA|102285|   35|Dustin Feagins_Ma...|
| 29| 

In [0]:
from pyspark.sql.functions import col

# df.withColumn('marks', df.marks + 10).show()
# df.withColumn('marks', df[5] + 10).show()
df.withColumn('marks', col('marks') + 10).show()

+---+------+----------------+------+------+-----+--------------------+
|age|gender|            name|course|  roll|marks|               email|
+---+------+----------------+------+------+-----+--------------------+
| 28|Female| Hubert Oliveras|    DB|  2984|   69|Annika Hoffman_Na...|
| 29|Female|Toshiko Hillyard| Cloud| 12899|   72|Margene Moores_Ma...|
| 28|  Male|  Celeste Lollis|    PF| 21267|   55|Jeannetta Golden_...|
| 29|Female|    Elenore Choy|    DB| 32877|   39|Billi Clore_Mitzi...|
| 28|  Male|  Sheryll Towler|   DSA| 41487|   51|Claude Panos_Judi...|
| 28|  Male|  Margene Moores|   MVC| 52771|   42|Toshiko Hillyard_...|
| 28|  Male|     Neda Briski|   OOP| 61973|   79|Alberta Freund_El...|
| 28|Female|    Claude Panos| Cloud| 72409|   95|Sheryll Towler_Al...|
| 28|  Male|  Celeste Lollis|   MVC| 81492|   74|Nicole Harwood_Cl...|
| 29|  Male|  Cordie Harnois|   OOP| 92882|   61|Judie Chipps_Clem...|
| 29|Female|       Kena Wild|   DSA|102285|   45|Dustin Feagins_Ma...|
| 29| 

In [0]:
df.withColumn('updated marks', df[5] + 10).show()

+---+------+----------------+------+------+-----+--------------------+-------------+
|age|gender|            name|course|  roll|marks|               email|updated marks|
+---+------+----------------+------+------+-----+--------------------+-------------+
| 28|Female| Hubert Oliveras|    DB|  2984|   59|Annika Hoffman_Na...|           69|
| 29|Female|Toshiko Hillyard| Cloud| 12899|   62|Margene Moores_Ma...|           72|
| 28|  Male|  Celeste Lollis|    PF| 21267|   45|Jeannetta Golden_...|           55|
| 29|Female|    Elenore Choy|    DB| 32877|   29|Billi Clore_Mitzi...|           39|
| 28|  Male|  Sheryll Towler|   DSA| 41487|   41|Claude Panos_Judi...|           51|
| 28|  Male|  Margene Moores|   MVC| 52771|   32|Toshiko Hillyard_...|           42|
| 28|  Male|     Neda Briski|   OOP| 61973|   69|Alberta Freund_El...|           79|
| 28|Female|    Claude Panos| Cloud| 72409|   85|Sheryll Towler_Al...|           95|
| 28|  Male|  Celeste Lollis|   MVC| 81492|   64|Nicole Harwood_C

In [0]:
from pyspark.sql.functions import lit
df.withColumn('country', lit('USA')).show()

+---+------+----------------+------+------+-----+--------------------+-------+
|age|gender|            name|course|  roll|marks|               email|country|
+---+------+----------------+------+------+-----+--------------------+-------+
| 28|Female| Hubert Oliveras|    DB|  2984|   59|Annika Hoffman_Na...|    USA|
| 29|Female|Toshiko Hillyard| Cloud| 12899|   62|Margene Moores_Ma...|    USA|
| 28|  Male|  Celeste Lollis|    PF| 21267|   45|Jeannetta Golden_...|    USA|
| 29|Female|    Elenore Choy|    DB| 32877|   29|Billi Clore_Mitzi...|    USA|
| 28|  Male|  Sheryll Towler|   DSA| 41487|   41|Claude Panos_Judi...|    USA|
| 28|  Male|  Margene Moores|   MVC| 52771|   32|Toshiko Hillyard_...|    USA|
| 28|  Male|     Neda Briski|   OOP| 61973|   69|Alberta Freund_El...|    USA|
| 28|Female|    Claude Panos| Cloud| 72409|   85|Sheryll Towler_Al...|    USA|
| 28|  Male|  Celeste Lollis|   MVC| 81492|   64|Nicole Harwood_Cl...|    USA|
| 29|  Male|  Cordie Harnois|   OOP| 92882|   51|Jud

In [0]:
updatedDF = df.withColumn('updated marks', df.marks + 10).withColumn('country', lit('USA'))

In [0]:
df.show()

+---+------+----------------+------+------+-----+--------------------+
|age|gender|            name|course|  roll|marks|               email|
+---+------+----------------+------+------+-----+--------------------+
| 28|Female| Hubert Oliveras|    DB|  2984|   59|Annika Hoffman_Na...|
| 29|Female|Toshiko Hillyard| Cloud| 12899|   62|Margene Moores_Ma...|
| 28|  Male|  Celeste Lollis|    PF| 21267|   45|Jeannetta Golden_...|
| 29|Female|    Elenore Choy|    DB| 32877|   29|Billi Clore_Mitzi...|
| 28|  Male|  Sheryll Towler|   DSA| 41487|   41|Claude Panos_Judi...|
| 28|  Male|  Margene Moores|   MVC| 52771|   32|Toshiko Hillyard_...|
| 28|  Male|     Neda Briski|   OOP| 61973|   69|Alberta Freund_El...|
| 28|Female|    Claude Panos| Cloud| 72409|   85|Sheryll Towler_Al...|
| 28|  Male|  Celeste Lollis|   MVC| 81492|   64|Nicole Harwood_Cl...|
| 29|  Male|  Cordie Harnois|   OOP| 92882|   51|Judie Chipps_Clem...|
| 29|Female|       Kena Wild|   DSA|102285|   35|Dustin Feagins_Ma...|
| 29| 

In [0]:
updatedDF.show()

+---+------+----------------+------+------+-----+--------------------+-------------+-------+
|age|gender|            name|course|  roll|marks|               email|updated marks|country|
+---+------+----------------+------+------+-----+--------------------+-------------+-------+
| 28|Female| Hubert Oliveras|    DB|  2984|   59|Annika Hoffman_Na...|           69|    USA|
| 29|Female|Toshiko Hillyard| Cloud| 12899|   62|Margene Moores_Ma...|           72|    USA|
| 28|  Male|  Celeste Lollis|    PF| 21267|   45|Jeannetta Golden_...|           55|    USA|
| 29|Female|    Elenore Choy|    DB| 32877|   29|Billi Clore_Mitzi...|           39|    USA|
| 28|  Male|  Sheryll Towler|   DSA| 41487|   41|Claude Panos_Judi...|           51|    USA|
| 28|  Male|  Margene Moores|   MVC| 52771|   32|Toshiko Hillyard_...|           42|    USA|
| 28|  Male|     Neda Briski|   OOP| 61973|   69|Alberta Freund_El...|           79|    USA|
| 28|Female|    Claude Panos| Cloud| 72409|   85|Sheryll Towler_Al...|

# Spark DF withColumnRenamed and Alias

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit

spark = SparkSession.builder.appName('withColumnRenamed').getOrCreate()
df = spark.read.options(header='True', inferSchema='True').csv('/FileStore/tables/StudentData.csv')
df.show()

+---+------+----------------+------+------+-----+--------------------+
|age|gender|            name|course|  roll|marks|               email|
+---+------+----------------+------+------+-----+--------------------+
| 28|Female| Hubert Oliveras|    DB|  2984|   59|Annika Hoffman_Na...|
| 29|Female|Toshiko Hillyard| Cloud| 12899|   62|Margene Moores_Ma...|
| 28|  Male|  Celeste Lollis|    PF| 21267|   45|Jeannetta Golden_...|
| 29|Female|    Elenore Choy|    DB| 32877|   29|Billi Clore_Mitzi...|
| 28|  Male|  Sheryll Towler|   DSA| 41487|   41|Claude Panos_Judi...|
| 28|  Male|  Margene Moores|   MVC| 52771|   32|Toshiko Hillyard_...|
| 28|  Male|     Neda Briski|   OOP| 61973|   69|Alberta Freund_El...|
| 28|Female|    Claude Panos| Cloud| 72409|   85|Sheryll Towler_Al...|
| 28|  Male|  Celeste Lollis|   MVC| 81492|   64|Nicole Harwood_Cl...|
| 29|  Male|  Cordie Harnois|   OOP| 92882|   51|Judie Chipps_Clem...|
| 29|Female|       Kena Wild|   DSA|102285|   35|Dustin Feagins_Ma...|
| 29| 

In [0]:
df.withColumnRenamed('gender', 'sex').show() # here we can change the column name permanently by adding an new dataframe

+---+------+----------------+------+------+-----+--------------------+
|age|   sex|            name|course|  roll|marks|               email|
+---+------+----------------+------+------+-----+--------------------+
| 28|Female| Hubert Oliveras|    DB|  2984|   59|Annika Hoffman_Na...|
| 29|Female|Toshiko Hillyard| Cloud| 12899|   62|Margene Moores_Ma...|
| 28|  Male|  Celeste Lollis|    PF| 21267|   45|Jeannetta Golden_...|
| 29|Female|    Elenore Choy|    DB| 32877|   29|Billi Clore_Mitzi...|
| 28|  Male|  Sheryll Towler|   DSA| 41487|   41|Claude Panos_Judi...|
| 28|  Male|  Margene Moores|   MVC| 52771|   32|Toshiko Hillyard_...|
| 28|  Male|     Neda Briski|   OOP| 61973|   69|Alberta Freund_El...|
| 28|Female|    Claude Panos| Cloud| 72409|   85|Sheryll Towler_Al...|
| 28|  Male|  Celeste Lollis|   MVC| 81492|   64|Nicole Harwood_Cl...|
| 29|  Male|  Cordie Harnois|   OOP| 92882|   51|Judie Chipps_Clem...|
| 29|Female|       Kena Wild|   DSA|102285|   35|Dustin Feagins_Ma...|
| 29| 

In [0]:
df.select(col('name').alias('Full Name')).show() # alias() will not change column name permanently it just for reading purpose

+----------------+
|       Full Name|
+----------------+
| Hubert Oliveras|
|Toshiko Hillyard|
|  Celeste Lollis|
|    Elenore Choy|
|  Sheryll Towler|
|  Margene Moores|
|     Neda Briski|
|    Claude Panos|
|  Celeste Lollis|
|  Cordie Harnois|
|       Kena Wild|
| Ernest Rossbach|
|  Latia Vanhoose|
|  Latia Vanhoose|
|     Neda Briski|
|  Latia Vanhoose|
|  Loris Crossett|
|  Annika Hoffman|
|   Santa Kerfien|
|Mickey Cortright|
+----------------+
only showing top 20 rows



# Spark DF Filter rows

In [0]:
df.filter(df.course == 'DB').show()
df.filter(df.course == 'DB').count()

+---+------+-----------------+------+-------+-----+--------------------+
|age|gender|             name|course|   roll|marks|               email|
+---+------+-----------------+------+-------+-----+--------------------+
| 28|Female|  Hubert Oliveras|    DB|   2984|   59|Annika Hoffman_Na...|
| 29|Female|     Elenore Choy|    DB|  32877|   29|Billi Clore_Mitzi...|
| 29|  Male|  Ernest Rossbach|    DB| 111449|   53|Maybell Duguay_Ab...|
| 28|Female|   Latia Vanhoose|    DB| 122502|   27|Latia Vanhoose_Mi...|
| 29|Female|   Latia Vanhoose|    DB| 152159|   27|Claude Panos_Sant...|
| 28|Female| Mickey Cortright|    DB| 192537|   62|Ernest Rossbach_M...|
| 28|Female|      Anna Santos|    DB| 311589|   79|Celeste Lollis_Mi...|
| 28|  Male|    Kizzy Brenner|    DB| 381712|   36|Paris Hutton_Kena...|
| 28|  Male| Toshiko Hillyard|    DB| 392218|   47|Leontine Phillips...|
| 29|  Male|     Paris Hutton|    DB| 481229|   57|Clementina Menke_...|
| 28|Female| Mickey Cortright|    DB| 551389|   43|

In [0]:
df.filter(col('course') == 'DB').show()

+---+------+-----------------+------+-------+-----+--------------------+
|age|gender|             name|course|   roll|marks|               email|
+---+------+-----------------+------+-------+-----+--------------------+
| 28|Female|  Hubert Oliveras|    DB|   2984|   59|Annika Hoffman_Na...|
| 29|Female|     Elenore Choy|    DB|  32877|   29|Billi Clore_Mitzi...|
| 29|  Male|  Ernest Rossbach|    DB| 111449|   53|Maybell Duguay_Ab...|
| 28|Female|   Latia Vanhoose|    DB| 122502|   27|Latia Vanhoose_Mi...|
| 29|Female|   Latia Vanhoose|    DB| 152159|   27|Claude Panos_Sant...|
| 28|Female| Mickey Cortright|    DB| 192537|   62|Ernest Rossbach_M...|
| 28|Female|      Anna Santos|    DB| 311589|   79|Celeste Lollis_Mi...|
| 28|  Male|    Kizzy Brenner|    DB| 381712|   36|Paris Hutton_Kena...|
| 28|  Male| Toshiko Hillyard|    DB| 392218|   47|Leontine Phillips...|
| 29|  Male|     Paris Hutton|    DB| 481229|   57|Clementina Menke_...|
| 28|Female| Mickey Cortright|    DB| 551389|   43|

In [0]:
df.filter((df.course == 'DB') & (df.marks > 50)).show()

+---+------+------------------+------+-------+-----+--------------------+
|age|gender|              name|course|   roll|marks|               email|
+---+------+------------------+------+-------+-----+--------------------+
| 28|Female|   Hubert Oliveras|    DB|   2984|   59|Annika Hoffman_Na...|
| 29|  Male|   Ernest Rossbach|    DB| 111449|   53|Maybell Duguay_Ab...|
| 28|Female|  Mickey Cortright|    DB| 192537|   62|Ernest Rossbach_M...|
| 28|Female|       Anna Santos|    DB| 311589|   79|Celeste Lollis_Mi...|
| 29|  Male|      Paris Hutton|    DB| 481229|   57|Clementina Menke_...|
| 28|Female|   Hubert Oliveras|    DB| 771081|   79|Kizzy Brenner_Dus...|
| 29|Female|      Elenore Choy|    DB| 811824|   55|Maybell Duguay_Me...|
| 29|  Male|  Clementina Menke|    DB| 882200|   76|Michelle Ruggiero...|
| 29|Female|   Sebrina Maresca|    DB| 922210|   54|Toshiko Hillyard_...|
| 29|  Male|      Naoma Fritts|    DB| 931295|   79|Hubert Oliveras_S...|
| 29|Female|      Claude Panos|    DB|

In [0]:
courses = ['DB', 'Cloud', 'OOP', 'DSA']
df.filter(df.course.isin(courses)).show()

+---+------+----------------+------+------+-----+--------------------+
|age|gender|            name|course|  roll|marks|               email|
+---+------+----------------+------+------+-----+--------------------+
| 28|Female| Hubert Oliveras|    DB|  2984|   59|Annika Hoffman_Na...|
| 29|Female|Toshiko Hillyard| Cloud| 12899|   62|Margene Moores_Ma...|
| 29|Female|    Elenore Choy|    DB| 32877|   29|Billi Clore_Mitzi...|
| 28|  Male|  Sheryll Towler|   DSA| 41487|   41|Claude Panos_Judi...|
| 28|  Male|     Neda Briski|   OOP| 61973|   69|Alberta Freund_El...|
| 28|Female|    Claude Panos| Cloud| 72409|   85|Sheryll Towler_Al...|
| 29|  Male|  Cordie Harnois|   OOP| 92882|   51|Judie Chipps_Clem...|
| 29|Female|       Kena Wild|   DSA|102285|   35|Dustin Feagins_Ma...|
| 29|  Male| Ernest Rossbach|    DB|111449|   53|Maybell Duguay_Ab...|
| 28|Female|  Latia Vanhoose|    DB|122502|   27|Latia Vanhoose_Mi...|
| 29|Female|  Latia Vanhoose|    DB|152159|   27|Claude Panos_Sant...|
| 29| 

In [0]:
df.filter(df.course.startswith('D')).show()

+---+------+----------------+------+------+-----+--------------------+
|age|gender|            name|course|  roll|marks|               email|
+---+------+----------------+------+------+-----+--------------------+
| 28|Female| Hubert Oliveras|    DB|  2984|   59|Annika Hoffman_Na...|
| 29|Female|    Elenore Choy|    DB| 32877|   29|Billi Clore_Mitzi...|
| 28|  Male|  Sheryll Towler|   DSA| 41487|   41|Claude Panos_Judi...|
| 29|Female|       Kena Wild|   DSA|102285|   35|Dustin Feagins_Ma...|
| 29|  Male| Ernest Rossbach|    DB|111449|   53|Maybell Duguay_Ab...|
| 28|Female|  Latia Vanhoose|    DB|122502|   27|Latia Vanhoose_Mi...|
| 29|Female|  Latia Vanhoose|    DB|152159|   27|Claude Panos_Sant...|
| 28|Female|Mickey Cortright|    DB|192537|   62|Ernest Rossbach_M...|
| 28|Female|    Jc Andrepont|   DSA|232060|   58|Billi Clore_Abram...|
| 29|Female|    Paris Hutton|   DSA|271472|   99|Sheryll Towler_Al...|
| 28|Female|  Dustin Feagins|   DSA|291984|   82|Abram Nagao_Kena ...|
| 28|F

In [0]:
df.filter(df.course.endswith('A')).show()

+---+------+----------------+------+-------+-----+--------------------+
|age|gender|            name|course|   roll|marks|               email|
+---+------+----------------+------+-------+-----+--------------------+
| 28|  Male|  Sheryll Towler|   DSA|  41487|   41|Claude Panos_Judi...|
| 29|Female|       Kena Wild|   DSA| 102285|   35|Dustin Feagins_Ma...|
| 28|Female|    Jc Andrepont|   DSA| 232060|   58|Billi Clore_Abram...|
| 29|Female|    Paris Hutton|   DSA| 271472|   99|Sheryll Towler_Al...|
| 28|Female|  Dustin Feagins|   DSA| 291984|   82|Abram Nagao_Kena ...|
| 28|Female|Mickey Cortright|   DSA| 342003|   44|Mitzi Seldon_Jean...|
| 29|Female|     Anna Santos|   DSA| 411479|   42|Kena Wild_Mitzi S...|
| 28|Female|  Maybell Duguay|   DSA| 452141|   29|Leontine Phillips...|
| 29|Female|    Paris Hutton|   DSA| 492159|   60|Nicole Harwood_Ma...|
| 29|  Male|  Celeste Lollis|   DSA| 562065|   85|Jc Andrepont_Mela...|
| 29|  Male|  Maybell Duguay|   DSA| 592061|   83|Eda Neathery_J

In [0]:
df.filter(df.name.endswith('Wild')).show()

+---+------+---------+------+-------+-----+--------------------+
|age|gender|     name|course|   roll|marks|               email|
+---+------+---------+------+-------+-----+--------------------+
| 29|Female|Kena Wild|   DSA| 102285|   35|Dustin Feagins_Ma...|
| 28|Female|Kena Wild| Cloud| 221750|   60|Mitzi Seldon_Jenn...|
| 28|  Male|Kena Wild|    DB|2031530|   93|Jc Andrepont_Jc A...|
| 28|Female|Kena Wild|    PF|2931712|   52|Melani Engberg_Le...|
| 28|Female|Kena Wild|    PF|3291691|   96|Tamera Blakley_Na...|
| 28|Female|Kena Wild|    DB|4431030|   59|Paris Hutton_Cher...|
| 28|Female|Kena Wild|    DB|5001412|   53|Annika Hoffman_Ja...|
| 29|  Male|Kena Wild|   DSA|6092008|   76|Sheryll Towler_An...|
| 28|Female|Kena Wild|   DSA|6201485|   85|Niki Klimek_Cleme...|
| 28|Female|Kena Wild|   DSA|6831241|   94|Loris Crossett_Le...|
| 29|Female|Kena Wild|    DB|6911600|   73|Maybell Duguay_La...|
| 29|Female|Kena Wild|   DSA|7621891|   69|Marylee Capasso_M...|
| 28|Female|Kena Wild|   

In [0]:
df.filter(df.name.contains('se')).show() # it will returen every thing that contains 'se' in it

+---+------+--------------+------+-------+-----+--------------------+
|age|gender|          name|course|   roll|marks|               email|
+---+------+--------------+------+-------+-----+--------------------+
| 28|Female|Latia Vanhoose|    DB| 122502|   27|Latia Vanhoose_Mi...|
| 29|Female|Latia Vanhoose|   MVC| 132110|   55|Eda Neathery_Nico...|
| 29|Female|Latia Vanhoose|    DB| 152159|   27|Claude Panos_Sant...|
| 29|  Male|Loris Crossett|   MVC| 161771|   36|Mitzi Seldon_Jenn...|
| 29|Female|Loris Crossett|    PF| 201487|   96|Elenore Choy_Lati...|
| 28|Female|Loris Crossett|    PF| 332739|   62|Michelle Ruggiero...|
| 29|  Male|Loris Crossett|    PF| 911593|   46|Gonzalo Ferebee_M...|
| 28|Female|Loris Crossett|   DSA|1662549|   86|Paris Hutton_Lati...|
| 29|  Male|Latia Vanhoose| Cloud|1832268|   60|Marylee Capasso_S...|
| 29|  Male|Latia Vanhoose|   OOP|2372748|   94|Latia Vanhoose_La...|
| 28|Female|Loris Crossett|   OOP|2691881|   29|Maybell Duguay_Ni...|
| 28|  Male|Loris Cr

In [0]:
df.filter(df.name.like('%e_')).show() # works same as SQL

+---+------+------------------+------+------+-----+--------------------+
|age|gender|              name|course|  roll|marks|               email|
+---+------+------------------+------+------+-----+--------------------+
| 28|  Male|    Sheryll Towler|   DSA| 41487|   41|Claude Panos_Judi...|
| 28|  Male|    Margene Moores|   MVC| 52771|   32|Toshiko Hillyard_...|
| 29|  Male|     Santa Kerfien|    PF|182129|   56|Judie Chipps_Tary...|
| 29|Female|       Niki Klimek|    PF|211508|   23|Cordie Harnois_Ju...|
| 29|Female|      Cheri Kenney| Cloud|281408|   43|Annika Hoffman_Me...|
| 28|Female|      Cheri Kenney|   MVC|321816|   24|Kena Wild_Michell...|
| 28|  Male|     Kizzy Brenner|    DB|381712|   36|Paris Hutton_Kena...|
| 28|Female|     Kizzy Brenner| Cloud|402409|   27|Ernest Rossbach_G...|
| 29|Female|    Somer Stoecker| Cloud|442028|   34|Taryn Brownlee_Ta...|
| 28|  Male|    Margene Moores|    PF|531530|   53|Cheri Kenney_Mela...|
| 28|Female|    Somer Stoecker| Cloud|612490|   82|

# Quiz (select, withColumn, filter)

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit

spark = SparkSession.builder.appName('Q1').getOrCreate()
df = spark.read.options(header='True', inferSchema='True').csv('gender')
df.show()

+---+------+----------------+------+------+-----+--------------------+
|age|gender|            name|course|  roll|marks|               email|
+---+------+----------------+------+------+-----+--------------------+
| 28|Female| Hubert Oliveras|    DB|  2984|   59|Annika Hoffman_Na...|
| 29|Female|Toshiko Hillyard| Cloud| 12899|   62|Margene Moores_Ma...|
| 28|  Male|  Celeste Lollis|    PF| 21267|   45|Jeannetta Golden_...|
| 29|Female|    Elenore Choy|    DB| 32877|   29|Billi Clore_Mitzi...|
| 28|  Male|  Sheryll Towler|   DSA| 41487|   41|Claude Panos_Judi...|
| 28|  Male|  Margene Moores|   MVC| 52771|   32|Toshiko Hillyard_...|
| 28|  Male|     Neda Briski|   OOP| 61973|   69|Alberta Freund_El...|
| 28|Female|    Claude Panos| Cloud| 72409|   85|Sheryll Towler_Al...|
| 28|  Male|  Celeste Lollis|   MVC| 81492|   64|Nicole Harwood_Cl...|
| 29|  Male|  Cordie Harnois|   OOP| 92882|   51|Judie Chipps_Clem...|
| 29|Female|       Kena Wild|   DSA|102285|   35|Dustin Feagins_Ma...|
| 29| 

In [0]:
df1 = df.withColumn('total_marks', lit(120))
df1.show()

+---+------+----------------+------+------+-----+--------------------+-----------+
|age|gender|            name|course|  roll|marks|               email|total_marks|
+---+------+----------------+------+------+-----+--------------------+-----------+
| 28|Female| Hubert Oliveras|    DB|  2984|   59|Annika Hoffman_Na...|        120|
| 29|Female|Toshiko Hillyard| Cloud| 12899|   62|Margene Moores_Ma...|        120|
| 28|  Male|  Celeste Lollis|    PF| 21267|   45|Jeannetta Golden_...|        120|
| 29|Female|    Elenore Choy|    DB| 32877|   29|Billi Clore_Mitzi...|        120|
| 28|  Male|  Sheryll Towler|   DSA| 41487|   41|Claude Panos_Judi...|        120|
| 28|  Male|  Margene Moores|   MVC| 52771|   32|Toshiko Hillyard_...|        120|
| 28|  Male|     Neda Briski|   OOP| 61973|   69|Alberta Freund_El...|        120|
| 28|Female|    Claude Panos| Cloud| 72409|   85|Sheryll Towler_Al...|        120|
| 28|  Male|  Celeste Lollis|   MVC| 81492|   64|Nicole Harwood_Cl...|        120|
| 29

In [0]:
df2 = df1.withColumn('average', (col('marks') / col('total_marks')) * 100)
df2.show()

+---+------+----------------+------+------+-----+--------------------+-----------+------------------+
|age|gender|            name|course|  roll|marks|               email|total_marks|           average|
+---+------+----------------+------+------+-----+--------------------+-----------+------------------+
| 28|Female| Hubert Oliveras|    DB|  2984|   59|Annika Hoffman_Na...|        120|49.166666666666664|
| 29|Female|Toshiko Hillyard| Cloud| 12899|   62|Margene Moores_Ma...|        120| 51.66666666666667|
| 28|  Male|  Celeste Lollis|    PF| 21267|   45|Jeannetta Golden_...|        120|              37.5|
| 29|Female|    Elenore Choy|    DB| 32877|   29|Billi Clore_Mitzi...|        120|24.166666666666668|
| 28|  Male|  Sheryll Towler|   DSA| 41487|   41|Claude Panos_Judi...|        120|34.166666666666664|
| 28|  Male|  Margene Moores|   MVC| 52771|   32|Toshiko Hillyard_...|        120|26.666666666666668|
| 28|  Male|     Neda Briski|   OOP| 61973|   69|Alberta Freund_El...|        120|

In [0]:
df_OOP = df2.filter((col('course') == 'OOP') & (col('average') > 80))
df_OOP.show()

+---+------+------------------+------+-------+-----+--------------------+-----------+-----------------+
|age|gender|              name|course|   roll|marks|               email|total_marks|          average|
+---+------+------------------+------+-------+-----+--------------------+-----------+-----------------+
| 28|  Male|    Jenna Montague|   OOP|3331161|   98|Leontine Phillips...|        120|81.66666666666667|
| 29|Female|Priscila Tavernier|   OOP|3902993|   99|Celeste Lollis_Bi...|        120|             82.5|
| 28|Female|      Judie Chipps|   OOP|5451977|   99|Tamera Blakley_Mi...|        120|             82.5|
| 29|  Male|    Margene Moores|   OOP|5621072|   97|Sheryll Towler_Ma...|        120|80.83333333333333|
| 29|  Male|      Jc Andrepont|   OOP|8022618|   97|Cordie Harnois_Ja...|        120|80.83333333333333|
| 28|  Male|    Loris Crossett|   OOP|8172914|   98|Paris Hutton_Pari...|        120|81.66666666666667|
| 28|  Male|    Loris Crossett|   OOP|9692316|   99|Judie Chipps

In [0]:
# df_Cloud = df2.filter((df.course == 'Cloud') & (df.average > 60))
df_Cloud = df2.filter((col('course') == 'Cloud') & (col('average') > 60))

df_Cloud.show()

+---+------+-----------------+------+-------+-----+--------------------+-----------+-----------------+
|age|gender|             name|course|   roll|marks|               email|total_marks|          average|
+---+------+-----------------+------+-------+-----+--------------------+-----------+-----------------+
| 28|Female|     Claude Panos| Cloud|  72409|   85|Sheryll Towler_Al...|        120|70.83333333333334|
| 29|  Male|      Billi Clore| Cloud| 512047|   76|Taryn Brownlee_Ju...|        120|63.33333333333333|
| 28|Female|   Somer Stoecker| Cloud| 612490|   82|Sebrina Maresca_G...|        120|68.33333333333333|
| 29|Female|     Judie Chipps| Cloud| 632793|   75|Tijuana Kropf_Ele...|        120|             62.5|
| 29|Female|     Eda Neathery| Cloud|1011971|   91|Margene Moores_El...|        120|75.83333333333333|
| 28|  Male|   Bonita Higuera| Cloud|1312294|   94|Eda Neathery_Pris...|        120|78.33333333333333|
| 29|Female|  Hubert Oliveras| Cloud|1392791|   94|Anna Santos_Alber...| 

In [0]:
df.select(col('name'), col('marks')).show()

+----------------+-----+
|            name|marks|
+----------------+-----+
| Hubert Oliveras|   59|
|Toshiko Hillyard|   62|
|  Celeste Lollis|   45|
|    Elenore Choy|   29|
|  Sheryll Towler|   41|
|  Margene Moores|   32|
|     Neda Briski|   69|
|    Claude Panos|   85|
|  Celeste Lollis|   64|
|  Cordie Harnois|   51|
|       Kena Wild|   35|
| Ernest Rossbach|   53|
|  Latia Vanhoose|   27|
|  Latia Vanhoose|   55|
|     Neda Briski|   42|
|  Latia Vanhoose|   27|
|  Loris Crossett|   36|
|  Annika Hoffman|   22|
|   Santa Kerfien|   56|
|Mickey Cortright|   62|
+----------------+-----+
only showing top 20 rows



# Spark DF (Count, Distinct, Duplicate)

In [0]:
df.show()

+---+------+----------------+------+------+-----+--------------------+
|age|gender|            name|course|  roll|marks|               email|
+---+------+----------------+------+------+-----+--------------------+
| 28|Female| Hubert Oliveras|    DB|  2984|   59|Annika Hoffman_Na...|
| 29|Female|Toshiko Hillyard| Cloud| 12899|   62|Margene Moores_Ma...|
| 28|  Male|  Celeste Lollis|    PF| 21267|   45|Jeannetta Golden_...|
| 29|Female|    Elenore Choy|    DB| 32877|   29|Billi Clore_Mitzi...|
| 28|  Male|  Sheryll Towler|   DSA| 41487|   41|Claude Panos_Judi...|
| 28|  Male|  Margene Moores|   MVC| 52771|   32|Toshiko Hillyard_...|
| 28|  Male|     Neda Briski|   OOP| 61973|   69|Alberta Freund_El...|
| 28|Female|    Claude Panos| Cloud| 72409|   85|Sheryll Towler_Al...|
| 28|  Male|  Celeste Lollis|   MVC| 81492|   64|Nicole Harwood_Cl...|
| 29|  Male|  Cordie Harnois|   OOP| 92882|   51|Judie Chipps_Clem...|
| 29|Female|       Kena Wild|   DSA|102285|   35|Dustin Feagins_Ma...|
| 29| 

In [0]:
print('Total no of enrolments: ', df.count())

print('Total no of enrolments for DB: ', df.filter(df.course == 'DB').count())

Total no of enrolments:  1000
Total no of enrolments for DB:  157


In [0]:
df.select(df.gender, df.age).distinct().show()# it will give only unique values

+------+---+
|gender|age|
+------+---+
|Female| 29|
|Female| 28|
|  Male| 28|
|  Male| 29|
+------+---+



In [0]:
df.dropDuplicates(['gender','course']).show()

+---+------+----------------+------+------+-----+--------------------+
|age|gender|            name|course|  roll|marks|               email|
+---+------+----------------+------+------+-----+--------------------+
| 29|Female|Toshiko Hillyard| Cloud| 12899|   62|Margene Moores_Ma...|
| 28|Female| Hubert Oliveras|    DB|  2984|   59|Annika Hoffman_Na...|
| 29|Female|       Kena Wild|   DSA|102285|   35|Dustin Feagins_Ma...|
| 29|Female|  Latia Vanhoose|   MVC|132110|   55|Eda Neathery_Nico...|
| 28|Female|  Alberta Freund|   OOP|251805|   83|Annika Hoffman_Sh...|
| 29|Female|  Loris Crossett|    PF|201487|   96|Elenore Choy_Lati...|
| 29|  Male|     Billi Clore| Cloud|512047|   76|Taryn Brownlee_Ju...|
| 29|  Male| Ernest Rossbach|    DB|111449|   53|Maybell Duguay_Ab...|
| 28|  Male|  Sheryll Towler|   DSA| 41487|   41|Claude Panos_Judi...|
| 28|  Male|  Margene Moores|   MVC| 52771|   32|Toshiko Hillyard_...|
| 28|  Male|     Neda Briski|   OOP| 61973|   69|Alberta Freund_El...|
| 28| 

# Quiz (Distinct, Duplicate)

In [0]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('Distinct and Duplicate').getOrCreate()
df = spark.read.options(header='True', inferschema='True').csv('/FileStore/tables/StudentData.csv')

In [0]:
df1 = df.dropDuplicates(['age', 'gender', 'course'])

print(df1.count())
df1.show()

24
+---+------+----------------+------+------+-----+--------------------+
|age|gender|            name|course|  roll|marks|               email|
+---+------+----------------+------+------+-----+--------------------+
| 28|Female|    Claude Panos| Cloud| 72409|   85|Sheryll Towler_Al...|
| 28|Female| Hubert Oliveras|    DB|  2984|   59|Annika Hoffman_Na...|
| 28|Female|    Jc Andrepont|   DSA|232060|   58|Billi Clore_Abram...|
| 28|Female|    Cheri Kenney|   MVC|321816|   24|Kena Wild_Michell...|
| 28|Female|  Alberta Freund|   OOP|251805|   83|Annika Hoffman_Sh...|
| 28|Female|  Loris Crossett|    PF|332739|   62|Michelle Ruggiero...|
| 28|  Male|  Annika Hoffman| Cloud|722193|   55|Taryn Brownlee_El...|
| 28|  Male|   Kizzy Brenner|    DB|381712|   36|Paris Hutton_Kena...|
| 28|  Male|  Sheryll Towler|   DSA| 41487|   41|Claude Panos_Judi...|
| 28|  Male|  Margene Moores|   MVC| 52771|   32|Toshiko Hillyard_...|
| 28|  Male|     Neda Briski|   OOP| 61973|   69|Alberta Freund_El...|
| 2

In [0]:
df2 = df.select(df.age, df.gender, df.course).distinct()

print(df2.count())
df2.show()

24
+---+------+------+
|age|gender|course|
+---+------+------+
| 28|Female|    DB|
| 28|Female|   MVC|
| 29|  Male|    PF|
| 28|Female|   OOP|
| 29|Female| Cloud|
| 29|  Male| Cloud|
| 29|Female|   DSA|
| 29|  Male|   OOP|
| 29|  Male|    DB|
| 28|Female|    PF|
| 29|Female|   MVC|
| 29|Female|    DB|
| 28|  Male|    PF|
| 28|Female| Cloud|
| 29|  Male|   MVC|
| 28|  Male|   OOP|
| 28|  Male|   DSA|
| 29|Female|    PF|
| 28|  Male|   MVC|
| 28|  Male|    DB|
+---+------+------+
only showing top 20 rows



# Spark DF (sort, orderBy)

In [0]:
df.show()

+---+------+----------------+------+------+-----+--------------------+
|age|gender|            name|course|  roll|marks|               email|
+---+------+----------------+------+------+-----+--------------------+
| 28|Female| Hubert Oliveras|    DB|  2984|   59|Annika Hoffman_Na...|
| 29|Female|Toshiko Hillyard| Cloud| 12899|   62|Margene Moores_Ma...|
| 28|  Male|  Celeste Lollis|    PF| 21267|   45|Jeannetta Golden_...|
| 29|Female|    Elenore Choy|    DB| 32877|   29|Billi Clore_Mitzi...|
| 28|  Male|  Sheryll Towler|   DSA| 41487|   41|Claude Panos_Judi...|
| 28|  Male|  Margene Moores|   MVC| 52771|   32|Toshiko Hillyard_...|
| 28|  Male|     Neda Briski|   OOP| 61973|   69|Alberta Freund_El...|
| 28|Female|    Claude Panos| Cloud| 72409|   85|Sheryll Towler_Al...|
| 28|  Male|  Celeste Lollis|   MVC| 81492|   64|Nicole Harwood_Cl...|
| 29|  Male|  Cordie Harnois|   OOP| 92882|   51|Judie Chipps_Clem...|
| 29|Female|       Kena Wild|   DSA|102285|   35|Dustin Feagins_Ma...|
| 29| 

In [0]:
df.sort(df.marks, 'age').show()

+---+------+-----------------+------+-------+-----+--------------------+
|age|gender|             name|course|   roll|marks|               email|
+---+------+-----------------+------+-------+-----+--------------------+
| 28|  Male|  Marylee Capasso|   DSA|2081560|   20|Sheryll Towler_Do...|
| 28|Female|   Maybell Duguay| Cloud| 261439|   20|Nicole Harwood_Ju...|
| 28|Female|     Jc Andrepont|    PF| 972733|   20|Eda Neathery_Eda ...|
| 29|Female|   Tamera Blakley|   DSA|3911247|   20|Donna Yerby_Bonit...|
| 29|  Male|   Jalisa Swenson|   OOP|4751515|   20|Annika Hoffman_Hu...|
| 29|Female|  Gonzalo Ferebee|   DSA|5631172|   20|Jeannetta Golden_...|
| 29|  Male|Michelle Ruggiero|    PF|6001585|   20|Paris Hutton_Marg...|
| 29|Female|Michelle Ruggiero|    DB|9232210|   20|Donna Yerby_Latia...|
| 29|  Male|     Elenore Choy|    DB|3652057|   20|Jc Andrepont_Gonz...|
| 28|  Male|      Abram Nagao| Cloud|2652463|   21|Eda Neathery_Anni...|
| 28|  Male|   Bonita Higuera| Cloud|7671835|   21|

In [0]:
# both sort() and orderBy() does the same thind
df.orderBy(col('marks'), df[0]).show()

+---+------+-----------------+------+-------+-----+--------------------+
|age|gender|             name|course|   roll|marks|               email|
+---+------+-----------------+------+-------+-----+--------------------+
| 28|  Male|  Marylee Capasso|   DSA|2081560|   20|Sheryll Towler_Do...|
| 28|Female|   Maybell Duguay| Cloud| 261439|   20|Nicole Harwood_Ju...|
| 28|Female|     Jc Andrepont|    PF| 972733|   20|Eda Neathery_Eda ...|
| 29|Female|   Tamera Blakley|   DSA|3911247|   20|Donna Yerby_Bonit...|
| 29|  Male|   Jalisa Swenson|   OOP|4751515|   20|Annika Hoffman_Hu...|
| 29|Female|  Gonzalo Ferebee|   DSA|5631172|   20|Jeannetta Golden_...|
| 29|  Male|Michelle Ruggiero|    PF|6001585|   20|Paris Hutton_Marg...|
| 29|Female|Michelle Ruggiero|    DB|9232210|   20|Donna Yerby_Latia...|
| 29|  Male|     Elenore Choy|    DB|3652057|   20|Jc Andrepont_Gonz...|
| 28|  Male|      Abram Nagao| Cloud|2652463|   21|Eda Neathery_Anni...|
| 28|  Male|   Bonita Higuera| Cloud|7671835|   21|

In [0]:
df.sort(df.marks.asc(), df.age.desc()).show() # sorting in assencing and descending order

+---+------+-----------------+------+-------+-----+--------------------+
|age|gender|             name|course|   roll|marks|               email|
+---+------+-----------------+------+-------+-----+--------------------+
| 29|Female|Michelle Ruggiero|    DB|9232210|   20|Donna Yerby_Latia...|
| 29|  Male|     Elenore Choy|    DB|3652057|   20|Jc Andrepont_Gonz...|
| 29|  Male|   Jalisa Swenson|   OOP|4751515|   20|Annika Hoffman_Hu...|
| 29|Female|  Gonzalo Ferebee|   DSA|5631172|   20|Jeannetta Golden_...|
| 29|Female|   Tamera Blakley|   DSA|3911247|   20|Donna Yerby_Bonit...|
| 29|  Male|Michelle Ruggiero|    PF|6001585|   20|Paris Hutton_Marg...|
| 28|Female|   Maybell Duguay| Cloud| 261439|   20|Nicole Harwood_Ju...|
| 28|  Male|  Marylee Capasso|   DSA|2081560|   20|Sheryll Towler_Do...|
| 28|Female|     Jc Andrepont|    PF| 972733|   20|Eda Neathery_Eda ...|
| 29|  Male|  Sebrina Maresca| Cloud|5042394|   21|Donna Yerby_Miche...|
| 29|Female|   Nicole Harwood|    PF|8121198|   21|

In [0]:
df.orderBy(df.marks.asc(), df.age.desc()).show()
# sortind work perfectly on integers but with strings it will not throw any error by it wont work fine

+---+------+-----------------+------+-------+-----+--------------------+
|age|gender|             name|course|   roll|marks|               email|
+---+------+-----------------+------+-------+-----+--------------------+
| 29|Female|Michelle Ruggiero|    DB|9232210|   20|Donna Yerby_Latia...|
| 29|  Male|     Elenore Choy|    DB|3652057|   20|Jc Andrepont_Gonz...|
| 29|  Male|   Jalisa Swenson|   OOP|4751515|   20|Annika Hoffman_Hu...|
| 29|Female|  Gonzalo Ferebee|   DSA|5631172|   20|Jeannetta Golden_...|
| 29|Female|   Tamera Blakley|   DSA|3911247|   20|Donna Yerby_Bonit...|
| 29|  Male|Michelle Ruggiero|    PF|6001585|   20|Paris Hutton_Marg...|
| 28|Female|   Maybell Duguay| Cloud| 261439|   20|Nicole Harwood_Ju...|
| 28|  Male|  Marylee Capasso|   DSA|2081560|   20|Sheryll Towler_Do...|
| 28|Female|     Jc Andrepont|    PF| 972733|   20|Eda Neathery_Eda ...|
| 29|  Male|  Sebrina Maresca| Cloud|5042394|   21|Donna Yerby_Miche...|
| 29|Female|   Nicole Harwood|    PF|8121198|   21|

# Quiz (sort, orderBy)

In [0]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('sork quiz').getOrCreate()
df = spark.read.options(header='True', inferschema='True').csv('/FileStore/tables/OfficeData.csv')
df.show()

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|        James|     Sales|   NY| 90000| 34|10000|
|      Michael|     Sales|   NY| 86000| 56|20000|
|       Robert|     Sales|   CA| 81000| 30|23000|
|        Maria|   Finance|   CA| 90000| 24|23000|
|        Raman|   Finance|   CA| 99000| 40|24000|
|        Scott|   Finance|   NY| 83000| 36|19000|
|          Jen|   Finance|   NY| 79000| 53|15000|
|         Jeff| Marketing|   CA| 80000| 25|18000|
|        Kumar| Marketing|   NY| 91000| 50|21000|
+-------------+----------+-----+------+---+-----+



In [0]:
df.sort(df.bonus).show()

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|        James|     Sales|   NY| 90000| 34|10000|
|          Jen|   Finance|   NY| 79000| 53|15000|
|         Jeff| Marketing|   CA| 80000| 25|18000|
|        Scott|   Finance|   NY| 83000| 36|19000|
|      Michael|     Sales|   NY| 86000| 56|20000|
|        Kumar| Marketing|   NY| 91000| 50|21000|
|       Robert|     Sales|   CA| 81000| 30|23000|
|        Maria|   Finance|   CA| 90000| 24|23000|
|        Raman|   Finance|   CA| 99000| 40|24000|
+-------------+----------+-----+------+---+-----+



In [0]:
df.sort(df.age.desc(), df.salary.asc()).show()

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|      Michael|     Sales|   NY| 86000| 56|20000|
|          Jen|   Finance|   NY| 79000| 53|15000|
|        Kumar| Marketing|   NY| 91000| 50|21000|
|        Raman|   Finance|   CA| 99000| 40|24000|
|        Scott|   Finance|   NY| 83000| 36|19000|
|        James|     Sales|   NY| 90000| 34|10000|
|       Robert|     Sales|   CA| 81000| 30|23000|
|         Jeff| Marketing|   CA| 80000| 25|18000|
|        Maria|   Finance|   CA| 90000| 24|23000|
+-------------+----------+-----+------+---+-----+



In [0]:
df.sort(df.age.desc(), df.bonus.desc(), df.salary.asc()).show()

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|      Michael|     Sales|   NY| 86000| 56|20000|
|          Jen|   Finance|   NY| 79000| 53|15000|
|        Kumar| Marketing|   NY| 91000| 50|21000|
|        Raman|   Finance|   CA| 99000| 40|24000|
|        Scott|   Finance|   NY| 83000| 36|19000|
|        James|     Sales|   NY| 90000| 34|10000|
|       Robert|     Sales|   CA| 81000| 30|23000|
|         Jeff| Marketing|   CA| 80000| 25|18000|
|        Maria|   Finance|   CA| 90000| 24|23000|
+-------------+----------+-----+------+---+-----+



# Spark DF (Group By)

In [0]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('Distinct and Duplicate').getOrCreate()
df = spark.read.options(header='True', inferschema='True').csv('/FileStore/tables/StudentData.csv')
df.show()

+---+------+----------------+------+------+-----+--------------------+
|age|gender|            name|course|  roll|marks|               email|
+---+------+----------------+------+------+-----+--------------------+
| 28|Female| Hubert Oliveras|    DB|  2984|   59|Annika Hoffman_Na...|
| 29|Female|Toshiko Hillyard| Cloud| 12899|   62|Margene Moores_Ma...|
| 28|  Male|  Celeste Lollis|    PF| 21267|   45|Jeannetta Golden_...|
| 29|Female|    Elenore Choy|    DB| 32877|   29|Billi Clore_Mitzi...|
| 28|  Male|  Sheryll Towler|   DSA| 41487|   41|Claude Panos_Judi...|
| 28|  Male|  Margene Moores|   MVC| 52771|   32|Toshiko Hillyard_...|
| 28|  Male|     Neda Briski|   OOP| 61973|   69|Alberta Freund_El...|
| 28|Female|    Claude Panos| Cloud| 72409|   85|Sheryll Towler_Al...|
| 28|  Male|  Celeste Lollis|   MVC| 81492|   64|Nicole Harwood_Cl...|
| 29|  Male|  Cordie Harnois|   OOP| 92882|   51|Judie Chipps_Clem...|
| 29|Female|       Kena Wild|   DSA|102285|   35|Dustin Feagins_Ma...|
| 29| 

In [0]:
# while working with groutBy() we cant directly see data using show(). We must perform some aggrigation(summing the column/product of the column/min value of that group/... ) to see output
df.groupBy('gender').show()

[0;31m---------------------------------------------------------------------------[0m
[0;31mAttributeError[0m                            Traceback (most recent call last)
[0;32m<command-570313305499405>[0m in [0;36m<module>[0;34m[0m
[1;32m      1[0m [0;31m# while working with groutBy() we cant directly see data using show(). We must perform some aggrigation(summing the column/product of the column/min value of that group/... ) to see output[0m[0;34m[0m[0;34m[0m[0;34m[0m[0m
[0;32m----> 2[0;31m [0mdf[0m[0;34m.[0m[0mgroupBy[0m[0;34m([0m[0;34m'gender'[0m[0;34m)[0m[0;34m.[0m[0mshow[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;31mAttributeError[0m: 'GroupedData' object has no attribute 'show'

In [0]:
df.groupBy('gender').sum('marks').show()

+------+----------+
|gender|sum(marks)|
+------+----------+
|Female|     29636|
|  Male|     30461|
+------+----------+



In [0]:
df.groupBy('gender').min('marks').show()

+------+----------+
|gender|min(marks)|
+------+----------+
|Female|        20|
|  Male|        20|
+------+----------+



In [0]:
df.groupBy('gender').max('marks').show()

+------+----------+
|gender|max(marks)|
+------+----------+
|Female|        99|
|  Male|        99|
+------+----------+



In [0]:
df.groupBy('course').count().show()

+------+-----+
|course|count|
+------+-----+
|    PF|  166|
|    DB|  157|
|   MVC|  157|
|   DSA|  176|
| Cloud|  192|
|   OOP|  152|
+------+-----+



In [0]:
df.groupBy(df.age).avg('marks').show()
df.groupBy(df.age).mean('marks').show()

+---+------------------+
|age|        avg(marks)|
+---+------------------+
| 28|60.487854251012145|
| 29|59.715415019762844|
+---+------------------+

+---+------------------+
|age|        avg(marks)|
+---+------------------+
| 28|60.487854251012145|
| 29|59.715415019762844|
+---+------------------+



# Spark DF(Group By - Multiple Columns and Aggregations)

In [0]:
df.groupBy(df.course, df.age).count().show()

+------+---+-----+
|course|age|count|
+------+---+-----+
|   MVC| 28|   72|
|   MVC| 29|   85|
| Cloud| 28|  100|
|    PF| 29|   87|
|    PF| 28|   79|
|   OOP| 29|   74|
|   DSA| 28|   83|
| Cloud| 29|   92|
|    DB| 28|   82|
|   DSA| 29|   93|
|   OOP| 28|   78|
|    DB| 29|   75|
+------+---+-----+



In [0]:
df.groupBy(df.course, df.age).count().sum().show()

[0;31m---------------------------------------------------------------------------[0m
[0;31mAttributeError[0m                            Traceback (most recent call last)
[0;32m<command-570313305499414>[0m in [0;36m<module>[0;34m[0m
[0;32m----> 1[0;31m [0mdf[0m[0;34m.[0m[0mgroupBy[0m[0;34m([0m[0mdf[0m[0;34m.[0m[0mcourse[0m[0;34m,[0m [0mdf[0m[0;34m.[0m[0mage[0m[0;34m)[0m[0;34m.[0m[0mcount[0m[0;34m([0m[0;34m)[0m[0;34m.[0m[0msum[0m[0;34m([0m[0;34m)[0m[0;34m.[0m[0mshow[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;32m/databricks/spark/python/pyspark/sql/dataframe.py[0m in [0;36m__getattr__[0;34m(self, name)[0m
[1;32m   1798[0m         """
[1;32m   1799[0m         [0;32mif[0m [0mname[0m [0;32mnot[0m [0;32min[0m [0mself[0m[0;34m.[0m[0mcolumns[0m[0;34m:[0m[0;34m[0m[0;34m[0m[0m
[0;32m-> 1800[0;31m             raise AttributeError(
[0m[1;32m   1801[0m                 "'%s' object has no attri

In [0]:
# for performing multiple aggregations we need to import aggrigate functions first
from pyspark.sql.functions import sum, min, max, mean, count, avg

df.groupBy(df.course).agg(count(df.marks).alias('count'),sum(df.marks).alias('sum'),avg(df.marks).alias('avg')).show()

+------+-----+-----+------------------+
|course|count|  sum|               avg|
+------+-----+-----+------------------+
|    PF|  166| 9933| 59.83734939759036|
|    DB|  157| 9270|59.044585987261144|
|   MVC|  157| 9585| 61.05095541401274|
|   DSA|  176|10950| 62.21590909090909|
| Cloud|  192|11443|59.598958333333336|
|   OOP|  152| 8916|  58.6578947368421|
+------+-----+-----+------------------+



# Group By - Filtering

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import min, max, avg, sum, count, mean
spark = SparkSession.builder.appName('Filtering').getOrCreate()

df = spark.read.options(header='True', inferschema='True').csv('/FileStore/tables/StudentData.csv')
df.printSchema()
df.show()

root
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- name: string (nullable = true)
 |-- course: string (nullable = true)
 |-- roll: integer (nullable = true)
 |-- marks: integer (nullable = true)
 |-- email: string (nullable = true)

+---+------+----------------+------+------+-----+--------------------+
|age|gender|            name|course|  roll|marks|               email|
+---+------+----------------+------+------+-----+--------------------+
| 28|Female| Hubert Oliveras|    DB|  2984|   59|Annika Hoffman_Na...|
| 29|Female|Toshiko Hillyard| Cloud| 12899|   62|Margene Moores_Ma...|
| 28|  Male|  Celeste Lollis|    PF| 21267|   45|Jeannetta Golden_...|
| 29|Female|    Elenore Choy|    DB| 32877|   29|Billi Clore_Mitzi...|
| 28|  Male|  Sheryll Towler|   DSA| 41487|   41|Claude Panos_Judi...|
| 28|  Male|  Margene Moores|   MVC| 52771|   32|Toshiko Hillyard_...|
| 28|  Male|     Neda Briski|   OOP| 61973|   69|Alberta Freund_El...|
| 28|Female|    Claude P

In [0]:
df1 = df.filter(df.gender == 'Male').groupBy('course', 'gender').agg(count('*').alias('total_count'))
df1.show()

+------+------+-----------+
|course|gender|total_count|
+------+------+-----------+
|   OOP|  Male|         70|
|    DB|  Male|         82|
|   MVC|  Male|         86|
|    PF|  Male|         97|
| Cloud|  Male|         86|
|   DSA|  Male|         78|
+------+------+-----------+



In [0]:
# method 1
df1.where(df1.total_count > 85).show() # as we are adding new alias name we need to assign it to DF and apply where()
df1.filter(df1.total_count > 85).show()

+------+------+-----------+
|course|gender|total_count|
+------+------+-----------+
|   MVC|  Male|         86|
|    PF|  Male|         97|
| Cloud|  Male|         86|
+------+------+-----------+

+------+------+-----------+
|course|gender|total_count|
+------+------+-----------+
|   MVC|  Male|         86|
|    PF|  Male|         97|
| Cloud|  Male|         86|
+------+------+-----------+



In [0]:
# method 2, using column reference. That means col() is not actually depend on DF it is dependent on context

df.filter(df.gender == 'Male').groupBy('course', 'gender').agg(count('*').alias('total_count')).where(col('total_count') > 85).show()

+------+------+-----------+
|course|gender|total_count|
+------+------+-----------+
|   MVC|  Male|         86|
|    PF|  Male|         97|
| Cloud|  Male|         86|
+------+------+-----------+



# Quiz (Group By)

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, avg, min, max, count, mean, sum

spark = SparkSession.builder.appName('GroupBy').getOrCreate()
df = spark.read.options(header='True', inferschema='True').csv('/FileStore/tables/StudentData.csv')

In [0]:
df.groupBy(df.course).agg(count('*').alias('total_count')).show()

+------+-----------+
|course|total_count|
+------+-----------+
|    PF|        166|
|    DB|        157|
|   MVC|        157|
|   DSA|        176|
| Cloud|        192|
|   OOP|        152|
+------+-----------+



In [0]:
df.groupBy(df.gender, df.course).agg(count('*').alias('total_count')).show()

+------+------+-----------+
|gender|course|total_count|
+------+------+-----------+
|Female| Cloud|        106|
|Female|   OOP|         82|
|  Male|    PF|         97|
|  Male|    DB|         82|
|  Male| Cloud|         86|
|  Male|   OOP|         70|
|Female|   DSA|         98|
|Female|    DB|         75|
|  Male|   DSA|         78|
|  Male|   MVC|         86|
|Female|   MVC|         71|
|Female|    PF|         69|
+------+------+-----------+



In [0]:
df.groupBy(df.gender, df.course).agg(sum(df.marks).alias('total_marks')).show()

+------+------+-----------+
|gender|course|total_marks|
+------+------+-----------+
|Female| Cloud|       6316|
|Female|   OOP|       4682|
|  Male|    PF|       5960|
|  Male|    DB|       5073|
|  Male| Cloud|       5127|
|  Male|   OOP|       4234|
|Female|   DSA|       6124|
|Female|    DB|       4197|
|  Male|   DSA|       4826|
|  Male|   MVC|       5241|
|Female|   MVC|       4344|
|Female|    PF|       3973|
+------+------+-----------+



In [0]:
df.groupBy(df.age, df.course).agg(min(df.marks).alias('min_marks'), max(df.marks).alias('max_marks'), avg(df.marks).alias('avg_marks')).show()

+---+------+---------+---------+------------------+
|age|course|min_marks|max_marks|         avg_marks|
+---+------+---------+---------+------------------+
| 29|   OOP|       20|       99|59.729729729729726|
| 28|    PF|       20|       98| 63.75949367088607|
| 28|    DB|       21|       98| 58.76829268292683|
| 28|   DSA|       20|       99|  64.6867469879518|
| 29|    DB|       20|       98|59.346666666666664|
| 28| Cloud|       20|       99|             58.08|
| 29|   DSA|       20|       99| 60.01075268817204|
| 28|   MVC|       23|       99| 60.44444444444444|
| 28|   OOP|       23|       99| 57.64102564102564|
| 29|    PF|       20|       99|56.275862068965516|
| 29|   MVC|       22|       99| 61.56470588235294|
| 29| Cloud|       21|       98|             61.25|
+---+------+---------+---------+------------------+



# Quiz (Word Count)

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, avg, min, max, count, mean, sum

spark = SparkSession.builder.appName('GroupBy').getOrCreate()
df = spark.read.text('/FileStore/tables/WordData.txt')
df.show()

+------+
| value|
+------+
| Apple|
|   Mic|
|   Mic|
| Apple|
|Laptop|
| Apple|
|   Mic|
|   Mic|
| Apple|
|Laptop|
| Chair|
| Chair|
| Chair|
|   Bag|
|Mobile|
|  Book|
| Chair|
| Chair|
| Chair|
|   Bag|
+------+
only showing top 20 rows



In [0]:
df.groupBy('value').count().show()

+------+-----+
| value|count|
+------+-----+
|   Mic|   10|
| Chair|   15|
|  Book|    5|
|Laptop|    5|
|   Bag|    5|
|Mobile|    5|
| Apple|   10|
+------+-----+



# Spark DF (UDFs)

In [0]:
# UDF- User Derined Function

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, udf
from pyspark.sql.types import IntegerType

spark = SparkSession.builder.appName('UDF').getOrCreate()
df = spark.read.options(header='True', inferschema='True').csv('/FileStore/tables/OfficeData.csv')

df.printSchema()
df.show()

root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- state: string (nullable = true)
 |-- salary: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- bonus: integer (nullable = true)

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|        James|     Sales|   NY| 90000| 34|10000|
|      Michael|     Sales|   NY| 86000| 56|20000|
|       Robert|     Sales|   CA| 81000| 30|23000|
|        Maria|   Finance|   CA| 90000| 24|23000|
|        Raman|   Finance|   CA| 99000| 40|24000|
|        Scott|   Finance|   NY| 83000| 36|19000|
|          Jen|   Finance|   NY| 79000| 53|15000|
|         Jeff| Marketing|   CA| 80000| 25|18000|
|        Kumar| Marketing|   NY| 91000| 50|21000|
+-------------+----------+-----+------+---+-----+



In [0]:
def get_total_salary(salary, bonus):
    return salary + bonus

# registering the function to udf
totalSalaryUDF = udf(lambda x, y: get_total_salary(x, y), IntegerType())# by default udf functions will return string tipe so we need to explecetly mention data types

df.withColumn('total_salary', totalSalaryUDF(df.salary, df.bonus)).show()

+-------------+----------+-----+------+---+-----+------------+
|employee_name|department|state|salary|age|bonus|total_salary|
+-------------+----------+-----+------+---+-----+------------+
|        James|     Sales|   NY| 90000| 34|10000|      100000|
|      Michael|     Sales|   NY| 86000| 56|20000|      106000|
|       Robert|     Sales|   CA| 81000| 30|23000|      104000|
|        Maria|   Finance|   CA| 90000| 24|23000|      113000|
|        Raman|   Finance|   CA| 99000| 40|24000|      123000|
|        Scott|   Finance|   NY| 83000| 36|19000|      102000|
|          Jen|   Finance|   NY| 79000| 53|15000|       94000|
|         Jeff| Marketing|   CA| 80000| 25|18000|       98000|
|        Kumar| Marketing|   NY| 91000| 50|21000|      112000|
+-------------+----------+-----+------+---+-----+------------+



# Quiz (UDFs)

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType

In [0]:
spark = SparkSession.builder.appName('UDFs').getOrCreate()
df = spark.read.options(header='True', inferschema='True').csv('/FileStore/tables/OfficeData.csv')

df.printSchema()
df.show()

root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- state: string (nullable = true)
 |-- salary: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- bonus: integer (nullable = true)

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|        James|     Sales|   NY| 90000| 34|10000|
|      Michael|     Sales|   NY| 86000| 56|20000|
|       Robert|     Sales|   CA| 81000| 30|23000|
|        Maria|   Finance|   CA| 90000| 24|23000|
|        Raman|   Finance|   CA| 99000| 40|24000|
|        Scott|   Finance|   NY| 83000| 36|19000|
|          Jen|   Finance|   NY| 79000| 53|15000|
|         Jeff| Marketing|   CA| 80000| 25|18000|
|        Kumar| Marketing|   NY| 91000| 50|21000|
+-------------+----------+-----+------+---+-----+



In [0]:
# def sal_increment(state, salary, bonus):
#     sal_inc = None
#     bns_inc = None
    
#     if (state == 'NY'):
#         sal_inc = salary * 0.10
#         bns_inc = bonus * 0.05
#     elif (state == 'CA'):
#         sal_inc = salary * 0.12
#         bns_inc = bonus * 0.03
#         return sal_inc + bns_inc

def sal_increment(state, salary, bonus):
    sum = None
    if (state == 'NY'):
        sum = salary * 0.10
        sum += bonus * 0.05
    elif (state == 'CA'):
        sum = salary * 0.12
        sum += bonus * 0.03
    return sum

incrementUDF = udf(lambda x, y, z: sal_increment(x, y, z), DoubleType())

df.withColumn('increment', incrementUDF(df.state, df.salary, df.bonus)).show()

+-------------+----------+-----+------+---+-----+---------+
|employee_name|department|state|salary|age|bonus|increment|
+-------------+----------+-----+------+---+-----+---------+
|        James|     Sales|   NY| 90000| 34|10000|   9500.0|
|      Michael|     Sales|   NY| 86000| 56|20000|   9600.0|
|       Robert|     Sales|   CA| 81000| 30|23000|  10410.0|
|        Maria|   Finance|   CA| 90000| 24|23000|  11490.0|
|        Raman|   Finance|   CA| 99000| 40|24000|  12600.0|
|        Scott|   Finance|   NY| 83000| 36|19000|   9250.0|
|          Jen|   Finance|   NY| 79000| 53|15000|   8650.0|
|         Jeff| Marketing|   CA| 80000| 25|18000|  10140.0|
|        Kumar| Marketing|   NY| 91000| 50|21000|  10150.0|
+-------------+----------+-----+------+---+-----+---------+



# Cache and Persist work flow

Cache under the hood use the Precist function.

Cache and Persist are actually used to save the data in the memory at any given point, so that whenever there are any further action or transformations beyond the point of cache it will not refer back to the transformation again it will start flowing the data from the cache point.

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, avg, min, max, count, mean, sum

spark = SparkSession.builder.appName('Cache').getOrCreate()
df = spark.read.options(header='True', inferschema='True').csv('/FileStore/tables/StudentData.csv')
df.show()

+---+------+----------------+------+------+-----+--------------------+
|age|gender|            name|course|  roll|marks|               email|
+---+------+----------------+------+------+-----+--------------------+
| 28|Female| Hubert Oliveras|    DB|  2984|   59|Annika Hoffman_Na...|
| 29|Female|Toshiko Hillyard| Cloud| 12899|   62|Margene Moores_Ma...|
| 28|  Male|  Celeste Lollis|    PF| 21267|   45|Jeannetta Golden_...|
| 29|Female|    Elenore Choy|    DB| 32877|   29|Billi Clore_Mitzi...|
| 28|  Male|  Sheryll Towler|   DSA| 41487|   41|Claude Panos_Judi...|
| 28|  Male|  Margene Moores|   MVC| 52771|   32|Toshiko Hillyard_...|
| 28|  Male|     Neda Briski|   OOP| 61973|   69|Alberta Freund_El...|
| 28|Female|    Claude Panos| Cloud| 72409|   85|Sheryll Towler_Al...|
| 28|  Male|  Celeste Lollis|   MVC| 81492|   64|Nicole Harwood_Cl...|
| 29|  Male|  Cordie Harnois|   OOP| 92882|   51|Judie Chipps_Clem...|
| 29|Female|       Kena Wild|   DSA|102285|   35|Dustin Feagins_Ma...|
| 29| 

In [0]:
df = df.groupBy('course', 'gender', 'age').count()

In [0]:
df = df.withColumn('dummy', col('age') * 100)

In [0]:
df.show()

+------+------+---+-----+-----+
|course|gender|age|count|dummy|
+------+------+---+-----+-----+
| Cloud|Female| 29|   49| 2900|
|   DSA|Female| 28|   47| 2800|
|    PF|Female| 29|   34| 2900|
|   OOP|Female| 29|   39| 2900|
|   DSA|Female| 29|   51| 2900|
|    PF|Female| 28|   35| 2800|
|   MVC|Female| 28|   34| 2800|
|    DB|  Male| 28|   42| 2800|
|   OOP|  Male| 29|   35| 2900|
|   MVC|  Male| 29|   48| 2900|
|   MVC|Female| 29|   37| 2900|
|    DB|Female| 29|   35| 2900|
|   OOP|Female| 28|   43| 2800|
| Cloud|Female| 28|   57| 2800|
| Cloud|  Male| 29|   43| 2900|
|    PF|  Male| 28|   44| 2800|
|   DSA|  Male| 28|   36| 2800|
|    DB|Female| 28|   40| 2800|
|    DB|  Male| 29|   40| 2900|
|    PF|  Male| 29|   53| 2900|
+------+------+---+-----+-----+
only showing top 20 rows



In [0]:
#it will capture all the data from above action
df.cache()

Out[35]: DataFrame[course: string, gender: string, age: int, count: bigint, dummy: int]

In [0]:
# here again it will not perform all transformations we will get the data from cache()

print(df.count())
df.show()

24
+------+------+---+-----+-----+
|course|gender|age|count|dummy|
+------+------+---+-----+-----+
| Cloud|Female| 29|   49| 2900|
|   DSA|Female| 28|   47| 2800|
|    PF|Female| 29|   34| 2900|
|   OOP|Female| 29|   39| 2900|
|   DSA|Female| 29|   51| 2900|
|    PF|Female| 28|   35| 2800|
|   MVC|Female| 28|   34| 2800|
|    DB|  Male| 28|   42| 2800|
|   OOP|  Male| 29|   35| 2900|
|   MVC|  Male| 29|   48| 2900|
|   MVC|Female| 29|   37| 2900|
|    DB|Female| 29|   35| 2900|
|   OOP|Female| 28|   43| 2800|
| Cloud|Female| 28|   57| 2800|
| Cloud|  Male| 29|   43| 2900|
|    PF|  Male| 28|   44| 2800|
|   DSA|  Male| 28|   36| 2800|
|    DB|Female| 28|   40| 2800|
|    DB|  Male| 29|   40| 2900|
|    PF|  Male| 29|   53| 2900|
+------+------+---+-----+-----+
only showing top 20 rows



# Spark DF (DF to RDD)

DF are nearly a wrapper on the RDD. Underline all the transformation and procss done under the DF is RDD

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('DF to RDD').getOrCreate()
df = spark.read.options(header='True',inferschema='True').csv('/FileStore/tables/StudentData.csv')
df.show()

+---+------+----------------+------+------+-----+--------------------+
|age|gender|            name|course|  roll|marks|               email|
+---+------+----------------+------+------+-----+--------------------+
| 28|Female| Hubert Oliveras|    DB|  2984|   59|Annika Hoffman_Na...|
| 29|Female|Toshiko Hillyard| Cloud| 12899|   62|Margene Moores_Ma...|
| 28|  Male|  Celeste Lollis|    PF| 21267|   45|Jeannetta Golden_...|
| 29|Female|    Elenore Choy|    DB| 32877|   29|Billi Clore_Mitzi...|
| 28|  Male|  Sheryll Towler|   DSA| 41487|   41|Claude Panos_Judi...|
| 28|  Male|  Margene Moores|   MVC| 52771|   32|Toshiko Hillyard_...|
| 28|  Male|     Neda Briski|   OOP| 61973|   69|Alberta Freund_El...|
| 28|Female|    Claude Panos| Cloud| 72409|   85|Sheryll Towler_Al...|
| 28|  Male|  Celeste Lollis|   MVC| 81492|   64|Nicole Harwood_Cl...|
| 29|  Male|  Cordie Harnois|   OOP| 92882|   51|Judie Chipps_Clem...|
| 29|Female|       Kena Wild|   DSA|102285|   35|Dustin Feagins_Ma...|
| 29| 

In [0]:
type(df)

Out[39]: pyspark.sql.dataframe.DataFrame

In [0]:
rdd = df.rdd

type(rdd)

Out[40]: pyspark.rdd.RDD

In [0]:
# DF stores data in structured manner thats why after converting DF to RDD we will get key value pairs
rdd.collect()

Out[43]: [Row(age=28, gender='Female', name='Hubert Oliveras', course='DB', roll=2984, marks=59, email='Annika Hoffman_Naoma Fritts@OOP.com'),
 Row(age=29, gender='Female', name='Toshiko Hillyard', course='Cloud', roll=12899, marks=62, email='Margene Moores_Marylee Capasso@DB.com'),
 Row(age=28, gender='Male', name='Celeste Lollis', course='PF', roll=21267, marks=45, email='Jeannetta Golden_Jenna Montague@DSA.com'),
 Row(age=29, gender='Female', name='Elenore Choy', course='DB', roll=32877, marks=29, email='Billi Clore_Mitzi Seldon@DB.com'),
 Row(age=28, gender='Male', name='Sheryll Towler', course='DSA', roll=41487, marks=41, email='Claude Panos_Judie Chipps@OOP.com'),
 Row(age=28, gender='Male', name='Margene Moores', course='MVC', roll=52771, marks=32, email='Toshiko Hillyard_Clementina Menke@MVC.com'),
 Row(age=28, gender='Male', name='Neda Briski', course='OOP', roll=61973, marks=69, email='Alberta Freund_Elenore Choy@DB.com'),
 Row(age=28, gender='Female', name='Claude Panos', co

In [0]:
# rdd.filter(lambda x: x[1] == 'Male').collect()
rdd.filter(lambda x: x['gender'] == 'Male').collect()

Out[44]: [Row(age=28, gender='Male', name='Celeste Lollis', course='PF', roll=21267, marks=45, email='Jeannetta Golden_Jenna Montague@DSA.com'),
 Row(age=28, gender='Male', name='Sheryll Towler', course='DSA', roll=41487, marks=41, email='Claude Panos_Judie Chipps@OOP.com'),
 Row(age=28, gender='Male', name='Margene Moores', course='MVC', roll=52771, marks=32, email='Toshiko Hillyard_Clementina Menke@MVC.com'),
 Row(age=28, gender='Male', name='Neda Briski', course='OOP', roll=61973, marks=69, email='Alberta Freund_Elenore Choy@DB.com'),
 Row(age=28, gender='Male', name='Celeste Lollis', course='MVC', roll=81492, marks=64, email='Nicole Harwood_Claude Panos@MVC.com'),
 Row(age=29, gender='Male', name='Cordie Harnois', course='OOP', roll=92882, marks=51, email='Judie Chipps_Clementina Menke@MVC.com'),
 Row(age=29, gender='Male', name='Ernest Rossbach', course='DB', roll=111449, marks=53, email='Maybell Duguay_Abram Nagao@OOP.com'),
 Row(age=29, gender='Male', name='Neda Briski', course=

# Spark DF (Spark SQL)

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('DF to RDD').getOrCreate()
df = spark.read.options(header='True',inferschema='True').csv('/FileStore/tables/StudentData.csv')
df.show()

+---+------+----------------+------+------+-----+--------------------+
|age|gender|            name|course|  roll|marks|               email|
+---+------+----------------+------+------+-----+--------------------+
| 28|Female| Hubert Oliveras|    DB|  2984|   59|Annika Hoffman_Na...|
| 29|Female|Toshiko Hillyard| Cloud| 12899|   62|Margene Moores_Ma...|
| 28|  Male|  Celeste Lollis|    PF| 21267|   45|Jeannetta Golden_...|
| 29|Female|    Elenore Choy|    DB| 32877|   29|Billi Clore_Mitzi...|
| 28|  Male|  Sheryll Towler|   DSA| 41487|   41|Claude Panos_Judi...|
| 28|  Male|  Margene Moores|   MVC| 52771|   32|Toshiko Hillyard_...|
| 28|  Male|     Neda Briski|   OOP| 61973|   69|Alberta Freund_El...|
| 28|Female|    Claude Panos| Cloud| 72409|   85|Sheryll Towler_Al...|
| 28|  Male|  Celeste Lollis|   MVC| 81492|   64|Nicole Harwood_Cl...|
| 29|  Male|  Cordie Harnois|   OOP| 92882|   51|Judie Chipps_Clem...|
| 29|Female|       Kena Wild|   DSA|102285|   35|Dustin Feagins_Ma...|
| 29| 

In [0]:
df.createOrReplaceTempView('Students') # it creats a temp view if not exists or it will replace the existing one with new one

In [0]:
spark.sql("SELECT * FROM Students WHERE course = 'DB'").show()

+---+------+-----------------+------+-------+-----+--------------------+
|age|gender|             name|course|   roll|marks|               email|
+---+------+-----------------+------+-------+-----+--------------------+
| 28|Female|  Hubert Oliveras|    DB|   2984|   59|Annika Hoffman_Na...|
| 29|Female|     Elenore Choy|    DB|  32877|   29|Billi Clore_Mitzi...|
| 29|  Male|  Ernest Rossbach|    DB| 111449|   53|Maybell Duguay_Ab...|
| 28|Female|   Latia Vanhoose|    DB| 122502|   27|Latia Vanhoose_Mi...|
| 29|Female|   Latia Vanhoose|    DB| 152159|   27|Claude Panos_Sant...|
| 28|Female| Mickey Cortright|    DB| 192537|   62|Ernest Rossbach_M...|
| 28|Female|      Anna Santos|    DB| 311589|   79|Celeste Lollis_Mi...|
| 28|  Male|    Kizzy Brenner|    DB| 381712|   36|Paris Hutton_Kena...|
| 28|  Male| Toshiko Hillyard|    DB| 392218|   47|Leontine Phillips...|
| 29|  Male|     Paris Hutton|    DB| 481229|   57|Clementina Menke_...|
| 28|Female| Mickey Cortright|    DB| 551389|   43|

In [0]:
spark.sql("SELECT course, gender, COUNT('*') AS count, SUM(marks) AS total_marks FROM Students GROUP BY course, gender").show()

+------+------+-----+-----------+
|course|gender|count|total_marks|
+------+------+-----+-----------+
|   OOP|  Male|   70|       4234|
|    DB|  Male|   82|       5073|
| Cloud|Female|  106|       6316|
|   MVC|  Male|   86|       5241|
|   DSA|Female|   98|       6124|
|    PF|  Male|   97|       5960|
|   MVC|Female|   71|       4344|
| Cloud|  Male|   86|       5127|
|    PF|Female|   69|       3973|
|   DSA|  Male|   78|       4826|
|    DB|Female|   75|       4197|
|   OOP|Female|   82|       4682|
+------+------+-----+-----------+



# Spark DF (Write DF)

In [0]:
df.show()

+---+------+----------------+------+------+-----+--------------------+
|age|gender|            name|course|  roll|marks|               email|
+---+------+----------------+------+------+-----+--------------------+
| 28|Female| Hubert Oliveras|    DB|  2984|   59|Annika Hoffman_Na...|
| 29|Female|Toshiko Hillyard| Cloud| 12899|   62|Margene Moores_Ma...|
| 28|  Male|  Celeste Lollis|    PF| 21267|   45|Jeannetta Golden_...|
| 29|Female|    Elenore Choy|    DB| 32877|   29|Billi Clore_Mitzi...|
| 28|  Male|  Sheryll Towler|   DSA| 41487|   41|Claude Panos_Judi...|
| 28|  Male|  Margene Moores|   MVC| 52771|   32|Toshiko Hillyard_...|
| 28|  Male|     Neda Briski|   OOP| 61973|   69|Alberta Freund_El...|
| 28|Female|    Claude Panos| Cloud| 72409|   85|Sheryll Towler_Al...|
| 28|  Male|  Celeste Lollis|   MVC| 81492|   64|Nicole Harwood_Cl...|
| 29|  Male|  Cordie Harnois|   OOP| 92882|   51|Judie Chipps_Clem...|
| 29|Female|       Kena Wild|   DSA|102285|   35|Dustin Feagins_Ma...|
| 29| 

In [0]:
df.rdd.getNumPartitions() # to check the no of partitions in DF

Out[61]: 1

In [0]:
# ther are several modes to perform write actions(append, overwrite, ignore, error)
df.write.options(header='True').csv('/FileStore/tables/StudentData/output')

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
[0;32m<command-3525593887850576>[0m in [0;36m<module>[0;34m[0m
[1;32m      1[0m [0;31m# ther are several modes to perform write or read or .. actions(write, append, overwrite, ignore, error)[0m[0;34m[0m[0;34m[0m[0;34m[0m[0m
[0;32m----> 2[0;31m [0mdf[0m[0;34m.[0m[0mwrite[0m[0;34m.[0m[0moptions[0m[0;34m([0m[0mheader[0m[0;34m=[0m[0;34m'True'[0m[0;34m)[0m[0;34m.[0m[0mcsv[0m[0;34m([0m[0;34m'/FileStore/tables/StudentData/output'[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;32m/databricks/spark/python/pyspark/sql/readwriter.py[0m in [0;36mcsv[0;34m(self, path, mode, compression, sep, quote, escape, header, nullValue, escapeQuotes, quoteAll, dateFormat, timestampFormat, ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace, charToEscapeQuoteEscaping, encoding, emptyValue, l

In [0]:
df.write.mode('overwrite').options(header='True').csv('/FileStore/tables/StudentData/output')

In [0]:
# reading the above created df
spark.read.options(header='True', inferschema='True').csv('/FileStore/tables/StudentData/output').show()

+---+------+----------------+------+------+-----+--------------------+
|age|gender|            name|course|  roll|marks|               email|
+---+------+----------------+------+------+-----+--------------------+
| 28|Female| Hubert Oliveras|    DB|  2984|   59|Annika Hoffman_Na...|
| 29|Female|Toshiko Hillyard| Cloud| 12899|   62|Margene Moores_Ma...|
| 28|  Male|  Celeste Lollis|    PF| 21267|   45|Jeannetta Golden_...|
| 29|Female|    Elenore Choy|    DB| 32877|   29|Billi Clore_Mitzi...|
| 28|  Male|  Sheryll Towler|   DSA| 41487|   41|Claude Panos_Judi...|
| 28|  Male|  Margene Moores|   MVC| 52771|   32|Toshiko Hillyard_...|
| 28|  Male|     Neda Briski|   OOP| 61973|   69|Alberta Freund_El...|
| 28|Female|    Claude Panos| Cloud| 72409|   85|Sheryll Towler_Al...|
| 28|  Male|  Celeste Lollis|   MVC| 81492|   64|Nicole Harwood_Cl...|
| 29|  Male|  Cordie Harnois|   OOP| 92882|   51|Judie Chipps_Clem...|
| 29|Female|       Kena Wild|   DSA|102285|   35|Dustin Feagins_Ma...|
| 29| 

In [0]:
df = df.groupBy(df.course, df.gender).agg(count('*').alias('total_count'))
df.show()

+------+------+-----------+
|course|gender|total_count|
+------+------+-----------+
|   OOP|  Male|         70|
|    DB|  Male|         82|
| Cloud|Female|        106|
|   MVC|  Male|         86|
|   DSA|Female|         98|
|    PF|  Male|         97|
|   MVC|Female|         71|
| Cloud|  Male|         86|
|    PF|Female|         69|
|   DSA|  Male|         78|
|    DB|Female|         75|
|   OOP|Female|         82|
+------+------+-----------+



In [0]:
# overwriting the groupBy DF in existing DF
df.write.mode('overwrite').options(header='True').csv('/FileStore/tables/StudentData/output')

In [0]:
# reading the overwritten DF
df = spark.read.options(header='True', inferschema='True').csv('/FileStore/tables/StudentData/output')
df.show()

+------+------+-----------+
|course|gender|total_count|
+------+------+-----------+
|   OOP|  Male|         70|
|    DB|  Male|         82|
| Cloud|Female|        106|
|   MVC|  Male|         86|
|   DSA|Female|         98|
|    PF|  Male|         97|
|   MVC|Female|         71|
| Cloud|  Male|         86|
|    PF|Female|         69|
|   DSA|  Male|         78|
|    DB|Female|         75|
|   OOP|Female|         82|
+------+------+-----------+



# Mini Project

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import min, max, count, avg, mean, sum, col, lit, udf
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType

In [0]:
spark = SparkSession.builder.appName('Mini Project').getOrCreate()
df = spark.read.options(header='True', inferschema='True').csv('/FileStore/tables/OfficeDataProject.csv')
df.show()

+-----------+-------------------+----------+-----+------+---+-----+
|employee_id|      employee_name|department|state|salary|age|bonus|
+-----------+-------------------+----------+-----+------+---+-----+
|       1000|          Nitz Leif| Marketing|   CA|  6131| 26|  543|
|       1001|    Melissia Dedman|   Finance|   AK|  4027| 43| 1290|
|       1002|  Rudolph Barringer|        HR|   LA|  3122| 43| 1445|
|       1003|        Tamra Amber|  Accounts|   AK|  5717| 47| 1291|
|       1004|        Mullan Nitz|Purchasing|   CA|  5685| 34| 1394|
|       1005|      Zollner Karie|  Accounts|   CA|  2843| 27| 1078|
|       1006|Kaczorowski Zollner|     Sales|   CA|  7201| 21| 1834|
|       1007|      Nakano Locust| Marketing|   LA|  3444| 23| 1823|
|       1008|  Recalde Kensinger|  Accounts|   LA|  3704| 48| 1330|
|       1009|        Imai Hallie|  Accounts|   AK|  5061| 38| 1557|
|       1010|    Debroah Gallman|  Accounts|   NY|  9308| 35|  817|
|       1011|   Barringer Escoto|Purchasing|   W

In [0]:
countDF = df
print('Total number of employees are: ', countDF.count())

Total number of employees are:  1000


In [0]:
depCountDF = df

# method 1
depCountDF = depCountDF.groupBy(depCountDF.department).agg(count('*').alias('department_count')).count()
print('Total number of departments in the company are: ', depCountDF)

# method 2
df.select('department').dropDuplicates(['department']).count()

Total number of departments in the company are:  6
Out[133]: 6

In [0]:
depNameDF = df

#method 1
depNameDF.groupBy(depNameDF.department).agg(count('*')).select(df.department).show()

#method 2
df.select('department').dropDuplicates(['department']).show()

+----------+
|department|
+----------+
|     Sales|
|        HR|
|   Finance|
|Purchasing|
| Marketing|
|  Accounts|
+----------+

+----------+
|department|
+----------+
|     Sales|
|        HR|
|   Finance|
|Purchasing|
| Marketing|
|  Accounts|
+----------+



In [0]:
empCountInDepDF = df

empCountInDepDF.groupBy(df.department).agg(count('*').alias('total_employees')).show()

+----------+---------------+
|department|total_employees|
+----------+---------------+
|     Sales|            169|
|        HR|            171|
|   Finance|            162|
|Purchasing|            166|
| Marketing|            170|
|  Accounts|            162|
+----------+---------------+



In [0]:
empCountInStateDF = df

empCountInStateDF.groupBy(df.state).agg(count('*').alias('total_employees_state')).show()

+-----+---------------------+
|state|total_employees_state|
+-----+---------------------+
|   LA|                  205|
|   CA|                  205|
|   WA|                  208|
|   NY|                  173|
|   AK|                  209|
+-----+---------------------+



In [0]:
df.groupBy(df.state, df.department).agg(count('*').alias('total_emp_state_dep')).show()

+-----+----------+-------------------+
|state|department|total_emp_state_dep|
+-----+----------+-------------------+
|   CA|     Sales|                 42|
|   CA| Marketing|                 33|
|   NY|  Accounts|                 34|
|   NY|     Sales|                 27|
|   CA|   Finance|                 35|
|   CA|  Accounts|                 35|
|   CA|Purchasing|                 32|
|   WA|        HR|                 47|
|   AK|Purchasing|                 30|
|   WA|  Accounts|                 27|
|   WA|Purchasing|                 38|
|   AK|     Sales|                 38|
|   AK|  Accounts|                 37|
|   WA| Marketing|                 39|
|   LA|        HR|                 41|
|   LA|     Sales|                 35|
|   AK|        HR|                 25|
|   LA|   Finance|                 29|
|   AK|   Finance|                 37|
|   LA|Purchasing|                 45|
+-----+----------+-------------------+
only showing top 20 rows



In [0]:
minMaxDF = df

minMaxDF = minMaxDF.groupBy(df.department).agg(min(df.salary).alias('min_sal'), max(df.salary).alias('max_sal')).sort(col('min_sal').asc(), col('max_sal').asc())
minMaxDF.show()

+----------+-------+-------+
|department|min_sal|max_sal|
+----------+-------+-------+
|   Finance|   1006|   9899|
|  Accounts|   1007|   9890|
|        HR|   1013|   9982|
| Marketing|   1031|   9974|
|     Sales|   1103|   9982|
|Purchasing|   1105|   9985|
+----------+-------+-------+



In [0]:
noEmp = df

noEmp1 = noEmp.filter(df.state == 'NY').groupBy(df.state).agg(avg('bonus').alias('avg_bonus')).select(col('avg_bonus')).collect()
#collect will provide us with the actual data, it will simply eleminate the abstraction layer of DF and provid us the actual data for the RDD
print(noEmp1)
print(noEmp1[0]['avg_bonus'])

avgBonus = noEmp.filter(df.state == 'NY').groupBy(df.state).agg(avg('bonus').alias('avg_bonus')).select(col('avg_bonus')).collect()[0]['avg_bonus']
print(avgBonus, type(avgBonus))

noEmp = noEmp.filter((noEmp.state == 'NY') & (noEmp.department == 'Finance') & (df.bonus > avgBonus))
print('**********************')
print('No of employees workint in NY and in Finance dep whose bonuse is grater than avg bonus of emp in NY are: ', noEmp.count())
noEmp.show()

[Row(avg_bonus=1251.3468208092486)]
1251.3468208092486
1251.3468208092486 <class 'float'>
**********************
No of employees workint in NY and in Finance dep whose bonuse is grater than avg bonus of emp in NY are:  17
+-----------+--------------------+----------+-----+------+---+-----+
|employee_id|       employee_name|department|state|salary|age|bonus|
+-----------+--------------------+----------+-----+------+---+-----+
|       1035|       Vivan Sifford|   Finance|   NY|  1129| 35| 1261|
|       1073|      Herder Gallman|   Finance|   NY|  1988| 31| 1402|
|       1082|          Nena Rocha|   Finance|   NY|  3417| 25| 1647|
|       1087|       Leif Lemaster|   Finance|   NY|  8642| 45| 1782|
|       1100|Ellingsworth Meli...|   Finance|   NY|  7845| 32| 1358|
|       1127|        Escoto Gilma|   Finance|   NY|  3426| 41| 1285|
|       1161|     Georgeanna Laub|   Finance|   NY|  2469| 26| 1679|
|       1175|     Durio Tenenbaum|   Finance|   NY|  2253| 42| 1684|
|       1180|      

In [0]:
salRaisDF = df

def rais(age, salary):
    if age > 45:
        return salary + 500
    else:
        return salary

raisUDF = udf(lambda x, y: rais(x, y), IntegerType())
salRaisDF = salRaisDF.withColumn('salary', raisUDF(salRaisDF.age, salRaisDF.salary))
salRaisDF.show()

+-----------+-------------------+----------+-----+------+---+-----+
|employee_id|      employee_name|department|state|salary|age|bonus|
+-----------+-------------------+----------+-----+------+---+-----+
|       1000|          Nitz Leif| Marketing|   CA|  6131| 26|  543|
|       1001|    Melissia Dedman|   Finance|   AK|  4027| 43| 1290|
|       1002|  Rudolph Barringer|        HR|   LA|  3122| 43| 1445|
|       1003|        Tamra Amber|  Accounts|   AK|  6217| 47| 1291|
|       1004|        Mullan Nitz|Purchasing|   CA|  5685| 34| 1394|
|       1005|      Zollner Karie|  Accounts|   CA|  2843| 27| 1078|
|       1006|Kaczorowski Zollner|     Sales|   CA|  7201| 21| 1834|
|       1007|      Nakano Locust| Marketing|   LA|  3444| 23| 1823|
|       1008|  Recalde Kensinger|  Accounts|   LA|  4204| 48| 1330|
|       1009|        Imai Hallie|  Accounts|   AK|  5061| 38| 1557|
|       1010|    Debroah Gallman|  Accounts|   NY|  9308| 35|  817|
|       1011|   Barringer Escoto|Purchasing|   W

In [0]:
# verification
salRaisDF.filter(salRaisDF.age > 45).show()
df.filter(df.age > 45).show()

+-----------+------------------+----------+-----+------+---+-----+
|employee_id|     employee_name|department|state|salary|age|bonus|
+-----------+------------------+----------+-----+------+---+-----+
|       1003|       Tamra Amber|  Accounts|   AK|  6217| 47| 1291|
|       1008| Recalde Kensinger|  Accounts|   LA|  4204| 48| 1330|
|       1011|  Barringer Escoto|Purchasing|   WA|  2185| 49| 1706|
|       1018|Vankirk Jacquelyne|Purchasing|   NY|  9136| 47| 1192|
|       1025|   Dionne Lemaster|     Sales|   AK|  5634| 48| 1356|
|       1030|        Trena Benz|  Accounts|   NY|  4876| 49| 1624|
|       1039|      Dynes Katlyn|  Accounts|   AK|  3539| 48|  834|
|       1058|      Clune Norene|   Finance|   AK|  2105| 49|  801|
|       1074|      Rocha Dionne|  Accounts|   CA|  3970| 49|  706|
|       1088|       Imai Locust|     Sales|   NY| 10482| 49| 1809|
|       1090| Clemencia Rudolph|   Finance|   NY|  1796| 50| 1209|
|       1099|    Zollner Marvis|Purchasing|   CA|  4730| 50|  

In [0]:
filterEmpDF = df

#method 1
filterEmpDF = filterEmpDF.filter(filterEmpDF.age > 45)
print(filterEmpDF.count())

filterEmpDF.write.mode('overwrite').options(header='True').csv('/FileStore/tables/OfficeDataProject/output')

#method 2
df.filter(df.age > 45).write.mode('ignore').csv('/FileStore/tables/OfficeDataProject/output')

166


In [0]:
#verification

df1 = spark.read.options(header='True', inferdata='True').csv('/FileStore/tables/OfficeDataProject.csv')
df2 = spark.read.options(header='True', inferdata='True').csv('/FileStore/tables/OfficeDataProject/output')

print(df1.count())
print(df2.count())

1000
166
