### Spark Streaming Analysis
*  What is the processing paradigm used in Spark Streaming?
  * Spark Streaming processes data in small, discrete batches. This approach enables near real-time processing by continuously ingesting, processing, and outputting data in small time intervals.
* What parameters define a Spark Streaming context?
  *  Spark cluster master URL, AppName, and Batch Time Interval

* What are the transformations used in Spark Streaming?
  * Stateless
    * These transformations operate on individual batches of data independently, such as map, flatMap, filter, etc.
  
  * Stateful transformation
    * These transformations maintain state across batches, enabling operations like windowed aggregations, sessionization, etc.,
* What are the ways that Spark Streaming is being leveraged?
  * Event Triggers: Responding to specific events or triggers in real-time data streams.
  
  
   * Data Augmentation:Enriching or enhancing streaming data with additional information from external sources or reference data.
   * Streaming ETL: Performing real-time ETL operations on streaming data, such as filtering, aggregating, joining, and transforming data streams before loading them into a target system or storage.

### Sample presentation <br>

https://drive.google.com/drive/u/0/folders/1KiUWkVAuKl-FpZNQUqwDsnUNdcuH2HLU

# Part I: Spark Streaming

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
import os

# Install Java if not already installed
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Define the URL and file name for Hadoop
hadoop_url = "https://downloads.apache.org/hadoop/common/hadoop-3.4.0/hadoop-3.4.0.tar.gz"
hadoop_file = "hadoop-3.4.0.tar.gz"

# Check if the Hadoop tar.gz file already exists
if not os.path.exists(hadoop_file):
    # Download the Hadoop tar.gz file if it doesn't exist
    !wget -q $hadoop_url
else:
    print("Hadoop tar.gz file already exists. Skipping download.")

# Extract the Hadoop tar.gz file
!tar -xzf $hadoop_file


In [3]:
# Set the path to your Hadoop installation
#hadoop_path = '/content/hadoop-3.4.0'
# Set environment variables
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-8-openjdk-amd64'

In [4]:
!pip install -q findspark
!pip3 install pyspark==3.5.1
import findspark
findspark.init()

Collecting pyspark==3.5.1
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=f5f3bd33043ffb9f04d31e9d0696072aa2737db06d2cb8cac351c15b8b20fd25
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


# Step 1: Check if the port is in use

In [5]:
import os

def is_port_in_use(port):
    # Run the lsof command to check if the port is in use
    result = os.popen(f"lsof -i :{port}").read()
    if result:
        # If the result is not empty, return the PID of the process using the port
        pid = int(result.split("\n")[1].split()[1])
        return True, pid
    return False, None


Port 9999 is not in use


In [10]:

# Check if port 9999 is in use
port_in_use, pid = is_port_in_use(9999)
if port_in_use:
    print(f"Port 9999 is in use by process {pid}")
else:
    print("Port 9999 is not in use")


-------------------------------------------
Time: 2024-05-31 14:26:47
-------------------------------------------

Port 9999 is not in use


#2. Skip it if the port is not in use

In [None]:
# if in use, kill it
import signal

def kill_process_using_port(port):
    port_in_use, pid = is_port_in_use(port)
    if port_in_use and pid:
        os.kill(pid, signal.SIGKILL)
        print(f"Process {pid} using port {port} has been killed")
    else:
        print(f"No process is using port {port}")

# Kill the process using port 9999 if it is in use
kill_process_using_port(9999)


#Step 3: Create a server start/stop function

In [6]:
import socket
import threading
import time

class StoppableSocketServer:
    def __init__(self, host, port):
        self.host = host
        self.port = port
        self.server_socket = None
        self.thread = None
        self.is_running = False

    def start(self):
        if self.is_running:
            print("Server is already running")
            return

        self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.server_socket.bind((self.host, self.port))
        self.server_socket.listen(1)
        self.is_running = True
        self.thread = threading.Thread(target=self._run)
        self.thread.start()
        print(f"Socket server started on port {self.port}")

    def _run(self):
        while self.is_running:
            client_socket, addr = self.server_socket.accept()
            print(f"Connection from: {addr}")
            for i in range(10):  # send some example data
                if not self.is_running:
                    break
                message = f"Value:{i}\n"
                client_socket.send(message.encode())
                time.sleep(1)
            client_socket.close()

    def stop(self):
        if not self.is_running:
            print("Server is not running")
            return
        self.is_running = False
        self.server_socket.close()
        self.thread.join()
        print(f"Socket server on port {self.port} stopped")

# Create an instance of the socket server
server = StoppableSocketServer("localhost", 9999)


# Step 4: Start server

In [11]:
# Start the server
server.start()

-------------------------------------------
Time: 2024-05-31 14:27:01
-------------------------------------------

Socket server started on port 9999


# Step 5: Streaming Socket data

In [12]:
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext

# Function to stop active contexts
def stop_active_contexts():
    global ssc
    if 'ssc' in globals() and ssc is not None:
        ssc.stop(stopSparkContext=False, stopGraceFully=True)
        ssc = None
    if 'spark' in globals():
        spark.stop()

# Stop any active Spark and Streaming contexts
stop_active_contexts()

# Initialize Spark session
spark = SparkSession.builder.master("local[*]").appName("SocketStreaming").getOrCreate()

# Initialize StreamingContext with a batch interval of 1 second
ssc = StreamingContext(spark.sparkContext, 1)

# Define the socket stream
lines = ssc.socketTextStream("localhost", 9999)

# Perform some transformations (e.g., count each batch)
counts = lines.flatMap(lambda line: line.split(" ")) \
              .filter(lambda word: word.startswith("Value:")) \
              .map(lambda word: (word, 1)) \
              .reduceByKey(lambda a, b: a + b)

# Print the results to the console
counts.pprint()

# Start the streaming computation and wait for one minute
ssc.start()
ssc.awaitTerminationOrTimeout(60)  # Stop after one minute


-------------------------------------------
Time: 2024-05-31 14:27:12
-------------------------------------------
('Value:9', 1)

-------------------------------------------
Time: 2024-05-31 14:27:13
-------------------------------------------

-------------------------------------------
Time: 2024-05-31 14:27:14
-------------------------------------------

-------------------------------------------
Time: 2024-05-31 14:27:15
-------------------------------------------





Connection from: ('127.0.0.1', 56094)
-------------------------------------------
Time: 2024-05-31 14:27:18
-------------------------------------------
('Value:0', 1)

-------------------------------------------
Time: 2024-05-31 14:27:19
-------------------------------------------
('Value:1', 1)

-------------------------------------------
Time: 2024-05-31 14:27:20
-------------------------------------------
('Value:2', 1)

-------------------------------------------
Time: 2024-05-31 14:27:21
-------------------------------------------
('Value:3', 1)

-------------------------------------------
Time: 2024-05-31 14:27:22
-------------------------------------------
('Value:4', 1)

-------------------------------------------
Time: 2024-05-31 14:27:23
-------------------------------------------
('Value:5', 1)

-------------------------------------------
Time: 2024-05-31 14:27:24
-------------------------------------------
('Value:6', 1)

-------------------------------------------
Time: 20

False

#Step 5:  shut down the server when it is done.

In [13]:
# Start the server
server.stop()

-------------------------------------------
Time: 2024-05-31 14:28:23
-------------------------------------------
('Value:5', 1)

Socket server on port 9999 stopped
