In [2]:
# install if not in your env already!
#!pip install pyspark

In [3]:
# Import pyspark after installation
import pyspark as ps

The entry point into all functionality in Spark is the 'SparkSession' class. 

In [4]:
# Creating Spark session 
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('Practise1').getOrCreate()

In [5]:
# Check if the session has been made successfully
spark

In [9]:
# Let's get our dataset using spark 
df_spark = spark.read.csv('testset1.csv')

In [7]:
# check the loaded dataframe
df_spark = spark.read.csv('testset1.csv').show()

+------+---+-----------+
|   _c0|_c1|        _c2|
+------+---+-----------+
|  Name|Age|Experiences|
|Haroon| 25|          2|
|  khan| 26|          3|
|  Awan| 27|          4|
+------+---+-----------+



In [10]:
df_spark.head()

Row(_c0='Name', _c1='Age', _c2='Experiences')

In [11]:
#let's take our first row as the header instead of excel coloumns i.e c0,c1 etc
df_spark = spark.read.option('header','true').csv('testset1.csv')

In [12]:
#only run to check the loaded dataset and then again use it without 'show' method; otherwise the type of dataframe will be 'None'
#df_spark = spark.read.option('header','true').csv('testset1.csv').show()

In [13]:
#check the 'type' of created dataframe
type(df_spark)

pyspark.sql.dataframe.DataFrame

In [14]:
# Print the schema in a tree format
df_spark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Experiences: string (nullable = true)



In [15]:
#select one or multiple columns by name
df_spark.select(['Name','Experiences']).show()

+------+-----------+
|  Name|Experiences|
+------+-----------+
|Haroon|          2|
|  khan|          3|
|  Awan|          4|
+------+-----------+



In [16]:
df_spark.describe()

DataFrame[summary: string, Name: string, Age: string, Experiences: string]

In [17]:
df_spark.describe().show()

+-------+----+----+-----------+
|summary|Name| Age|Experiences|
+-------+----+----+-----------+
|  count|   3|   3|          3|
|   mean|null|26.0|        3.0|
| stddev|null| 1.0|        1.0|
|    min|Awan|  25|          2|
|    max|khan|  27|          4|
+-------+----+----+-----------+



In [18]:
#Adding a column in the dataframe; you have to reflect it to the dataframe to add it properly by assigning it to the df.
df_spark = df_spark.withColumn('Experience after 3 yrs', df_spark['Experiences']+2)

In [19]:
df_spark.show()

+------+---+-----------+----------------------+
|  Name|Age|Experiences|Experience after 3 yrs|
+------+---+-----------+----------------------+
|Haroon| 25|          2|                   4.0|
|  khan| 26|          3|                   5.0|
|  Awan| 27|          4|                   6.0|
+------+---+-----------+----------------------+



In [20]:
# Drop the column
df_spark = df_spark.drop('Experience after 3 yrs')

In [21]:
# Renaming a column 
df_spark = df_spark.withColumnRenamed('Age','Ages')

In [22]:
df_spark.show()

+------+----+-----------+
|  Name|Ages|Experiences|
+------+----+-----------+
|Haroon|  25|          2|
|  khan|  26|          3|
|  Awan|  27|          4|
+------+----+-----------+



In [23]:
# Reading another csv
df2 = spark.read.csv('testset2.csv',header=True,inferSchema=True)

In [24]:
df2.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Experience: integer (nullable = true)



In [25]:
df2.show()

+------+----+----------+
|  Name| Age|Experience|
+------+----+----------+
|Haroon|  23|      null|
|  khan|  22|         3|
|   ali|  25|         2|
| ahmad|null|         3|
| noman|  27|         2|
|  null|  28|         2|
| bilal|null|         1|
+------+----+----------+



In [26]:
#drop null values 
df2.na.drop(how='any').show()

+-----+---+----------+
| Name|Age|Experience|
+-----+---+----------+
| khan| 22|         3|
|  ali| 25|         2|
|noman| 27|         2|
| null| 28|         2|
+-----+---+----------+



In [27]:
# Drop nulls focusing on a column using subset; so it will only drop the rows where it finds null in age column
df2.na.drop(how="any",subset=['Age']).show()

+------+---+----------+
|  Name|Age|Experience|
+------+---+----------+
|Haroon| 23|      null|
|  khan| 22|         3|
|   ali| 25|         2|
| noman| 27|         2|
|  null| 28|         2|
+------+---+----------+



In [28]:
# Fill missing value
df2.na.fill('Missing value').show()

+------+----+----------+
|  Name| Age|Experience|
+------+----+----------+
|Haroon|  23|      null|
|  khan|  22|         3|
|   ali|  25|         2|
| ahmad|null|         3|
| noman|  27|         2|
|  null|  28|         2|
| bilal|null|         1|
+------+----+----------+



In [29]:
from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols = ['Age', 'Experience'], 
    outputCols=["{}_imputed".format(c) for c in ['Age', 'Experience']]
    ).setStrategy("mean")

In [30]:
# Add imputation cols to dataframe
imputer.fit(df2).transform(df2).show()

+------+----+----------+-----------+------------------+
|  Name| Age|Experience|Age_imputed|Experience_imputed|
+------+----+----------+-----------+------------------+
|Haroon|  23|      null|         23|                 2|
|  khan|  22|         3|         22|                 3|
|   ali|  25|         2|         25|                 2|
| ahmad|null|         3|         25|                 3|
| noman|  27|         2|         27|                 2|
|  null|  28|         2|         28|                 2|
| bilal|null|         1|         25|                 1|
+------+----+----------+-----------+------------------+



In [31]:
df2.show()

+------+----+----------+
|  Name| Age|Experience|
+------+----+----------+
|Haroon|  23|      null|
|  khan|  22|         3|
|   ali|  25|         2|
| ahmad|null|         3|
| noman|  27|         2|
|  null|  28|         2|
| bilal|null|         1|
+------+----+----------+



#### Filter Operations

In [36]:
spark1 = SparkSession.builder.appName('Practise2').getOrCreate()

In [46]:
df3 = spark1.read.csv('testset3.csv', header=True, inferSchema=True)

In [47]:
df3.show()

+------+---+------+
|  Name|Age|Salary|
+------+---+------+
|Haroon| 22| 44000|
|  khan| 12| 20000|
|  awan| 35| 21000|
|   ali| 68| 42000|
| bilal| 65| 12000|
|asghar| 24| 50000|
| noman| 50| 74000|
| ahmad| 47| 14000|
+------+---+------+



In [49]:
# Salary of people less than or equal to 20k
df3.filter("Salary<=20000").show()

+-----+---+------+
| Name|Age|Salary|
+-----+---+------+
| khan| 12| 20000|
|bilal| 65| 12000|
|ahmad| 47| 14000|
+-----+---+------+



In [54]:
# show selcted columns where the salary is more than 20k
df3.filter(df3['Salary']>=20000).select(['Name','Age']).show()

+------+---+
|  Name|Age|
+------+---+
|Haroon| 22|
|  khan| 12|
|  awan| 35|
|   ali| 68|
|asghar| 24|
| noman| 50|
+------+---+



In [55]:
# Using 'Not' operator for our condition
df3.filter(~(df3['Salary']<=20000)).show()

+------+---+------+
|  Name|Age|Salary|
+------+---+------+
|Haroon| 22| 44000|
|  awan| 35| 21000|
|   ali| 68| 42000|
|asghar| 24| 50000|
| noman| 50| 74000|
+------+---+------+

