In [29]:
!pip install pyspark



In [30]:
import pyspark
from pyspark.sql import SparkSession

In [31]:
# initialize the spark session
spark = SparkSession.builder.appName('taask02').getOrCreate()

In [32]:
spark

In [38]:
dataframe=spark.read.option('header','true').csv('proper_spark.csv',inferSchema=True)

In [39]:
dataframe.show()

+----+----+-----+----+-----+----+
|one |two |three|four|five |six |
+----+----+-----+----+-----+----+
|   1|  11|   21|  51|   52| 101|
|  22|  22|   22|  22|   53| 102|
|  22|  33|    3|  22|   54|1030|
| 333|  44|  322|  33|   55|  22|
|NULL|  22|   11|  88|   56| 201|
|  88|NULL|   22|  36|    6| 202|
|   3|  22|   54|NULL|   77| 222|
|   3|  33| NULL|  88|   66| 203|
|  32|  22|   33|  61|   88| 244|
|   2|  11|   46|  33|   99| 255|
+----+----+-----+----+-----+----+



In [40]:
dataframe.printSchema()

root
 |-- one : integer (nullable = true)
 |-- two : integer (nullable = true)
 |-- three: integer (nullable = true)
 |-- four: integer (nullable = true)
 |-- five : integer (nullable = true)
 |-- six : integer (nullable = true)



In [41]:
dataframe.describe().show()

+-------+------------------+------------------+------------------+-----------------+------------------+-----------------+
|summary|              one |              two |             three|             four|             five |             six |
+-------+------------------+------------------+------------------+-----------------+------------------+-----------------+
|  count|                 9|                 9|                 9|                9|                10|               10|
|   mean| 56.22222222222222|24.444444444444443|59.333333333333336|48.22222222222222|              60.6|            258.2|
| stddev|107.35429401958939| 10.69007847388305| 99.78727373768662|25.79620988526114|25.237538178937605|281.1831352616219|
|    min|                 1|                11|                 3|               22|                 6|               22|
|    max|               333|                44|               322|               88|                99|             1030|
+-------+---------------

In [43]:
dataframe.select(['one ','three']).show()

+----+-----+
|one |three|
+----+-----+
|   1|   21|
|  22|   22|
|  22|    3|
| 333|  322|
|NULL|   11|
|  88|   22|
|   3|   54|
|   3| NULL|
|  32|   33|
|   2|   46|
+----+-----+



In [58]:
dataframe.select(dataframe.columns[2:5]).show()

+-----+----+-----+
|three|four|five |
+-----+----+-----+
|   21|  51|   52|
|   22|  22|   53|
|    3|  22|   54|
|  322|  33|   55|
|   11|  88|   56|
|   22|  36|    6|
|   54|NULL|   77|
| NULL|  88|   66|
|   33|  61|   88|
|   46|  33|   99|
+-----+----+-----+



In [53]:
from pyspark.sql.functions import col , when

In [61]:
dataframe = dataframe.withColumn("new_six_col",when(col("four")>20,1).otherwise(0))
dataframe=dataframe.withColumn("new_seven",col("five ")*1)
dataframe.show()

+----+----+-----+----+-----+----+-----------+---------+
|one |two |three|four|five |six |new_six_col|new_seven|
+----+----+-----+----+-----+----+-----------+---------+
|   1|  11|   21|  51|   52| 101|          1|       52|
|  22|  22|   22|  22|   53| 102|          1|       53|
|  22|  33|    3|  22|   54|1030|          1|       54|
| 333|  44|  322|  33|   55|  22|          1|       55|
|NULL|  22|   11|  88|   56| 201|          1|       56|
|  88|NULL|   22|  36|    6| 202|          1|        6|
|   3|  22|   54|NULL|   77| 222|          0|       77|
|   3|  33| NULL|  88|   66| 203|          1|       66|
|  32|  22|   33|  61|   88| 244|          1|       88|
|   2|  11|   46|  33|   99| 255|          1|       99|
+----+----+-----+----+-----+----+-----------+---------+



In [62]:
# filling the missing values in the dataframe with the median val
for col_name in dataframe.columns:
  median_val = dataframe.approxQuantile(col_name, [0.5], 0.001)[0]
  dataframe=dataframe.fillna(median_val,subset=[col_name])

In [64]:
dataframe.show() # got removied and replaced with median

+----+----+-----+----+-----+----+-----------+---------+
|one |two |three|four|five |six |new_six_col|new_seven|
+----+----+-----+----+-----+----+-----------+---------+
|   1|  11|   21|  51|   52| 101|          1|       52|
|  22|  22|   22|  22|   53| 102|          1|       53|
|  22|  33|    3|  22|   54|1030|          1|       54|
| 333|  44|  322|  33|   55|  22|          1|       55|
|  22|  22|   11|  88|   56| 201|          1|       56|
|  88|  22|   22|  36|    6| 202|          1|        6|
|   3|  22|   54|  36|   77| 222|          0|       77|
|   3|  33|   22|  88|   66| 203|          1|       66|
|  32|  22|   33|  61|   88| 244|          1|       88|
|   2|  11|   46|  33|   99| 255|          1|       99|
+----+----+-----+----+-----+----+-----------+---------+



In [68]:
# applying two filter conditions are being done .
filtered_dataframe = dataframe.filter((col("three")>20)&(col("new_six_col")==1)).show()

# after the filter the output is :

+----+----+-----+----+-----+----+-----------+---------+
|one |two |three|four|five |six |new_six_col|new_seven|
+----+----+-----+----+-----+----+-----------+---------+
|   1|  11|   21|  51|   52| 101|          1|       52|
|  22|  22|   22|  22|   53| 102|          1|       53|
| 333|  44|  322|  33|   55|  22|          1|       55|
|  88|  22|   22|  36|    6| 202|          1|        6|
|   3|  33|   22|  88|   66| 203|          1|       66|
|  32|  22|   33|  61|   88| 244|          1|       88|
|   2|  11|   46|  33|   99| 255|          1|       99|
+----+----+-----+----+-----+----+-----------+---------+

