# PySpark Dataframes
1) filter Operation
2) &,|,==
3) ~

In [59]:
from pyspark.sql import SparkSession

In [60]:
spark=SparkSession.builder.appName('dataframe').getOrCreate()

In [61]:
df_pyspark=spark.read.csv('test2.csv',header=True,inferSchema=True)
df_pyspark.show()

+-------+----+----------+-------+
|   Name| Age|Experience| Salary|
+-------+----+----------+-------+
|Sarthak|  21|         0|      0|
|  Lalit|  22|         1| 800000|
|   Amit|  21|         2|1000000|
|Ajinkya|  23|         3| 600000|
| Deepak|  20|         1| 400000|
| Mahesh|NULL|      NULL| 300000|
|   NULL|  19|         0|      0|
|   NULL|  21|      NULL|   NULL|
+-------+----+----------+-------+



In [62]:
from pyspark.ml.feature import Imputer

imputer=Imputer(
    inputCols=['Age','Experience','Salary'],
    outputCols=["{}_imputed".format(c) for c in ['Age',"Experience","Salary"]]
    ).setStrategy("mean")

In [63]:
imputer.fit(df_pyspark).transform(df_pyspark).show()

+-------+----+----------+-------+-----------+------------------+--------------+
|   Name| Age|Experience| Salary|Age_imputed|Experience_imputed|Salary_imputed|
+-------+----+----------+-------+-----------+------------------+--------------+
|Sarthak|  21|         0|      0|         21|                 0|             0|
|  Lalit|  22|         1| 800000|         22|                 1|        800000|
|   Amit|  21|         2|1000000|         21|                 2|       1000000|
|Ajinkya|  23|         3| 600000|         23|                 3|        600000|
| Deepak|  20|         1| 400000|         20|                 1|        400000|
| Mahesh|NULL|      NULL| 300000|         21|                 1|        300000|
|   NULL|  19|         0|      0|         19|                 0|             0|
|   NULL|  21|      NULL|   NULL|         21|                 1|        442857|
+-------+----+----------+-------+-----------+------------------+--------------+



In [64]:
df_pyspark.show()


+-------+----+----------+-------+
|   Name| Age|Experience| Salary|
+-------+----+----------+-------+
|Sarthak|  21|         0|      0|
|  Lalit|  22|         1| 800000|
|   Amit|  21|         2|1000000|
|Ajinkya|  23|         3| 600000|
| Deepak|  20|         1| 400000|
| Mahesh|NULL|      NULL| 300000|
|   NULL|  19|         0|      0|
|   NULL|  21|      NULL|   NULL|
+-------+----+----------+-------+



In [65]:
from pyspark.ml.feature import Imputer

# Define Imputer
imputer = Imputer(
    inputCols=['Age', 'Experience', 'Salary'],
    outputCols=['Age', 'Experience', 'Salary']
).setStrategy("mean")

# Fit and transform the DataFrame
df_pyspark = imputer.fit(df_pyspark).transform(df_pyspark)

# Show the updated DataFrame
df_pyspark.show()


+-------+---+----------+-------+
|   Name|Age|Experience| Salary|
+-------+---+----------+-------+
|Sarthak| 21|         0|      0|
|  Lalit| 22|         1| 800000|
|   Amit| 21|         2|1000000|
|Ajinkya| 23|         3| 600000|
| Deepak| 20|         1| 400000|
| Mahesh| 21|         1| 300000|
|   NULL| 19|         0|      0|
|   NULL| 21|         1| 442857|
+-------+---+----------+-------+



In [69]:
from pyspark.ml.feature import Imputer
from pyspark.sql.functions import mean as spark_mean, when, col

# Calculate the mean values for 'Age', 'Experience', and 'salary' columns
mean_values = df_pyspark.select([spark_mean(col(col_name)).alias(col_name) for col_name in ['Age', 'Experience', 'Salary']]).collect()[0]

# Extract the mean values
age_mean = mean_values['Age']
exp_mean = mean_values['Experience']
salary_mean = mean_values['Salary']

# Replace 0 values with the mean of each column
df_pyspark = df_pyspark.withColumn('Age', when(df_pyspark['Age'] == 0, age_mean).otherwise(df_pyspark['Age']))
df_pyspark = df_pyspark.withColumn('Experience', when(df_pyspark['Experience'] == 0, exp_mean).otherwise(df_pyspark['Experience']))
df_pyspark = df_pyspark.withColumn('Salary', when(df_pyspark['Salary'] == 0, salary_mean).otherwise(df_pyspark['Salary']))

# Show the updated DataFrame
df_pyspark.show()


+-------+----+----------+----------+
|   Name| Age|Experience|    Salary|
+-------+----+----------+----------+
|Sarthak|21.0|     1.125|442857.125|
|  Lalit|22.0|       1.0|  800000.0|
|   Amit|21.0|       2.0| 1000000.0|
|Ajinkya|23.0|       3.0|  600000.0|
| Deepak|20.0|       1.0|  400000.0|
| Mahesh|21.0|       1.0|  300000.0|
|   NULL|19.0|     1.125|442857.125|
|   NULL|21.0|       1.0|  442857.0|
+-------+----+----------+----------+



In [70]:
df_pyspark.filter("Salary<=400000").show()

+------+----+----------+--------+
|  Name| Age|Experience|  Salary|
+------+----+----------+--------+
|Deepak|20.0|       1.0|400000.0|
|Mahesh|21.0|       1.0|300000.0|
+------+----+----------+--------+



In [71]:
from pyspark.sql.functions import when

# Replace NULL values in 'Name' column based on conditions
df_pyspark = df_pyspark.withColumn('Name', 
                                    when((df_pyspark['Age'] == 19) & (df_pyspark['Name'].isNull()), 'Saurabh')
                                    .when((df_pyspark['Age'] == 21) & (df_pyspark['Name'].isNull()), 'Prathamesh')
                                    .otherwise(df_pyspark['Name']))

# Show the updated DataFrame
df_pyspark.show()


+----------+----+----------+----------+
|      Name| Age|Experience|    Salary|
+----------+----+----------+----------+
|   Sarthak|21.0|     1.125|442857.125|
|     Lalit|22.0|       1.0|  800000.0|
|      Amit|21.0|       2.0| 1000000.0|
|   Ajinkya|23.0|       3.0|  600000.0|
|    Deepak|20.0|       1.0|  400000.0|
|    Mahesh|21.0|       1.0|  300000.0|
|   Saurabh|19.0|     1.125|442857.125|
|Prathamesh|21.0|       1.0|  442857.0|
+----------+----+----------+----------+



In [72]:
df_pyspark.filter("Salary<=400000").select(['Name','Age']).show()

+------+----+
|  Name| Age|
+------+----+
|Deepak|20.0|
|Mahesh|21.0|
+------+----+



In [73]:
df_pyspark.filter((df_pyspark['Salary']<=500000) & (df_pyspark['Salary']>=300000)).show()

+----------+----+----------+----------+
|      Name| Age|Experience|    Salary|
+----------+----+----------+----------+
|   Sarthak|21.0|     1.125|442857.125|
|    Deepak|20.0|       1.0|  400000.0|
|    Mahesh|21.0|       1.0|  300000.0|
|   Saurabh|19.0|     1.125|442857.125|
|Prathamesh|21.0|       1.0|  442857.0|
+----------+----+----------+----------+



In [74]:
df_pyspark.filter(~(df_pyspark["Salary"]<=400000)).show()

+----------+----+----------+----------+
|      Name| Age|Experience|    Salary|
+----------+----+----------+----------+
|   Sarthak|21.0|     1.125|442857.125|
|     Lalit|22.0|       1.0|  800000.0|
|      Amit|21.0|       2.0| 1000000.0|
|   Ajinkya|23.0|       3.0|  600000.0|
|   Saurabh|19.0|     1.125|442857.125|
|Prathamesh|21.0|       1.0|  442857.0|
+----------+----+----------+----------+

