***Real time Data Analytics with Apache Spark***





Plan

Install Spark in Colab (Colab doesn't have it by default, but Java + setup is quick).

Start SparkSession in local mode.

Simulate streaming data by writing small CSV batches to a folder.

Read the data with Spark Structured Streaming.

Do real-time analytics (e.g., word count or transaction aggregation).

Show output in the notebook.

In [1]:
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz
!tar xf spark-3.3.2-bin-hadoop3.tgz
!pip install -q findspark

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split, col
import os
import shutil
import time

In [2]:
# Set environment variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.2-bin-hadoop3"

# Start Spark Session
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split

spark = SparkSession.builder \
    .appName("RealTimeDataAnalytics") \
    .master("local[*]") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")
print("✅ Spark started successfully!")

# Create streaming input folder
input_dir = "/content/stream_data"
os.makedirs(input_dir, exist_ok=True)

# Create schema and read streaming data
from pyspark.sql.types import StructType, StringType

schema = StructType().add("value", StringType())

stream_df = spark.readStream \
    .schema(schema) \
    .csv(input_dir)

# Tokenize words
words_df = stream_df.select(explode(split(stream_df.value, " ")).alias("word"))

# Count word frequency
word_count = words_df.groupBy("word").count()

# Start query to print results
query = word_count.writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

# Simulate streaming by writing files
import time
sample_data = [
    "Apache Spark is awesome",
    "Spark streaming with Python",
    "Real time analytics is powerful",
    "Data is the new oil"
]

for line in sample_data:
    with open(f"{input_dir}/file_{int(time.time())}.csv", "w") as f:
        f.write(line)
    time.sleep(2)  # Simulate delay

query.awaitTermination(10)  # Run stream for 10 seconds
query.stop()

print("✅ Streaming simulation finished!")


✅ Spark started successfully!
✅ Streaming simulation finished!


In [4]:
# Create Spark Session
spark = SparkSession.builder \
    .appName("RealTimeDataAnalytics") \
    .master("local[*]") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

# Create folder for streaming input
input_dir = "/tmp/stream_data"
if os.path.exists(input_dir):
    shutil.rmtree(input_dir)
os.makedirs(input_dir)

# Read streaming data from CSV files
stream_df = spark.readStream \
    .format("csv") \
    .option("sep", ",") \
    .schema("value STRING") \
    .load(input_dir)

# Split lines into words
words = stream_df.select(
    explode(split(col("value"), " ")).alias("word")
)

# Count words
word_count = words.groupBy("word").count()

# Write results to an in-memory table instead of console
query = word_count.writeStream \
    .outputMode("complete") \
    .format("memory") \
    .queryName("word_counts") \
    .start()

# Simulate streaming by writing files
sample_data = [
    "Apache Spark is awesome",
    "Spark streaming with Python",
    "Real time analytics is powerful",
    "Data is the new oil"
]

for line in sample_data:
    with open(f"{input_dir}/file_{int(time.time())}.csv", "w") as f:
        f.write(line)
    time.sleep(2)  # Simulate delay between incoming data

# Wait for Spark to process
time.sleep(5)

# Show the results from the memory table
spark.sql("SELECT * FROM word_counts ORDER BY count DESC").show()

# Stop the query
query.stop()

spark.stop()


+----+-----+
|word|count|
+----+-----+
+----+-----+

