In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

In [2]:
BOOTSTRAP_SERVERS = 'confluent-local-broker-1:50459'
TOPIC = 'server_logs'

In [3]:
spark = SparkSession.builder.appName('AnalyticsConsumer').getOrCreate()

In [4]:
def test(df, sleep_time=5, output_mode='append'):
    import time
    query = df.writeStream.outputMode(output_mode).format('console').options(truncate=False).start()
    time.sleep(sleep_time)
    query.stop()

In [5]:
kafka_stream_df = (
    spark.readStream
    .format('kafka')
    .option('kafka.bootstrap.servers', BOOTSTRAP_SERVERS)
    .option('subscribe', TOPIC)
    .load()
)

In [6]:
test(kafka_stream_df)

## Write to SQL:
#### 1. Write all of the data to a SQL database table named server_logs. Include columns: 
1. ip_address (text)
2. user_name (text)
3. user_id (integer)
4. timestamp (timestamp without time zone)
5. http_method(text)
6. path (text)
7. status_code (integer)

#### sample data
104.192.183.73 carl_mejia 9693 [2024-09-17T01:06:18.735420+00:00] "GET /video/r335Q7vg9" 200

In [7]:
df = kafka_stream_df.select(
    kafka_stream_df.value.
    cast('string')
)

df = df.select(F.regexp_replace(df.value, r'\[|\]|\"', '').alias('data'))

df = df.select(F.split(df.data, ' ').alias('data'))

df = df.select(
    F.col('data').getItem(0).cast('string').alias('ip_address'),
    F.col('data').getItem(1).cast('string').alias('user_name'),
    F.col('data').getItem(2).cast('integer').alias('user_id'),
    F.col('data').getItem(3).cast('timestamp').alias('timestamp'),
    F.col('data').getItem(4).cast('string').alias('http_method'),
    F.col('data').getItem(5).cast('string').alias('path'),
    F.col('data').getItem(6).cast('integer').alias('status_code')
)

test(df)

In [8]:
DB_HOST = 'aws-0-us-west-1.pooler.supabase.com'
DB_NAME = 'postgres'
DB_PORT = '6543'
DB_USERNAME = 'postgres.mecvsqfdytymqtblrlgo'
DB_PASSWORD = 'TWZ7i7KeGaFF'
DB_URL = f'jdbc:postgresql://{DB_HOST}:{DB_PORT}/{DB_NAME}?prepareThreshold=0'
DB_PROPERTIES = {'user': DB_USERNAME, 'password': DB_PASSWORD, 'driver': 'org.postgresql.Driver'}

In [79]:
# def write_to_postgres(df, epoch_id):
#     mode='append'
#     table_name='server_logs'
#     df.write.jdbc(url=DB_URL, 
#                   table=table_name, 
#                   mode=mode, 
#                   properties=DB_PROPERTIES
#                  )
# commenting out to try Erik's solution

In [20]:
def write_to_postgres(df, table_name: str, write_mode: str = 'append', output_mode: str = 'append'):
    write = lambda df, epoch_id: df.write.jdbc(url=DB_URL, table=table_name, mode=write_mode, properties=DB_PROPERTIES)
    return df.writeStream.foreachBatch(write).outputMode(output_mode).start()

In [21]:
# query = df.writeStream.foreachBatch(write_to_postgres).outputMode('append').start()
query = write_to_postgres(df, 'server_logs')

In [22]:
query.stop()

## Aggregate errors
#### 2. Filter the data to include only server logs that have an error code (404 or 500)

In [12]:
errors_df = df.filter(F.col('status_code') != 200).select('path', 'status_code')
test(errors_df)

#### Aggregate by path and report the total number of errors: output 4 columns: 
1. path: the path
2. 404_errors: how many 404 errors that path received
3. 500_errors: how many 500 errors that path received
4. total_errors: total errors the given path received

In [13]:
errors_df = errors_df.withColumn('404_errors', F.when((df.status_code == 404), F.lit(1)).otherwise(F.lit(0)))
errors_df = errors_df.withColumn('500_errors', F.when((df.status_code == 500), F.lit(1)).otherwise(F.lit(0)))
errors_df = errors_df.withColumn('total_errors', F.when((df.status_code == 500) | (df.status_code == 404), F.lit(1)).otherwise(F.lit(0)))

In [14]:
errors_df = errors_df.drop('status_code')

In [15]:
errors_df = errors_df.groupBy('path').agg(F.sum('404_errors').alias('404_errors'),
    F.sum('500_errors').alias('500_errors'),
    F.sum('total_errors').alias('total_errors'))

In [16]:
test(errors_df, output_mode='complete', sleep_time=10)

In [23]:
query = write_to_postgres(errors_df, 'errors_by_path', write_mode='overwrite', output_mode='complete')

In [24]:
query.stop()

In [19]:
window_duration = '1 minute'
window_df = (
    df.groupBy(F.window('timestamp', window_duration), 'ip_address')
    .agg(F.count('ip_address').alias('count'))
    .select(
        F.col('window.start').alias('window_start'),
        F.col('window.end').alias('window_end'),
        'ip_address',
        'count'
    ).withColumn('dos_attack', F.col('count') > 100)
    .orderBy(F.desc('window_start'))
)
test(window_df, output_mode='complete', sleep_time=30)