In [2]:
import socket
import threading
import time

def start_socket_server(file_path, server_host="localhost", server_port=9999, stream_delay=1):
    # Start the socket server to stream
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server_socket.bind((server_host, server_port))
    server_socket.listen(1)
    print(f"Socket server running on {server_host}:{server_port}")
    def stream_file_to_client(client_socket):
        # Stream the file line by line
        with open(file_path, "r") as file:
            for line in file:
                client_socket.sendall(line.encode("utf-8"))
                time.sleep(stream_delay)
        client_socket.close()
    while True:
        client_socket, client_address = server_socket.accept()
        threading.Thread(target=stream_file_to_client, args=(client_socket,)).start()
# Start the socket server in a separate thread
threading.Thread(target=start_socket_server, args=("/content/sentences.txt",)).start()


Socket server running on localhost:9999


In [None]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

class KmerAnalyzer:
    def generate_kmers(self, line, k=3):
        line = line.strip()  # Remove any leading/trailing whitespace
        # Return the list of k-mers
        return [line[i:i+k].lower() for i in range(len(line) - k + 1)] if len(line) >= k else []


def main():
    # SparkContext and a StreamingContext
    spark_context = SparkContext("local[2]", "KmerCounter")
    streaming_context = StreamingContext(spark_context, 10)  # 10-second batch interval
    kmer_analyzer = KmerAnalyzer()
    # Stream the data
    socket_stream = streaming_context.socketTextStream("localhost", 9999)
    kmer_stream = socket_stream.flatMap(lambda line: kmer_analyzer.generate_kmers(line))
    kmer_count_stream = kmer_stream.map(lambda kmer: (kmer, 1)).reduceByKey(lambda count1, count2: count1 + count2)
    kmer_count_stream.pprint()
    streaming_context.start()
    streaming_context.awaitTermination()


main()



-------------------------------------------
Time: 2024-12-01 21:30:50
-------------------------------------------
('yws', 1)
('wsd', 1)
('vto', 1)
('otk', 1)
('tkr', 1)
('krq', 1)
('yod', 1)
('uhj', 1)
('hjb', 1)
('bwf', 1)
...

-------------------------------------------
Time: 2024-12-01 21:31:00
-------------------------------------------
('tga', 1)
('uki', 1)
('kij', 1)
('ija', 1)
('aha', 1)
('hac', 1)
('acy', 1)
('cyk', 1)
('yki', 1)
('kik', 1)
...

-------------------------------------------
Time: 2024-12-01 21:31:10
-------------------------------------------
('nsd', 1)
('duo', 1)
('wui', 1)
('iza', 1)
('rjn', 1)
('jni', 1)
('fzl', 1)
('zll', 1)
('lll', 1)
('llq', 1)
...

-------------------------------------------
Time: 2024-12-01 21:31:20
-------------------------------------------
('wey', 1)
('eyj', 1)
('pxr', 1)
('rxm', 1)
('gxc', 1)
('xca', 1)
('fsv', 1)
('svd', 1)
('xli', 1)
('liy', 1)
...

-------------------------------------------
Time: 2024-12-01 21:31:30
--------------