# 🚀 Introduction to PySpark
PySpark is the Python API for Apache Spark — a distributed data processing engine ideal for large-scale data handling.

It's built for **parallel computation**, **cluster execution**, and **fault tolerance**, making it the go-to tool for **Big Data** workloads.

We'll now walk through real-life use cases, highlighting PySpark's syntax and advantages.

In [1]:
# ✅ Install and Set Up PySpark (Only needed on Colab or local setup)
!pip install pyspark --quiet

In [2]:
# ✅ Import PySpark and Start a Session
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession.builder \
    .appName("PySpark Essentials") \
    .getOrCreate()

In [3]:
import pyspark
print(pyspark.__version__)

3.5.1


🧱 DataFrame Creation

In [4]:
# ▶️ Create DataFrame from Dict
data = [{"col1": 1, "col2": "x"}, {"col1": 2, "col2": "y"}, {"col1": 3, "col2": "z"}]
df = spark.createDataFrame(data)
df.show()

+----+----+
|col1|col2|
+----+----+
|   1|   x|
|   2|   y|
|   3|   z|
+----+----+



Creating Some Synthetic Data

In [5]:
from sklearn.datasets import make_regression
import pandas as pd
# Create synthetic finance-related data
# Let's imagine features like 'stock_price_yesterday', 'interest_rate', 'inflation_rate',
# 'volume', 'market_sentiment', and a target 'stock_price_today'.
X, y = make_regression(n_samples=300, n_features=10, random_state=42, n_informative=8, noise=10)

# Convert to pandas DataFrame
column_names = [f'feature_{i+1}' for i in range(10)]
df_finance = pd.DataFrame(X, columns=column_names)
df_finance['target_stock_price'] = y

# Add some columns that might be more finance-specific (even if synthetic)
df_finance['stock_price_yesterday'] = df_finance['feature_1'] * 100 + 500 # Example calculation
df_finance['interest_rate'] = df_finance['feature_2'] * 0.1 + 2.0 # Example calculation
df_finance['inflation_rate'] = df_finance['feature_3'] * 0.05 + 1.0 # Example calculation

# Drop some of the original 'feature' columns to keep it around 10
df_finance = df_finance[['stock_price_yesterday', 'interest_rate', 'inflation_rate',
                         'feature_4', 'feature_5', 'feature_6', 'feature_7', 'feature_8',
                         'feature_9', 'target_stock_price']]

# Save to CSV
csv_file_path = 'finance_data.csv'
df_finance.to_csv(csv_file_path, index=False)
# Save to CSV
csv_file_path = 'finance_data.xlsx'
df_finance.to_excel(csv_file_path, index=False)
#Save to DB
import sqlite3
conn = sqlite3.connect('finance_data.db')
# Save the pandas DataFrame to a SQLite table
df_finance.to_sql('finance_data', conn, if_exists='replace', index=False)
conn.close()
print("DataFrame 'df_finance' successfully saved to 'finance_data.db' as table 'finance_data'.")

print(f"Synthetic finance data saved to {csv_file_path}")
print(df_finance.head())

DataFrame 'df_finance' successfully saved to 'finance_data.db' as table 'finance_data'.
Synthetic finance data saved to finance_data.xlsx
   stock_price_yesterday  interest_rate  inflation_rate  feature_4  feature_5  \
0             526.032184       2.177531        0.987498  -0.670620   1.000582   
1             403.165555       2.139200        0.988768   0.940771  -0.989628   
2             518.142662       1.935023        0.935194  -0.351921  -0.487203   
3             585.243333       2.021646        1.009323   0.633919   2.143944   
4             456.451363       1.987962        1.009525  -0.090533  -0.535328   

   feature_6  feature_7  feature_8  feature_9  target_stock_price  
0  -1.193637   1.392465  -0.646227   0.919154          214.313595  
1   0.918317  -0.982487   0.179894  -1.570501            2.242062  
2  -1.203201  -0.769996   0.874517  -1.042044         -177.174358  
3   0.045572  -2.025143  -0.730367  -0.651600            0.874379  
4  -2.172670   0.331980   1.107081 

📥 Reading Data

In [6]:
# 📁 Read CSV
df_csv = spark.read.csv("/content/finance_data.csv", header=True, inferSchema=True)
df_csv.show()

+---------------------+------------------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+
|stock_price_yesterday|     interest_rate|    inflation_rate|           feature_4|           feature_5|           feature_6|           feature_7|           feature_8|           feature_9| target_stock_price|
+---------------------+------------------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+
|    526.0321838771221| 2.177531089260951|0.9874976742700523| -0.6706202090766449|  1.0005823180429783|   -1.19363683175648|  1.3924653000643288|  -0.646227309843803|  0.9191541736100015|   214.313595338902|
|    403.1655545136365|  2.13920022863446|0.9887683425001488|  0.9407711879882159| -0.9896281365703375|   0.918316606002313| -0.9824873935383096| 0.17989415115347845|  

In [7]:
# 📁 Read Excel (via pandas workaround, PySpark doesn’t natively support Excel)
import pandas as pd
pdf = pd.read_excel("/content/finance_data.xlsx")
df_excel = spark.createDataFrame(pdf)
df_excel.show()

+---------------------+-----------------+------------------+--------------------+-------------------+--------------------+-------------------+--------------------+-------------------+------------------+
|stock_price_yesterday|    interest_rate|    inflation_rate|           feature_4|          feature_5|           feature_6|          feature_7|           feature_8|          feature_9|target_stock_price|
+---------------------+-----------------+------------------+--------------------+-------------------+--------------------+-------------------+--------------------+-------------------+------------------+
|    526.0321838771221|2.177531089260951|0.9874976742700523| -0.6706202090766449|  1.000582318042978|   -1.19363683175648|  1.392465300064329|  -0.646227309843803| 0.9191541736100015|  214.313595338902|
|    403.1655545136365| 2.13920022863446|0.9887683425001488|  0.9407711879882159|-0.9896281365703375|   0.918316606002313|-0.9824873935383096|  0.1798941511534785| -1.570500603623177| 2.24

In [10]:
# 🛢️ Read from Database (via JDBC)
# jdbc_url = "jdbc:sqlite:example.db"
# df_db = spark.read.format("jdbc") \
#     .option("url", jdbc_url) \
#     .option("dbtable", "users") \
#     .load()
# df_db.show()

💾 Writing Data

In [16]:
# 💾 Write to CSV
df_csv_output_path = "df_csv_output"
df_csv.write.csv(df_csv_output_path, header=True, mode="overwrite")

# Read the saved CSV file back into a new DataFrame
df_csv_read = spark.read.csv(df_csv_output_path, header=True, inferSchema=True)

# Show the content of the read DataFrame
df_csv_read.show()

+---------------------+------------------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+
|stock_price_yesterday|     interest_rate|    inflation_rate|           feature_4|           feature_5|           feature_6|           feature_7|           feature_8|           feature_9| target_stock_price|
+---------------------+------------------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+
|    526.0321838771221| 2.177531089260951|0.9874976742700523| -0.6706202090766449|  1.0005823180429783|   -1.19363683175648|  1.3924653000643288|  -0.646227309843803|  0.9191541736100015|   214.313595338902|
|    403.1655545136365|  2.13920022863446|0.9887683425001488|  0.9407711879882159| -0.9896281365703375|   0.918316606002313| -0.9824873935383096| 0.17989415115347845|  

In [17]:
# 💾 Write to Parquet (Preferred for performance)
df_excel_csv_output_path = "df_excel_output"
df_excel.write.csv(df_excel_csv_output_path, header=True, mode="overwrite")

# Read the saved CSV file back into a new DataFrame
df_excel_read = spark.read.csv(df_excel_csv_output_path, header=True, inferSchema=True)

# Show the content of the read DataFrame
df_excel_read.show()

+---------------------+-----------------+------------------+-------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+
|stock_price_yesterday|    interest_rate|    inflation_rate|          feature_4|           feature_5|           feature_6|           feature_7|           feature_8|           feature_9|target_stock_price|
+---------------------+-----------------+------------------+-------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+
|    611.9574911434577|1.769807883526441| 1.015562507727177|-0.2490360395563783|   1.644967713501284|  -1.515191062198552|  0.5765569630557664|  -0.275051697151644|   1.366874267444525|-76.92523676388738|
|     632.893331953541| 1.97559195263258|0.9343890403284297|  1.431366781831183|  0.3338599665685347|-0.03930728762763119|   1.081766727672829|  -1.016683128576497| -0.134496784401

In [18]:
# # 💾 Write to Database
# df.write.format("jdbc") \
#     .option("url", jdbc_url) \
#     .option("dbtable", "new_table") \
#     .mode("overwrite") \
#     .save()

🔍 Inspect DataFrame

In [23]:
df_csv.show(5)  # View first 5 rows

+---------------------+------------------+------------------+--------------------+--------------------+--------------------+-------------------+-------------------+-------------------+------------------+
|stock_price_yesterday|     interest_rate|    inflation_rate|           feature_4|           feature_5|           feature_6|          feature_7|          feature_8|          feature_9|target_stock_price|
+---------------------+------------------+------------------+--------------------+--------------------+--------------------+-------------------+-------------------+-------------------+------------------+
|    526.0321838771221| 2.177531089260951|0.9874976742700523| -0.6706202090766449|  1.0005823180429783|   -1.19363683175648| 1.3924653000643288| -0.646227309843803| 0.9191541736100015|  214.313595338902|
|    403.1655545136365|  2.13920022863446|0.9887683425001488|  0.9407711879882159| -0.9896281365703375|   0.918316606002313|-0.9824873935383096|0.17989415115347845| -1.570500603623177|

In [22]:
df_csv.tail(5)  # View last 5 rows (pandas-like workaround needed)

[Row(stock_price_yesterday=480.3650150915022, interest_rate=1.9650742295681571, inflation_rate=1.0119394579513257, feature_4=0.43004164719106963, feature_5=0.38193545223155334, feature_6=-0.3216350512173826, feature_7=1.0302834540318422, feature_8=-0.3486521344078592, feature_9=2.076747983560841, target_stock_price=156.38251322706833),
 Row(stock_price_yesterday=480.8972237399831, interest_rate=1.9837207093927707, inflation_rate=0.9502825312695744, feature_4=-0.5132135664715852, feature_5=0.7408243454201259, feature_6=0.04091917539572113, feature_7=-0.22859991534001858, feature_8=-0.5441140021141926, feature_9=-1.0021874557814727, target_stock_price=-133.7361078215726),
 Row(stock_price_yesterday=323.09240951108814, interest_rate=1.902645380029553, inflation_rate=0.974551818714625, feature_4=2.7596600389110777, feature_5=1.0602099074988798, feature_6=0.47635776935621926, feature_7=0.39241596717876703, feature_8=0.8071225966373103, feature_9=0.5054701591666269, target_stock_price=165.85

In [24]:
df_csv.printSchema()

root
 |-- stock_price_yesterday: double (nullable = true)
 |-- interest_rate: double (nullable = true)
 |-- inflation_rate: double (nullable = true)
 |-- feature_4: double (nullable = true)
 |-- feature_5: double (nullable = true)
 |-- feature_6: double (nullable = true)
 |-- feature_7: double (nullable = true)
 |-- feature_8: double (nullable = true)
 |-- feature_9: double (nullable = true)
 |-- target_stock_price: double (nullable = true)



In [25]:
df_csv.describe().show()

+-------+---------------------+-------------------+------------------+-------------------+-------------------+--------------------+--------------------+--------------------+--------------------+------------------+
|summary|stock_price_yesterday|      interest_rate|    inflation_rate|          feature_4|          feature_5|           feature_6|           feature_7|           feature_8|           feature_9|target_stock_price|
+-------+---------------------+-------------------+------------------+-------------------+-------------------+--------------------+--------------------+--------------------+--------------------+------------------+
|  count|                  300|                300|               300|                300|                300|                 300|                 300|                 300|                 300|               300|
|   mean|    514.2722184920926|  2.006596927559644|1.0000730583050632|0.03075357135602464| 0.0665743941570391|0.001545293078945...|-0.0602928518

In [27]:
df_csv.columns

['stock_price_yesterday',
 'interest_rate',
 'inflation_rate',
 'feature_4',
 'feature_5',
 'feature_6',
 'feature_7',
 'feature_8',
 'feature_9',
 'target_stock_price']

In [28]:
df_csv.count()

300

🧹 Handling Nulls

In [29]:
df_csv.select([count(when(col(c).isNull(), c)).alias(c) for c in df_csv.columns]).show()

+---------------------+-------------+--------------+---------+---------+---------+---------+---------+---------+------------------+
|stock_price_yesterday|interest_rate|inflation_rate|feature_4|feature_5|feature_6|feature_7|feature_8|feature_9|target_stock_price|
+---------------------+-------------+--------------+---------+---------+---------+---------+---------+---------+------------------+
|                    0|            0|             0|        0|        0|        0|        0|        0|        0|                 0|
+---------------------+-------------+--------------+---------+---------+---------+---------+---------+---------+------------------+



In [32]:
df_csv.dropna().show()

+---------------------+------------------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+
|stock_price_yesterday|     interest_rate|    inflation_rate|           feature_4|           feature_5|           feature_6|           feature_7|           feature_8|           feature_9| target_stock_price|
+---------------------+------------------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+
|    526.0321838771221| 2.177531089260951|0.9874976742700523| -0.6706202090766449|  1.0005823180429783|   -1.19363683175648|  1.3924653000643288|  -0.646227309843803|  0.9191541736100015|   214.313595338902|
|    403.1655545136365|  2.13920022863446|0.9887683425001488|  0.9407711879882159| -0.9896281365703375|   0.918316606002313| -0.9824873935383096| 0.17989415115347845|  

In [33]:
df_csv.fillna(0).show()

+---------------------+------------------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+
|stock_price_yesterday|     interest_rate|    inflation_rate|           feature_4|           feature_5|           feature_6|           feature_7|           feature_8|           feature_9| target_stock_price|
+---------------------+------------------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+
|    526.0321838771221| 2.177531089260951|0.9874976742700523| -0.6706202090766449|  1.0005823180429783|   -1.19363683175648|  1.3924653000643288|  -0.646227309843803|  0.9191541736100015|   214.313595338902|
|    403.1655545136365|  2.13920022863446|0.9887683425001488|  0.9407711879882159| -0.9896281365703375|   0.918316606002313| -0.9824873935383096| 0.17989415115347845|  

🧠 Selecting and Filtering

In [34]:
df.select("col1").show()

+----+
|col1|
+----+
|   1|
|   2|
|   3|
+----+



In [35]:
df.select(["col1", "col2"]).show()

+----+----+
|col1|col2|
+----+----+
|   1|   x|
|   2|   y|
|   3|   z|
+----+----+



In [36]:
df.filter(df.col1 > 1).show()

+----+----+
|col1|col2|
+----+----+
|   2|   y|
|   3|   z|
+----+----+



In [37]:
df.filter(df.col2.contains("y")).show()

+----+----+
|col1|col2|
+----+----+
|   2|   y|
+----+----+



In [38]:
df.limit(2).show()

+----+----+
|col1|col2|
+----+----+
|   1|   x|
|   2|   y|
+----+----+



🧼 Data Cleaning / Transformations

In [39]:
# Convert String to Date
df_dates = df.withColumn("date", to_date(lit("2024-06-01")))

In [40]:
df_dates.show()

+----+----+----------+
|col1|col2|      date|
+----+----+----------+
|   1|   x|2024-06-01|
|   2|   y|2024-06-01|
|   3|   z|2024-06-01|
+----+----+----------+



In [41]:
# Cast data types
df.withColumn("col1_int", col("col1").cast("int")).show()

+----+----+--------+
|col1|col2|col1_int|
+----+----+--------+
|   1|   x|       1|
|   2|   y|       2|
|   3|   z|       3|
+----+----+--------+



In [42]:
# String ops
df.withColumn("lower_col2", lower(col("col2"))).show()

+----+----+----------+
|col1|col2|lower_col2|
+----+----+----------+
|   1|   x|         x|
|   2|   y|         y|
|   3|   z|         z|
+----+----+----------+



In [43]:
df.withColumn("trimmed_col2", trim(col("col2"))).show()

+----+----+------------+
|col1|col2|trimmed_col2|
+----+----+------------+
|   1|   x|           x|
|   2|   y|           y|
|   3|   z|           z|
+----+----+------------+



In [44]:
 df.withColumn("replaced_col2", regexp_replace("col2", "x", "xx")).show()

+----+----+-------------+
|col1|col2|replaced_col2|
+----+----+-------------+
|   1|   x|           xx|
|   2|   y|            y|
|   3|   z|            z|
+----+----+-------------+



🆎 Rename & New Columns

In [45]:
# Rename columns
df_renamed = df.withColumnRenamed("col1", "new_col1")

In [46]:
df_renamed.show()

+--------+----+
|new_col1|col2|
+--------+----+
|       1|   x|
|       2|   y|
|       3|   z|
+--------+----+



In [47]:
# Create new column
df.withColumn("col1_double", col("col1") * 2).show()

+----+----+-----------+
|col1|col2|col1_double|
+----+----+-----------+
|   1|   x|          2|
|   2|   y|          4|
|   3|   z|          6|
+----+----+-----------+



In [48]:
# Apply UDF
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

@udf(IntegerType())
def add_ten(x):
    return x + 10

df.withColumn("plus_10", add_ten("col1")).show()

+----+----+-------+
|col1|col2|plus_10|
+----+----+-------+
|   1|   x|     11|
|   2|   y|     12|
|   3|   z|     13|
+----+----+-------+



🧮 GroupBy and Aggregation

In [49]:
df_group = spark.createDataFrame(
    [("a", 10), ("a", 20), ("b", 30), ("b", 40)],
    ["col", "value"]
)
df_group.groupBy("col").agg(mean("value").alias("mean_val")).show()

+---+--------+
|col|mean_val|
+---+--------+
|  a|    15.0|
|  b|    35.0|
+---+--------+



🔗 Joins and Unions

In [51]:
# Inner Join
df1 = spark.createDataFrame([(1, "x")], ["id", "val"])
df2 = spark.createDataFrame([(1, 100)], ["id", "score"])
df1.join(df2, on="id", how="inner").show()

+---+---+-----+
| id|val|score|
+---+---+-----+
|  1|  x|  100|
+---+---+-----+



In [52]:
# Concatenate
df1.union(df1).show()

+---+---+
| id|val|
+---+---+
|  1|  x|
|  1|  x|
+---+---+



📊 Pivot and Unpivot

In [53]:
# Pivot
df_pivot = spark.createDataFrame([
    ("NY", "HR", 1000),
    ("NY", "Tech", 1500),
    ("LA", "HR", 1100),
    ("LA", "Tech", 1600)
], ["city", "department", "salary"])

df_pivot.groupBy("city").pivot("department").agg(avg("salary")).show()

+----+------+------+
|city|    HR|  Tech|
+----+------+------+
|  LA|1100.0|1600.0|
|  NY|1000.0|1500.0|
+----+------+------+



In [54]:
# Unpivot (Melt)
from pyspark.sql.functions import expr

df_unpivot = df_pivot.select("city", "department", "salary")
df_unpivot.selectExpr("city", "stack(2, 'HR', salary, 'Tech', salary) as (dept, val)").show()

+----+----+----+
|city|dept| val|
+----+----+----+
|  NY|  HR|1000|
|  NY|Tech|1000|
|  NY|  HR|1500|
|  NY|Tech|1500|
|  LA|  HR|1100|
|  LA|Tech|1100|
|  LA|  HR|1600|
|  LA|Tech|1600|
+----+----+----+



⏱️ Datetime and Time Series

In [55]:
# Date extraction
df_dates.withColumn("year", year("date")).show()

+----+----+----------+----+
|col1|col2|      date|year|
+----+----+----------+----+
|   1|   x|2024-06-01|2024|
|   2|   y|2024-06-01|2024|
|   3|   z|2024-06-01|2024|
+----+----+----------+----+



In [56]:
# Window functions (rolling-like behavior)
from pyspark.sql.window import Window

w = Window.orderBy("date").rowsBetween(-2, 0)
df_time = df_dates.withColumn("rolling_avg", avg("col1").over(w))
df_time.show()

+----+----+----------+-----------+
|col1|col2|      date|rolling_avg|
+----+----+----------+-----------+
|   1|   x|2024-06-01|        1.0|
|   2|   y|2024-06-01|        1.5|
|   3|   z|2024-06-01|        2.0|
+----+----+----------+-----------+



🔢 Frequency and Count

In [57]:
df_group.groupBy("col").count().show()

+---+-----+
|col|count|
+---+-----+
|  a|    2|
|  b|    2|
+---+-----+



In [58]:
df_group.select("col").groupBy("col").count().show()

+---+-----+
|col|count|
+---+-----+
|  a|    2|
|  b|    2|
+---+-----+



# 🧠 PySpark Exclusive Superpowers
- ⚙️ Distributed Computation across clusters (via SparkContext)
- 🔄 Lazy evaluation with DAG optimizations
- 🚀 Catalyst Optimizer for query optimization
- 🔥 Tungsten engine for physical execution
- ☁️ Native support for S3, Hive, HDFS, Delta Lake
- 🧩 Integration with MLlib, GraphX, Structured Streaming
- 🗃️ Columnar file formats (Parquet/ORC) optimized

💡 Use PySpark when:
- Handling 1M+ rows or GBs/TBs of data
- Need to scale computations across machines
- Doing data engineering or batch ETL pipelines


⚙️ 1. Distributed Computation via SparkContext

✅ Real-world use case: Use this to parallelize large data transformations (e.g., log processing, preprocessing in pipelines).

In [59]:
# 💡 SparkContext gives access to the cluster and parallel computation features
sc = spark.sparkContext

# 🔁 Parallelize a list and perform a distributed map
rdd = sc.parallelize([1, 2, 3, 4, 5])
squared = rdd.map(lambda x: x ** 2).collect()
print("Squared values using distributed RDD:", squared)

Squared values using distributed RDD: [1, 4, 9, 16, 25]


🔄 2. Lazy Evaluation & DAG Optimization

⚠️ PySpark doesn't execute anything until an action like .show(), .collect(), or .write() is called. This helps Spark optimize the entire execution pipeline.

In [60]:
# 💤 All transformations are lazy and build a Directed Acyclic Graph (DAG)
df_lazy = df.filter(col("col1") > 1).select("col2")

# 🔍 This triggers the execution
df_lazy.show()

+----+
|col2|
+----+
|   y|
|   z|
+----+



🚀 3. Catalyst Optimizer for Query Optimization

🔥 Catalyst Optimizer rewrites your SQL/DataFrame queries internally for best performance (predicate pushdown, constant folding, etc.)

In [61]:
# 👀 Check the optimized query plan
df_group.groupBy("col").agg(avg("value")).explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[col#1895], functions=[avg(value#1896L)])
   +- Exchange hashpartitioning(col#1895, 200), ENSURE_REQUIREMENTS, [plan_id=1195]
      +- HashAggregate(keys=[col#1895], functions=[partial_avg(value#1896L)])
         +- Scan ExistingRDD[col#1895,value#1896L]




🔥 4. Tungsten Engine for Physical Execution

⚙️ Observe WholeStageCodegen, ColumnarToRow, and other Tungsten execution optimizations in the plan.

In [62]:
# Tungsten is Spark's memory and execution engine
# It handles bytecode generation, cache-aware computation, and optimized memory usage.

# While not directly visible via code, it’s activated behind-the-scenes whenever transformations/actions run.

# For proof:
df_group.groupBy("col").agg(avg("value")).explain("formatted")

== Physical Plan ==
AdaptiveSparkPlan (5)
+- HashAggregate (4)
   +- Exchange (3)
      +- HashAggregate (2)
         +- Scan ExistingRDD (1)


(1) Scan ExistingRDD
Output [2]: [col#1895, value#1896L]
Arguments: [col#1895, value#1896L], MapPartitionsRDD[164] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:0, ExistingRDD, UnknownPartitioning(0)

(2) HashAggregate
Input [2]: [col#1895, value#1896L]
Keys [1]: [col#1895]
Functions [1]: [partial_avg(value#1896L)]
Aggregate Attributes [2]: [sum#2176, count#2177L]
Results [3]: [col#1895, sum#2178, count#2179L]

(3) Exchange
Input [3]: [col#1895, sum#2178, count#2179L]
Arguments: hashpartitioning(col#1895, 200), ENSURE_REQUIREMENTS, [plan_id=1208]

(4) HashAggregate
Input [3]: [col#1895, sum#2178, count#2179L]
Keys [1]: [col#1895]
Functions [1]: [avg(value#1896L)]
Aggregate Attributes [1]: [avg(value#1896L)#2172]
Results [2]: [col#1895, avg(value#1896L)#2172 AS avg(value)#2173]

(5) AdaptiveSparkPlan
Output [2]: [col#1895, avg(value

☁️ 5. Native Support for S3, HDFS, Hive, Delta Lake

⚡ Enables cloud-native data engineering, ETL pipelines, and massive-scale reads/writes.

In [63]:
# 🔄 Reading from S3 (if properly configured with AWS credentials)
# df_s3 = spark.read.csv("s3a://my-bucket/path/data.csv")

# 🔄 Hive Table (only works if Hive metastore is connected)
# spark.sql("SELECT * FROM hive_db.my_table").show()

# 🔁 Delta Lake
# df_delta = spark.read.format("delta").load("s3a://my-bucket/delta-table")

# ✅ Universal support for Big Data filesystems: s3a://, hdfs://, abfs://, gs://

🧩 6. Integration with MLlib - Machine Learning

📈 Run distributed machine learning pipelines for huge datasets using pyspark.ml.

In [64]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

# Sample DataFrame for regression
df_ml = spark.createDataFrame([
    (1, 2.0), (2, 3.5), (3, 5.0), (4, 7.5), (5, 9.0)
], ["feature", "label"])

assembler = VectorAssembler(inputCols=["feature"], outputCol="features")
df_features = assembler.transform(df_ml)

# Fit Linear Regression
lr = LinearRegression(featuresCol="features", labelCol="label")
model = lr.fit(df_features)
model.summary.r2, model.coefficients

(0.9908256880733946, DenseVector([1.8]))

🌐 7. Structured Streaming - Real-time Data Processing

🧠 PySpark supports event-time processing, windowed aggregations, exactly-once semantics using readStream / writeStream.

In [65]:
# Simulate streaming by reading a directory continuously
# You can run this in a real Spark cluster with a streaming source

# streaming_df = spark.readStream \
#     .schema(df.schema) \
#     .option("maxFilesPerTrigger", 1) \
#     .csv("streaming_data/")

# query = streaming_df.writeStream \
#     .outputMode("append") \
#     .format("console") \
#     .start()

# query.awaitTermination()

🗃️ 8. Columnar File Formats - Parquet & ORC

🧩 These formats are splittable, compressed, and fast for column-wise operations, ideal for analytics & queries.

In [66]:
# 💾 Write and read Parquet (default for performance)
df.write.parquet("data.parquet", mode="overwrite")
df_parquet = spark.read.parquet("data.parquet")
df_parquet.show()

# 💾 ORC (optimized row columnar)
df.write.orc("data.orc", mode="overwrite")
df_orc = spark.read.orc("data.orc")
df_orc.show()

+----+----+
|col1|col2|
+----+----+
|   2|   y|
|   3|   z|
|   1|   x|
+----+----+

+----+----+
|col1|col2|
+----+----+
|   2|   y|
|   3|   z|
|   1|   x|
+----+----+

