<a href="https://colab.research.google.com/github/Ashik9576/PySpark_Learning/blob/main/PySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **PySpark Installation**

In [63]:
!pip install pyspark



In [64]:
import pyspark

In [65]:
# mounted the drive
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [66]:
import pandas as pd
df=pd.read_csv("/content/drive/MyDrive/Pyspark/counties.csv")

In [67]:
df.columns=['Name','ID']

In [68]:
df.head()

Unnamed: 0,Name,ID
0,Adams,4029
1,Allamakee,14330
2,Appanoose,12884
3,Audubon,6119
4,Benton,26076


# **Reading file in Spark**


**firstly I have to creat a spark session**

In [69]:
from pyspark.sql import SparkSession

In [70]:
spark=SparkSession.builder.appName('Ashik').getOrCreate()

In [71]:
spark

In [72]:
df_spark=spark.read.csv("/content/drive/MyDrive/Pyspark/counties.csv")

In [73]:
df_spark.show()

+-----------+------+
|        _c0|   _c1|
+-----------+------+
|      Adair|  7682|
|      Adams|  4029|
|  Allamakee| 14330|
|  Appanoose| 12884|
|    Audubon|  6119|
|     Benton| 26076|
| Black Hawk|131090|
|      Boone| 26306|
|     Bremer| 24276|
|   Buchanan| 20958|
|Buena Vista| 20260|
|     Butler| 14867|
|    Calhoun|  9670|
|    Carroll| 20816|
|       Cass| 13956|
|      Cedar| 18499|
|Cerro Gordo| 44151|
|   Cherokee| 12072|
|  Chickasaw| 12439|
|     Clarke|  9286|
+-----------+------+
only showing top 20 rows



# **Changing the columns name**

In [74]:
df_spark=df_spark.withColumnRenamed("_c0","Name")\
 .withColumnRenamed("_c1","ID")

In [75]:
df_spark.show(5)

+---------+-----+
|     Name|   ID|
+---------+-----+
|    Adair| 7682|
|    Adams| 4029|
|Allamakee|14330|
|Appanoose|12884|
|  Audubon| 6119|
+---------+-----+
only showing top 5 rows



In [76]:
df_spark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- ID: string (nullable = true)



My Id column is integer but it is showing string because it takes every column as string to handle it we will pass "inferScheme=True" while reading the dataset

In [77]:
df_spark=spark.read.csv("/content/drive/MyDrive/Pyspark/counties.csv",inferSchema=True)

In [78]:
df_spark=df_spark.withColumnRenamed("_c0","Name")\
 .withColumnRenamed("_c1","ID")

In [79]:
# checking the datatype of columns
df_spark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- ID: integer (nullable = true)



Now it showing integer type for ID

In [80]:
# checking for first 3 rows
df_spark.head(3)

[Row(Name='Adair', ID=7682),
 Row(Name='Adams', ID=4029),
 Row(Name='Allamakee', ID=14330)]

In [81]:
#to check the type of variable_name 
type(df_spark)

pyspark.sql.dataframe.DataFrame

In [82]:
# to check the all columns name
df_spark.columns

['Name', 'ID']

In [83]:
# select column by providing columns names 
df_spark.select('Name').show(5)

+---------+
|     Name|
+---------+
|    Adair|
|    Adams|
|Allamakee|
|Appanoose|
|  Audubon|
+---------+
only showing top 5 rows



In [84]:
# check datatype of each column
df_spark.dtypes

[('Name', 'string'), ('ID', 'int')]

In [85]:
# descibing the dataset
df_spark.describe().show()

+-------+------+------------------+
|summary|  Name|                ID|
+-------+------+------------------+
|  count|    99|                99|
|   mean|  null| 30771.23232323232|
| stddev|  null|52888.737874675055|
|    min| Adair|              4029|
|    max|Wright|            430640|
+-------+------+------------------+



In [86]:
# adding column to dataframe
df_spark=df_spark.withColumn('ID2',df_spark['ID']+2)

In [87]:
df_spark.show(5)

+---------+-----+-----+
|     Name|   ID|  ID2|
+---------+-----+-----+
|    Adair| 7682| 7684|
|    Adams| 4029| 4031|
|Allamakee|14330|14332|
|Appanoose|12884|12886|
|  Audubon| 6119| 6121|
+---------+-----+-----+
only showing top 5 rows



In [88]:
# droping columns from data frame
df_spark=df_spark.drop('ID2')

In [89]:
df_spark.show(5)

+---------+-----+
|     Name|   ID|
+---------+-----+
|    Adair| 7682|
|    Adams| 4029|
|Allamakee|14330|
|Appanoose|12884|
|  Audubon| 6119|
+---------+-----+
only showing top 5 rows



# **Day 2**

In [90]:
df_spark=spark.read.csv("/content/drive/MyDrive/Pyspark/PYSPA.csv",header=True, inferSchema=True)

In [91]:
df_spark.show()

+----+----+----------+-----+
|Name| age|Experience|Salay|
+----+----+----------+-----+
|   A|  31|        10|30000|
|   B|  30|         8|25000|
|   C|  29|         4|20000|
|   D|  24|         3|15000|
|   E|  21|         1|18000|
|   F|  23|         2|40000|
|   G|null|      null|38000|
|null|  34|        10| null|
|null|  36|      null| null|
+----+----+----------+-----+



In [92]:
#droping rows cointaing null value if "how=any" it will drop row having 1,2 or more null values in a row 
#but if "how=all" it will drop row having all null values assigned to all columns
df_spark.na.drop(how="any").show()

+----+---+----------+-----+
|Name|age|Experience|Salay|
+----+---+----------+-----+
|   A| 31|        10|30000|
|   B| 30|         8|25000|
|   C| 29|         4|20000|
|   D| 24|         3|15000|
|   E| 21|         1|18000|
|   F| 23|         2|40000|
+----+---+----------+-----+



In [93]:
df_spark.show()

+----+----+----------+-----+
|Name| age|Experience|Salay|
+----+----+----------+-----+
|   A|  31|        10|30000|
|   B|  30|         8|25000|
|   C|  29|         4|20000|
|   D|  24|         3|15000|
|   E|  21|         1|18000|
|   F|  23|         2|40000|
|   G|null|      null|38000|
|null|  34|        10| null|
|null|  36|      null| null|
+----+----+----------+-----+



In [94]:
# threshold: if "thresh=2" it means at least two not null columns should present in a row if it is not then it will be deleted.

In [95]:
df_spark.na.drop(how="any",thresh=2).show()

+----+----+----------+-----+
|Name| age|Experience|Salay|
+----+----+----------+-----+
|   A|  31|        10|30000|
|   B|  30|         8|25000|
|   C|  29|         4|20000|
|   D|  24|         3|15000|
|   E|  21|         1|18000|
|   F|  23|         2|40000|
|   G|null|      null|38000|
|null|  34|        10| null|
+----+----+----------+-----+



In [96]:
#subset: if subset=['age'] the all the rows containing null in age will be deleted
df_spark.na.drop(how="any",subset=['age']).show()

+----+---+----------+-----+
|Name|age|Experience|Salay|
+----+---+----------+-----+
|   A| 31|        10|30000|
|   B| 30|         8|25000|
|   C| 29|         4|20000|
|   D| 24|         3|15000|
|   E| 21|         1|18000|
|   F| 23|         2|40000|
|null| 34|        10| null|
|null| 36|      null| null|
+----+---+----------+-----+



**filling of MIssing value**

In [97]:
#fill takes two parameter value and subset
df_spark.na.fill(0).show()

+----+---+----------+-----+
|Name|age|Experience|Salay|
+----+---+----------+-----+
|   A| 31|        10|30000|
|   B| 30|         8|25000|
|   C| 29|         4|20000|
|   D| 24|         3|15000|
|   E| 21|         1|18000|
|   F| 23|         2|40000|
|   G|  0|         0|38000|
|null| 34|        10|    0|
|null| 36|         0|    0|
+----+---+----------+-----+



it is not filling name column because we are filling with integer but this is not an interger column 

In [98]:
df_spark.na.fill('Ashik').show()

+-----+----+----------+-----+
| Name| age|Experience|Salay|
+-----+----+----------+-----+
|    A|  31|        10|30000|
|    B|  30|         8|25000|
|    C|  29|         4|20000|
|    D|  24|         3|15000|
|    E|  21|         1|18000|
|    F|  23|         2|40000|
|    G|null|      null|38000|
|Ashik|  34|        10| null|
|Ashik|  36|      null| null|
+-----+----+----------+-----+



In [99]:
#subset=['age','Experience']
df_spark.na.fill(0,['age','Experience']).show()

+----+---+----------+-----+
|Name|age|Experience|Salay|
+----+---+----------+-----+
|   A| 31|        10|30000|
|   B| 30|         8|25000|
|   C| 29|         4|20000|
|   D| 24|         3|15000|
|   E| 21|         1|18000|
|   F| 23|         2|40000|
|   G|  0|         0|38000|
|null| 34|        10| null|
|null| 36|         0| null|
+----+---+----------+-----+



In [101]:
# fill null values with mean,mode and median by using imputer function in pyspark

from pyspark.ml.feature import Imputer
imputer=Imputer(
    inputCols=['age','Experience','Salay'],
    outputCols=["{}_imputed".format(c) for c in ['age','Experience','Salay']]).setStrategy("mean")



In [102]:
imputer.fit(df_spark).transform(df_spark).show()

+----+----+----------+-----+-----------+------------------+-------------+
|Name| age|Experience|Salay|age_imputed|Experience_imputed|Salay_imputed|
+----+----+----------+-----+-----------+------------------+-------------+
|   A|  31|        10|30000|         31|                10|        30000|
|   B|  30|         8|25000|         30|                 8|        25000|
|   C|  29|         4|20000|         29|                 4|        20000|
|   D|  24|         3|15000|         24|                 3|        15000|
|   E|  21|         1|18000|         21|                 1|        18000|
|   F|  23|         2|40000|         23|                 2|        40000|
|   G|null|      null|38000|         28|                 5|        38000|
|null|  34|        10| null|         34|                10|        26571|
|null|  36|      null| null|         36|                 5|        26571|
+----+----+----------+-----+-----------+------------------+-------------+



## **Filter Operations**

In [103]:
df_spark=df_spark.na.drop()

In [104]:
df_spark.show()

+----+---+----------+-----+
|Name|age|Experience|Salay|
+----+---+----------+-----+
|   A| 31|        10|30000|
|   B| 30|         8|25000|
|   C| 29|         4|20000|
|   D| 24|         3|15000|
|   E| 21|         1|18000|
|   F| 23|         2|40000|
+----+---+----------+-----+



In [106]:
# print the table where salary is greater or equal to 20000
df_spark.filter("Salay>=20000").show()

+----+---+----------+-----+
|Name|age|Experience|Salay|
+----+---+----------+-----+
|   A| 31|        10|30000|
|   B| 30|         8|25000|
|   C| 29|         4|20000|
|   F| 23|         2|40000|
+----+---+----------+-----+



In [107]:
df_spark.filter("Salay>=20000").select(['Name','age']).show()

+----+---+
|Name|age|
+----+---+
|   A| 31|
|   B| 30|
|   C| 29|
|   F| 23|
+----+---+



In [108]:
df_spark.filter((df_spark['Salay']>=20000) & (df_spark['age']>25)).show()

+----+---+----------+-----+
|Name|age|Experience|Salay|
+----+---+----------+-----+
|   A| 31|        10|30000|
|   B| 30|         8|25000|
|   C| 29|         4|20000|
+----+---+----------+-----+



In [109]:
df_spark.filter((df_spark['Salay']>=20000) | (df_spark['age']>25)).show()

+----+---+----------+-----+
|Name|age|Experience|Salay|
+----+---+----------+-----+
|   A| 31|        10|30000|
|   B| 30|         8|25000|
|   C| 29|         4|20000|
|   F| 23|         2|40000|
+----+---+----------+-----+



In [110]:
df_spark.filter(~(df_spark['Salay']>=20000)).show()

+----+---+----------+-----+
|Name|age|Experience|Salay|
+----+---+----------+-----+
|   D| 24|         3|15000|
|   E| 21|         1|18000|
+----+---+----------+-----+



## **Groupby**

In [111]:
df_spark=spark.read.csv("/content/drive/MyDrive/Pyspark/test.csv",header=True, inferSchema=True)

In [112]:
df_spark.show()

+----+-----------+------+
|Name|Departments|salary|
+----+-----------+------+
|   K|         DS| 10000|
|   K|        IOT|  5000|
|   M|         BD|  4000|
|   K|         BD|  4000|
|   M|         DS|  3000|
|   S|         DS| 20000|
|   S|        IOT| 10000|
|   S|         BD|  5000|
|  Su|         DS| 10000|
|  Su|         BD|  2000|
+----+-----------+------+

