In [0]:
#DataFrames are wrapper on rdd.
#we create SparkSession for spark to read data in dataframe format. where as for SparkContext, spark read the data in rdd format.
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Spark DataFrames").getOrCreate()
df = spark.read.option("header",True).csv("/FileStore/tables/studentDatat.csv")
df.show()

+---+------+-----------------+--------------+----+-----+--------------------+
|age|gender|             name|        course|roll|marks|               email|
+---+------+-----------------+--------------+----+-----+--------------------+
| 22|     F|        Kiana Lor|    Data Enggg| 101|  100|  KianaLor@gmail.com|
| 22|     M|   Joshua Lonaker|   Cloud Admin| 102|   99|JoshuaLonaker@gma...|
| 22|     F|    Dakota Blanco|        DevOps| 103|   65|DakotaBlanco@gmai...|
| 20|     F|  Natasha Yarusso|     FullStack| 104|   78|NatashaYarusso@gm...|
| 21|     F|   Brooke Cazares|         MLOps| 105|   98|BrookeCazares@gma...|
| 21|     F| Rochelle Johnson|data Scientist| 106|   54|RochelleJohnson@g...|
| 22|     M|       Joey Abreu|      FrontEnd| 107|   63| JoeyAbreu@gmail.com|
| 22|     M|   Preston Suarez|       Backend| 108|   48|PrestonSuarez@gma...|
| 24|     F|         Lee Dong|         UI/UX| 109|   87|   LeeDong@gmail.com|
| 22|     M|    Maa'iz al-Dia|           SRE| 110|   69|Maa'izal

In [0]:
#If we could not include inferSchema in options then spark will consider all data types as string. 
df.printSchema()

root
 |-- age: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- name: string (nullable = true)
 |-- course: string (nullable = true)
 |-- roll: string (nullable = true)
 |-- marks: string (nullable = true)
 |-- email: string (nullable = true)



In [0]:
#instead of writing multiple option() functions we can write options(key='value') format
df = spark.read.options(header='True', inferSchema='True').csv("/FileStore/tables/studentDatat.csv")
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- name: string (nullable = true)
 |-- course: string (nullable = true)
 |-- roll: integer (nullable = true)
 |-- marks: integer (nullable = true)
 |-- email: string (nullable = true)



In [0]:
#create our own schema using StructType and StructField
from pyspark.sql.types import StructType, StructField, IntegerType,StringType
schema = StructType([
    StructField('age',IntegerType(),True),
    StructField('gender',StringType(),True),
    StructField('name',StringType(),True),
    StructField('course',StringType(),True),
    StructField('roll',StringType(),True),
    StructField('marks',IntegerType(),True),
    StructField('email',StringType(),True)
])
df = spark.read.options(header="True").schema(schema).csv("/FileStore/tables/studentDatat.csv")
df.show()
df.printSchema()

+---+------+-----------------+--------------+----+-----+--------------------+
|age|gender|             name|        course|roll|marks|               email|
+---+------+-----------------+--------------+----+-----+--------------------+
| 22|     F|        Kiana Lor|    Data Enggg| 101|  100|  KianaLor@gmail.com|
| 22|     M|   Joshua Lonaker|   Cloud Admin| 102|   99|JoshuaLonaker@gma...|
| 22|     F|    Dakota Blanco|        DevOps| 103|   65|DakotaBlanco@gmai...|
| 20|     F|  Natasha Yarusso|     FullStack| 104|   78|NatashaYarusso@gm...|
| 21|     F|   Brooke Cazares|         MLOps| 105|   98|BrookeCazares@gma...|
| 21|     F| Rochelle Johnson|data Scientist| 106|   54|RochelleJohnson@g...|
| 22|     M|       Joey Abreu|      FrontEnd| 107|   63| JoeyAbreu@gmail.com|
| 22|     M|   Preston Suarez|       Backend| 108|   48|PrestonSuarez@gma...|
| 24|     F|         Lee Dong|         UI/UX| 109|   87|   LeeDong@gmail.com|
| 22|     M|    Maa'iz al-Dia|           SRE| 110|   69|Maa'izal

In [0]:
#creating plane DataFrame from rdd
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("DftoRdd").getOrCreate()

from pyspark import SparkConf,SparkContext
# conf = SparkConf().setAppName("dataframeTordd")
# sc = SparkContext().getOrCreate(conf = conf)
rdd = sc.textFile("/FileStore/tables/studentDatat.csv")
headers = rdd.first()
rdd = rdd.filter(lambda x:x != headers).map(lambda x:x.split(","))

In [0]:
dfrdd = rdd.toDF()
dfrdd.show()
dfrdd.printSchema()

+---+-----+-----------------+--------------+---+---+--------------------+
| _1|   _2|               _3|            _4| _5| _6|                  _7|
+---+-----+-----------------+--------------+---+---+--------------------+
| 22|    F|        Kiana Lor|    Data Enggg|101|100|  KianaLor@gmail.com|
| 22|    M|   Joshua Lonaker|   Cloud Admin|102| 99|JoshuaLonaker@gma...|
| 22|    F|    Dakota Blanco|        DevOps|103| 65|DakotaBlanco@gmai...|
| 20|    F|  Natasha Yarusso|     FullStack|104| 78|NatashaYarusso@gm...|
| 21|    F|   Brooke Cazares|         MLOps|105| 98|BrookeCazares@gma...|
| 21|    F| Rochelle Johnson|data Scientist|106| 54|RochelleJohnson@g...|
| 22|    M|       Joey Abreu|      FrontEnd|107| 63| JoeyAbreu@gmail.com|
| 22|    M|   Preston Suarez|       Backend|108| 48|PrestonSuarez@gma...|
| 24|    F|         Lee Dong|         UI/UX|109| 87|   LeeDong@gmail.com|
| 22|    M|    Maa'iz al-Dia|           SRE|110| 69|Maa'izal-Dia@gmai...|
| 23|    F|   Maja Nicholson|  Network

In [0]:
#creating DataFrame with column names from rdd
from pyspark.sql.types import StructType, StructField, IntegerType,StringType
schema = StructType([
    StructField('age',IntegerType(),True),
    StructField('gender',StringType(),True),
    StructField('name',StringType(),True),
    StructField('course',StringType(),True),
    StructField('roll',StringType(),True),
    StructField('marks',IntegerType(),True),
    StructField('email',StringType(),True)
])

In [0]:
columns = headers.split(',')
dfrdd = rdd.toDF(columns)
dfrdd.show()

+---+------+-----------------+--------------+----+-----+--------------------+
|age|gender|             name|        course|roll|marks|               email|
+---+------+-----------------+--------------+----+-----+--------------------+
| 22|     F|        Kiana Lor|    Data Enggg| 101|  100|  KianaLor@gmail.com|
| 22|     M|   Joshua Lonaker|   Cloud Admin| 102|   99|JoshuaLonaker@gma...|
| 22|     F|    Dakota Blanco|        DevOps| 103|   65|DakotaBlanco@gmai...|
| 20|     F|  Natasha Yarusso|     FullStack| 104|   78|NatashaYarusso@gm...|
| 21|     F|   Brooke Cazares|         MLOps| 105|   98|BrookeCazares@gma...|
| 21|     F| Rochelle Johnson|data Scientist| 106|   54|RochelleJohnson@g...|
| 22|     M|       Joey Abreu|      FrontEnd| 107|   63| JoeyAbreu@gmail.com|
| 22|     M|   Preston Suarez|       Backend| 108|   48|PrestonSuarez@gma...|
| 24|     F|         Lee Dong|         UI/UX| 109|   87|   LeeDong@gmail.com|
| 22|     M|    Maa'iz al-Dia|           SRE| 110|   69|Maa'izal

In [0]:
#create dataframe with schema from rdd
df = spark.createDataFrame(rdd, schema = schema)
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- name: string (nullable = true)
 |-- course: string (nullable = true)
 |-- roll: string (nullable = true)
 |-- marks: integer (nullable = true)
 |-- email: string (nullable = true)



In [0]:
#diff ways of fetching columns from dataframe
df = spark.read.options(header='True',inferSchema='True').csv("/FileStore/tables/studentDatat.csv")
df.show()

+---+------+-----------------+--------------+----+-----+--------------------+
|age|gender|             name|        course|roll|marks|               email|
+---+------+-----------------+--------------+----+-----+--------------------+
| 22|     F|        Kiana Lor|    Data Enggg| 101|  100|  KianaLor@gmail.com|
| 22|     M|   Joshua Lonaker|   Cloud Admin| 102|   99|JoshuaLonaker@gma...|
| 22|     F|    Dakota Blanco|        DevOps| 103|   65|DakotaBlanco@gmai...|
| 20|     F|  Natasha Yarusso|     FullStack| 104|   78|NatashaYarusso@gm...|
| 21|     F|   Brooke Cazares|         MLOps| 105|   98|BrookeCazares@gma...|
| 21|     F| Rochelle Johnson|data Scientist| 106|   54|RochelleJohnson@g...|
| 22|     M|       Joey Abreu|      FrontEnd| 107|   63| JoeyAbreu@gmail.com|
| 22|     M|   Preston Suarez|       Backend| 108|   48|PrestonSuarez@gma...|
| 24|     F|         Lee Dong|         UI/UX| 109|   87|   LeeDong@gmail.com|
| 22|     M|    Maa'iz al-Dia|           SRE| 110|   69|Maa'izal

In [0]:
#access all coumns
df.select('*').show()

+---+------+-----------------+--------------+----+-----+--------------------+
|age|gender|             name|        course|roll|marks|               email|
+---+------+-----------------+--------------+----+-----+--------------------+
| 22|     F|        Kiana Lor|    Data Enggg| 101|  100|  KianaLor@gmail.com|
| 22|     M|   Joshua Lonaker|   Cloud Admin| 102|   99|JoshuaLonaker@gma...|
| 22|     F|    Dakota Blanco|        DevOps| 103|   65|DakotaBlanco@gmai...|
| 20|     F|  Natasha Yarusso|     FullStack| 104|   78|NatashaYarusso@gm...|
| 21|     F|   Brooke Cazares|         MLOps| 105|   98|BrookeCazares@gma...|
| 21|     F| Rochelle Johnson|data Scientist| 106|   54|RochelleJohnson@g...|
| 22|     M|       Joey Abreu|      FrontEnd| 107|   63| JoeyAbreu@gmail.com|
| 22|     M|   Preston Suarez|       Backend| 108|   48|PrestonSuarez@gma...|
| 24|     F|         Lee Dong|         UI/UX| 109|   87|   LeeDong@gmail.com|
| 22|     M|    Maa'iz al-Dia|           SRE| 110|   69|Maa'izal

In [0]:
#access few columns
df.select('age','gender','name').show()

+---+------+-----------------+
|age|gender|             name|
+---+------+-----------------+
| 22|     F|        Kiana Lor|
| 22|     M|   Joshua Lonaker|
| 22|     F|    Dakota Blanco|
| 20|     F|  Natasha Yarusso|
| 21|     F|   Brooke Cazares|
| 21|     F| Rochelle Johnson|
| 22|     M|       Joey Abreu|
| 22|     M|   Preston Suarez|
| 24|     F|         Lee Dong|
| 22|     M|    Maa'iz al-Dia|
| 23|     F|   Maja Nicholson|
| 21|     F|     Sasha Jansen|
| 20|     M|Alexander Sherman|
| 23|     M|    Edgar Sanchez|
| 21|     M|     Kolbi Strunk|
| 21|     F|    Brittany Sath|
| 21|     F|     Meggan Smith|
| 23| other|   Ericka Arreola|
| 24|     M|       David Pulc|
| 23|     M|      Kyle Luckey|
+---+------+-----------------+
only showing top 20 rows



In [0]:
df.select(df.name,df.age).show()

+-----------------+---+
|             name|age|
+-----------------+---+
|        Kiana Lor| 22|
|   Joshua Lonaker| 22|
|    Dakota Blanco| 22|
|  Natasha Yarusso| 20|
|   Brooke Cazares| 21|
| Rochelle Johnson| 21|
|       Joey Abreu| 22|
|   Preston Suarez| 22|
|         Lee Dong| 24|
|    Maa'iz al-Dia| 22|
|   Maja Nicholson| 23|
|     Sasha Jansen| 21|
|Alexander Sherman| 20|
|    Edgar Sanchez| 23|
|     Kolbi Strunk| 21|
|    Brittany Sath| 21|
|     Meggan Smith| 21|
|   Ericka Arreola| 23|
|       David Pulc| 24|
|      Kyle Luckey| 23|
+-----------------+---+
only showing top 20 rows



In [0]:
from pyspark.sql.functions import col
df.select(col('age'),col('name')).show()

+---+-----------------+
|age|             name|
+---+-----------------+
| 22|        Kiana Lor|
| 22|   Joshua Lonaker|
| 22|    Dakota Blanco|
| 20|  Natasha Yarusso|
| 21|   Brooke Cazares|
| 21| Rochelle Johnson|
| 22|       Joey Abreu|
| 22|   Preston Suarez|
| 24|         Lee Dong|
| 22|    Maa'iz al-Dia|
| 23|   Maja Nicholson|
| 21|     Sasha Jansen|
| 20|Alexander Sherman|
| 23|    Edgar Sanchez|
| 21|     Kolbi Strunk|
| 21|    Brittany Sath|
| 21|     Meggan Smith|
| 23|   Ericka Arreola|
| 24|       David Pulc|
| 23|      Kyle Luckey|
+---+-----------------+
only showing top 20 rows



In [0]:
df.columns

Out[15]: ['age', 'gender', 'name', 'course', 'roll', 'marks', 'email']

In [0]:
#access all columns in diff way
df.select(df.columns[3:6]).show()

+--------------+----+-----+
|        course|roll|marks|
+--------------+----+-----+
|    Data Enggg| 101|  100|
|   Cloud Admin| 102|   99|
|        DevOps| 103|   65|
|     FullStack| 104|   78|
|         MLOps| 105|   98|
|data Scientist| 106|   54|
|      FrontEnd| 107|   63|
|       Backend| 108|   48|
|         UI/UX| 109|   87|
|           SRE| 110|   69|
|  Network engg| 111|   57|
| Security Engg| 112|   99|
|    Data Enggg| 113|   65|
|   Cloud Admin| 114|   78|
|        DevOps| 115|   98|
|     FullStack| 116|   54|
|         MLOps| 117|   63|
|data Scientist| 118|   48|
|      FrontEnd| 119|   87|
|       Backend| 120|   69|
+--------------+----+-----+
only showing top 20 rows



In [0]:
df.select('name',df.age,col('gender')).show()

+-----------------+---+------+
|             name|age|gender|
+-----------------+---+------+
|        Kiana Lor| 22|     F|
|   Joshua Lonaker| 22|     M|
|    Dakota Blanco| 22|     F|
|  Natasha Yarusso| 20|     F|
|   Brooke Cazares| 21|     F|
| Rochelle Johnson| 21|     F|
|       Joey Abreu| 22|     M|
|   Preston Suarez| 22|     M|
|         Lee Dong| 24|     F|
|    Maa'iz al-Dia| 22|     M|
|   Maja Nicholson| 23|     F|
|     Sasha Jansen| 21|     F|
|Alexander Sherman| 20|     M|
|    Edgar Sanchez| 23|     M|
|     Kolbi Strunk| 21|     M|
|    Brittany Sath| 21|     F|
|     Meggan Smith| 21|     F|
|   Ericka Arreola| 23| other|
|       David Pulc| 24|     M|
|      Kyle Luckey| 23|     M|
+-----------------+---+------+
only showing top 20 rows



In [0]:
#withColumn(): this function used to manipulate column data plus chnage column data type
df = df.withColumn('roll',col('roll').cast('string'))
df.show()
df.printSchema()

+---+------+-----------------+--------------+----+-----+--------------------+
|age|gender|             name|        course|roll|marks|               email|
+---+------+-----------------+--------------+----+-----+--------------------+
| 22|     F|        Kiana Lor|    Data Enggg| 101|  100|  KianaLor@gmail.com|
| 22|     M|   Joshua Lonaker|   Cloud Admin| 102|   99|JoshuaLonaker@gma...|
| 22|     F|    Dakota Blanco|        DevOps| 103|   65|DakotaBlanco@gmai...|
| 20|     F|  Natasha Yarusso|     FullStack| 104|   78|NatashaYarusso@gm...|
| 21|     F|   Brooke Cazares|         MLOps| 105|   98|BrookeCazares@gma...|
| 21|     F| Rochelle Johnson|data Scientist| 106|   54|RochelleJohnson@g...|
| 22|     M|       Joey Abreu|      FrontEnd| 107|   63| JoeyAbreu@gmail.com|
| 22|     M|   Preston Suarez|       Backend| 108|   48|PrestonSuarez@gma...|
| 24|     F|         Lee Dong|         UI/UX| 109|   87|   LeeDong@gmail.com|
| 22|     M|    Maa'iz al-Dia|           SRE| 110|   69|Maa'izal

In [0]:
              #new col name,col(col to be manipulated)
df.withColumn('marks',col('marks') + 10).show()    

+---+------+-----------------+--------------+----+-----+--------------------+
|age|gender|             name|        course|roll|marks|               email|
+---+------+-----------------+--------------+----+-----+--------------------+
| 22|     F|        Kiana Lor|    Data Enggg| 101|  110|  KianaLor@gmail.com|
| 22|     M|   Joshua Lonaker|   Cloud Admin| 102|  109|JoshuaLonaker@gma...|
| 22|     F|    Dakota Blanco|        DevOps| 103|   75|DakotaBlanco@gmai...|
| 20|     F|  Natasha Yarusso|     FullStack| 104|   88|NatashaYarusso@gm...|
| 21|     F|   Brooke Cazares|         MLOps| 105|  108|BrookeCazares@gma...|
| 21|     F| Rochelle Johnson|data Scientist| 106|   64|RochelleJohnson@g...|
| 22|     M|       Joey Abreu|      FrontEnd| 107|   73| JoeyAbreu@gmail.com|
| 22|     M|   Preston Suarez|       Backend| 108|   58|PrestonSuarez@gma...|
| 24|     F|         Lee Dong|         UI/UX| 109|   97|   LeeDong@gmail.com|
| 22|     M|    Maa'iz al-Dia|           SRE| 110|   79|Maa'izal

In [0]:
from pyspark.sql.functions import lit
df1 = df.withColumn('Country',lit('India'))    #lit() function used to provide string to each row in provided column. If we give existing column then it will update existing column with new string values and if not then it will create new clumn with defauklt values provided in lit()
df1.show()

+---+------+-----------------+--------------+----+-----+--------------------+-------+
|age|gender|             name|        course|roll|marks|               email|Country|
+---+------+-----------------+--------------+----+-----+--------------------+-------+
| 22|     F|        Kiana Lor|    Data Enggg| 101|  100|  KianaLor@gmail.com|  India|
| 22|     M|   Joshua Lonaker|   Cloud Admin| 102|   99|JoshuaLonaker@gma...|  India|
| 22|     F|    Dakota Blanco|        DevOps| 103|   65|DakotaBlanco@gmai...|  India|
| 20|     F|  Natasha Yarusso|     FullStack| 104|   78|NatashaYarusso@gm...|  India|
| 21|     F|   Brooke Cazares|         MLOps| 105|   98|BrookeCazares@gma...|  India|
| 21|     F| Rochelle Johnson|data Scientist| 106|   54|RochelleJohnson@g...|  India|
| 22|     M|       Joey Abreu|      FrontEnd| 107|   63| JoeyAbreu@gmail.com|  India|
| 22|     M|   Preston Suarez|       Backend| 108|   48|PrestonSuarez@gma...|  India|
| 24|     F|         Lee Dong|         UI/UX| 109|   8

In [0]:
# renaming column permanently with withColumnRenamed() or use alias() to rename temp
df.withColumnRenamed('name','Full Name').show()

+---+------+-----------------+--------------+----+-----+--------------------+
|age|gender|        Full Name|        course|roll|marks|               email|
+---+------+-----------------+--------------+----+-----+--------------------+
| 22|     F|        Kiana Lor|    Data Enggg| 101|  100|  KianaLor@gmail.com|
| 22|     M|   Joshua Lonaker|   Cloud Admin| 102|   99|JoshuaLonaker@gma...|
| 22|     F|    Dakota Blanco|        DevOps| 103|   65|DakotaBlanco@gmai...|
| 20|     F|  Natasha Yarusso|     FullStack| 104|   78|NatashaYarusso@gm...|
| 21|     F|   Brooke Cazares|         MLOps| 105|   98|BrookeCazares@gma...|
| 21|     F| Rochelle Johnson|data Scientist| 106|   54|RochelleJohnson@g...|
| 22|     M|       Joey Abreu|      FrontEnd| 107|   63| JoeyAbreu@gmail.com|
| 22|     M|   Preston Suarez|       Backend| 108|   48|PrestonSuarez@gma...|
| 24|     F|         Lee Dong|         UI/UX| 109|   87|   LeeDong@gmail.com|
| 22|     M|    Maa'iz al-Dia|           SRE| 110|   69|Maa'izal

In [0]:
df.select(col('name').alias('Full Name')).show()

+-----------------+
|        Full Name|
+-----------------+
|        Kiana Lor|
|   Joshua Lonaker|
|    Dakota Blanco|
|  Natasha Yarusso|
|   Brooke Cazares|
| Rochelle Johnson|
|       Joey Abreu|
|   Preston Suarez|
|         Lee Dong|
|    Maa'iz al-Dia|
|   Maja Nicholson|
|     Sasha Jansen|
|Alexander Sherman|
|    Edgar Sanchez|
|     Kolbi Strunk|
|    Brittany Sath|
|     Meggan Smith|
|   Ericka Arreola|
|       David Pulc|
|      Kyle Luckey|
+-----------------+
only showing top 20 rows



In [0]:
#conditional functions: where or filter, isin(),startswith(),endswith(),contains(),like()
# df.filter(col('course') == 'DevOps').show()
df.filter(df.course.isin('DevOps','SRE')).show()
df.filter(df.course.startswith('D')).show()


+---+------+-------------------+------+----+-----+--------------------+
|age|gender|               name|course|roll|marks|               email|
+---+------+-------------------+------+----+-----+--------------------+
| 22|     F|      Dakota Blanco|DevOps| 103|   65|DakotaBlanco@gmai...|
| 22|     M|      Maa'iz al-Dia|   SRE| 110|   69|Maa'izal-Dia@gmai...|
| 21|     M|       Kolbi Strunk|DevOps| 115|   98|KolbiStrunk@gmail...|
| 20|     M|        David Weber|   SRE| 122|   55|DavidWeber@gmail.com|
| 22|     F|     Angela Harding|DevOps| 127|   88|AngelaHarding@gma...|
| 21|     M|         Sean Rozga|   SRE| 134|   48| SeanRozga@gmail.com|
| 24|     M|    Joshua Galloway|DevOps| 139|   44|JoshuaGallowa@gma...|
| 23|     F|    Britany Stevens|   SRE| 146|   45|BritanyStevens@gm...|
| 23|     M|         Issac Mata|DevOps| 151|   63| IssacMata@gmail.com|
| 22|     F|       Amanda Tatum|   SRE| 158|   78|AmandaTatum@gmail...|
| 23|     M|      Donald Nevins|DevOps| 163|   87|DonaldNevins@g

In [0]:
#assignment:
#1. create new column as total marks and letr the total marks be 120
#2. create new column as average to calculate the average marks of student (marks/toal marks)*100
#3. Filter out all those students who got >80% marks in DevOps course and save it as new data frame
#print the names and marks of all students from the above df.

In [0]:
#1. create new column as total marks and letr the total marks be 120
from pyspark.sql.functions import lit, col
tot_marksdf = df.withColumn('Total_Marks',lit(120))
tot_marksdf.show()

+---+------+-----------------+--------------+----+-----+--------------------+-----------+
|age|gender|             name|        course|roll|marks|               email|Total_Marks|
+---+------+-----------------+--------------+----+-----+--------------------+-----------+
| 22|     F|        Kiana Lor|    Data Enggg| 101|  100|  KianaLor@gmail.com|        120|
| 22|     M|   Joshua Lonaker|   Cloud Admin| 102|   99|JoshuaLonaker@gma...|        120|
| 22|     F|    Dakota Blanco|        DevOps| 103|   65|DakotaBlanco@gmai...|        120|
| 20|     F|  Natasha Yarusso|     FullStack| 104|   78|NatashaYarusso@gm...|        120|
| 21|     F|   Brooke Cazares|         MLOps| 105|   98|BrookeCazares@gma...|        120|
| 21|     F| Rochelle Johnson|data Scientist| 106|   54|RochelleJohnson@g...|        120|
| 22|     M|       Joey Abreu|      FrontEnd| 107|   63| JoeyAbreu@gmail.com|        120|
| 22|     M|   Preston Suarez|       Backend| 108|   48|PrestonSuarez@gma...|        120|
| 24|     

In [0]:
#2. create new column as average to calculate the average marks of student (marks/toal marks)*100
perdf = tot_marksdf.withColumn('Average',lit(col('marks')/col('Total_Marks'))*100)
perdf.show()

+---+------+-----------------+--------------+----+-----+--------------------+-----------+------------------+
|age|gender|             name|        course|roll|marks|               email|Total_Marks|           Average|
+---+------+-----------------+--------------+----+-----+--------------------+-----------+------------------+
| 22|     F|        Kiana Lor|    Data Enggg| 101|  100|  KianaLor@gmail.com|        120| 83.33333333333334|
| 22|     M|   Joshua Lonaker|   Cloud Admin| 102|   99|JoshuaLonaker@gma...|        120|              82.5|
| 22|     F|    Dakota Blanco|        DevOps| 103|   65|DakotaBlanco@gmai...|        120|54.166666666666664|
| 20|     F|  Natasha Yarusso|     FullStack| 104|   78|NatashaYarusso@gm...|        120|              65.0|
| 21|     F|   Brooke Cazares|         MLOps| 105|   98|BrookeCazares@gma...|        120| 81.66666666666667|
| 21|     F| Rochelle Johnson|data Scientist| 106|   54|RochelleJohnson@g...|        120|              45.0|
| 22|     M|       

In [0]:
#3. Filter out all those students who got >80% marks in DevOps course and save it as new data frame
eightydf = perdf.filter((col('course') == 'DevOps') & (col('Average') > 80))
eightydf.show()

+---+------+----------------+------+----+-----+--------------------+-----------+-----------------+
|age|gender|            name|course|roll|marks|               email|Total_Marks|          Average|
+---+------+----------------+------+----+-----+--------------------+-----------+-----------------+
| 21|     M|    Kolbi Strunk|DevOps| 115|   98|KolbiStrunk@gmail...|        120|81.66666666666667|
| 23|     M|Riley Mcloughlin|DevOps| 209|   99|RileyMcloughlin@g...|        120|             82.5|
+---+------+----------------+------+----+-----+--------------------+-----------+-----------------+



In [0]:
#count,distinct,dropDuplicates() - > it will drop all duplicate records 
df.count()

Out[28]: 143

In [0]:
df.select('gender').distinct().show()

+------+
|gender|
+------+
|     F|
|     M|
| other|
+------+



In [0]:
df.dropDuplicates(['gender','course']).show()

+---+------+-----------------+--------------+----+-----+--------------------+
|age|gender|             name|        course|roll|marks|               email|
+---+------+-----------------+--------------+----+-----+--------------------+
| 22|     F|   Lauren Klocker|       Backend| 132|   54|LaurenKlocker@gma...|
| 22|     F|   Samantha Hicks|   Cloud Admin| 126|   77|SamanthaHicks@gma...|
| 22|     F|        Kiana Lor|    Data Enggg| 101|  100|  KianaLor@gmail.com|
| 22|     F|    Dakota Blanco|        DevOps| 103|   65|DakotaBlanco@gmai...|
| 22|     F|     Dakota Wirth|      FrontEnd| 131|   37|DakotaWirth@gmail...|
| 20|     F|  Natasha Yarusso|     FullStack| 104|   78|NatashaYarusso@gm...|
| 21|     F|   Brooke Cazares|         MLOps| 105|   98|BrookeCazares@gma...|
| 23|     F|   Maja Nicholson|  Network engg| 111|   57|MajaNicholson@gma...|
| 23|     F|  Britany Stevens|           SRE| 146|   45|BritanyStevens@gm...|
| 21|     F|     Sasha Jansen| Security Engg| 112|   99|SashaJan

In [0]:
#write a code to show all unique rows of age,gender and course
df.dropDuplicates(['age','gender','course']).show()

+---+------+--------------------+--------------+----+-----+--------------------+
|age|gender|                name|        course|roll|marks|               email|
+---+------+--------------------+--------------+----+-----+--------------------+
| 19|     F|        Faviola Soto|     FullStack| 241|   54|FaviolaSoto@gmail...|
| 19|     F|        Shauna Sneed|         MLOps| 232|   63|ShaunaSneed@gmail...|
| 20|     F|       Katelyn Sharp|    Data Enggg| 161|   63|KatelynSharp@gmai...|
| 20|     F|     Natasha Yarusso|     FullStack| 104|   78|NatashaYarusso@gm...|
| 20|     F|      Lindsey Freund|         MLOps| 189|   48|LindseyFreund@gma...|
| 20|     F|      Savannah Clark|data Scientist| 190|   87|SavannahClark@gma...|
| 20|     M|   Alexander Sherman|    Data Enggg| 113|   65|AlexanderSherman@...|
| 20|     M|   Jason Hundsdorfer|        DevOps| 199|   48|JasonHundsdorfer@...|
| 20|     M|          James Rice|     FullStack| 222|   77| JamesRice@gmail.com|
| 20|     M|          Alan T

In [0]:
#sort() or orderBy(): used to sort the numbers in ascending or descending order.
#sort() function works only on number data type.
df.sort(df.marks.desc()).show()

+---+------+--------------------+-------------+----+-----+--------------------+
|age|gender|                name|       course|roll|marks|               email|
+---+------+--------------------+-------------+----+-----+--------------------+
| 22|     F|           Kiana Lor|   Data Enggg| 101|  100|  KianaLor@gmail.com|
| 22|     M|    Casey Vanden Bos|          SRE| 182|  100|CaseyVandenBos@gm...|
| 22|     M|      Joshua Lonaker|  Cloud Admin| 102|   99|JoshuaLonaker@gma...|
| 21|     F|        Sasha Jansen|Security Engg| 112|   99|SashaJansen@gmail...|
| 21|     M|     Brandon Barbour|    FullStack| 128|   99|BrandonBarbour@gm...|
| 22|     M|     Kenny Fukushima|      Backend| 144|   99|KennyFukushima@gm...|
| 23|     F|    Georgia Williams|      Backend| 156|   99|GeorgiaWilliams@g...|
| 21|     F|Sheyenne Delgado-...| Network engg| 171|   99|SheyenneDelgado-M...|
| 23|     F|          Staci Maes| Network engg| 183|   99| StaciMaes@gmail.com|
| 22|     F|    Airabella Koontz|       

In [0]:
df.orderBy('course').show()

+---+------+--------------------+-----------+----+-----+--------------------+
|age|gender|                name|     course|roll|marks|               email|
+---+------+--------------------+-----------+----+-----+--------------------+
| 22|     M|     Kenny Fukushima|    Backend| 144|   99|KennyFukushima@gm...|
| 22|     M|      Preston Suarez|    Backend| 108|   48|PrestonSuarez@gma...|
| 23|     M|         Kyle Luckey|    Backend| 120|   69|KyleLuckey@gmail.com|
| 22|     F|      Lauren Klocker|    Backend| 132|   54|LaurenKlocker@gma...|
| 23|     F|    Georgia Williams|    Backend| 156|   99|GeorgiaWilliams@g...|
| 23|     F|         Lindsey Job|    Backend| 168|   77|LindseyJob@gmail.com|
| 23|     F|        Macie Nguyen|    Backend| 180|   87|MacieNguyen@gmail...|
| 22|     F|      Daisha Schmidt|    Backend| 192|   57|DaishaSchmidt@gma...|
| 23|     M|Christian Zambran...|    Backend| 213|   54|ChristianZambrano...|
| 22|     F|        Marisa Ramey|    Backend| 218|   57|MarisaRa

In [0]:
#assignments: use OfficeData.csv to read 
#1. create a DF, sorted on bonus in ascending order and show it.
#2. Create a DF, sorted on age and salary in descending and ascending order respectively and show it.
#3. Create a DF, sorted on age,bonus and salary in descending, descending and ascending order respectively and show it.

In [0]:
df = spark.read.options(header='True',inferSchema='True').csv('/FileStore/tables/OfficeData.csv')
df.show()

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|        James|     Sales|   NY| 90000| 34|10000|
|      Michael|     Sales|   NY| 86000| 56|20000|
|       Robert|     Sales|   CA| 81000| 30|23000|
|        Maria|   Finance|   CA| 90000| 24|23000|
|        Raman|   Finance|   CA| 99000| 40|24000|
|        Scott|   Finance|   NY| 83000| 36|19000|
|          Jen|   Finance|   NY| 79000| 53|15000|
|         Jeff| Marketing|   CA| 80000| 25|18000|
|        Kumar| Marketing|   NY| 91000| 50|21000|
+-------------+----------+-----+------+---+-----+



In [0]:
#1. create a DF, sorted on bonus in ascending order and show it.
df1 = df.sort(df.bonus)
df1.show()

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|        James|     Sales|   NY| 90000| 34|10000|
|          Jen|   Finance|   NY| 79000| 53|15000|
|         Jeff| Marketing|   CA| 80000| 25|18000|
|        Scott|   Finance|   NY| 83000| 36|19000|
|      Michael|     Sales|   NY| 86000| 56|20000|
|        Kumar| Marketing|   NY| 91000| 50|21000|
|       Robert|     Sales|   CA| 81000| 30|23000|
|        Maria|   Finance|   CA| 90000| 24|23000|
|        Raman|   Finance|   CA| 99000| 40|24000|
+-------------+----------+-----+------+---+-----+



In [0]:
#2. Create a DF, sorted on age and salary in descending and ascending order respectively and show it.
df2 = df.orderBy((col('age').desc()),df.salary)
df2.show()

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|      Michael|     Sales|   NY| 86000| 56|20000|
|          Jen|   Finance|   NY| 79000| 53|15000|
|        Kumar| Marketing|   NY| 91000| 50|21000|
|        Raman|   Finance|   CA| 99000| 40|24000|
|        Scott|   Finance|   NY| 83000| 36|19000|
|        James|     Sales|   NY| 90000| 34|10000|
|       Robert|     Sales|   CA| 81000| 30|23000|
|         Jeff| Marketing|   CA| 80000| 25|18000|
|        Maria|   Finance|   CA| 90000| 24|23000|
+-------------+----------+-----+------+---+-----+



In [0]:
#3. Create a DF, sorted on age,bonus and salary in descending, descending and ascending order respectively and show it.
df3 = df.sort(df.age.desc(),df.bonus.desc(),df.salary)
df3.show()

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|      Michael|     Sales|   NY| 86000| 56|20000|
|          Jen|   Finance|   NY| 79000| 53|15000|
|        Kumar| Marketing|   NY| 91000| 50|21000|
|        Raman|   Finance|   CA| 99000| 40|24000|
|        Scott|   Finance|   NY| 83000| 36|19000|
|        James|     Sales|   NY| 90000| 34|10000|
|       Robert|     Sales|   CA| 81000| 30|23000|
|         Jeff| Marketing|   CA| 80000| 25|18000|
|        Maria|   Finance|   CA| 90000| 24|23000|
+-------------+----------+-----+------+---+-----+



In [0]:
#groupBy(): this function always works with aggregation function. 
df = spark.read.options(header='true',inferSchema='true').csv('/FileStore/tables/StudentData.csv')
df.groupBy('gender').avg('age').show()
df.groupBy('course').max('marks').show()
df.groupBy('course').min('marks').show()
df.groupBy('course').sum('marks').show()
df.groupBy('course').count().show()

+------+------------------+
|gender|          avg(age)|
+------+------------------+
|Female|28.489021956087825|
|  Male| 28.52304609218437|
+------+------------------+

+------+----------+
|course|max(marks)|
+------+----------+
|    PF|        99|
|    DB|        98|
|   MVC|        99|
|   DSA|        99|
| Cloud|        99|
|   OOP|        99|
+------+----------+

+------+----------+
|course|min(marks)|
+------+----------+
|    PF|        20|
|    DB|        20|
|   MVC|        22|
|   DSA|        20|
| Cloud|        20|
|   OOP|        20|
+------+----------+

+------+----------+
|course|sum(marks)|
+------+----------+
|    PF|      9933|
|    DB|      9270|
|   MVC|      9585|
|   DSA|     10950|
| Cloud|     11443|
|   OOP|      8916|
+------+----------+

+------+-----+
|course|count|
+------+-----+
|    PF|  166|
|    DB|  157|
|   MVC|  157|
|   DSA|  176|
| Cloud|  192|
|   OOP|  152|
+------+-----+



In [0]:
#groupBy with multiple columns and multiple aggregated functions
from pyspark.sql.functions import min,max,sum,count,avg,mean
df.groupBy('course','gender').agg(count('*').alias('Total_enrollment'),sum('marks').alias('Total_Marks'),min('marks').alias('Min Marks'),max('marks').alias('Max marks')).orderBy('course').show()

+------+------+----------------+-----------+---------+---------+
|course|gender|Total_enrollment|Total_Marks|Min Marks|Max marks|
+------+------+----------------+-----------+---------+---------+
| Cloud|Female|             106|       6316|       20|       99|
| Cloud|  Male|              86|       5127|       21|       97|
|    DB|  Male|              82|       5073|       20|       98|
|    DB|Female|              75|       4197|       20|       96|
|   DSA|Female|              98|       6124|       20|       99|
|   DSA|  Male|              78|       4826|       20|       99|
|   MVC|  Male|              86|       5241|       22|       99|
|   MVC|Female|              71|       4344|       22|       99|
|   OOP|  Male|              70|       4234|       20|       99|
|   OOP|Female|              82|       4682|       21|       99|
|    PF|  Male|              97|       5960|       20|       99|
|    PF|Female|              69|       3973|       20|       99|
+------+------+----------

In [0]:
#filter can be applied in 2 ways in groupBy(). 1st is before and 2nd is after groupBy(). But bth works the same way.
from pyspark.sql.functions import min,max,sum,count,avg,mean
df.groupBy('course','gender').agg(count('*').alias('Total_enrollment'),sum('marks').alias('Total_Marks'),min('marks').alias('Min Marks'),max('marks').alias('Max marks')).orderBy('course').where(col('gender') != 'Male').show()

+------+------+----------------+-----------+---------+---------+
|course|gender|Total_enrollment|Total_Marks|Min Marks|Max marks|
+------+------+----------------+-----------+---------+---------+
| Cloud|Female|             106|       6316|       20|       99|
|    DB|Female|              75|       4197|       20|       96|
|   DSA|Female|              98|       6124|       20|       99|
|   MVC|Female|              71|       4344|       22|       99|
|   OOP|Female|              82|       4682|       21|       99|
|    PF|Female|              69|       3973|       20|       99|
+------+------+----------------+-----------+---------+---------+



In [0]:
#assignment: use Studentdata.csv
#1. Display the total number of students enrolled in each course
#2. Display the total number of male and females enrolled in each course
#3. Display the total marks achived by each gender in each course
#4. Display the min,max,avg marks achived in each course by same age group

In [0]:
#1. Display the total number of students enrolled in each course
df.groupBy('course').count().show()

+------+-----+
|course|count|
+------+-----+
|    PF|  166|
|    DB|  157|
|   MVC|  157|
|   DSA|  176|
| Cloud|  192|
|   OOP|  152|
+------+-----+



In [0]:
#2. Display the total number of male and females enrolled in each course
df.groupBy('course','gender').count().show()

+------+------+-----+
|course|gender|count|
+------+------+-----+
|   OOP|  Male|   70|
|    DB|  Male|   82|
| Cloud|Female|  106|
|   MVC|  Male|   86|
|   DSA|Female|   98|
|    PF|  Male|   97|
|   MVC|Female|   71|
| Cloud|  Male|   86|
|    PF|Female|   69|
|   DSA|  Male|   78|
|    DB|Female|   75|
|   OOP|Female|   82|
+------+------+-----+



In [0]:
#3. Display the total marks achived by each gender in each course
df.groupBy('course','gender').sum('marks').show()

+------+------+----------+
|course|gender|sum(marks)|
+------+------+----------+
|   OOP|  Male|      4234|
|    DB|  Male|      5073|
| Cloud|Female|      6316|
|   MVC|  Male|      5241|
|   DSA|Female|      6124|
|    PF|  Male|      5960|
|   MVC|Female|      4344|
| Cloud|  Male|      5127|
|    PF|Female|      3973|
|   DSA|  Male|      4826|
|    DB|Female|      4197|
|   OOP|Female|      4682|
+------+------+----------+



In [0]:
#4. Display the min,max,avg marks achived in each course by same age group
df.groupBy('course','age').agg(min('marks').alias('Min_Marks'),max('marks').alias('Max Marks'),avg('marks').alias('Average Marks')).show()

+------+---+---------+---------+------------------+
|course|age|Min_Marks|Max Marks|     Average Marks|
+------+---+---------+---------+------------------+
|   MVC| 28|       23|       99| 60.44444444444444|
|   MVC| 29|       22|       99| 61.56470588235294|
| Cloud| 28|       20|       99|             58.08|
|    PF| 29|       20|       99|56.275862068965516|
|    PF| 28|       20|       98| 63.75949367088607|
|   OOP| 29|       20|       99|59.729729729729726|
|   DSA| 28|       20|       99|  64.6867469879518|
| Cloud| 29|       21|       98|             61.25|
|    DB| 28|       21|       98| 58.76829268292683|
|   DSA| 29|       20|       99| 60.01075268817204|
|   OOP| 28|       23|       99| 57.64102564102564|
|    DB| 29|       20|       98|59.346666666666664|
+------+---+---------+---------+------------------+



In [0]:
#assignment2: read worddata.text
#1. calculate and show the count of each word in file
from pyspark.sql.types import StructField,StructType,StringType
schema = StructType([
    StructField('words',StringType(),True)
])
df = spark.read.options(delimiter='\n').schema(schema=schema).csv('/FileStore/tables/WordData.txt')
df.groupBy('words').count().show()

+------+-----+
| words|count|
+------+-----+
|   Mic|   10|
| Chair|   15|
|  Book|    5|
|Laptop|    5|
|   Bag|    5|
|Mobile|    5|
| Apple|   10|
+------+-----+



In [0]:
#UDF: user defined functions: user programmable routine act on row.
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
df = spark.read.options(header='True',inferSchema='True').csv('/FileStore/tables/OfficeData.csv')
df.show()

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|        James|     Sales|   NY| 90000| 34|10000|
|      Michael|     Sales|   NY| 86000| 56|20000|
|       Robert|     Sales|   CA| 81000| 30|23000|
|        Maria|   Finance|   CA| 90000| 24|23000|
|        Raman|   Finance|   CA| 99000| 40|24000|
|        Scott|   Finance|   NY| 83000| 36|19000|
|          Jen|   Finance|   NY| 79000| 53|15000|
|         Jeff| Marketing|   CA| 80000| 25|18000|
|        Kumar| Marketing|   NY| 91000| 50|21000|
+-------------+----------+-----+------+---+-----+



In [0]:
#add new column as Total_salary using UDF
def total_sal(salary,bonus):
    return salary + bonus

total_salaryUDF = udf(lambda x,y:total_sal(x,y),IntegerType())
df.withColumn('Total_Salary',total_salaryUDF(df.salary,df.bonus)).show()
# df1.printSchema()
# df2 = df.withColumn('Total_Salary',total_sal(df.salary,df.bonus))


+-------------+----------+-----+------+---+-----+------------+
|employee_name|department|state|salary|age|bonus|Total_Salary|
+-------------+----------+-----+------+---+-----+------------+
|        James|     Sales|   NY| 90000| 34|10000|      100000|
|      Michael|     Sales|   NY| 86000| 56|20000|      106000|
|       Robert|     Sales|   CA| 81000| 30|23000|      104000|
|        Maria|   Finance|   CA| 90000| 24|23000|      113000|
|        Raman|   Finance|   CA| 99000| 40|24000|      123000|
|        Scott|   Finance|   NY| 83000| 36|19000|      102000|
|          Jen|   Finance|   NY| 79000| 53|15000|       94000|
|         Jeff| Marketing|   CA| 80000| 25|18000|       98000|
|        Kumar| Marketing|   NY| 91000| 50|21000|      112000|
+-------------+----------+-----+------+---+-----+------------+



In [0]:
#create new column Increment and provide increment to the emp on following criteria
#1. If the emp is in NY state, his increment would be 10% of salary and 5% of bonus
#2. If emp is in CA state, his increment would be 12% of salary and 3% of bonus
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import udf,lit
def incSalary(state,salary,bonus):
    if state == 'NY':
        return ((salary*0.1)+(bonus*0.05))
#     elif state == 'CA':
        return ((salary*0.12) + (bonus*0.03))

incSalUDF = udf(lambda x,y,z:incSalary(x,y,z),DoubleType())
df1 = df.withColumn('Increment',incSalUDF(df.state,df.salary,df.bonus))
df1.show()
# df.withColumn('Increment',lit(incSalary(df.state,df.salary,df.bonus))).show()

+-------------+----------+-----+------+---+-----+---------+
|employee_name|department|state|salary|age|bonus|Increment|
+-------------+----------+-----+------+---+-----+---------+
|        James|     Sales|   NY| 90000| 34|10000|   9500.0|
|      Michael|     Sales|   NY| 86000| 56|20000|   9600.0|
|       Robert|     Sales|   CA| 81000| 30|23000|     null|
|        Maria|   Finance|   CA| 90000| 24|23000|     null|
|        Raman|   Finance|   CA| 99000| 40|24000|     null|
|        Scott|   Finance|   NY| 83000| 36|19000|   9250.0|
|          Jen|   Finance|   NY| 79000| 53|15000|   8650.0|
|         Jeff| Marketing|   CA| 80000| 25|18000|     null|
|        Kumar| Marketing|   NY| 91000| 50|21000|  10150.0|
+-------------+----------+-----+------+---+-----+---------+



In [0]:
#cache():
#usually when multiple actions happen (say one after the other) all actions execute the transformations from initial which will eat time.
# so to optimize it we can execute the cache() after transformations so that when multiple actions execute, action will fetch data from cache #instead of executing all transformations from intial.
df.cache()

Out[72]: DataFrame[employee_name: string, department: string, state: string, salary: int, age: int, bonus: int]

In [0]:
df.show()

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|        James|     Sales|   NY| 90000| 34|10000|
|      Michael|     Sales|   NY| 86000| 56|20000|
|       Robert|     Sales|   CA| 81000| 30|23000|
|        Maria|   Finance|   CA| 90000| 24|23000|
|        Raman|   Finance|   CA| 99000| 40|24000|
|        Scott|   Finance|   NY| 83000| 36|19000|
|          Jen|   Finance|   NY| 79000| 53|15000|
|         Jeff| Marketing|   CA| 80000| 25|18000|
|        Kumar| Marketing|   NY| 91000| 50|21000|
+-------------+----------+-----+------+---+-----+



In [0]:
type(df)

Out[74]: pyspark.sql.dataframe.DataFrame

In [0]:
#converting DF to RDD
rdd = df.rdd
type(rdd)

Out[75]: pyspark.rdd.RDD

In [0]:
rdd.collect()

Out[76]: [Row(employee_name='James', department='Sales', state='NY', salary=90000, age=34, bonus=10000),
 Row(employee_name='Michael', department='Sales', state='NY', salary=86000, age=56, bonus=20000),
 Row(employee_name='Robert', department='Sales', state='CA', salary=81000, age=30, bonus=23000),
 Row(employee_name='Maria', department='Finance', state='CA', salary=90000, age=24, bonus=23000),
 Row(employee_name='Raman', department='Finance', state='CA', salary=99000, age=40, bonus=24000),
 Row(employee_name='Scott', department='Finance', state='NY', salary=83000, age=36, bonus=19000),
 Row(employee_name='Jen', department='Finance', state='NY', salary=79000, age=53, bonus=15000),
 Row(employee_name='Jeff', department='Marketing', state='CA', salary=80000, age=25, bonus=18000),
 Row(employee_name='Kumar', department='Marketing', state='NY', salary=91000, age=50, bonus=21000)]

In [0]:
rdd.filter(lambda x: x[1] == 'Sales').collect()

Out[77]: [Row(employee_name='James', department='Sales', state='NY', salary=90000, age=34, bonus=10000),
 Row(employee_name='Michael', department='Sales', state='NY', salary=86000, age=56, bonus=20000),
 Row(employee_name='Robert', department='Sales', state='CA', salary=81000, age=30, bonus=23000)]

In [0]:
rdd.filter(lambda x: x['age'] == 34).collect()

Out[78]: [Row(employee_name='James', department='Sales', state='NY', salary=90000, age=34, bonus=10000)]

In [0]:
#Spak SQL
df.createOrReplaceTempView('Student')  # create table with student name

In [0]:
spark.sql("select * from student").show()   #--> spark sql query will return as data frame 
# this is equivalent to 
df.select('*').show()

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|        James|     Sales|   NY| 90000| 34|10000|
|      Michael|     Sales|   NY| 86000| 56|20000|
|       Robert|     Sales|   CA| 81000| 30|23000|
|        Maria|   Finance|   CA| 90000| 24|23000|
|        Raman|   Finance|   CA| 99000| 40|24000|
|        Scott|   Finance|   NY| 83000| 36|19000|
|          Jen|   Finance|   NY| 79000| 53|15000|
|         Jeff| Marketing|   CA| 80000| 25|18000|
|        Kumar| Marketing|   NY| 91000| 50|21000|
+-------------+----------+-----+------+---+-----+

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|        James|     Sales|   NY| 90000| 34|10000|
|      Michael|     Sales|   NY| 86000| 56|20000|
|       Robert|     Sales|   CA| 81000| 30|23000|
|        Maria|   Finance|   CA| 90000| 24|23000|

In [0]:
#Writing a data frame (can use same read command) + while writing df, we need to provide only folder location because spark will write df in partitions  
df1.write.options(header='true').csv('/FileStore/tables/officedata/')

In [0]:
%fs ls /FileStore/tables/officedata/

path,name,size,modificationTime
dbfs:/FileStore/tables/officedata/_SUCCESS,_SUCCESS,0,1686302566000
dbfs:/FileStore/tables/officedata/_committed_8238028158737769323,_committed_8238028158737769323,112,1686302566000
dbfs:/FileStore/tables/officedata/_started_8238028158737769323,_started_8238028158737769323,0,1686302565000
dbfs:/FileStore/tables/officedata/part-00000-tid-8238028158737769323-e9d1d1b6-20f8-4893-bea3-cba05df8ecd1-91-1-c000.csv,part-00000-tid-8238028158737769323-e9d1d1b6-20f8-4893-bea3-cba05df8ecd1-91-1-c000.csv,384,1686302565000


In [0]:
df2 = spark.read.options(header='true').csv('/FileStore/tables/officedata/')
df2.show()

+-------------+----------+-----+------+---+-----+---------+
|employee_name|department|state|salary|age|bonus|Increment|
+-------------+----------+-----+------+---+-----+---------+
|        James|     Sales|   NY| 90000| 34|10000|   9500.0|
|      Michael|     Sales|   NY| 86000| 56|20000|   9600.0|
|       Robert|     Sales|   CA| 81000| 30|23000|     null|
|        Maria|   Finance|   CA| 90000| 24|23000|     null|
|        Raman|   Finance|   CA| 99000| 40|24000|     null|
|        Scott|   Finance|   NY| 83000| 36|19000|   9250.0|
|          Jen|   Finance|   NY| 79000| 53|15000|   8650.0|
|         Jeff| Marketing|   CA| 80000| 25|18000|     null|
|        Kumar| Marketing|   NY| 91000| 50|21000|  10150.0|
+-------------+----------+-----+------+---+-----+---------+

