In [1]:
import pyspark

In [2]:
import pandas as  pd

In [3]:
df = pd.read_csv("test1.csv")
df.head()

Unnamed: 0,name,age
0,sai,19
1,kumar,18
2,krish,29


##### Read the DataFrame in Pyspark

In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('practise').getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/12/25 22:07:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/12/25 22:07:22 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
22/12/25 22:07:22 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [5]:
spark

In [6]:
df_spark = spark.read.csv("test1.csv")
df_spark.show()

                                                                                

+-----+---+
|  _c0|_c1|
+-----+---+
| name|age|
|  sai| 19|
|kumar| 18|
|krish| 29|
+-----+---+



In [7]:
#we want name and age as column name
df_spark = spark.read.option('header','true').csv('test1.csv')

In [8]:
df_spark

DataFrame[name: string, age: string]

In [9]:
type(df_spark)

pyspark.sql.dataframe.DataFrame

#### Check the datatypes of columns

In [10]:
df_spark.printSchema() 
# here,we get age as string data type
# it consider default all the variables as string

root
 |-- name: string (nullable = true)
 |-- age: string (nullable = true)



In [11]:
df_spark = spark.read.option('header','true').csv('test1.csv',inferSchema =True) 

In [12]:
df_spark.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)



In [13]:
df_spark = spark.read.csv('test1.csv',header=True,inferSchema= True) # another way
df_spark.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)



In [14]:
df_spark.dtypes

[('name', 'string'), ('age', 'int')]

In [15]:
df_spark.head(2)

[Row(name='sai', age=19), Row(name='kumar', age=18)]

#####  Select the specified columns

In [16]:
df_spark.select('name').show()

+-----+
| name|
+-----+
|  sai|
|kumar|
|krish|
+-----+



In [17]:
df_spark.select(['name','age']).show()

+-----+---+
| name|age|
+-----+---+
|  sai| 19|
|kumar| 18|
|krish| 29|
+-----+---+



In [18]:
df_spark.describe().show()
#mean,and stddev are null ,because it consider categorical variables also

+-------+-----+-----------------+
|summary| name|              age|
+-------+-----+-----------------+
|  count|    3|                3|
|   mean| null|             22.0|
| stddev| null|6.082762530298219|
|    min|krish|               18|
|    max|  sai|               29|
+-------+-----+-----------------+



#### Adding Columns in dataframe

In [19]:
df_spark = df_spark.withColumn('age after 2 years',df_spark["age"]+2)

In [20]:
df_spark.show()

+-----+---+-----------------+
| name|age|age after 2 years|
+-----+---+-----------------+
|  sai| 19|               21|
|kumar| 18|               20|
|krish| 29|               31|
+-----+---+-----------------+



#### Drop the columns

In [21]:
df_spark = df_spark.drop('age after 2 years')

In [22]:
df_spark.show()

+-----+---+
| name|age|
+-----+---+
|  sai| 19|
|kumar| 18|
|krish| 29|
+-----+---+



##### Rename the column

In [23]:
df_spark = df_spark.withColumnRenamed('name','New name')

In [24]:
df_spark.show()

+--------+---+
|New name|age|
+--------+---+
|     sai| 19|
|   kumar| 18|
|   krish| 29|
+--------+---+



#### Handling Missing Value

In [25]:
df_spark2 = spark.read.csv('test2.csv',header=True,inferSchema=True)

In [26]:
df_spark2.show()

+------+----+----------+------+
|  name| age|experience|salary|
+------+----+----------+------+
| sunny|  31|         4| 30000|
| krish|  30|         3| 40000|
|  paul|  24|         2| 50000|
|harsha|null|         4| 45000|
|mahesh|  21|      null|  null|
|  null|  23|         2| 30000|
+------+----+----------+------+



In [27]:
# drop the cloumns
df_spark2.drop('name').show()

+----+----------+------+
| age|experience|salary|
+----+----------+------+
|  31|         4| 30000|
|  30|         3| 40000|
|  24|         2| 50000|
|null|         4| 45000|
|  21|      null|  null|
|  23|         2| 30000|
+----+----------+------+



In [28]:
df_spark2.show()

+------+----+----------+------+
|  name| age|experience|salary|
+------+----+----------+------+
| sunny|  31|         4| 30000|
| krish|  30|         3| 40000|
|  paul|  24|         2| 50000|
|harsha|null|         4| 45000|
|mahesh|  21|      null|  null|
|  null|  23|         2| 30000|
+------+----+----------+------+



In [29]:
df_spark2.na.drop().show()
# wherever the null values are present,that rows are dropped

+-----+---+----------+------+
| name|age|experience|salary|
+-----+---+----------+------+
|sunny| 31|         4| 30000|
|krish| 30|         3| 40000|
| paul| 24|         2| 50000|
+-----+---+----------+------+



In [30]:
# how = null -> drop the row which has all null values
# how = any(defualt) -> drop the row if one column has null value
# how = any ,thresh = 2 -> atleast 2 non null values present
df_spark2.na.drop(how = 'any',thresh= 3 ).show()

+------+----+----------+------+
|  name| age|experience|salary|
+------+----+----------+------+
| sunny|  31|         4| 30000|
| krish|  30|         3| 40000|
|  paul|  24|         2| 50000|
|harsha|null|         4| 45000|
|  null|  23|         2| 30000|
+------+----+----------+------+



In [31]:
# subset -> wherever the column values are null that rows will be dropped
df_spark2.na.drop(how = 'any',subset= ['age']).show() 

+------+---+----------+------+
|  name|age|experience|salary|
+------+---+----------+------+
| sunny| 31|         4| 30000|
| krish| 30|         3| 40000|
|  paul| 24|         2| 50000|
|mahesh| 21|      null|  null|
|  null| 23|         2| 30000|
+------+---+----------+------+



#### Filling missing values

In [32]:
df_spark2.na.fill("missing value").show()

+-------------+----+----------+------+
|         name| age|experience|salary|
+-------------+----+----------+------+
|        sunny|  31|         4| 30000|
|        krish|  30|         3| 40000|
|         paul|  24|         2| 50000|
|       harsha|null|         4| 45000|
|       mahesh|  21|      null|  null|
|missing value|  23|         2| 30000|
+-------------+----+----------+------+



In [33]:
from pyspark.ml.feature import Imputer 
imputer = Imputer(
    inputCols = ['age','experience','salary'],
    outputCols = ["{}_imputed".format(c) for c in ['age','experience','salary']]
).setStrategy('mean')

In [34]:
df_imputed = imputer.fit(df_spark2).transform(df_spark2)

In [35]:
df_imputed.show()

+------+----+----------+------+-----------+------------------+--------------+
|  name| age|experience|salary|age_imputed|experience_imputed|salary_imputed|
+------+----+----------+------+-----------+------------------+--------------+
| sunny|  31|         4| 30000|         31|                 4|         30000|
| krish|  30|         3| 40000|         30|                 3|         40000|
|  paul|  24|         2| 50000|         24|                 2|         50000|
|harsha|null|         4| 45000|         25|                 4|         45000|
|mahesh|  21|      null|  null|         21|                 3|         39000|
|  null|  23|         2| 30000|         23|                 2|         30000|
+------+----+----------+------+-----------+------------------+--------------+



In [36]:
df_nonNull = df_imputed.drop(*('age','experience','salary'))


In [37]:
df_nonNull = df_nonNull.na.drop(how="any")

In [38]:
df_nonNull.show()

+------+-----------+------------------+--------------+
|  name|age_imputed|experience_imputed|salary_imputed|
+------+-----------+------------------+--------------+
| sunny|         31|                 4|         30000|
| krish|         30|                 3|         40000|
|  paul|         24|                 2|         50000|
|harsha|         25|                 4|         45000|
|mahesh|         21|                 3|         39000|
+------+-----------+------------------+--------------+



In [39]:

df_nonNull = df_nonNull.toDF(*("name","age","experience","salary")) # to rename mutliple columns


In [40]:
df_nonNull.show()

+------+---+----------+------+
|  name|age|experience|salary|
+------+---+----------+------+
| sunny| 31|         4| 30000|
| krish| 30|         3| 40000|
|  paul| 24|         2| 50000|
|harsha| 25|         4| 45000|
|mahesh| 21|         3| 39000|
+------+---+----------+------+



#### Filter operation

In [41]:
# salary of the people less than or equal to 40000
df_nonNull.filter("salary<=40000").show()

+------+---+----------+------+
|  name|age|experience|salary|
+------+---+----------+------+
| sunny| 31|         4| 30000|
| krish| 30|         3| 40000|
|mahesh| 21|         3| 39000|
+------+---+----------+------+



In [42]:
df_nonNull.filter("salary<=40000").select(['name','age']).show()

+------+---+
|  name|age|
+------+---+
| sunny| 31|
| krish| 30|
|mahesh| 21|
+------+---+



In [43]:
df_nonNull.filter((df_nonNull['salary']<50000 ) & (df_nonNull['salary']>30000)).show()

+------+---+----------+------+
|  name|age|experience|salary|
+------+---+----------+------+
| krish| 30|         3| 40000|
|harsha| 25|         4| 45000|
|mahesh| 21|         3| 39000|
+------+---+----------+------+



In [44]:
df_nonNull.filter(~(df_nonNull['salary']>30000)).show()

+-----+---+----------+------+
| name|age|experience|salary|
+-----+---+----------+------+
|sunny| 31|         4| 30000|
+-----+---+----------+------+



#### Group by and Aggregation

In [45]:
#Groupby and Aggregation function works together
# so,first we need to apply groupby function and then we need to apply aggrgation function
df_nonNull.groupBy('name').sum('salary').show()
# Grouped to find the maximum salary

+------+-----------+
|  name|sum(salary)|
+------+-----------+
|  paul|      50000|
| sunny|      30000|
| krish|      40000|
|mahesh|      39000|
|harsha|      45000|
+------+-----------+



In [46]:
df_nonNull.groupBy('name').count().show()

+------+-----+
|  name|count|
+------+-----+
|  paul|    1|
| sunny|    1|
| krish|    1|
|mahesh|    1|
|harsha|    1|
+------+-----+



In [47]:
df_nonNull.agg({'salary':'sum'}).show()

+-----------+
|sum(salary)|
+-----------+
|     204000|
+-----------+

