In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, StructType, StructField, TimestampType, IntegerType
from pyspark.sql.functions import from_json, col

# Define the path to the jars on the EC2 instance
spark_jars_path = "/home/ec2-user/stream-processing-template/jars"  # <-- Update this path

spark = SparkSession.builder.appName("retail_pysaprk_consumer") \
    .config("spark.jars", f"{spark_jars_path}/commons-pool2-2.11.1.jar,"
            f"{spark_jars_path}/spark-sql-kafka-0-10_2.12-3.4.0.jar,"
            f"{spark_jars_path}/spark-streaming-kafka-0-10-assembly_2.12-3.4.0.jar") \
    .getOrCreate()


23/12/12 09:07:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
# Define the schema for our data
schema = StructType([
    StructField("store_location", StringType(), True),
    StructField("time_of_purchase", TimestampType(), True),
    StructField("product_ID", StringType(), True),
    StructField("transaction_amount", IntegerType(), True)
])

# Stream from Kafka topic
df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "b-1.monstercluster1.6xql65.c3.kafka.eu-west-2.amazonaws.com:9092") \
    .option("subscribe", "retail_transactions") \
    .load()

In [None]:
transactions = (df.selectExpr("CAST(value AS STRING)")
                .withColumn("data", from_json(col("value"), schema))
                .select("data.*"))

query = transactions.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

query.awaitTermination()

In [None]:
query.stop()

For your use case, you only need to start the Kafka stream once. The `transactions` DataFrame you defined will continuously read the data from Kafka. Each of your transformation operations (like aggregations, filtering, etc.) will then be applied to this `transactions` DataFrame.

However, when you want to view the results of these transformations, you have to initiate separate streaming queries for each operation. The key is to define these transformations as separate queries from the initial stream read.

Here’s how you can structure this:

1. **Start the Kafka Stream**: Run this block once to initialize the streaming DataFrame from Kafka.

   ```python
   transactions = (df.selectExpr("CAST(value AS STRING)")
                   .withColumn("data", from_json(col("value"), schema))
                   .select("data.*"))
   ```

2. **Apply Transformations and View Results**: For each transformation, you'll create a new streaming query. Here's how you would modify the provided examples to view their output:

   **Example - Total Transaction Amounts by Store Location**:

   ```python
   total_amounts_by_location = transactions.groupBy("store_location")\
                                           .agg(_sum("transaction_amount").alias("total_amount"))

   query_total_amounts = total_amounts_by_location.writeStream \
       .outputMode("complete") \
       .format("console") \
       .start()

   query_total_amounts.awaitTermination()
   ```

   You would replicate this pattern for each of your other transformations, creating separate queries and viewing their results independently.

3. **Managing Multiple Streams**: If you start multiple queries like this, each will output to the console independently. To view only the output of a specific transformation, you should stop other active queries using `query.stop()` before starting a new one.

4. **Note on `.awaitTermination()`**: Be aware that `query.awaitTermination()` will block the current thread until the stream query stops, which means you won't be able to run other code in the same notebook or script while the query is active. If you're experimenting in a Jupyter Notebook, you might want to remove the `.awaitTermination()` call and rely on the notebook's interactive environment to manage the stream's lifecycle. Alternatively, for long-running streams or automated scripts, `.awaitTermination()` is appropriate.

By structuring your Jupyter Notebook this way, you can effectively manage multiple transformations on your Kafka stream and view each transformation's output independently.

1. Transformations:
Aggregations:

Total transaction amounts by store location.

In [None]:
from pyspark.sql.functions import sum as _sum

total_amounts_by_location = transactions.groupBy("store_location")\
                                        .agg(_sum("transaction_amount").alias("total_amount"))

query_total_amounts = total_amounts_by_location.writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

query_total_amounts.awaitTermination()


In [None]:
# After viewing results from above snippet, and when we are ready to stop the stream...

query_total_amounts.stop()

Task 1.2: Number of transactions by store location.

In [None]:
num_transactions_by_location = transactions.groupBy("store_location")\
                                           .count()\
                                           .withColumnRenamed("count", "num_transactions")

query_num_transactions = num_transactions_by_location.writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

query_num_transactions.awaitTermination()

In [None]:
query_num_transactions.stop()

Task 1.3: Average Transaction Amount by Store Location

In [None]:
from pyspark.sql.functions import avg

avg_transaction_amount_by_location = transactions.groupBy("store_location")\
                                                 .agg(avg("transaction_amount").alias("avg_transaction_amount"))

query_avg_transaction_amount = avg_transaction_amount_by_location.writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

query_avg_transaction_amount.awaitTermination()

In [None]:
query_avg_transaction_amount.stop()

2. Time Series Analysis
2.1 Transactions per Hour/Day

In [None]:
from pyspark.sql.functions import hour, dayofmonth

transactions_per_hour = transactions.withColumn("hour", hour("time_of_purchase"))\
                                    .groupBy("hour")\
                                    .count()\
                                    .withColumnRenamed("count", "transactions_per_hour")

query_transactions_per_hour = transactions_per_hour.writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

query_transactions_per_hour.awaitTermination()

In [None]:
query_transactions_per_hour.stop()

In [None]:
transactions_per_day = transactions.withColumn("day", dayofmonth("time_of_purchase"))\
                                   .groupBy("day")\
                                   .count()\
                                   .withColumnRenamed("count", "transactions_per_day")

query_transactions_per_day = transactions_per_day.writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

query_transactions_per_day.awaitTermination()

In [None]:
query_transactions_per_day.stop()

2.2 Total or Average Transaction Amounts Over Time

In [None]:
from pyspark.sql.functions import sum as _sum

total_amounts_over_time = transactions.groupBy(dayofmonth("time_of_purchase").alias("day"))\
                                      .agg(_sum("transaction_amount").alias("total_amount"))

query_total_amounts_over_time = total_amounts_over_time.writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

query_total_amounts_over_time.awaitTermination()

In [None]:
query_total_amounts_over_time.stop()

In [None]:
avg_amounts_over_time = transactions.groupBy(dayofmonth("time_of_purchase").alias("day"))\
                                    .agg(avg("transaction_amount").alias("avg_amount"))

query_avg_amounts_over_time = avg_amounts_over_time.writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

query_avg_amounts_over_time.awaitTermination()

In [None]:
query_avg_amounts_over_time.stop()

3. Product Analysis
Top N Products by Number of Transactions

In [None]:
top_n_products_by_transactions = transactions.groupBy("product_ID")\
                                             .count()\
                                             .withColumnRenamed("count", "num_transactions")\
                                             .orderBy(col("num_transactions").desc())

query_top_n_products_transactions = top_n_products_by_transactions.writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

query_top_n_products_transactions.awaitTermination()

In [None]:
query_top_n_products_transactions.stop()

3.2 Top N Products by Total Transaction Amount

In [None]:
top_n_products_by_total_amount = transactions.groupBy("product_ID")\
                                             .agg(_sum("transaction_amount").alias("total_transaction_amount"))\
                                             .orderBy(col("total_transaction_amount").desc())

query_top_n_products_total_amount = top_n_products_by_total_amount.writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

query_top_n_products_total_amount.awaitTermination()

In [None]:
query_top_n_products_total_amount.stop()

4. Anomaly Detection
Identify Transactions with Amounts Significantly Different from the Average

--

The error you're encountering, "AnalysisException: Queries with streaming sources must be executed with writeStream.start(); kafka", indicates that there is an issue with how you're attempting to process the streaming data from Kafka. This error typically occurs when you're trying to use an action (like `.show()`, `.collect()`, etc.) that is meant for batch processing on a streaming DataFrame.

In your case, the issue seems to be arising from these lines:

```python
std_dev = transactions.agg(stddev("transaction_amount").alias("stddev_amount")).collect()[0]["stddev_amount"]
avg_amount = transactions.agg(avg("transaction_amount").alias("avg_amount")).collect()[0]["avg_amount"]
```

In a streaming context, you cannot directly use actions such as `collect()` on a streaming DataFrame, since it represents an unbounded dataset. The standard aggregation functions and actions like `collect()` are designed for batch DataFrames where the dataset is finite and fully available at the time of computation.

To work around this, you need to define your anomaly detection logic within the streaming query itself, and then start the query with `writeStream`. However, calculating a standard deviation or average in real-time over a streaming dataset can be complex and may require using stateful operations or windowed aggregations.

Here's an alternative approach using windowed aggregations for streaming data:

```python
from pyspark.sql.functions import window, stddev, avg

# Windowed aggregation for streaming data
windowed_transactions = transactions \
    .withColumn("window", window("time_of_purchase", "1 hour")) \
    .groupBy("window") \
    .agg(
        stddev("transaction_amount").alias("stddev_amount"),
        avg("transaction_amount").alias("avg_amount")
    )

# You can then start the stream to write the results to the console
query_windowed_transactions = windowed_transactions.writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

query_windowed_transactions.awaitTermination()
```

This code snippet uses a tumbling window of 1 hour to calculate the standard deviation and average of transaction amounts. Each window will be processed independently. Remember that in a streaming context, especially with windowed aggregations, your results are calculated over the data in the respective windows and not over the entire dataset.

In [None]:
from pyspark.sql.functions import window, stddev, avg

# Windowed aggregation for streaming data
windowed_transactions = transactions \
    .withColumn("window", window("time_of_purchase", "1 hour")) \
    .groupBy("window") \
    .agg(
        stddev("transaction_amount").alias("stddev_amount"),
        avg("transaction_amount").alias("avg_amount")
    )

# You can then start the stream to write the results to the console
query_anomalous_transactions = windowed_transactions.writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

query_anomalous_transactions.awaitTermination()

In [34]:
query_anomalous_transactions.stop()

                                                                                

-------------------------------------------
Batch: 26
-------------------------------------------
+--------------------+------------------+-----------------+
|              window|     stddev_amount|       avg_amount|
+--------------------+------------------+-----------------+
|{2023-12-12 09:00...|359.34876686639353|452.1512605042017|
+--------------------+------------------+-----------------+





5. Filtering
Filter Transactions Below or Above a Certain Amount

In [None]:
filtered_transactions = transactions.filter((col("transaction_amount") > 50) & 
                                            (col("transaction_amount") < 1000))

query_filtered_transactions = filtered_transactions.writeStream \
    .outputMode("update") \
    .format("console") \
    .start()

query_filtered_transactions.awaitTermination()

In [8]:
query_filtered_transactions.stop()