PySpark is a powerful tool for large-scale data processing in Python. It is a Python API for Apache Spark, which is a distributed computing framework that allows you to process large volumes of data in parallel across a cluster of computers.

There are several reasons why you might choose to use PySpark for data processing tasks in Python:

Scalability: PySpark allows you to process large datasets that would be difficult or impossible to handle on a single machine. By distributing the processing across a cluster of machines, you can scale your data processing to handle datasets of virtually any size.

Speed: PySpark is designed to be fast, thanks to its distributed computing architecture. This allows you to process data much more quickly than you would be able to on a single machine.

Ease of use: PySpark provides a simple and intuitive interface for working with distributed data. You can write your data processing code in Python, which is a popular and easy-to-learn language.

Compatibility: PySpark works seamlessly with other Python libraries and tools, such as NumPy, pandas, and scikit-learn. This makes it easy to integrate PySpark into your existing data processing workflows.

Overall, PySpark is a powerful tool for large-scale data processing in Python, and it can help you to process large volumes of data quickly and easily.

In [1]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m1.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317130 sha256=2751c840e1c97d06b7d0c83e45a703a4ffa03199453b5d4c5d9403d222b6e5af
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


In [None]:
|import pyspark

In [None]:
# in order to work with spark we need to start a spark session

In [None]:
from pyspark.sql import SparkSession

In [None]:
spark= SparkSession.builder.appName("Practice").getOrCreate()

In [None]:
spark

In [None]:
df_spark= spark.read.csv("/content/test1.csv")

In [None]:
df_spark.show()

+----+---+
| _c0|_c1|
+----+---+
|Name|age|
|mani| 29|
|arsh| 23|
| kai| 27|
+----+---+



In [None]:
df_pyspark=spark.read.option("header","true").csv("/content/test1.csv")

In [None]:
type(df_pyspark)

pyspark.sql.dataframe.DataFrame

In [None]:
df_pyspark.head(3)

[Row(Name='mani', age='29'),
 Row(Name='arsh', age='23'),
 Row(Name='kai', age='27')]

In [None]:
import pyspark

In [None]:
import pandas as pd
pd.read_csv("/content/test1.csv")

Unnamed: 0,Name,age
0,mani,29
1,arsh,23
2,kai,27


In [None]:
from pyspark.sql import SparkSession

In [None]:
spark= SparkSession.builder.appName("Practice").getOrCreate()

In [None]:
spark

In [None]:
df= spark.read.csv("/content/test1.csv")

In [None]:
df.show()

+----+---+
| _c0|_c1|
+----+---+
|Name|age|
|mani| 29|
|arsh| 23|
| kai| 27|
+----+---+



In [None]:
df=spark.read.option("header","true").csv("/content/test1.csv")

In [None]:
df.show()

+----+---+
|Name|age|
+----+---+
|mani| 29|
|arsh| 23|
| kai| 27|
+----+---+



In [None]:
type(df)

pyspark.sql.dataframe.DataFrame

In [None]:
df.head(3)

[Row(Name='mani', age='29'),
 Row(Name='arsh', age='23'),
 Row(Name='kai', age='27')]

In [None]:
df.printSchema() #just like info option in pandas


root
 |-- Name: string (nullable = true)
 |-- age: string (nullable = true)



In [None]:

# next topic to cover in pyspark

# Pyspark dataframe
# reading the dataset
# checking the datatypes of the column
# selecting the columns and indexing
# check describe option similar to pandas 
# adding the columns
# dropping the columns

In [None]:
from pyspark.sql import SparkSession

In [None]:
spark= SparkSession.builder.appName("Dataframe").getOrCreate()

In [None]:
spark

In [None]:
df=spark.read.csv("/content/test3.csv")

In [None]:
df= spark.read.option("header","true").csv("/content/test3.csv")

In [None]:
df.show()

+------+---+----------+
|  Name|age|experience|
+------+---+----------+
| Krish| 31|        10|
|sudhan| 30|         8|
| sunny| 29|         4|
+------+---+----------+



In [None]:
df.head(3)

[Row(Name='Krish', age='31', experience='10'),
 Row(Name='sudhan', age='30', experience='8'),
 Row(Name='sunny', age='29', experience='4')]

In [None]:
df.printSchema()

root
 |-- Name: string (nullable = true)
 |-- age: string (nullable = true)
 |-- experience: string (nullable = true)



In [None]:
#best way to read csv file is that>

df= spark.read.csv("/content/test3.csv",header=True,inferSchema=True)

In [None]:
df.show()

+------+---+----------+
|  Name|age|experience|
+------+---+----------+
| Krish| 31|        10|
|sudhan| 30|         8|
| sunny| 29|         4|
+------+---+----------+



In [None]:
df.printSchema()

root
 |-- Name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- experience: integer (nullable = true)



In [None]:
from pyspark.sql import SparkSession

In [None]:
spark= SparkSession.builder.appName("manikandan").getOrCreate()

In [None]:
df=spark.read.csv("/content/test3.csv",header=True,inferSchema=True)

In [None]:
df.show()


+------+---+----------+
|  Name|age|experience|
+------+---+----------+
| Krish| 31|        10|
|sudhan| 30|         8|
| sunny| 29|         4|
+------+---+----------+



In [None]:
df.printSchema()

root
 |-- Name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- experience: integer (nullable = true)



In [None]:
type(df)

pyspark.sql.dataframe.DataFrame

In [None]:
df.columns

['Name', 'age', 'experience']

In [None]:
df.head(3)

[Row(Name='Krish', age=31, experience=10),
 Row(Name='sudhan', age=30, experience=8),
 Row(Name='sunny', age=29, experience=4)]

In [None]:
df.select("Name").show()

+------+
|  Name|
+------+
| Krish|
|sudhan|
| sunny|
+------+



In [None]:
df.dtypes

[('Name', 'string'), ('age', 'int'), ('experience', 'int')]

In [None]:
df.describe().show()

+-------+-----+----+-----------------+
|summary| Name| age|       experience|
+-------+-----+----+-----------------+
|  count|    3|   3|                3|
|   mean| null|30.0|7.333333333333333|
| stddev| null| 1.0|3.055050463303893|
|    min|Krish|  29|                4|
|    max|sunny|  31|               10|
+-------+-----+----+-----------------+



In [None]:
#adding the new columns
df=df.withColumn("experience after 2 years",df["experience"]+2)

In [None]:
df.show()

+------+---+----------+------------------------+
|  Name|age|experience|experience after 2 years|
+------+---+----------+------------------------+
| Krish| 31|        10|                      12|
|sudhan| 30|         8|                      10|
| sunny| 29|         4|                       6|
+------+---+----------+------------------------+



In [None]:
# drop the columns
df= df.drop("experience after 2 years")

In [None]:
df.show()

+------+---+----------+
|  Name|age|experience|
+------+---+----------+
| Krish| 31|        10|
|sudhan| 30|         8|
| sunny| 29|         4|
+------+---+----------+



In [None]:
# rename the columns
df= df.withColumnRenamed("Name","New name")

In [None]:
df.show()

+--------+---+----------+
|New name|age|experience|
+--------+---+----------+
|   Krish| 31|        10|
|  sudhan| 30|         8|
|   sunny| 29|         4|
+--------+---+----------+



In [None]:
# pyspark handling missing values

In [None]:
# dropping columns
# dropping rows
# various parameter in dropping functionalities
# handling missing values by mean median mode

In [2]:
from pyspark.sql import SparkSession
spark= SparkSession.builder.appName("practise").getOrCreate()

In [28]:
df= spark.read.csv("/content/test4.csv",header=True,inferSchema=True)

In [29]:
df.show()

+----+----+----+------+
|Name| age| exp|salary|
+----+----+----+------+
|   a|  31|  10| 30000|
|   b|  30|   8| 25000|
|   c|  29|   4| 20000|
|   d|  24|   3| 20000|
|   e|  24|   1| 15000|
|   f|  23|   2| 18000|
|   g|null|null| 40000|
|null|  34|  10|380000|
|null|  36|null|  null|
+----+----+----+------+



In [30]:
df=df.na.drop()

In [31]:
df.show()

+----+---+---+------+
|Name|age|exp|salary|
+----+---+---+------+
|   a| 31| 10| 30000|
|   b| 30|  8| 25000|
|   c| 29|  4| 20000|
|   d| 24|  3| 20000|
|   e| 24|  1| 15000|
|   f| 23|  2| 18000|
+----+---+---+------+



In [None]:
# now we gonna drop name column
df.drop("NAME").show()

+----+----+------+
| age| exp|salary|
+----+----+------+
|  31|  10| 30000|
|  30|   8| 25000|
|  29|   4| 20000|
|  24|   3| 20000|
|  24|   1| 15000|
|  23|   2| 18000|
|null|null| 40000|
|  34|  10|380000|
|  36|null|  null|
+----+----+------+



In [None]:
df.na.drop().show()

+----+---+---+------+
|Name|age|exp|salary|
+----+---+---+------+
|   a| 31| 10| 30000|
|   b| 30|  8| 25000|
|   c| 29|  4| 20000|
|   d| 24|  3| 20000|
|   e| 24|  1| 15000|
|   f| 23|  2| 18000|
+----+---+---+------+



In [None]:
df.show()

+----+----+----+------+
|Name| age| exp|salary|
+----+----+----+------+
|   a|  31|  10| 30000|
|   b|  30|   8| 25000|
|   c|  29|   4| 20000|
|   d|  24|   3| 20000|
|   e|  24|   1| 15000|
|   f|  23|   2| 18000|
|   g|null|null| 40000|
|null|  34|  10|380000|
|null|  36|null|  null|
+----+----+----+------+



In [None]:
df.na.drop(how="all").show()  #if the row has everything null it will drop else it wont

+----+----+----+------+
|Name| age| exp|salary|
+----+----+----+------+
|   a|  31|  10| 30000|
|   b|  30|   8| 25000|
|   c|  29|   4| 20000|
|   d|  24|   3| 20000|
|   e|  24|   1| 15000|
|   f|  23|   2| 18000|
|   g|null|null| 40000|
|null|  34|  10|380000|
|null|  36|null|  null|
+----+----+----+------+



In [None]:
df.na.drop(how="any",thresh=2).show()

+----+----+----+------+
|Name| age| exp|salary|
+----+----+----+------+
|   a|  31|  10| 30000|
|   b|  30|   8| 25000|
|   c|  29|   4| 20000|
|   d|  24|   3| 20000|
|   e|  24|   1| 15000|
|   f|  23|   2| 18000|
|   g|null|null| 40000|
|null|  34|  10|380000|
+----+----+----+------+



In [None]:
df.na.drop(subset="name").show() ##just like sql it insensitive to case

+----+----+----+------+
|Name| age| exp|salary|
+----+----+----+------+
|   a|  31|  10| 30000|
|   b|  30|   8| 25000|
|   c|  29|   4| 20000|
|   d|  24|   3| 20000|
|   e|  24|   1| 15000|
|   f|  23|   2| 18000|
|   g|null|null| 40000|
+----+----+----+------+



In [None]:
df.na.fill("missing values",subset=["exp","salary"]).show()

+----+----+----+------+
|Name| age| exp|salary|
+----+----+----+------+
|   a|  31|  10| 30000|
|   b|  30|   8| 25000|
|   c|  29|   4| 20000|
|   d|  24|   3| 20000|
|   e|  24|   1| 15000|
|   f|  23|   2| 18000|
|   g|null|null| 40000|
|null|  34|  10|380000|
|null|  36|null|  null|
+----+----+----+------+



In [32]:
df=df.na.drop()

In [33]:
df.show()

+----+---+---+------+
|Name|age|exp|salary|
+----+---+---+------+
|   a| 31| 10| 30000|
|   b| 30|  8| 25000|
|   c| 29|  4| 20000|
|   d| 24|  3| 20000|
|   e| 24|  1| 15000|
|   f| 23|  2| 18000|
+----+---+---+------+



In [34]:
df.filter("salary<=20000").show()

+----+---+---+------+
|Name|age|exp|salary|
+----+---+---+------+
|   c| 29|  4| 20000|
|   d| 24|  3| 20000|
|   e| 24|  1| 15000|
|   f| 23|  2| 18000|
+----+---+---+------+



In [36]:
df.filter((df["salary"] <= 20000)&
          (df["exp"]<=3)).show()

+----+---+---+------+
|Name|age|exp|salary|
+----+---+---+------+
|   d| 24|  3| 20000|
|   e| 24|  1| 15000|
|   f| 23|  2| 18000|
+----+---+---+------+



In [37]:
from pyspark.sql import SparkSession

In [38]:
spark= SparkSession.builder.appName("Agg").getOrCreate()

In [41]:
df= spark.read.csv("/content/test5.csv",header=True,inferSchema=True)

In [42]:
df.show()

+---------+------------+------+
|     Name| Departments|salary|
+---------+------------+------+
|    Krish|Data Science| 10000|
|    Krish|         IOT|  5000|
|   Mahesh|    Big Data|  4000|
|    Krish|    Big Data|  4000|
|   Mahesh|Data Science|  3000|
|Sudhanshu|Data Science| 20000|
|Sudhanshu|         IOT| 10000|
|Sudhanshu|    Big Data|  5000|
|    Sunny|Data Science| 10000|
+---------+------------+------+



In [43]:
df.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Departments: string (nullable = true)
 |-- salary: integer (nullable = true)



In [44]:
df.groupBy("departments").sum().show()

+------------+-----------+
| departments|sum(salary)|
+------------+-----------+
|         IOT|      15000|
|    Big Data|      13000|
|Data Science|      43000|
+------------+-----------+



In [45]:
df.groupBy("name").sum().show()

+---------+-----------+
|     name|sum(salary)|
+---------+-----------+
|Sudhanshu|      35000|
|    Sunny|      10000|
|    Krish|      19000|
|   Mahesh|       7000|
+---------+-----------+



In [46]:
df.agg({"salary":"sum"}).show()

+-----------+
|sum(salary)|
+-----------+
|      71000|
+-----------+



In [47]:
# Now we gonna do machine learning techniques ( linear regeression)

In [48]:
from pyspark.sql import SparkSession

In [49]:
spark= SparkSession.builder.appName("missing").getOrCreate()

In [50]:
df= spark.read.csv("/content/test22.csv",header=True,inferSchema=True)

In [51]:
df.show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [52]:
df.columns

['Name', 'age', 'Experience', 'Salary']

In [53]:
df.printSchema()

root
 |-- Name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Salary: integer (nullable = true)



In [55]:
df.describe().show()

+-------+------+------------------+-----------------+------------------+
|summary|  Name|               age|       Experience|            Salary|
+-------+------+------------------+-----------------+------------------+
|  count|     6|                 6|                6|                 6|
|   mean|  null|26.333333333333332|4.666666666666667|21333.333333333332|
| stddev|  null| 4.179314138308661|3.559026084010437| 5354.126134736337|
|    min|Harsha|                21|                1|             15000|
|    max| Sunny|                31|               10|             30000|
+-------+------+------------------+-----------------+------------------+



In [56]:
# in linear regression we usually do train test split and we do prediction, but in pyspark we will group by the independent variable

In [59]:
from pyspark.ml.feature import VectorAssembler
featureassembler= VectorAssembler(inputCols=["age","Experience"],outputCol="indpendent_features")

In [60]:
output= featureassembler.transform(df)

In [61]:
output.show()

+---------+---+----------+------+-------------------+
|     Name|age|Experience|Salary|indpendent_features|
+---------+---+----------+------+-------------------+
|    Krish| 31|        10| 30000|        [31.0,10.0]|
|Sudhanshu| 30|         8| 25000|         [30.0,8.0]|
|    Sunny| 29|         4| 20000|         [29.0,4.0]|
|     Paul| 24|         3| 20000|         [24.0,3.0]|
|   Harsha| 21|         1| 15000|         [21.0,1.0]|
|  Shubham| 23|         2| 18000|         [23.0,2.0]|
+---------+---+----------+------+-------------------+



In [64]:
final_data= output.select("indpendent_features","Salary")

In [65]:
final_data.show()

+-------------------+------+
|indpendent_features|Salary|
+-------------------+------+
|        [31.0,10.0]| 30000|
|         [30.0,8.0]| 25000|
|         [29.0,4.0]| 20000|
|         [24.0,3.0]| 20000|
|         [21.0,1.0]| 15000|
|         [23.0,2.0]| 18000|
+-------------------+------+



In [66]:
from pyspark.ml.regression import LinearRegression

In [67]:
train_data,test_data= final_data.randomSplit([0.75,0.25])

In [68]:
regressor=LinearRegression(featuresCol="indpendent_features",labelCol="Salary")

In [70]:
regressor=regressor.fit(train_data)

In [73]:
regressor.coefficients

DenseVector([-90.5483, 1608.7819])

In [74]:
regressor.intercept

16079.136690647425

In [75]:
pred_results=regressor.evaluate(test_data)

In [77]:
pred_results.predictions.show()

+-------------------+------+-----------------+
|indpendent_features|Salary|       prediction|
+-------------------+------+-----------------+
|         [23.0,2.0]| 18000|17214.09079632846|
+-------------------+------+-----------------+



In [79]:
pred_results.meanAbsoluteError

785.909203671541