We are going to work on interesting topic of streaming data. The idea of this notebook is as follows

- Take a streaming stock market input that is provided by the netcat server. (Python program is written for the same)

- Data is caught by the spark stream reader, and schema is detected

- The data is then written to the local database first

- Writing the data to file using the for each and for each batch

- Working on joining multiple streaming information together. 
(Need think about the dataset that is sent in parallel from another netcat server)


* Compelete the python share_pusher.py and weather_pusher.py scripts

The server will have 

tail_logs.sh | nc -lvnp 9999

The reciever will have 

nc -vn 127.0.0.1 9999

In [1]:
import pyspark
import pandas as pd
from pyspark.sql.functions import *
from pyspark.sql import SparkSession

In [3]:
##Lets simply read the data from socket

socket = SparkSession.builder.appName("socket-reader"). \
        config("spark.jars","/usr/share/java/postgresql-42.2.26.jar"). \
        master("local[*]").getOrCreate()

23/01/27 21:18:52 WARN Utils: Your hostname, codeStation resolves to a loopback address: 127.0.1.1; using 172.17.0.1 instead (on interface docker0)
23/01/27 21:18:52 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
23/01/27 21:18:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [4]:
socketReader = socket.readStream.format('socket'). \
                option("host","localhost"). \
                option("port","9999"). \
                load()

23/01/27 21:20:37 WARN TextSocketSourceProvider: The socket source should not be used for production applications! It does not support recovery.


In [5]:
socketReader.printSchema()

root
 |-- value: string (nullable = true)



The below query is built step by step, by checking the write stream

In [None]:
dev_count = socketReader.select(split('value',' ')[0].alias('addr'),
                               split(split('value',' ')[3],":")[0].alias('date'),
                               split('value',' ')[6].alias('dept')) 

In [None]:
dev_count.printSchema()

In [None]:
dev_count.writeStream.format("console").outputMode("append"). \
    trigger(processingTime='5 seconds').start()

In [None]:
filter_count = socketReader.filter(split('value',' ')[6].contains('department')) \
                        .select(split('value',' ')[0].alias('addr'),
                               split(split('value',' ')[3],":")[0].alias('date'),
                               split('value',' ')[6].alias('dept')) 

In [None]:
filter_count.printSchema()

In [None]:
filter_count.writeStream.format("console").outputMode('append'). \
            trigger(processingTime='5 seconds').start()

In [None]:
filter_count.writeStream.format("console").outputMode('append'). \
            trigger(processingTime='5 seconds'). \
            option("truncate","false").start()

In [6]:
exact_filter = socketReader.filter(split(split('value',' ')[6],'/')[1] == 'department') \
                        .select(split('value',' ')[0].alias('addr'),
                               split(split('value',' ')[3],":")[0].alias('date'),
                               split('value',' ')[6].alias('dept')) 

In [7]:
exact_filter.writeStream.format("console").outputMode('append'). \
            trigger(processingTime='5 seconds'). \
            option("truncate","false").start()

23/01/27 21:21:36 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-a7042b99-283f-4a53-a041-e13f2899b87c. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/01/27 21:21:36 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


<pyspark.sql.streaming.StreamingQuery at 0x7f67ab3ab6d0>

                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+----+----+----+
|addr|date|dept|
+----+----+----+
+----+----+----+

-------------------------------------------
Batch: 1
-------------------------------------------
+-------------+------------+------------------------------------+
|addr         |date        |dept                                |
+-------------+------------+------------------------------------+
|131.3.183.100|[27/Jan/2023|/department/team%20sports/categories|
|47.199.148.5 |[27/Jan/2023|/department/footwear/categories     |
|81.179.176.92|[27/Jan/2023|/department/footwear/products       |
+-------------+------------+------------------------------------+

-------------------------------------------
Batch: 2
-------------------------------------------
+-------------+------------+-------------------------------+
|addr         |date        |dept                           |
+-------------+------------+---------------------------

What is happening with foreach is very interesting!!!

In [29]:
def write_to_postgres(row):
    print(type(row))
    print(row.asDict())

In [36]:
def write_to_batch(row,batch):
    print(row)
    print(batch)

In [38]:
#It connected to database, and then threw error
# Each row is acting like a new dataframe
def write_to_db(row,batch):
    row.write.format('jdbc') \
                .option("url", "jdbc:postgresql://localhost:5432/maintenance") \
                .option('dbtable','streaming_rice') \
                .option('user','postgres') \
                .option('password', '1234') \
                .option('driver','org.postgresql.Driver') \
                .save() 

In [30]:
exact_filter.writeStream.foreach(write_to_postgres).start()

23/01/27 21:46:43 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-ef017c78-9aa7-4b51-918b-8dda532b9dbd. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/01/27 21:46:43 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


<pyspark.sql.streaming.StreamingQuery at 0x7f676b662b00>

<class 'pyspark.sql.types.Row'>
{'addr': '189.32.98.223', 'date': '[27/Jan/2023', 'dept': '/department/outdoors/products'}
<class 'pyspark.sql.types.Row'>
{'addr': '23.200.115.206', 'date': '[27/Jan/2023', 'dept': '/department/fan%20shop/categories'}<class 'pyspark.sql.types.Row'>
{'addr': '82.229.233.192', 'date': '[27/Jan/2023', 'dept': '/department/fitness/products'}
<class 'pyspark.sql.types.Row'>
{'addr': '128.33.114.221', 'date': '[27/Jan/2023', 'dept': '/department/golf/categories'}

<class 'pyspark.sql.types.Row'>
{'addr': '219.149.174.10', 'date': '[27/Jan/2023', 'dept': '/department/team%20sports/categories'}
<class 'pyspark.sql.types.Row'>
{'addr': '87.7.174.197', 'date': '[27/Jan/2023', 'dept': '/department/team%20sports/categories'}
<class 'pyspark.sql.types.Row'>
{'addr': '134.202.158.81', 'date': '[27/Jan/2023', 'dept': '/department/fitness/products'}
<class 'pyspark.sql.types.Row'>
{'addr': '211.230.17.206', 'date': '[27/Jan/2023', 'dept': '/department/team%20sports/pro

23/01/27 21:46:58 WARN TextSocketMicroBatchStream: Stream closed by localhost:9999


In [41]:
exact_filter.writeStream.outputMode('append'). \
        foreachBatch(write_to_db). \
        trigger(processingTime='10 seconds').start()

23/01/27 21:59:51 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-ca0cbdc1-9d91-43d2-8018-b7366f21fb6b. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/01/27 21:59:51 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


<pyspark.sql.streaming.StreamingQuery at 0x7f676a6ac700>

23/01/27 22:00:00 ERROR MicroBatchExecution: Query [id = 75c7336b-5423-4cab-966a-e296d0e7d4fd, runId = f2f69064-46b0-4ebf-b98f-c71c044294fc] terminated with error
py4j.Py4JException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):
  File "/home/solverbot/.local/lib/python3.10/site-packages/py4j/clientserver.py", line 617, in _call_proxy
    return_value = getattr(self.pool[obj_id], method)(*params)
  File "/home/solverbot/.local/lib/python3.10/site-packages/pyspark/sql/utils.py", line 272, in call
    raise e
  File "/home/solverbot/.local/lib/python3.10/site-packages/pyspark/sql/utils.py", line 269, in call
    self.func(DataFrame(jdf, self.session), batch_id)
  File "/tmp/ipykernel_200706/1714206912.py", line 8, in write_to_db
    .save()
  File "/home/solverbot/.local/lib/python3.10/site-packages/pyspark/sql/readwriter.py", line 966, in save
    self._jwrite.save()
  File "/home/solverbot/.local/lib/python3.10/site-packages/py4j/java_g

In [None]:
exact_filter.write.getItem.

In [None]:
get_group = socketReader.filter(split(split('value',' ')[6],'/')[1] == 'department') \
                        .select(split('value',' ')[0].alias('addr'),
                               split(split('value',' ')[3],":")[0].alias('date'),
                               split(split('value',' ')[6],'/')[2].alias('dept')) \
                        .groupby('dept').agg(count(lit(1)).alias('dept_count'))

In [None]:
get_group.writeStream.format("console").outputMode("complete"). \
        trigger(processingTime='5 seconds'). \
        option("truncate","false").start()

In [None]:
get_group.writeStream.format("console").outputMode("update"). \
        trigger(processingTime='15 seconds'). \
        option("truncate","false").start()

In [None]:
socketReader.createOrReplaceTempView("socketReader")

In [None]:
socket.sql("SHOW VIEWS").show()

In [None]:
socketReader.printSchema()

In [None]:
socket.sql("""SELECT * FROM socketReader""")

In [None]:
new_get_group = socket.sql("""
    SELECT SPLIT(SPLIT(value, ' ')[6], '/')[2] AS dept,
    COUNT(value) as dept_count,
    FROM socketReader
    WHERE SPLIT(SPLIT(value, ' ')[6], '/')[1] = 'department'
    GROUP BY SPLIT(SPLIT(value, ' ')[6], '/')[2]
""")

In [None]:
socketReader. \
    writeStream. \
    format('csv'). \
    option("checkpointLocation", f'/run/media/solverbot/repoA/checkpoint'). \
    option('path', f'/run/media/solverbot/repoA/data'). \
    trigger(processingTime='5 seconds'). \
    start()