In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_extract, split, col, when, unix_timestamp, lit

In [None]:
# Configuration for Cassandra and Kafka
cassandra_host = "cassandra"
cassandra_user = "cassandra"
cassandra_pwd  = "cassandra"
cassandra_port = 9042
key_space = "loganalysis"
table_name = "nasalog"
kafka_server = "kafka:9092"
kafka_topic = "nasa_logs"

In [None]:
# Create SparkSession with Kafka and Cassandra connectors
spark = SparkSession.builder \
    .appName("Log Analyst") \
    .config("spark.jars.packages",
            "org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0,"
            "com.datastax.spark:spark-cassandra-connector_2.12:3.0.0,"
            "com.datastax.spark:spark-cassandra-connector-driver_2.12:3.0.0") \
    .config("spark.cassandra.connection.port", cassandra_port) \
    .config("spark.cassandra.connection.host", cassandra_host) \
    .config("spark.cassandra.auth.username", cassandra_user) \
    .config("spark.cassandra.auth.password", cassandra_pwd) \
    .getOrCreate()

In [None]:
# Regular expression pattern for Common Log Format
log_pattern = r'(\S+) \S+ \S+ \[([^\]]+)\] "(\S+) (\S+) \S+" (\d{3}) (\S+)?'

# Read streaming data from Kafka
log_data = spark.readStream.format("kafka") \
    .option("kafka.bootstrap.servers", kafka_server) \
    .option("subscribe", kafka_topic) \
    .option("startingOffsets", "earliest") \
    .load() \
    .selectExpr("CAST(value AS STRING) as value") \
    # Extract fields from log line using regex
    .withColumn("host", regexp_extract("value", log_pattern, 1)) \
    .withColumn("time", regexp_extract("value", log_pattern, 2)) \
    .withColumn("method", regexp_extract("value", log_pattern, 3)) \
    .withColumn("url", regexp_extract("value", log_pattern, 4)) \
    .withColumn("response", regexp_extract("value", log_pattern, 5)) \
    .withColumn("bytes", regexp_extract("value", log_pattern, 6)) \
    # Add processing timestamp
    .withColumn("time_added", unix_timestamp()) \
    # Extract file extension from URL
    .withColumn("extension",
                when(split(col("url"), "\.").getItem(-1).isNull(), "None")
                .otherwise(split(col("url"), "\.").getItem(-1))) \
    # Replace missing host with "unknown"
    .withColumn("host",
                when(col("host").isNull() | (col("host") == ""), lit("unknown"))
                .otherwise(col("host"))) \
    .drop("value")

In [None]:
# Function to process each micro-batch
def process_row(df, epoch_id):
    # Write batch to Cassandra
    df.write \
      .format("org.apache.spark.sql.cassandra") \
      .mode("append") \
      .options(table=table_name, keyspace=key_space) \
      .save()
    # Write batch to HDFS as CSV
    df.coalesce(1).write.csv("hdfs://52.202.83.122:8020/output/nasa_logs/", mode="append", header=True)

In [None]:
# Start the streaming query
query = log_data.writeStream \
    .outputMode("append") \
    .option("checkpointLocation", "checkpoint/data") \
    .foreachBatch(process_row) \
    .start()\
    .awaitTermination()