In [9]:
!nc -lk 9999

^C


In [1]:
import logging
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
import json
import threading
import time
import json
import subprocess
import random

In [2]:
# Enable logging
logging.basicConfig(level=logging.INFO)

In [3]:
# Create a Spark session
spark = SparkSession.builder.appName("MolecularSimulations").getOrCreate()

# Access the SparkContext
sc = spark.sparkContext

# Set the log level to INFO
sc.setLogLevel("INFO")

# Define a class to represent molecular simulations data
class MolecularSimulationData:
    """
    Represents data from molecular simulations.

    Attributes:
        molecule_id (str): Identifier for the molecule.
        timestamp (str): Timestamp of the simulation data.
        properties (dict): Properties associated with the molecule.
    """

    def __init__(self, molecule_id, timestamp, properties):
        self.molecule_id = molecule_id
        self.timestamp = timestamp
        self.properties = properties

    def __repr__(self):
        return f"Molecule ID: {self.molecule_id}, Timestamp: {self.timestamp}, Properties: {self.properties}"

23/12/08 17:55:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/08 17:55:54 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/12/08 17:55:54 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
23/12/08 17:55:54 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
23/12/08 17:55:54 WARN Utils: Service 'SparkUI' could not bind on port 4043. Attempting port 4044.
23/12/08 17:55:54 WARN Utils: Service 'SparkUI' could not bind on port 4044. Attempting port 4045.


In [4]:
def create_streaming_context():
    # Create a Spark session
    spark = SparkSession.builder.appName("MolecularSimulations").getOrCreate()

    # Create a local StreamingContext with two working threads and a batch interval of 1 second
    ssc = StreamingContext(spark.sparkContext, 1)



    # Define a function to send data to the socket
    def send_data_to_socket(molecule_id, temperature, pressure):
        timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
        data = {
            "molecule_id": str(molecule_id),
            "timestamp": timestamp,
            "properties": {"temperature": temperature, "pressure": pressure}
        }
        print(data)
        data_str = json.dumps(data)
        subprocess.run(["echo", data_str, "|", "nc", "localhost", "9999"], shell=True)

    # Simulate a continuous data stream
    molecule_id = 1 * random.randint(3, 199)
    while True:
        temperature = 200 + (molecule_id % 5) * 20 * random.randint(3, 199) # Vary temperature for different molecule IDs
        pressure = 1 + molecule_id % 3  * random.randint(3, 199)# Vary pressure for different molecule IDs
        send_data_to_socket(molecule_id, temperature, pressure)
        time.sleep(1)  # Sleep for 1 second before sending the next data

    # Parse the input JSON data
    def parse_input(line):
        try:
            data = json.loads(line)
            molecule_id = data.get("molecule_id")
            timestamp = data.get("timestamp")
            properties = data.get("properties")
            if molecule_id and timestamp and properties:
                return MolecularSimulationData(molecule_id, timestamp, properties)
        except json.JSONDecodeError as e:
            print(f"Error parsing input JSON: {e}")
        return None

    # Process the data stream
    # Process the data stream
    def process_data(rdd):
        # Print the raw lines received from the socket
        raw_lines = rdd.collect()
        
        print(f"Raw Lines: {raw_lines}")

        # Parse the data
        parsed_data = rdd.map(parse_input).filter(lambda x: x is not None)

        # Filter out data with missing or incorrect values
        valid_data = parsed_data.filter(lambda x: x.properties.get("temperature") is not None and x.properties.get("pressure") is not None)

        # Calculate aggregated statistics for the entire dataset
        total_molecules = valid_data.count()
        max_temperature = valid_data.map(lambda x: x.properties["temperature"]).max()
        min_pressure = valid_data.map(lambda x: x.properties["pressure"]).min()

        # Print the aggregated statistics
        print(f"Total Number of Molecules: {total_molecules}")
        print(f"Maximum Temperature: {max_temperature}")
        print(f"Minimum Pressure: {min_pressure}")


    # Apply parsing and processing functions to the DStream
    parsed_data = lines.map(parse_input)
    parsed_data.foreachRDD(process_data)

    return ssc

In [5]:
# Create a StreamingContext and start it in a separate thread
ssc = create_streaming_context()

def start_streaming_context():
    ssc.start()
    ssc.awaitTermination()

INFO:py4j.java_gateway:Callback Server Starting
INFO:py4j.java_gateway:Socket listening on ('127.0.0.1', 58458)


{'molecule_id': '50', 'timestamp': '2023-12-08 17:55:55', 'properties': {'temperature': 200, 'pressure': 77}}

{'molecule_id': '50', 'timestamp': '2023-12-08 17:55:56', 'properties': {'temperature': 200, 'pressure': 267}}

{'molecule_id': '50', 'timestamp': '2023-12-08 17:55:57', 'properties': {'temperature': 200, 'pressure': 215}}

{'molecule_id': '50', 'timestamp': '2023-12-08 17:55:58', 'properties': {'temperature': 200, 'pressure': 335}}

{'molecule_id': '50', 'timestamp': '2023-12-08 17:55:59', 'properties': {'temperature': 200, 'pressure': 263}}

{'molecule_id': '50', 'timestamp': '2023-12-08 17:56:00', 'properties': {'temperature': 200, 'pressure': 271}}

{'molecule_id': '50', 'timestamp': '2023-12-08 17:56:01', 'properties': {'temperature': 200, 'pressure': 211}}

{'molecule_id': '50', 'timestamp': '2023-12-08 17:56:02', 'properties': {'temperature': 200, 'pressure': 65}}

{'molecule_id': '50', 'timestamp': '2023-12-08 17:56:03', 'properties': {'temperature': 200, 'pressure': 3

{'molecule_id': '50', 'timestamp': '2023-12-08 17:57:10', 'properties': {'temperature': 200, 'pressure': 261}}

{'molecule_id': '50', 'timestamp': '2023-12-08 17:57:11', 'properties': {'temperature': 200, 'pressure': 205}}

{'molecule_id': '50', 'timestamp': '2023-12-08 17:57:12', 'properties': {'temperature': 200, 'pressure': 291}}

{'molecule_id': '50', 'timestamp': '2023-12-08 17:57:13', 'properties': {'temperature': 200, 'pressure': 173}}

{'molecule_id': '50', 'timestamp': '2023-12-08 17:57:14', 'properties': {'temperature': 200, 'pressure': 301}}

{'molecule_id': '50', 'timestamp': '2023-12-08 17:57:15', 'properties': {'temperature': 200, 'pressure': 89}}

{'molecule_id': '50', 'timestamp': '2023-12-08 17:57:17', 'properties': {'temperature': 200, 'pressure': 159}}

{'molecule_id': '50', 'timestamp': '2023-12-08 17:57:18', 'properties': {'temperature': 200, 'pressure': 107}}

{'molecule_id': '50', 'timestamp': '2023-12-08 17:57:19', 'properties': {'temperature': 200, 'pressure': 

KeyboardInterrupt: 