In [1]:
#installing pyspark
!pip install pyspark



In [2]:
import pyspark
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName('practise').getOrCreate() # create a spark session

In [4]:
spark

In [5]:
#read csv file in pyspark dataframe
#df = spark.read.option('header', True).csv('test3.csv', inferSchema=True) 
df = spark.read.csv('test3.csv', header=True, inferSchema=True)

In [6]:
type(df)

pyspark.sql.dataframe.DataFrame

In [7]:
df.head(4)

[Row(Name='a', Age=4),
 Row(Name='b', Age=3),
 Row(Name='c', Age=2),
 Row(Name='d', Age=1)]

In [8]:
#check schema to see datatypes of columns
df.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)



In [9]:
df.show()

+----+---+
|Name|Age|
+----+---+
|   a|  4|
|   b|  3|
|   c|  2|
|   d|  1|
+----+---+



In [10]:
# to get all column names 
df.columns

['Name', 'Age']

In [11]:
type(df.select('Name'))

pyspark.sql.dataframe.DataFrame

In [12]:
df.select(['Name', 'Age']).show()

+----+---+
|Name|Age|
+----+---+
|   a|  4|
|   b|  3|
|   c|  2|
|   d|  1|
+----+---+



In [13]:
df.describe().show()

+-------+----+------------------+
|summary|Name|               Age|
+-------+----+------------------+
|  count|   4|                 4|
|   mean|null|               2.5|
| stddev|null|1.2909944487358056|
|    min|   a|                 1|
|    max|   d|                 4|
+-------+----+------------------+



In [14]:
#Adding columns to dataframe
df = df.withColumn('Age After 2 years', df['Age']+2)

In [15]:
df.show()

+----+---+-----------------+
|Name|Age|Age After 2 years|
+----+---+-----------------+
|   a|  4|                6|
|   b|  3|                5|
|   c|  2|                4|
|   d|  1|                3|
+----+---+-----------------+



In [16]:
# Drop column 
df.drop('Age After 2 years')

DataFrame[Name: string, Age: int]

In [17]:
df

DataFrame[Name: string, Age: int, Age After 2 years: int]

In [18]:
#Rename Column 
df.withColumnRenamed('Name', 'New Name').show()

+--------+---+-----------------+
|New Name|Age|Age After 2 years|
+--------+---+-----------------+
|       a|  4|                6|
|       b|  3|                5|
|       c|  2|                4|
|       d|  1|                3|
+--------+---+-----------------+



In [19]:
spark.stop()

In [20]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('practice').getOrCreate()

In [21]:
df = spark.read.csv('test1.csv', header=True, inferSchema=True)

In [22]:
df.show()

+----+----+----------+------+
|Name| Age|Experience|Salary|
+----+----+----------+------+
|   a|  31|        10| 30000|
|   b|  32|         8| 60000|
|   c|  21|         3| 90000|
|   d|  23|         5| 20000|
|   e|  21|         7|100000|
|   f|  23|         3| 70000|
|   g|null|      null|300000|
|null|  34|        10| 48000|
|null|  36|      null|  null|
+----+----+----------+------+



In [23]:
#drop raw with null values 
df.na.drop().show()

+----+---+----------+------+
|Name|Age|Experience|Salary|
+----+---+----------+------+
|   a| 31|        10| 30000|
|   b| 32|         8| 60000|
|   c| 21|         3| 90000|
|   d| 23|         5| 20000|
|   e| 21|         7|100000|
|   f| 23|         3| 70000|
+----+---+----------+------+



In [24]:
#drop raw based on 'all' or 'any' null value 
df.na.drop(how='any', thresh=3).show()

+----+---+----------+------+
|Name|Age|Experience|Salary|
+----+---+----------+------+
|   a| 31|        10| 30000|
|   b| 32|         8| 60000|
|   c| 21|         3| 90000|
|   d| 23|         5| 20000|
|   e| 21|         7|100000|
|   f| 23|         3| 70000|
|null| 34|        10| 48000|
+----+---+----------+------+



In [25]:
df.na.drop(how='any', subset=['Experience']).show()

+----+---+----------+------+
|Name|Age|Experience|Salary|
+----+---+----------+------+
|   a| 31|        10| 30000|
|   b| 32|         8| 60000|
|   c| 21|         3| 90000|
|   d| 23|         5| 20000|
|   e| 21|         7|100000|
|   f| 23|         3| 70000|
|null| 34|        10| 48000|
+----+---+----------+------+



In [26]:
#fill null values with specific value
df.na.fill(value='Missing value', subset=['Experience']).show()

+----+----+----------+------+
|Name| Age|Experience|Salary|
+----+----+----------+------+
|   a|  31|        10| 30000|
|   b|  32|         8| 60000|
|   c|  21|         3| 90000|
|   d|  23|         5| 20000|
|   e|  21|         7|100000|
|   f|  23|         3| 70000|
|   g|null|      null|300000|
|null|  34|        10| 48000|
|null|  36|      null|  null|
+----+----+----------+------+



In [27]:
from pyspark.ml.feature import Imputer 
imputer = Imputer(
            inputCols  = ['Age', 'Experience', 'Salary'],
            outputCols = ["{}_imputed".format(c) for c in ['Age', 'Experience', 'Salary']]
            ).setStrategy('mean')

In [28]:
imputer.fit(df).transform(df).show()

+----+----+----------+------+-----------+------------------+--------------+
|Name| Age|Experience|Salary|Age_imputed|Experience_imputed|Salary_imputed|
+----+----+----------+------+-----------+------------------+--------------+
|   a|  31|        10| 30000|         31|                10|         30000|
|   b|  32|         8| 60000|         32|                 8|         60000|
|   c|  21|         3| 90000|         21|                 3|         90000|
|   d|  23|         5| 20000|         23|                 5|         20000|
|   e|  21|         7|100000|         21|                 7|        100000|
|   f|  23|         3| 70000|         23|                 3|         70000|
|   g|null|      null|300000|         27|                 6|        300000|
|null|  34|        10| 48000|         34|                10|         48000|
|null|  36|      null|  null|         36|                 6|         89750|
+----+----+----------+------+-----------+------------------+--------------+



In [29]:
df.show()

+----+----+----------+------+
|Name| Age|Experience|Salary|
+----+----+----------+------+
|   a|  31|        10| 30000|
|   b|  32|         8| 60000|
|   c|  21|         3| 90000|
|   d|  23|         5| 20000|
|   e|  21|         7|100000|
|   f|  23|         3| 70000|
|   g|null|      null|300000|
|null|  34|        10| 48000|
|null|  36|      null|  null|
+----+----+----------+------+



In [35]:
df1 = df.na.drop('any')

In [36]:
df1.show()

+----+---+----------+------+
|Name|Age|Experience|Salary|
+----+---+----------+------+
|   a| 31|        10| 30000|
|   b| 32|         8| 60000|
|   c| 21|         3| 90000|
|   d| 23|         5| 20000|
|   e| 21|         7|100000|
|   f| 23|         3| 70000|
+----+---+----------+------+



In [38]:
#Filter operation
df1.filter('Salary < 30000').show()

+----+---+----------+------+
|Name|Age|Experience|Salary|
+----+---+----------+------+
|   d| 23|         5| 20000|
+----+---+----------+------+



In [41]:
df1.filter('Salary <= 30000').select(['Name', 'Age']).show()

+----+---+
|Name|Age|
+----+---+
|   a| 31|
|   d| 23|
+----+---+



In [54]:
#Filter operation with multiple conditions
df1.filter((df1['Salary'] <=30000) & (df1['Age'] > 30)).show()

+----+---+----------+------+
|Name|Age|Experience|Salary|
+----+---+----------+------+
|   a| 31|        10| 30000|
+----+---+----------+------+



In [56]:
#Not operation ~
df1.filter(`('Salary <= 30000')).show()

SyntaxError: invalid syntax (<ipython-input-56-c983d30f4189>, line 2)

In [57]:
spark.stop()

In [60]:
from pyspark.sql import SparkSession

In [61]:
sp = SparkSession.builder.appName('Agg').getOrCreate()

In [62]:
sp

In [75]:
df = sp.read.csv('test4.csv', header=True, inferSchema=True)
df.show()

+-----+----------+------+
| Name|Department|Salary|
+-----+----------+------+
|Heena|        DS| 10000|
|Heena|       IOT|  5000|
|advik|        BD|  4000|
|Heena|        BD|  4000|
|advik|        DS|  3000|
| ashu|        DS|200000|
| ashu|       IOT|100000|
| ashu|        BD|  5000|
|    S|        DS| 10000|
|    S|        BD|  2000|
+-----+----------+------+



In [76]:
#Groupby operation 
df.groupBy('Name').sum().show()

+-----+-----------+
| Name|sum(Salary)|
+-----+-----------+
| ashu|     305000|
|advik|       7000|
|Heena|      19000|
|    S|      12000|
+-----+-----------+



In [77]:
df.groupBy('Department').count().show()

+----------+-----+
|Department|count|
+----------+-----+
|       IOT|    2|
|        BD|    4|
|        DS|    4|
+----------+-----+



In [None]:
from pyspark.ml.features import VectorAssembler
featureassembler = VectorAssembler(inputCols=['Age', 'Experience'], outputCols=['Salary'])