# SPARK Structured Streaming

In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m3.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=45d3aed627ade9cde12208b2b3ef306f344fff0afdb089faff5e8ea06c458b9e
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


## Sockets

In [None]:
!mkdir ./stream_dir

In [None]:
import time
import socket

IP = socket.gethostbyname(socket.getfqdn(socket.gethostname()))
PORT = 9999
N = 0

def run_server(steps=50)):
    global IP,PORT,N

    Fout=open("./stream_%d/tmp.csv"%N, "w")
    Fout.write("rowId,Product\n") #header

    # create a socket object
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    server_ip = IP
    port = PORT

    # bind the socket to a specific address and port
    server.bind((server_ip, port))
    # listen for incoming connections
    server.listen(0)
    print(f"Listening on {server_ip}:{port}")

    # accept incoming connections
    client_socket, client_address = server.accept()
    print(f"Accepted connection from {client_address[0]}:{client_address[1]}")

    while True:

        #request = client_socket.recv(1024)
        #request = request.decode("utf-8")

        time.sleep(2) #wait two seconds to send next data

        # convert and send accept response to the client
        N+=1
        client_socket.send(("Product %d"%(N%3)).encode('utf-8'))
        Fout.write("%d, Product%d"%(N, N%3)+"\n")
        #if request.lower() == "close":
           #client_socket.send("closed".encode("utf-8"))

        if N>=steps: break

    # close connection socket with the client
    client_socket.close()
    print("Connection to client closed")
    # close server socket
    server.close()

#In Colab we need to launch a thread with the socket server
import threading
threading.Thread(target=run_server).start()

Listening on 172.28.0.12:9999


### Testing a socket client (read-only)

In [None]:
def run_client(steps=10):

  client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

      # establish connection with server
  client.connect((IP, PORT))

  n=0
  while n<steps:
    response = client.recv(1024)
    print(response.decode("utf-8"))
    n+=1

  client.close()

import threading
threading.Thread(target=run_client, kwargs={'steps': 5}).start()

Accepted connection from 172.28.0.12:40988


## PySPARK Streaming with Sockets

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *   #windowing & ordering functions
from pyspark.sql.types import *       #building schemas

spark = SparkSession.builder.\
                    .master("local") \
                    .appName("Streaming")\
                    .config("spark.ui.port", "4050")\
                    .getOrCreate()

Exception in thread Thread-10 (run_server):
Traceback (most recent call last):
  File "/usr/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.10/threading.py", line 953, in run
    self._target(*self._args, **self._kwargs)
  File "<ipython-input-6-2d2efbf6a74f>", line 38, in run_server
BrokenPipeError: [Errno 32] Broken pipe


In [None]:
rawdata = spark.readStream \
    .format("socket") \
    .option("host", IP) \
    .option("port", PORT) \
    .option("includeTimeStamp", True) \
    .load()

print("is it streaming?", rawdata.isStreaming)

query1 = rawdata.select((rawdata.value).alias("product"), (rawdata.timestamp).alias("time")) \
               .groupBy(window("time","5 seconds"), "product").count() \
               .sort(desc("window"))

query2 = rawdata.select((rawdata.value).alias("product"), (rawdata.timestamp).alias("time"))


result1 = rawdata.writeStream.format("memory").queryName("tmp0").start(truncate=False) #use console in your local machine
result2 = query1.writeStream.outputMode("complete").format("memory").queryName("tmp1").start(truncate=False)

#result.awaitTermination(timeout=20)

#result.stop()

is it streaming? True
None
Accepted connection from 172.28.0.12:55244


In [None]:
print(result1.lastProgress)
print(result2.lastProgress)

In [None]:
spark.streams.active

[]

In [None]:
spark.sql(""" SELECT  * from tmp0 """).show(5)

+-----+---------+
|value|timestamp|
+-----+---------+
+-----+---------+



In [None]:
result.lastProgress

{'id': '922c6843-0921-4e8f-a62b-fe3ac12690a3',
 'runId': '84c5dfc7-99f3-42e9-a1bb-f5caa7d2a189',
 'name': 'tmp0',
 'timestamp': '2023-10-24T17:37:32.000Z',
 'batchId': 1,
 'numInputRows': 0,
 'inputRowsPerSecond': 0.0,
 'processedRowsPerSecond': 0.0,
 'durationMs': {'latestOffset': 0, 'triggerExecution': 0},
 'stateOperators': [],
 'sources': [{'description': 'TextSocketV2[host: 172.28.0.12, port: 9986]',
   'startOffset': -1,
   'endOffset': -1,
   'latestOffset': -1,
   'numInputRows': 0,
   'inputRowsPerSecond': 0.0,
   'processedRowsPerSecond': 0.0}],
 'sink': {'description': 'MemorySink', 'numOutputRows': 0}}

## CONTINUOUS DATAFRAMES

In [None]:
schema = StructType([ StructField("rowID",StringType(),True), StructField("Product",StringType(),True) ])

In [None]:
dfs = spark.readStream\
          .schema(schema) \  #schema can be also expressed as a string "rowId STRING, Product STRING"
          .option("header", True) \
          .format("csv") \
          .load("./stream_dir")

dfs.isStreaming

False

In [None]:
#Run client to add new data into the stream directory

threading.Thread(target=run_client, kwargs={'steps': 10}).start()

Exception in thread Thread-17 (run_client):
Traceback (most recent call last):
  File "/usr/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.10/threading.py", line 953, in run
    self._target(*self._args, **self._kwargs)
  File "<ipython-input-7-0bfd06c51eee>", line 6, in run_client
ConnectionRefusedError: [Errno 111] Connection refused


In [None]:
results_df = dfs.select("*")

query = (results_df.writeStream \
    .format("json") \
    .queryName("selectTable") \
    .option("checkpointLocation", "checkpoint")\
    .option("path", "results")\
    .outputMode("complete") \
    .start() \
    .awaitTermination() \
)

In [None]:
query.lastProgress

{'id': '88e43b29-4559-49d5-a5cd-6387a6bca421',
 'runId': 'cbb71411-0807-483a-aa72-c818648ef82b',
 'name': None,
 'timestamp': '2023-10-24T18:23:32.413Z',
 'batchId': 1,
 'numInputRows': 0,
 'inputRowsPerSecond': 0.0,
 'processedRowsPerSecond': 0.0,
 'durationMs': {'latestOffset': 2, 'triggerExecution': 2},
 'stateOperators': [],
 'sources': [{'description': 'FileStreamSource[file:/content/tmp_dir]',
   'startOffset': {'logOffset': 0},
   'endOffset': {'logOffset': 0},
   'latestOffset': None,
   'numInputRows': 0,
   'inputRowsPerSecond': 0.0,
   'processedRowsPerSecond': 0.0}],
 'sink': {'description': 'org.apache.spark.sql.execution.streaming.ConsoleTable$@71a474f2',
  'numOutputRows': 0}}