In [1]:
!pip install pyspark



In [2]:
import pyspark
pyspark.__version__

'4.0.1'

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Dataframe").getOrCreate()
spark

In [4]:
import pandas as pd

# Read original Excel file
pdf = pd.read_excel("testing.xlsx")   # real Excel path

# Save as CSV
pdf.to_csv("pyspark.csv", index=False)

In [5]:
df_pyspark =  spark.read.csv("pyspark.csv", header=True, inferSchema=True) # inferSchema means make the datatyes as it is else default is string
df_pyspark.show()

+--------------+---+----------+
|          Name|Age|Experience|
+--------------+---+----------+
|Syed Najiullah| 21|        10|
|Hassan Gillani| 26|         8|
|   Sheraz Khan| 22|         4|
+--------------+---+----------+



In [6]:
df_pyspark.columns

['Name', 'Age', 'Experience']

In [7]:
df_pyspark.head(3)

[Row(Name='Syed Najiullah', Age=21, Experience=10),
 Row(Name='Hassan Gillani', Age=26, Experience=8),
 Row(Name='Sheraz Khan', Age=22, Experience=4)]

Selecting columns

In [8]:
df_pyspark.select("Name").show()

+--------------+
|          Name|
+--------------+
|Syed Najiullah|
|Hassan Gillani|
|   Sheraz Khan|
+--------------+



In [9]:
df_pyspark.select("Name", "Age").show()

+--------------+---+
|          Name|Age|
+--------------+---+
|Syed Najiullah| 21|
|Hassan Gillani| 26|
|   Sheraz Khan| 22|
+--------------+---+



In [10]:
df_pyspark["Name"]

Column<'Name'>

In [11]:
df_pyspark.dtypes

[('Name', 'string'), ('Age', 'int'), ('Experience', 'int')]

In [12]:
df_pyspark.describe().show()

+-------+--------------+------------------+-----------------+
|summary|          Name|               Age|       Experience|
+-------+--------------+------------------+-----------------+
|  count|             3|                 3|                3|
|   mean|          NULL|              23.0|7.333333333333333|
| stddev|          NULL|2.6457513110645907|3.055050463303893|
|    min|Hassan Gillani|                21|                4|
|    max|Syed Najiullah|                26|               10|
+-------+--------------+------------------+-----------------+



Creating new column

In [13]:
df_pyspark = df_pyspark.withColumn("Experience After 2 years", df_pyspark["Experience"]+2)

In [14]:
df_pyspark.show()

+--------------+---+----------+------------------------+
|          Name|Age|Experience|Experience After 2 years|
+--------------+---+----------+------------------------+
|Syed Najiullah| 21|        10|                      12|
|Hassan Gillani| 26|         8|                      10|
|   Sheraz Khan| 22|         4|                       6|
+--------------+---+----------+------------------------+



droping column

In [15]:
df_pyspark = df_pyspark.drop("Experience After 2 years")
df_pyspark.show()

+--------------+---+----------+
|          Name|Age|Experience|
+--------------+---+----------+
|Syed Najiullah| 21|        10|
|Hassan Gillani| 26|         8|
|   Sheraz Khan| 22|         4|
+--------------+---+----------+



rename column

In [16]:
df_pyspark = df_pyspark.withColumnRenamed("Name", "New Name")
df_pyspark.show()

+--------------+---+----------+
|      New Name|Age|Experience|
+--------------+---+----------+
|Syed Najiullah| 21|        10|
|Hassan Gillani| 26|         8|
|   Sheraz Khan| 22|         4|
+--------------+---+----------+



In [17]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Practise").getOrCreate()
spark

In [18]:
import pandas as pd

# Read original Excel file
pdf = pd.read_excel("testing1.xlsx")   # real Excel path

# Save as CSV
pdf.to_csv("pyspark1.csv", index=False)

In [19]:
df_pyspark =  spark.read.csv("pyspark1.csv", header=True, inferSchema=True)
df_pyspark.show()

+---------+----+----------+-------+
|     name| age|experience| salary|
+---------+----+----------+-------+
|    krish|31.0|      10.0|30000.0|
|sudhansha|30.0|       8.0|25000.0|
|    sunny|29.0|       4.0|20000.0|
|     paul|24.0|       3.0|20000.0|
|   harsha|21.0|       2.0|15000.0|
|  shubham|23.0|       1.0|18000.0|
|   mahesh|NULL|      NULL|40000.0|
|     NULL|34.0|      NULL|38000.0|
|     NULL|36.0|      10.0|   NULL|
+---------+----+----------+-------+



In [20]:
df_pyspark.drop("Name").show()

+----+----------+-------+
| age|experience| salary|
+----+----------+-------+
|31.0|      10.0|30000.0|
|30.0|       8.0|25000.0|
|29.0|       4.0|20000.0|
|24.0|       3.0|20000.0|
|21.0|       2.0|15000.0|
|23.0|       1.0|18000.0|
|NULL|      NULL|40000.0|
|34.0|      NULL|38000.0|
|36.0|      10.0|   NULL|
+----+----------+-------+



In [21]:
df_pyspark.na.drop().show()

+---------+----+----------+-------+
|     name| age|experience| salary|
+---------+----+----------+-------+
|    krish|31.0|      10.0|30000.0|
|sudhansha|30.0|       8.0|25000.0|
|    sunny|29.0|       4.0|20000.0|
|     paul|24.0|       3.0|20000.0|
|   harsha|21.0|       2.0|15000.0|
|  shubham|23.0|       1.0|18000.0|
+---------+----+----------+-------+



In [22]:
df_pyspark.na.drop(how="any").show() # drop if there is any null value in row

+---------+----+----------+-------+
|     name| age|experience| salary|
+---------+----+----------+-------+
|    krish|31.0|      10.0|30000.0|
|sudhansha|30.0|       8.0|25000.0|
|    sunny|29.0|       4.0|20000.0|
|     paul|24.0|       3.0|20000.0|
|   harsha|21.0|       2.0|15000.0|
|  shubham|23.0|       1.0|18000.0|
+---------+----+----------+-------+



In [23]:
df_pyspark.na.drop(how="all").show() # drop if there is all null value in row

+---------+----+----------+-------+
|     name| age|experience| salary|
+---------+----+----------+-------+
|    krish|31.0|      10.0|30000.0|
|sudhansha|30.0|       8.0|25000.0|
|    sunny|29.0|       4.0|20000.0|
|     paul|24.0|       3.0|20000.0|
|   harsha|21.0|       2.0|15000.0|
|  shubham|23.0|       1.0|18000.0|
|   mahesh|NULL|      NULL|40000.0|
|     NULL|34.0|      NULL|38000.0|
|     NULL|36.0|      10.0|   NULL|
+---------+----+----------+-------+



In [24]:
df_pyspark.na.drop(how="any", thresh=2).show() # atleast 2 not null values should be present

+---------+----+----------+-------+
|     name| age|experience| salary|
+---------+----+----------+-------+
|    krish|31.0|      10.0|30000.0|
|sudhansha|30.0|       8.0|25000.0|
|    sunny|29.0|       4.0|20000.0|
|     paul|24.0|       3.0|20000.0|
|   harsha|21.0|       2.0|15000.0|
|  shubham|23.0|       1.0|18000.0|
|   mahesh|NULL|      NULL|40000.0|
|     NULL|34.0|      NULL|38000.0|
|     NULL|36.0|      10.0|   NULL|
+---------+----+----------+-------+



In [25]:
df_pyspark.na.drop(how="any", subset=["Age"]).show() # nan value in age column deleted

+---------+----+----------+-------+
|     name| age|experience| salary|
+---------+----+----------+-------+
|    krish|31.0|      10.0|30000.0|
|sudhansha|30.0|       8.0|25000.0|
|    sunny|29.0|       4.0|20000.0|
|     paul|24.0|       3.0|20000.0|
|   harsha|21.0|       2.0|15000.0|
|  shubham|23.0|       1.0|18000.0|
|     NULL|34.0|      NULL|38000.0|
|     NULL|36.0|      10.0|   NULL|
+---------+----+----------+-------+



In [26]:
df_pyspark.na.fill(0, ["experience", "salary"]).show()

+---------+----+----------+-------+
|     name| age|experience| salary|
+---------+----+----------+-------+
|    krish|31.0|      10.0|30000.0|
|sudhansha|30.0|       8.0|25000.0|
|    sunny|29.0|       4.0|20000.0|
|     paul|24.0|       3.0|20000.0|
|   harsha|21.0|       2.0|15000.0|
|  shubham|23.0|       1.0|18000.0|
|   mahesh|NULL|       0.0|40000.0|
|     NULL|34.0|       0.0|38000.0|
|     NULL|36.0|      10.0|    0.0|
+---------+----+----------+-------+



In [27]:
df_pyspark.show()

+---------+----+----------+-------+
|     name| age|experience| salary|
+---------+----+----------+-------+
|    krish|31.0|      10.0|30000.0|
|sudhansha|30.0|       8.0|25000.0|
|    sunny|29.0|       4.0|20000.0|
|     paul|24.0|       3.0|20000.0|
|   harsha|21.0|       2.0|15000.0|
|  shubham|23.0|       1.0|18000.0|
|   mahesh|NULL|      NULL|40000.0|
|     NULL|34.0|      NULL|38000.0|
|     NULL|36.0|      10.0|   NULL|
+---------+----+----------+-------+



Imputer

In [28]:
from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols=["age", "experience", "salary"],
    outputCols=["{}_imputed".format(c) for c in ["age", "experience", "salary"]]
).setStrategy("mean") # or median

imputer.fit(df_pyspark).transform(df_pyspark).show()

+---------+----+----------+-------+-----------+------------------+--------------+
|     name| age|experience| salary|age_imputed|experience_imputed|salary_imputed|
+---------+----+----------+-------+-----------+------------------+--------------+
|    krish|31.0|      10.0|30000.0|       31.0|              10.0|       30000.0|
|sudhansha|30.0|       8.0|25000.0|       30.0|               8.0|       25000.0|
|    sunny|29.0|       4.0|20000.0|       29.0|               4.0|       20000.0|
|     paul|24.0|       3.0|20000.0|       24.0|               3.0|       20000.0|
|   harsha|21.0|       2.0|15000.0|       21.0|               2.0|       15000.0|
|  shubham|23.0|       1.0|18000.0|       23.0|               1.0|       18000.0|
|   mahesh|NULL|      NULL|40000.0|       28.5| 5.428571428571429|       40000.0|
|     NULL|34.0|      NULL|38000.0|       34.0| 5.428571428571429|       38000.0|
|     NULL|36.0|      10.0|   NULL|       36.0|              10.0|       25750.0|
+---------+----+

Filter operations

In [30]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Practise").getOrCreate()
spark

In [34]:
df_pyspark =  spark.read.csv("test1.csv", header=True, inferSchema=True)
df_pyspark.show()

+---------+---+----------+------+
|     name|age|experience|salary|
+---------+---+----------+------+
|    krish| 31|        10| 30000|
|sudhansha| 30|         8| 25000|
|    sunny| 29|         4| 20000|
|     paul| 24|         3| 20000|
|   harsha| 21|         1| 15000|
|  shubham| 23|         2| 18000|
+---------+---+----------+------+



In [35]:
# salary of people less then or equal to 20000

df_pyspark.filter("salary<=20000").show()

+-------+---+----------+------+
|   name|age|experience|salary|
+-------+---+----------+------+
|  sunny| 29|         4| 20000|
|   paul| 24|         3| 20000|
| harsha| 21|         1| 15000|
|shubham| 23|         2| 18000|
+-------+---+----------+------+



In [36]:
df_pyspark.filter("salary<=20000").select(["name", "age"]).show()

+-------+---+
|   name|age|
+-------+---+
|  sunny| 29|
|   paul| 24|
| harsha| 21|
|shubham| 23|
+-------+---+



In [39]:
df_pyspark.filter((df_pyspark["salary"]<=20000) & (df_pyspark["salary"]>=15000)).show()

+-------+---+----------+------+
|   name|age|experience|salary|
+-------+---+----------+------+
|  sunny| 29|         4| 20000|
|   paul| 24|         3| 20000|
| harsha| 21|         1| 15000|
|shubham| 23|         2| 18000|
+-------+---+----------+------+



In [40]:
df_pyspark.filter((df_pyspark["salary"]<=20000) | (df_pyspark["salary"]>=15000)).show()

+---------+---+----------+------+
|     name|age|experience|salary|
+---------+---+----------+------+
|    krish| 31|        10| 30000|
|sudhansha| 30|         8| 25000|
|    sunny| 29|         4| 20000|
|     paul| 24|         3| 20000|
|   harsha| 21|         1| 15000|
|  shubham| 23|         2| 18000|
+---------+---+----------+------+



In [43]:
df_pyspark.filter(~(df_pyspark["salary"]<=20000)).show() # > 20,000

+---------+---+----------+------+
|     name|age|experience|salary|
+---------+---+----------+------+
|    krish| 31|        10| 30000|
|sudhansha| 30|         8| 25000|
+---------+---+----------+------+



gropy by and aggregate functions

In [46]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Agg").getOrCreate()
spark

In [48]:
df_pyspark = spark.read.csv("test2.csv", header=True, inferSchema=True)
df_pyspark.show()

+---------+------------+------+
|     name| departments|salary|
+---------+------------+------+
|    krish|data science| 10000|
|    krish|         iot|  5000|
|   mahesh|    big data|  4000|
|    krish|    big data|  4000|
|   mahesh|data science|  3000|
|sudhanshu|data science| 20000|
|sudhanshu|         iot| 10000|
|sudhanshu|    big data|  5000|
|    sunny|data science| 10000|
|    sunny|    big data|  2000|
+---------+------------+------+



In [49]:
df_pyspark.printSchema()

root
 |-- name: string (nullable = true)
 |-- departments: string (nullable = true)
 |-- salary: integer (nullable = true)



In [51]:
# groupby

df_pyspark.groupBy("name").sum().show()

+---------+-----------+
|     name|sum(salary)|
+---------+-----------+
|    sunny|      12000|
|    krish|      19000|
|sudhanshu|      35000|
|   mahesh|       7000|
+---------+-----------+



In [52]:
df_pyspark.groupBy("departments").sum().show()

+------------+-----------+
| departments|sum(salary)|
+------------+-----------+
|    big data|      15000|
|data science|      43000|
|         iot|      15000|
+------------+-----------+



In [53]:
df_pyspark.groupBy("departments").mean().show()

+------------+-----------+
| departments|avg(salary)|
+------------+-----------+
|    big data|     3750.0|
|data science|    10750.0|
|         iot|     7500.0|
+------------+-----------+



In [54]:
df_pyspark.groupBy("departments").count().show()

+------------+-----+
| departments|count|
+------------+-----+
|    big data|    4|
|data science|    4|
|         iot|    2|
+------------+-----+



In [57]:
df_pyspark.agg({"salary": "sum"}).show() # totoal salary of the company

+-----------+
|sum(salary)|
+-----------+
|      73000|
+-----------+



ML

In [58]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("missing").getOrCreate()
spark

In [59]:
training = spark.read.csv("test3.csv",header=True, inferSchema=True)
training.show()

+---------+---+----------+------+
|     name|age|experience|salary|
+---------+---+----------+------+
|    krish| 31|        10| 30000|
|sudhanshu| 30|         8| 25000|
|    sunny| 29|         4| 20000|
|     paul| 24|         3| 20000|
|   harsha| 21|         1| 15000|
|  shubham| 23|         2| 18000|
+---------+---+----------+------+



In [60]:
training.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- experience: integer (nullable = true)
 |-- salary: integer (nullable = true)



In [61]:
training.columns

['name', 'age', 'experience', 'salary']

In [62]:
from pyspark.ml.feature import VectorAssembler

featureassembler = VectorAssembler(inputCols=["age", "experience"], outputCol="Independent Features")

In [63]:
output = featureassembler.transform(training)
output.show()

+---------+---+----------+------+--------------------+
|     name|age|experience|salary|Independent Features|
+---------+---+----------+------+--------------------+
|    krish| 31|        10| 30000|         [31.0,10.0]|
|sudhanshu| 30|         8| 25000|          [30.0,8.0]|
|    sunny| 29|         4| 20000|          [29.0,4.0]|
|     paul| 24|         3| 20000|          [24.0,3.0]|
|   harsha| 21|         1| 15000|          [21.0,1.0]|
|  shubham| 23|         2| 18000|          [23.0,2.0]|
+---------+---+----------+------+--------------------+



In [64]:
finalized_data = output.select("Independent Features", "salary")
finalized_data.show()

+--------------------+------+
|Independent Features|salary|
+--------------------+------+
|         [31.0,10.0]| 30000|
|          [30.0,8.0]| 25000|
|          [29.0,4.0]| 20000|
|          [24.0,3.0]| 20000|
|          [21.0,1.0]| 15000|
|          [23.0,2.0]| 18000|
+--------------------+------+



In [65]:
from pyspark.ml.regression import LinearRegression

train_data, test_data = finalized_data.randomSplit([0.75, 0.25])
regressor = LinearRegression(featuresCol="Independent Features", labelCol="salary")
regressor = regressor.fit(train_data)

In [67]:
pred = regressor.evaluate(test_data)

pred.predictions.show()

+--------------------+------+------------------+
|Independent Features|salary|        prediction|
+--------------------+------+------------------+
|          [29.0,4.0]| 20000|  30500.0000000063|
|         [31.0,10.0]| 30000|23833.333333330767|
+--------------------+------+------------------+



In [68]:
pred.meanSquaredError, pred.meanAbsoluteError

(74138888.88897087, 8333.333333337767)