## Importing session and create 


In [75]:
from pyspark.sql import SparkSession

In [76]:
spark=SparkSession.builder.appName("practice").getOrCreate()

In [77]:
spark

## Basic commands

In [78]:
df_spark=spark.read.csv("test1.csv")
df_spark

DataFrame[_c0: string, _c1: string, _c2: string, _c3: string]

In [79]:
df_spark.show()

+------+----+----+------+
|   _c0| _c1| _c2|   _c3|
+------+----+----+------+
|  name| age| exp|salary|
|   sam|  31|  10| 20000|
|   ram|  30|   8| 20000|
| vikky|  29|   4| 15000|
| anand|  18|  17|  null|
|mahesh|null|null| 20000|
|  null|  20|   3| 20000|
+------+----+----+------+



In [80]:
# reading the dataset
spark.read.option("header","true").csv("test1.csv").show()

+------+----+----+------+
|  name| age| exp|salary|
+------+----+----+------+
|   sam|  31|  10| 20000|
|   ram|  30|   8| 20000|
| vikky|  29|   4| 15000|
| anand|  18|  17|  null|
|mahesh|null|null| 20000|
|  null|  20|   3| 20000|
+------+----+----+------+



In [81]:
type(df_spark)

pyspark.sql.dataframe.DataFrame

In [82]:
spark.read.option("header","true").csv("test1.csv")

DataFrame[name: string, age: string, exp: string, salary: string]

In [83]:
df_spark.dtypes

[('_c0', 'string'), ('_c1', 'string'), ('_c2', 'string'), ('_c3', 'string')]

In [84]:
# read the data and check the schema
df_spark=spark.read.option("header","true").csv("test1.csv",inferSchema=True)
df_spark.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- exp: integer (nullable = true)
 |-- salary: integer (nullable = true)



In [85]:
df_spark.columns

['name', 'age', 'exp', 'salary']

In [86]:
df_spark.head()

Row(name='sam', age=31, exp=10, salary=20000)

In [87]:
df_spark.head(4)

[Row(name='sam', age=31, exp=10, salary=20000),
 Row(name='ram', age=30, exp=8, salary=20000),
 Row(name='vikky', age=29, exp=4, salary=15000),
 Row(name='anand', age=18, exp=17, salary=None)]

In [88]:
df_spark.select("name").show()

+------+
|  name|
+------+
|   sam|
|   ram|
| vikky|
| anand|
|mahesh|
|  null|
+------+



In [89]:
df_spark.select(["name","age"])

DataFrame[name: string, age: int]

In [90]:
type(df_spark.select("name").show())

+------+
|  name|
+------+
|   sam|
|   ram|
| vikky|
| anand|
|mahesh|
|  null|
+------+



NoneType

In [91]:
df_spark.select(["name","age"]).show()

+------+----+
|  name| age|
+------+----+
|   sam|  31|
|   ram|  30|
| vikky|  29|
| anand|  18|
|mahesh|null|
|  null|  20|
+------+----+



In [92]:
df_spark.describe().show() # the null is because the pyspark will take the string columns also

+-------+-----+-----------------+-----------------+----------------+
|summary| name|              age|              exp|          salary|
+-------+-----+-----------------+-----------------+----------------+
|  count|    5|                5|                5|               5|
|   mean| null|             25.6|              8.4|         19000.0|
| stddev| null|6.107372593840988|5.594640292279746|2236.06797749979|
|    min|anand|               18|                3|           15000|
|    max|vikky|               31|               17|           20000|
+-------+-----+-----------------+-----------------+----------------+



### Adding column in dataframe

In [93]:
#df_spark.withColumn("Exp after 2 yrs",df_spark["exp"]+2).show()

In [94]:
dff_pyspark=df_spark.withColumn("Exp after 2 yrs",df_spark["exp"]+2)
dff_pyspark.show()

+------+----+----+------+---------------+
|  name| age| exp|salary|Exp after 2 yrs|
+------+----+----+------+---------------+
|   sam|  31|  10| 20000|             12|
|   ram|  30|   8| 20000|             10|
| vikky|  29|   4| 15000|              6|
| anand|  18|  17|  null|             19|
|mahesh|null|null| 20000|           null|
|  null|  20|   3| 20000|              5|
+------+----+----+------+---------------+



### Dropping columns

In [95]:
#df_spark.drop("Exp after 2 yrs").show()

In [96]:
dff_pyspark=df_spark.drop("exp after 2 yrs")# note: the (e) is small 
dff_pyspark.show()

+------+----+----+------+
|  name| age| exp|salary|
+------+----+----+------+
|   sam|  31|  10| 20000|
|   ram|  30|   8| 20000|
| vikky|  29|   4| 15000|
| anand|  18|  17|  null|
|mahesh|null|null| 20000|
|  null|  20|   3| 20000|
+------+----+----+------+



### Renaming the column

In [97]:
dff_pyspark.withColumnRenamed("name","new name").show()

+--------+----+----+------+
|new name| age| exp|salary|
+--------+----+----+------+
|     sam|  31|  10| 20000|
|     ram|  30|   8| 20000|
|   vikky|  29|   4| 15000|
|   anand|  18|  17|  null|
|  mahesh|null|null| 20000|
|    null|  20|   3| 20000|
+--------+----+----+------+



## Dropping the nan or null values

In [98]:
dff_pyspark.na.drop().show()

+-----+---+---+------+
| name|age|exp|salary|
+-----+---+---+------+
|  sam| 31| 10| 20000|
|  ram| 30|  8| 20000|
|vikky| 29|  4| 15000|
+-----+---+---+------+



In [99]:
# how== "any" if the row have even 1 nan or null values it'll remove the row
# how== "all" a row must contain entirely nan or null values
dff_pyspark.na.drop(how="any",thresh=2).show() # thresh value says ,atleast (2) non nan values should present

+------+----+----+------+
|  name| age| exp|salary|
+------+----+----+------+
|   sam|  31|  10| 20000|
|   ram|  30|   8| 20000|
| vikky|  29|   4| 15000|
| anand|  18|  17|  null|
|mahesh|null|null| 20000|
|  null|  20|   3| 20000|
+------+----+----+------+



In [100]:
# subset is to specify the column 
dff_pyspark.na.drop(how="any",subset=["name"]).show()

+------+----+----+------+
|  name| age| exp|salary|
+------+----+----+------+
|   sam|  31|  10| 20000|
|   ram|  30|   8| 20000|
| vikky|  29|   4| 15000|
| anand|  18|  17|  null|
|mahesh|null|null| 20000|
+------+----+----+------+



## Filling the nan or null values 

In [101]:

from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols=['age', 'exp', 'salary'], 
    outputCols=["{}_imputed".format(c) for c in ['age', 'exp', 'salary']]
    ).setStrategy("median")

In [102]:
imputer.fit(dff_pyspark).transform(dff_pyspark).show()

+------+----+----+------+-----------+-----------+--------------+
|  name| age| exp|salary|age_imputed|exp_imputed|salary_imputed|
+------+----+----+------+-----------+-----------+--------------+
|   sam|  31|  10| 20000|         31|         10|         20000|
|   ram|  30|   8| 20000|         30|          8|         20000|
| vikky|  29|   4| 15000|         29|          4|         15000|
| anand|  18|  17|  null|         18|         17|         20000|
|mahesh|null|null| 20000|         29|          8|         20000|
|  null|  20|   3| 20000|         20|          3|         20000|
+------+----+----+------+-----------+-----------+--------------+



In [107]:
df2_spark=spark.read.csv("test2.csv",header=True,inferSchema=True)
df2_spark.show()

+-----+---+---+------+
| name|age|exp|salary|
+-----+---+---+------+
|  sam| 31| 10| 20000|
|  ram| 30|  8| 20000|
|vikky| 29|  4| 15000|
|anand| 18| 17| 18000|
+-----+---+---+------+



## Filter Operation


In [109]:
df2_spark.filter("salary>=20000").show()

+----+---+---+------+
|name|age|exp|salary|
+----+---+---+------+
| sam| 31| 10| 20000|
| ram| 30|  8| 20000|
+----+---+---+------+



In [111]:
df2_spark.filter("salary>=20000").select(["name","age"]).show()

+----+---+
|name|age|
+----+---+
| sam| 31|
| ram| 30|
+----+---+



In [114]:
df2_spark.filter((df2_spark["salary"]>=20000) |             # here we can use and(&) ,or(|) and other operators
                 (df2_spark["salary"]<=10000)).show()

+----+---+---+------+
|name|age|exp|salary|
+----+---+---+------+
| sam| 31| 10| 20000|
| ram| 30|  8| 20000|
+----+---+---+------+



In [116]:
df2_spark.filter(~(df2_spark["salary"]>=20000)).show()  # (~) this not operator or here we can use it as inverse operation

+-----+---+---+------+
| name|age|exp|salary|
+-----+---+---+------+
|vikky| 29|  4| 15000|
|anand| 18| 17| 18000|
+-----+---+---+------+



## Groupby and Aaggregation

In [121]:
df3_spark=spark.read.csv("test3.csv",header=True,inferSchema=True)
df3_spark.show()

+---------+------------+------+
|     Name| Departments|salary|
+---------+------------+------+
|    Krish|Data Science| 10000|
|    Krish|         IOT|  5000|
|   Mahesh|    Big Data|  4000|
|    Krish|    Big Data|  4000|
|   Mahesh|Data Science|  3000|
|Sudhanshu|Data Science| 20000|
|Sudhanshu|         IOT| 10000|
|Sudhanshu|    Big Data|  5000|
|    Sunny|Data Science| 10000|
|    Sunny|    Big Data|  2000|
+---------+------------+------+



In [124]:
## groupby
# to finnd the maximum salary
df3_spark.groupBy("name").sum().show() #sum()=sum(salary) 

+---------+-----------+
|     name|sum(salary)|
+---------+-----------+
|Sudhanshu|      35000|
|    Sunny|      12000|
|    Krish|      19000|
|   Mahesh|       7000|
+---------+-----------+



In [125]:
df3_spark.groupBy("departments").sum().show() # instead sum we can use max,min.mean and etc..

+------------+-----------+
| departments|sum(salary)|
+------------+-----------+
|         IOT|      15000|
|    Big Data|      15000|
|Data Science|      43000|
+------------+-----------+



In [126]:
df3_spark.groupBy("departments").max().show()

+------------+-----------+
| departments|max(salary)|
+------------+-----------+
|         IOT|      10000|
|    Big Data|       5000|
|Data Science|      20000|
+------------+-----------+



In [127]:
df3_spark.groupBy("name").max().show()

+---------+-----------+
|     name|max(salary)|
+---------+-----------+
|Sudhanshu|      20000|
|    Sunny|      10000|
|    Krish|      10000|
|   Mahesh|       4000|
+---------+-----------+

