<a href="https://colab.research.google.com/github/ajinkyagh/Python_Practice/blob/main/Python_PySpark_Practice.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [44]:
#EDA & Data Manipulation
#Creating a dataframe from List
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("EDA Methods").getOrCreate()

data = [("Alice",25),("Bob",30), ("Charlie", None ), ("John",30)]
df = spark.createDataFrame(data, ["Name", "Age"])
df.show()

+-------+----+
|   Name| Age|
+-------+----+
|  Alice|  25|
|    Bob|  30|
|Charlie|NULL|
|   John|  30|
+-------+----+



In [45]:
#read CSV file
from google.colab import drive
drive.mount('/content/gdrive')

df_customers = spark.read.csv('/content/gdrive/My Drive/Colab Notebooks/dataset/customers.csv', header=True, inferSchema=True)
df_customers.show()


Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).
+-----------+-------+-----------+------------------+
|customer_id|   name|signup_date|             email|
+-----------+-------+-----------+------------------+
|          1|  Alice| 2021-01-01| user1@example.com|
|          2|    Bob| 2021-04-01| user2@example.com|
|          3|Charlie| 2021-06-30| user3@example.com|
|          4|  David| 2021-09-28|              NULL|
|          5|    Eva| 2021-12-27| user5@example.com|
|          6|  Frank| 2022-03-27| user6@example.com|
|          7|  Grace| 2022-06-25| user7@example.com|
|          8|  Helen| 2022-09-23| user8@example.com|
|          9|    Ian| 2022-12-22| user9@example.com|
|         10|   Jane| 2023-03-22|user10@example.com|
+-----------+-------+-----------+------------------+



In [46]:
#Read a parquet File into the a data frame
# df_orders = spark.read.parquet('/content/gdrive/My Drive/Colab Notebooks/dataset/orders.csv')

#Read a json File into the a data frame
#df = spark.read.json("path")

In [47]:
#Basic EDA Methods
df.show(5)

+-------+----+
|   Name| Age|
+-------+----+
|  Alice|  25|
|    Bob|  30|
|Charlie|NULL|
|   John|  30|
+-------+----+



In [48]:
df.columns

['Name', 'Age']

In [49]:
df.count()

4

In [50]:
#Displays the schema
df.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: long (nullable = true)



In [51]:
#Computes summary statistics for numerical columns.
df.describe().show()

+-------+-----+------------------+
|summary| Name|               Age|
+-------+-----+------------------+
|  count|    4|                 3|
|   mean| NULL|28.333333333333332|
| stddev| NULL| 2.886751345948129|
|    min|Alice|                25|
|    max| John|                30|
+-------+-----+------------------+



In [52]:
#Commutes column wise missing values.
from pyspark.sql.functions import col,sum
df.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in df.columns)).show()

+----+---+
|Name|Age|
+----+---+
|   0|  1|
+----+---+



In [53]:
#Filtering and Selecting Data
df.show(5)

#Filter rows where Age is greater than 25
df.filter(df.Age > 25).show()

+-------+----+
|   Name| Age|
+-------+----+
|  Alice|  25|
|    Bob|  30|
|Charlie|NULL|
|   John|  30|
+-------+----+

+----+---+
|Name|Age|
+----+---+
| Bob| 30|
|John| 30|
+----+---+



In [54]:
#Filter using multiple conditions
df.filter((df.Age>=25) & (df.Name.startswith("A"))).show()

df.filter((df.Age>=25) & (df.Name=="Bob")).show()

+-----+---+
| Name|Age|
+-----+---+
|Alice| 25|
+-----+---+

+----+---+
|Name|Age|
+----+---+
| Bob| 30|
+----+---+



In [55]:
#Select Specific Columns
df.select("Name").show()

+-------+
|   Name|
+-------+
|  Alice|
|    Bob|
|Charlie|
|   John|
+-------+



In [56]:
#Add a new column
from pyspark.sql.functions import lit
df = df.withColumn("Country", lit("USA"))
df.show()

+-------+----+-------+
|   Name| Age|Country|
+-------+----+-------+
|  Alice|  25|    USA|
|    Bob|  30|    USA|
|Charlie|NULL|    USA|
|   John|  30|    USA|
+-------+----+-------+



In [57]:
#rename a column
df = df.withColumnRenamed("Country", "Nationality")
df.show()

+-------+----+-----------+
|   Name| Age|Nationality|
+-------+----+-----------+
|  Alice|  25|        USA|
|    Bob|  30|        USA|
|Charlie|NULL|        USA|
|   John|  30|        USA|
+-------+----+-----------+



In [58]:
#Drop a column
#df = df.drop("Nationality")
#df.show()

In [59]:
#Aggregations and Grouping
#Counts the number of occurrences per category
df.groupBy("Age").count().show()

+----+-----+
| Age|count|
+----+-----+
|  25|    1|
|  30|    2|
|NULL|    1|
+----+-----+



In [60]:
#Finds the average age per country
df.groupBy("Nationality").agg({"Age": "avg"}).show()

+-----------+------------------+
|Nationality|          avg(Age)|
+-----------+------------------+
|        USA|28.333333333333332|
+-----------+------------------+



In [61]:
#Computes multiple aggregations
from pyspark.sql.functions import min, max, avg
df.groupBy("Nationality").agg(avg("Age"), min("Age"), max("Age")).show()

+-----------+------------------+--------+--------+
|Nationality|          avg(Age)|min(Age)|max(Age)|
+-----------+------------------+--------+--------+
|        USA|28.333333333333332|      25|      30|
+-----------+------------------+--------+--------+



In [62]:
#Sorting and Ranking
#Sorts DatFrame in ascending order.
df.show()

df.orderBy(df.Age.asc()).show()

+-------+----+-----------+
|   Name| Age|Nationality|
+-------+----+-----------+
|  Alice|  25|        USA|
|    Bob|  30|        USA|
|Charlie|NULL|        USA|
|   John|  30|        USA|
+-------+----+-----------+

+-------+----+-----------+
|   Name| Age|Nationality|
+-------+----+-----------+
|Charlie|NULL|        USA|
|  Alice|  25|        USA|
|   John|  30|        USA|
|    Bob|  30|        USA|
+-------+----+-----------+



In [63]:
#Sorts DatFrame in desc order.
df.orderBy(df.Age.desc()).show()

+-------+----+-----------+
|   Name| Age|Nationality|
+-------+----+-----------+
|   John|  30|        USA|
|    Bob|  30|        USA|
|  Alice|  25|        USA|
|Charlie|NULL|        USA|
+-------+----+-----------+



In [64]:
#ADDS a row number column(ranking)
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

windowSpec = Window.orderBy("Age")
df.withColumn("row_number", row_number().over(windowSpec)).show()

+-------+----+-----------+----------+
|   Name| Age|Nationality|row_number|
+-------+----+-----------+----------+
|Charlie|NULL|        USA|         1|
|  Alice|  25|        USA|         2|
|    Bob|  30|        USA|         3|
|   John|  30|        USA|         4|
+-------+----+-----------+----------+



In [65]:
#Handling Missing Data
#Drops rows with any null values
df.na.drop().show()

+-----+---+-----------+
| Name|Age|Nationality|
+-----+---+-----------+
|Alice| 25|        USA|
|  Bob| 30|        USA|
| John| 30|        USA|
+-----+---+-----------+



In [66]:
#Fills null values with a default value.
df.na.fill({"Age": 30}).show()

+-------+---+-----------+
|   Name|Age|Nationality|
+-------+---+-----------+
|  Alice| 25|        USA|
|    Bob| 30|        USA|
|Charlie| 30|        USA|
|   John| 30|        USA|
+-------+---+-----------+



In [67]:
#Replaces specific values.
df = df.replace("USA", "United States")

In [68]:
df.show()

+-------+----+-------------+
|   Name| Age|  Nationality|
+-------+----+-------------+
|  Alice|  25|United States|
|    Bob|  30|United States|
|Charlie|NULL|United States|
|   John|  30|United States|
+-------+----+-------------+



Feature Engineering

In [69]:
#Conditional Logic
#Assigns categories based on age.
from pyspark.sql.functions import when

df.withColumn("Age_Category", when(df.Age < 18, "Child").when(df.Age <=25 , "Middle-Aged").otherwise("Senior")).show()

+-------+----+-------------+------------+
|   Name| Age|  Nationality|Age_Category|
+-------+----+-------------+------------+
|  Alice|  25|United States| Middle-Aged|
|    Bob|  30|United States|      Senior|
|Charlie|NULL|United States|      Senior|
|   John|  30|United States|      Senior|
+-------+----+-------------+------------+



In [75]:
#String Functions
#Concatenates first and last names
from pyspark.sql.functions import concat, lit

df.withColumn("Full_Name", concat(df.Name, lit(" Tendulkar"))).show()

+-------+----+-------------+-----------------+
|   Name| Age|  Nationality|        Full_Name|
+-------+----+-------------+-----------------+
|  Alice|  25|United States|  Alice Tendulkar|
|    Bob|  30|United States|    Bob Tendulkar|
|Charlie|NULL|United States|Charlie Tendulkar|
|   John|  30|United States|   John Tendulkar|
+-------+----+-------------+-----------------+



In [76]:
#Converts text to lowercase
from pyspark.sql.functions import lower

df.withColumn("Name", lower(df.Name)).show()

+-------+----+-------------+
|   Name| Age|  Nationality|
+-------+----+-------------+
|  alice|  25|United States|
|    bob|  30|United States|
|charlie|NULL|United States|
|   john|  30|United States|
+-------+----+-------------+



In [77]:
#Remove whitespace from strings.
from pyspark.sql.functions import trim

df.withColumn("Name", trim(df.Name)).show()

+-------+----+-------------+
|   Name| Age|  Nationality|
+-------+----+-------------+
|  Alice|  25|United States|
|    Bob|  30|United States|
|Charlie|NULL|United States|
|   John|  30|United States|
+-------+----+-------------+



In [83]:
#MATHEMATICAL functions
#Rounds a number
from pyspark.sql.functions import round

df.withColumn("Age_Rounded", round(df.Age, 0)).show()

+-------+----+-------------+-----------+
|   Name| Age|  Nationality|Age_Rounded|
+-------+----+-------------+-----------+
|  Alice|  25|United States|         25|
|    Bob|  30|United States|         30|
|Charlie|NULL|United States|       NULL|
|   John|  30|United States|         30|
+-------+----+-------------+-----------+



In [86]:
#Add some values data to data
data.append(["Victoria",40])
df = spark.createDataFrame(data, ["Name", "Age"])
df.show()

+--------+----+
|    Name| Age|
+--------+----+
|   Alice|  25|
|     Bob|  30|
| Charlie|NULL|
|    John|  30|
|Victoria|  40|
|Victoria|  40|
+--------+----+



In [87]:
#Add nationality
df = df.withColumn("Nationality", lit("Australia"))
df.show()

+--------+----+-----------+
|    Name| Age|Nationality|
+--------+----+-----------+
|   Alice|  25|  Australia|
|     Bob|  30|  Australia|
| Charlie|NULL|  Australia|
|    John|  30|  Australia|
|Victoria|  40|  Australia|
|Victoria|  40|  Australia|
+--------+----+-----------+



In [88]:
#Computes the square root
from pyspark.sql.functions import sqrt

df.withColumn("Age_sqrt", sqrt(df.Age)).show()

+--------+----+-----------+-----------------+
|    Name| Age|Nationality|         Age_sqrt|
+--------+----+-----------+-----------------+
|   Alice|  25|  Australia|              5.0|
|     Bob|  30|  Australia|5.477225575051661|
| Charlie|NULL|  Australia|             NULL|
|    John|  30|  Australia|5.477225575051661|
|Victoria|  40|  Australia|6.324555320336759|
|Victoria|  40|  Australia|6.324555320336759|
+--------+----+-----------+-----------------+



In [89]:
#Computes Logarithm
from pyspark.sql.functions import log

df.withColumn("Age_log", log(df.Age)).show()

+--------+----+-----------+------------------+
|    Name| Age|Nationality|           Age_log|
+--------+----+-----------+------------------+
|   Alice|  25|  Australia|3.2188758248682006|
|     Bob|  30|  Australia|3.4011973816621555|
| Charlie|NULL|  Australia|              NULL|
|    John|  30|  Australia|3.4011973816621555|
|Victoria|  40|  Australia|3.6888794541139363|
|Victoria|  40|  Australia|3.6888794541139363|
+--------+----+-----------+------------------+



In [93]:
#Date & Time Functions
#Gets the current date

from pyspark.sql.functions import current_date

df.withColumn("Current_Date", current_date()).show()
df = df.withColumn("Current_Date", current_date())

+--------+----+-----------+------------+
|    Name| Age|Nationality|Current_Date|
+--------+----+-----------+------------+
|   Alice|  25|  Australia|  2025-08-21|
|     Bob|  30|  Australia|  2025-08-21|
| Charlie|NULL|  Australia|  2025-08-21|
|    John|  30|  Australia|  2025-08-21|
|Victoria|  40|  Australia|  2025-08-21|
|Victoria|  40|  Australia|  2025-08-21|
+--------+----+-----------+------------+



In [94]:
#Gets current time stamp
from pyspark.sql.functions import current_timestamp

df.withColumn("Current_Timestamp", current_timestamp()).show()

+--------+----+-----------+------------+--------------------+
|    Name| Age|Nationality|Current_Date|   Current_Timestamp|
+--------+----+-----------+------------+--------------------+
|   Alice|  25|  Australia|  2025-08-21|2025-08-21 16:42:...|
|     Bob|  30|  Australia|  2025-08-21|2025-08-21 16:42:...|
| Charlie|NULL|  Australia|  2025-08-21|2025-08-21 16:42:...|
|    John|  30|  Australia|  2025-08-21|2025-08-21 16:42:...|
|Victoria|  40|  Australia|  2025-08-21|2025-08-21 16:42:...|
|Victoria|  40|  Australia|  2025-08-21|2025-08-21 16:42:...|
+--------+----+-----------+------------+--------------------+



In [95]:
#Extracts year from a date column
from pyspark.sql.functions import year

df.withColumn("Year", year(df.Current_Date)).show()

+--------+----+-----------+------------+----+
|    Name| Age|Nationality|Current_Date|Year|
+--------+----+-----------+------------+----+
|   Alice|  25|  Australia|  2025-08-21|2025|
|     Bob|  30|  Australia|  2025-08-21|2025|
| Charlie|NULL|  Australia|  2025-08-21|2025|
|    John|  30|  Australia|  2025-08-21|2025|
|Victoria|  40|  Australia|  2025-08-21|2025|
|Victoria|  40|  Australia|  2025-08-21|2025|
+--------+----+-----------+------------+----+



In [97]:
#Vector Assembler
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=["Age"], outputCol="features", handleInvalid="keep")
df_assembler = assembler.transform(df)
df_assembler.show()

+--------+----+-----------+------------+--------+
|    Name| Age|Nationality|Current_Date|features|
+--------+----+-----------+------------+--------+
|   Alice|  25|  Australia|  2025-08-21|  [25.0]|
|     Bob|  30|  Australia|  2025-08-21|  [30.0]|
| Charlie|NULL|  Australia|  2025-08-21|   [NaN]|
|    John|  30|  Australia|  2025-08-21|  [30.0]|
|Victoria|  40|  Australia|  2025-08-21|  [40.0]|
|Victoria|  40|  Australia|  2025-08-21|  [40.0]|
+--------+----+-----------+------------+--------+



**Vector Assembler:**

A Vector Assembler is a transformer that combines a given list of numerical columns into a single vector column. This is often used as a preparatory step for machine learning models that require a single feature vector input.

In [99]:
#Linear Regression
from pyspark.ml.regression import LinearRegression

# Drop rows with nulls in the 'Age' column (the label column)
df_cleaned = df_assembler.na.drop(subset=["Age"])

lr = LinearRegression(featuresCol="features", labelCol="Age")
lr_model = lr.fit(df_cleaned)

In [100]:
print("Intercept:", lr_model.intercept)
print("Coefficients:", lr_model.coefficients)

Intercept: 1.6653345369377333e-13
Coefficients: [0.9999999999999949]


In [103]:
#Pipeline for ML Modeling
from pyspark.ml import Pipeline

# Drop rows with nulls in the 'Age' column before fitting the pipeline
df_cleaned_for_pipeline = df.na.drop(subset=["Age"])

pipeline = Pipeline(stages=[assembler, lr])
pipeline_model = pipeline.fit(df_cleaned_for_pipeline)

In [104]:
#Cache and Persist
#Caches DataFrame in memory for faster access.
df.cache()

DataFrame[Name: string, Age: bigint, Nationality: string, Current_Date: date]

In [105]:
#Persists DataFrame to memory/disk
from pyspark.storagelevel import StorageLevel
df.persist(StorageLevel.MEMORY_AND_DISK)

DataFrame[Name: string, Age: bigint, Nationality: string, Current_Date: date]

In [106]:
#Unpersists the DataFrame
df.unpersist()

DataFrame[Name: string, Age: bigint, Nationality: string, Current_Date: date]

In [107]:
#Repartioning and Coalescing
#Increase the number of partitions.
df.repartition(10)

DataFrame[Name: string, Age: bigint, Nationality: string, Current_Date: date]

In [109]:
#Reduce Partitions
df.coalesce(2)
df.show()

+--------+----+-----------+------------+
|    Name| Age|Nationality|Current_Date|
+--------+----+-----------+------------+
|   Alice|  25|  Australia|  2025-08-21|
|     Bob|  30|  Australia|  2025-08-21|
| Charlie|NULL|  Australia|  2025-08-21|
|    John|  30|  Australia|  2025-08-21|
|Victoria|  40|  Australia|  2025-08-21|
|Victoria|  40|  Australia|  2025-08-21|
+--------+----+-----------+------------+



In [110]:
#Explaining Query Execution
df.explain(True)

== Parsed Logical Plan ==
'Project [Name#1588, Age#1589L, Nationality#1601, current_date(None) AS Current_Date#1728]
+- Project [Name#1588, Age#1589L, Australia AS Nationality#1601]
   +- LogicalRDD [Name#1588, Age#1589L], false

== Analyzed Logical Plan ==
Name: string, Age: bigint, Nationality: string, Current_Date: date
Project [Name#1588, Age#1589L, Nationality#1601, current_date(Some(Etc/UTC)) AS Current_Date#1728]
+- Project [Name#1588, Age#1589L, Australia AS Nationality#1601]
   +- LogicalRDD [Name#1588, Age#1589L], false

== Optimized Logical Plan ==
Project [Name#1588, Age#1589L, Australia AS Nationality#1601, 2025-08-21 AS Current_Date#1728]
+- LogicalRDD [Name#1588, Age#1589L], false

== Physical Plan ==
*(1) Project [Name#1588, Age#1589L, Australia AS Nationality#1601, 2025-08-21 AS Current_Date#1728]
+- *(1) Scan ExistingRDD[Name#1588,Age#1589L]



In [111]:
#Streaming methods
#Reading Streaming Methods
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Streaming").getOrCreate()

df1 = spark.readStream.format("socket").option("host", "localhost").option("port", "9999").load()

In [113]:
#Writing Streaming Data to Console
df1.writeStream.outputMode("append").format("console").start().awaitTermination()

StreamingQueryException: [STREAM_FAILED] Query [id = 297d0740-270a-451d-9104-ad147c36d75b, runId = 56b9d4ab-9cad-49ac-96de-1bd1dd414204] terminated with exception: Connection refused (Connection refused)

In [114]:
#RDD Methods
#Creating an RDD
rdd = spark.sparkContext.parallelize([1, 2, 3, 4, 5])

#RDD Transformations
#Doubles each element in RDD
rdd.map(lambda x: x * 2).collect()

[2, 4, 6, 8, 10]

In [115]:
#Filters elements greater than 2.
rdd.filter(lambda x: x > 2).collect()

[3, 4, 5]

**Exact Use of RDDs:**

RDDs (Resilient Distributed Datasets) were the primary API in earlier versions of Spark. They are immutable, fault-tolerant, and distributed collections of objects that can be processed in parallel. While newer APIs like DataFrames and Datasets are often preferred for their optimizations and ease of use, RDDs are still valuable for:

*   **Low-level transformations and control:** When you need to perform transformations that are not easily expressed with DataFrames/Datasets or require fine-grained control over partitioning.
*   **Working with unstructured data:** RDDs are suitable for processing data that doesn't fit into a structured schema.
*   **Maintaining backward compatibility:** If you have existing Spark applications built with RDDs.

In [116]:
#RDD Actions
#Count elements.
rdd.count()

5

In [117]:
#Finds the max value
rdd.max()

5

In [118]:
#Collects all elements.
rdd.collect()

[1, 2, 3, 4, 5]