In [2]:
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext

In [3]:
def initialize_spark():
    """Initialize SparkSession and StreamingContext."""
    # Create a SparkSession specifying master and app name
    spark = SparkSession.builder \
        .master("local[2]") \
        .appName("NYSEStockStreamingApp") \
        .getOrCreate()
    
    # Create a Streaming Context with a batch interval of 1 second
    ssc = StreamingContext(spark.sparkContext, 1)
    
    return spark, ssc

In [4]:
def load_previous_max_prices(spark):
    """Load previous max prices from a CSV file into an RDD and cache it for later use."""
    previous_max_prices_df = spark.read.csv("previous_max_price.csv", header=False, inferSchema=True)
    previous_max_prices = previous_max_prices_df.rdd.map(lambda row: (row[0], float(row[1]))).cache()
    
    return previous_max_prices

In [5]:
def process_stock_data(lines, previous_max_prices):
    """Process streaming stock data."""
    # Map each line to a tuple of (stock symbol, (timestamp, price)) and convert price to float
    stock_data = lines.map(lambda line: line.split(",")).map(lambda x: (x[0], (x[1], float(x[2]))))
    
    # Join the stream data with previous max prices data
    joined_data = stock_data.transform(lambda rdd: rdd.join(previous_max_prices))
    
    # Filter the joined data to keep only the records where the current price is greater than or equal to the previous max price
    filtered_data = joined_data.filter(lambda x: x[1][0][1] >= x[1][1])
    
    return filtered_data

In [6]:
def calculate_percentage_change(data):
    """Calculate the percentage increase or decrease and format the output."""
    stock_symbol, ((timestamp, current_price), previous_max_price) = data
    percentage_change = ((current_price - previous_max_price) / previous_max_price) * 100
    return f"{stock_symbol}: Current Price: {current_price}, Previous Max Price: {previous_max_price}, Percentage Change: {percentage_change:.2f}%"

In [None]:
def main():
    # Initialize SparkSession and StreamingContext
    spark, ssc = initialize_spark()
    
    # Load previous max prices
    previous_max_prices = load_previous_max_prices(spark)
    
    # Create a DStream by connecting to a socket on localhost and port 9999
    lines = ssc.socketTextStream("localhost", 9999)
    
    # Process streaming stock data
    filtered_data = process_stock_data(lines, previous_max_prices)
    
    # Apply the percentage change calculation and format the output
    formatted_data = filtered_data.map(calculate_percentage_change)
    
    # Print the formatted data
    formatted_data.pprint()
    
    # Start the streaming context
    ssc.start()
    
    # Wait for the termination of the streaming context
    ssc.awaitTermination()

if __name__ == "__main__":
    main()



-------------------------------------------
Time: 2024-03-04 20:57:34
-------------------------------------------

-------------------------------------------
Time: 2024-03-04 20:57:35
-------------------------------------------

-------------------------------------------
Time: 2024-03-04 20:57:36
-------------------------------------------
JAS: Current Price: 50.43, Previous Max Price: 38.06, Percentage Change: 32.50%

-------------------------------------------
Time: 2024-03-04 20:57:37
-------------------------------------------

-------------------------------------------
Time: 2024-03-04 20:57:38
-------------------------------------------

