In [2]:
!pip install pyspark



In [3]:
import  pyspark

In [4]:
import pandas as pd
pd.read_csv('data.csv')

Unnamed: 0,Name,age,Experience
0,Bala,25,10
1,Sunny,25,8
2,Kartick,34,4


### Creating the Spark Session

Inorder to run pyspark need to install SparkSession

In [5]:
from pyspark.sql import SparkSession

In [6]:
#Create and starting SparkSession
spark = SparkSession.builder.appName('practise.com').getOrCreate()

In [7]:
spark

### READING THE DATA

#### Reading data Type1

In [48]:
# to read data and set our first row as a header
#if we don't give inferSchema = True, by default it return all value as string
df_pyspark = spark.read.csv('data.csv', header=True, inferSchema=True)

In [22]:
df_pyspark

DataFrame[Name: string, age: int, Experience: int]

In [23]:
df_pyspark.show()

+-------+---+----------+
|   Name|age|Experience|
+-------+---+----------+
|   Bala| 25|        10|
|  Sunny| 25|         8|
|Kartick| 34|         4|
+-------+---+----------+



#### Readig data Type2

In [16]:
# to read data and set our first row as a header
#if we don't give inferSchema = True, by default it return all value as string
df_pyspark = spark.read.option('header','true').csv('data.csv',inferSchema =True)

In [17]:
df_pyspark.show()

+-------+---+----------+
|   Name|age|Experience|
+-------+---+----------+
|   Bala| 25|        10|
|  Sunny| 25|         8|
|Kartick| 34|         4|
+-------+---+----------+



#### CHECKING THE DATATYPES

In [20]:
#like df.info() by printing the schema
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- Experience: integer (nullable = true)



In [32]:
df_pyspark.dtypes

[('Name', 'string'), ('age', 'int'), ('Experience', 'int')]

In [18]:
#checking the df
type(df_pyspark)

pyspark.sql.dataframe.DataFrame

#### VIEWING

In [26]:
#viewing the column names
df_pyspark.columns

['Name', 'age', 'Experience']

In [27]:
df_pyspark.head(3)

[Row(Name='Bala', age=25, Experience=10),
 Row(Name='Sunny', age=25, Experience=8),
 Row(Name='Kartick', age=34, Experience=4)]

#### SELECTING A COLUMN

In [28]:
# selecting only one column
df_pyspark.select('Name').show()

+-------+
|   Name|
+-------+
|   Bala|
|  Sunny|
|Kartick|
+-------+



In [30]:
#returning as pyspark dataframe
name = df_pyspark.select('Name')
type(name)

pyspark.sql.dataframe.DataFrame

In [31]:
# selecting multiple columns
df_pyspark.select(['Name','Experience']).show()

+-------+----------+
|   Name|Experience|
+-------+----------+
|   Bala|        10|
|  Sunny|         8|
|Kartick|         4|
+-------+----------+



#### DESCRIBE

In [34]:
# describe like in pandas
df_pyspark.describe().show()

+-------+-----+-----------------+-----------------+
|summary| Name|              age|       Experience|
+-------+-----+-----------------+-----------------+
|  count|    3|                3|                3|
|   mean| null|             28.0|7.333333333333333|
| stddev| null|5.196152422706632|3.055050463303893|
|    min| Bala|               25|                4|
|    max|Sunny|               34|               10|
+-------+-----+-----------------+-----------------+



#### ADDING, REMOVING  & RENAMING COLUMNS

In [37]:
#adding new columns
df_pyspark = df_pyspark.withColumn('Experience After 2 Years',df_pyspark['Experience']+2)
df_pyspark.show()

+-------+---+----------+------------------------+
|   Name|age|Experience|Experience After 2 Years|
+-------+---+----------+------------------------+
|   Bala| 25|        10|                      12|
|  Sunny| 25|         8|                      10|
|Kartick| 34|         4|                       6|
+-------+---+----------+------------------------+



In [38]:
#drop the column
df_pyspark = df_pyspark.drop("Experience After 2 Years")
df_pyspark.show()

+-------+---+----------+
|   Name|age|Experience|
+-------+---+----------+
|   Bala| 25|        10|
|  Sunny| 25|         8|
|Kartick| 34|         4|
+-------+---+----------+



In [50]:
#renaming the column   
df_pyspark = df_pyspark.withColumnRenamed('age','Age')
df_pyspark.show()

+-------+---+----------+
|   Name|Age|Experience|
+-------+---+----------+
|   Bala| 25|        10|
|  Sunny| 25|         8|
|Kartick| 34|         4|
+-------+---+----------+



#### HANDLING MISSING & NULL VALUES

In [51]:
spark = SparkSession.builder.appName('practise').getOrCreate()

In [205]:
df_py = spark.read.csv('data1.csv',header=True)

In [206]:
df_py.show()

+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|null|      null| 40000|
|     null|  34|        10| 38000|
|     null|  36|      null|  null|
+---------+----+----------+------+



### Dropping

In [207]:
##droping the nan values
df_py.na.drop().show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [208]:
# droping the nan values based on condition
#if how='any', it drops rows with even one null values
df_py.na.drop(how='any').show()


+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [209]:
# droping the nan values based on condition
#if how='all', it drops row with entire null values
df_py.na.drop(how='all').show()

+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|null|      null| 40000|
|     null|  34|        10| 38000|
|     null|  36|      null|  null|
+---------+----+----------+------+



In [210]:
# droping the nan values based on condition
#if how='all', it drops row with entire null 
#if thresh = no, it drops row with  "no - non null value"
df_py.na.drop(how='all',thresh=3).show() # doesn't drop rows with 3 non-null values

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
|     null| 34|        10| 38000|
+---------+---+----------+------+



In [211]:
# droping the nan values only from specific column
df_py.na.drop(how='all',subset='Experience').show() # drop null values only from Experience column

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
|     null| 34|        10| 38000|
+---------+---+----------+------+



### Filling

In [212]:
# Filling the missing values in all null value
df_py.na.fill('Missing values').show()

+--------------+--------------+--------------+--------------+
|          Name|           age|    Experience|        Salary|
+--------------+--------------+--------------+--------------+
|         Krish|            31|            10|         30000|
|     Sudhanshu|            30|             8|         25000|
|         Sunny|            29|             4|         20000|
|          Paul|            24|             3|         20000|
|        Harsha|            21|             1|         15000|
|       Shubham|            23|             2|         18000|
|        Mahesh|Missing values|Missing values|         40000|
|Missing values|            34|            10|         38000|
|Missing values|            36|Missing values|Missing values|
+--------------+--------------+--------------+--------------+



In [213]:
# Filling the missing values in specific col
df_py.na.fill('Missing value',['age','Salary']).show()

+---------+-------------+----------+-------------+
|     Name|          age|Experience|       Salary|
+---------+-------------+----------+-------------+
|    Krish|           31|        10|        30000|
|Sudhanshu|           30|         8|        25000|
|    Sunny|           29|         4|        20000|
|     Paul|           24|         3|        20000|
|   Harsha|           21|         1|        15000|
|  Shubham|           23|         2|        18000|
|   Mahesh|Missing value|      null|        40000|
|     null|           34|        10|        38000|
|     null|           36|      null|Missing value|
+---------+-------------+----------+-------------+



### IMPUTING

In [218]:
df_py1 = spark.read.csv('data1.csv',header=True,inferSchema=True)
df_py1.show()

+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|null|      null| 40000|
|     null|  34|        10| 38000|
|     null|  36|      null|  null|
+---------+----+----------+------+



In [219]:
from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols=['age', 'Experience', 'Salary'], 
    outputCols=["{}_imputed".format(c) for c in ['age', 'Experience', 'Salary']]
    ).setStrategy("median")

In [221]:
# Add imputation cols to df
imputer.fit(df_py1).transform(df_py1).show()

+---------+----+----------+------+-----------+------------------+--------------+
|     Name| age|Experience|Salary|age_imputed|Experience_imputed|Salary_imputed|
+---------+----+----------+------+-----------+------------------+--------------+
|    Krish|  31|        10| 30000|         31|                10|         30000|
|Sudhanshu|  30|         8| 25000|         30|                 8|         25000|
|    Sunny|  29|         4| 20000|         29|                 4|         20000|
|     Paul|  24|         3| 20000|         24|                 3|         20000|
|   Harsha|  21|         1| 15000|         21|                 1|         15000|
|  Shubham|  23|         2| 18000|         23|                 2|         18000|
|   Mahesh|null|      null| 40000|         29|                 4|         40000|
|     null|  34|        10| 38000|         34|                10|         38000|
|     null|  36|      null|  null|         36|                 4|         20000|
+---------+----+----------+-

#### FILTER OPERATION

In [223]:
df_pyspark=spark.read.csv('data.csv',header=True,inferSchema=True)
df_pyspark.show()

+-------+---+----------+------+
|   Name|age|Experience|Salary|
+-------+---+----------+------+
|   Bala| 25|        10| 10000|
|  Sunny| 25|         8| 50000|
|Kartick| 34|         4| 60000|
|    Abi| 45|        20| 70000|
|   Yama| 45|        15| 15000|
+-------+---+----------+------+



In [224]:
### Salary of the people less than or equal to 20000
df_pyspark.filter("Salary<=20000").show()

+----+---+----------+------+
|Name|age|Experience|Salary|
+----+---+----------+------+
|Bala| 25|        10| 10000|
|Yama| 45|        15| 15000|
+----+---+----------+------+



In [226]:
df_pyspark.filter("Salary<=60000").select(['Name','age']).show()

+-------+---+
|   Name|age|
+-------+---+
|   Bala| 25|
|  Sunny| 25|
|Kartick| 34|
|   Yama| 45|
+-------+---+



In [227]:
# can also write like
df_pyspark.filter(df_pyspark['Salary']<=20000).show()


+----+---+----------+------+
|Name|age|Experience|Salary|
+----+---+----------+------+
|Bala| 25|        10| 10000|
|Yama| 45|        15| 15000|
+----+---+----------+------+



In [230]:
# multiple condition
df_pyspark.filter((df_pyspark['Experience']<=15) | 
                  (df_pyspark['Experience']>=8)).show()

+-------+---+----------+------+
|   Name|age|Experience|Salary|
+-------+---+----------+------+
|   Bala| 25|        10| 10000|
|  Sunny| 25|         8| 50000|
|Kartick| 34|         4| 60000|
|    Abi| 45|        20| 70000|
|   Yama| 45|        15| 15000|
+-------+---+----------+------+



In [231]:
df_pyspark.filter(~(df_pyspark['Salary']<=20000)).show()


+-------+---+----------+------+
|   Name|age|Experience|Salary|
+-------+---+----------+------+
|  Sunny| 25|         8| 50000|
|Kartick| 34|         4| 60000|
|    Abi| 45|        20| 70000|
+-------+---+----------+------+



#### GROUPBY

In [233]:
spark=SparkSession.builder.appName('Agg').getOrCreate()


In [234]:
df_pyspark=spark.read.csv('test3.csv',header=True,inferSchema=True)
df_pyspark.show()

+---------+------------+------+
|     Name| Departments|salary|
+---------+------------+------+
|    Krish|Data Science| 10000|
|    Krish|         IOT|  5000|
|   Mahesh|    Big Data|  4000|
|    Krish|    Big Data|  4000|
|   Mahesh|Data Science|  3000|
|Sudhanshu|Data Science| 20000|
|Sudhanshu|         IOT| 10000|
|Sudhanshu|    Big Data|  5000|
|    Sunny|Data Science| 10000|
|    Sunny|    Big Data|  2000|
+---------+------------+------+



In [235]:
## Groupby
### Grouped to find the maximum salary
df_pyspark.groupBy('Name').sum().show()

+---------+-----------+
|     Name|sum(salary)|
+---------+-----------+
|Sudhanshu|      35000|
|    Sunny|      12000|
|    Krish|      19000|
|   Mahesh|       7000|
+---------+-----------+



In [236]:

df_pyspark.groupBy('Name').avg().show()

+---------+------------------+
|     Name|       avg(salary)|
+---------+------------------+
|Sudhanshu|11666.666666666666|
|    Sunny|            6000.0|
|    Krish| 6333.333333333333|
|   Mahesh|            3500.0|
+---------+------------------+



In [237]:
df_pyspark.groupBy('Departments').mean().show()

+------------+-----------+
| Departments|avg(salary)|
+------------+-----------+
|         IOT|     7500.0|
|    Big Data|     3750.0|
|Data Science|    10750.0|
+------------+-----------+



In [238]:
df_pyspark.groupBy('Departments').count().show()


+------------+-----+
| Departments|count|
+------------+-----+
|         IOT|    2|
|    Big Data|    4|
|Data Science|    4|
+------------+-----+



In [239]:
df_pyspark.agg({'Salary':'sum'}).show()


+-----------+
|sum(Salary)|
+-----------+
|      73000|
+-----------+

