In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285397 sha256=6d0a9f93871559188cdb489a8bfbb5ff54395de46efeab2b5d61c71c5eafe89c
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


In [2]:
import pyspark
import pandas as pd

First we have to create a spark session and also provide a name for the session. In this case, the session name is practice.

In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Practice').getOrCreate()

In [4]:
spark

In [5]:
# To read a csv using spark
# Show is used to print the entire csv
spark.read.csv('/content/drive/MyDrive/Datasets/Pyspark/test1.csv').show()

+---------+---+----------+------+
|      _c0|_c1|       _c2|   _c3|
+---------+---+----------+------+
|     Name|age|Experience|Salary|
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [6]:
# option('header', 'true') specifies to use first row as header
df_spark = spark.read.option('header', 'true').csv('/content/drive/MyDrive/Datasets/Pyspark/test1.csv')

In [7]:
df_spark.show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [8]:
type(df_spark)

pyspark.sql.dataframe.DataFrame

In [9]:
# To check the schema. Similar to df.info() in pandas
# It gives information about the datatypes of columns in the dataframe
# By default all the columns are considered as string
df_spark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- age: string (nullable = true)
 |-- Experience: string (nullable = true)
 |-- Salary: string (nullable = true)



In [10]:
# InferSchema option specifies to consider all the columns in its original datatype
# nullable = true tells that column can contain null values
spark.read.option('header', 'true').csv('/content/drive/MyDrive/Datasets/Pyspark/test1.csv', inferSchema=True).printSchema()

root
 |-- Name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Salary: integer (nullable = true)



### Writing above steps in a clean format

In [11]:
df_pyspark = spark.read.csv('/content/drive/MyDrive/Datasets/Pyspark/test1.csv', header=True, inferSchema=True)
df_pyspark.show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [12]:
# To get the list of available columns in dataframe
df_pyspark.columns

['Name', 'age', 'Experience', 'Salary']

In [13]:
# Selecting only particular columns in df
df_pyspark.select(['Name', 'Salary']).head(3)
df_pyspark.select(['Name', 'Salary']).show()

+---------+------+
|     Name|Salary|
+---------+------+
|    Krish| 30000|
|Sudhanshu| 25000|
|    Sunny| 20000|
|     Paul| 20000|
|   Harsha| 15000|
|  Shubham| 18000|
+---------+------+



In [14]:
# To just get the datatypes of columns
df_pyspark.dtypes

[('Name', 'string'), ('age', 'int'), ('Experience', 'int'), ('Salary', 'int')]

In [15]:
# To describe the df just like in pandas
df_pyspark.describe().show()

+-------+------+------------------+-----------------+------------------+
|summary|  Name|               age|       Experience|            Salary|
+-------+------+------------------+-----------------+------------------+
|  count|     6|                 6|                6|                 6|
|   mean|  null|26.333333333333332|4.666666666666667|21333.333333333332|
| stddev|  null| 4.179314138308661|3.559026084010437| 5354.126134736337|
|    min|Harsha|                21|                1|             15000|
|    max| Sunny|                31|               10|             30000|
+-------+------+------------------+-----------------+------------------+



## Adding and Dropping column

In [16]:
# Adding a column to df
df_pyspark = df_pyspark.withColumn('Experience after 2yrs', df_pyspark['Experience']+2)
df_pyspark.show()

+---------+---+----------+------+---------------------+
|     Name|age|Experience|Salary|Experience after 2yrs|
+---------+---+----------+------+---------------------+
|    Krish| 31|        10| 30000|                   12|
|Sudhanshu| 30|         8| 25000|                   10|
|    Sunny| 29|         4| 20000|                    6|
|     Paul| 24|         3| 20000|                    5|
|   Harsha| 21|         1| 15000|                    3|
|  Shubham| 23|         2| 18000|                    4|
+---------+---+----------+------+---------------------+



In [17]:
# drop a column from df
df_pyspark = df_pyspark.drop('Experience after 2yrs')
df_pyspark.show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [18]:
# Renaming a column
df_pyspark = df_pyspark.withColumnRenamed('Name', 'First name')
df_pyspark.show()

+----------+---+----------+------+
|First name|age|Experience|Salary|
+----------+---+----------+------+
|     Krish| 31|        10| 30000|
| Sudhanshu| 30|         8| 25000|
|     Sunny| 29|         4| 20000|
|      Paul| 24|         3| 20000|
|    Harsha| 21|         1| 15000|
|   Shubham| 23|         2| 18000|
+----------+---+----------+------+



## Dropping null values

In [21]:
# drop function contains 3 parameters.
# how = 'all or any' specifies to drop a row if only all values of row are empty or any value of row is empty.
# df_pyspark.na.drop(how='all').show()
df_pyspark.na.drop(how='any').show()


+----------+---+----------+------+
|First name|age|Experience|Salary|
+----------+---+----------+------+
|     Krish| 31|        10| 30000|
| Sudhanshu| 30|         8| 25000|
|     Sunny| 29|         4| 20000|
|      Paul| 24|         3| 20000|
|    Harsha| 21|         1| 15000|
|   Shubham| 23|         2| 18000|
+----------+---+----------+------+



In [23]:
# drop null values with threshold.
# thresh = 2 checks if a row contains atleast 2 non null values. If not, it is dropped.
df_pyspark.na.drop(how='any', thresh=2).show()


+----------+---+----------+------+
|First name|age|Experience|Salary|
+----------+---+----------+------+
|     Krish| 31|        10| 30000|
| Sudhanshu| 30|         8| 25000|
|     Sunny| 29|         4| 20000|
|      Paul| 24|         3| 20000|
|    Harsha| 21|         1| 15000|
|   Shubham| 23|         2| 18000|
+----------+---+----------+------+



In [24]:
# Droppping null values based on subset
df_pyspark.na.drop(how='any', subset=['age', 'Experience']).show()

+----------+---+----------+------+
|First name|age|Experience|Salary|
+----------+---+----------+------+
|     Krish| 31|        10| 30000|
| Sudhanshu| 30|         8| 25000|
|     Sunny| 29|         4| 20000|
|      Paul| 24|         3| 20000|
|    Harsha| 21|         1| 15000|
|   Shubham| 23|         2| 18000|
+----------+---+----------+------+



## Fill missing values

In [25]:
# This code fills the nan values in Experience column with word 'Missing value'
df_pyspark.na.fill('Missing value', 'Experience').show()

+----------+---+----------+------+
|First name|age|Experience|Salary|
+----------+---+----------+------+
|     Krish| 31|        10| 30000|
| Sudhanshu| 30|         8| 25000|
|     Sunny| 29|         4| 20000|
|      Paul| 24|         3| 20000|
|    Harsha| 21|         1| 15000|
|   Shubham| 23|         2| 18000|
+----------+---+----------+------+



In [26]:
# Using imputer function to fill null values
from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols=['age', 'Experience', 'Salary'],
    outputCols=["{}_imputed".format(c) for c in ['age', 'Experience', 'Salary']]
    ).setStrategy("mean")

In [27]:
imputer.fit(df_pyspark).transform(df_pyspark).show()

+----------+---+----------+------+-----------+------------------+--------------+
|First name|age|Experience|Salary|age_imputed|Experience_imputed|Salary_imputed|
+----------+---+----------+------+-----------+------------------+--------------+
|     Krish| 31|        10| 30000|         31|                10|         30000|
| Sudhanshu| 30|         8| 25000|         30|                 8|         25000|
|     Sunny| 29|         4| 20000|         29|                 4|         20000|
|      Paul| 24|         3| 20000|         24|                 3|         20000|
|    Harsha| 21|         1| 15000|         21|                 1|         15000|
|   Shubham| 23|         2| 18000|         23|                 2|         18000|
+----------+---+----------+------+-----------+------------------+--------------+

