In [1]:
from typing import List
from datetime import datetime
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
import pandas as pd
import pyspark
import time

In [2]:
ss = SparkSession.builder \
        .master("local") \
        .appName("log sql ex") \
        .getOrCreate()

In [3]:
# 스키마 정의
fields = StructType([
        StructField("ip", StringType(), False),
        StructField("timestamp", StringType(), False),
        StructField("method", StringType(), False),
        StructField("endpoint", StringType(), False),
        StructField("status_code", StringType(), False),
        StructField("latency", IntegerType(), False),  # 단위 : milliseconds
    ])
log_data = ss.read.schema(fields).csv("data/log.csv")

In [4]:
table_name = "log_data"
log_data.createOrReplaceTempView(table_name)

In [5]:
# 데이터 확인
ss.sql(f"SELECT * FROM {table_name}").show()

+---------------+-------------------+------+-----------+-----------+-------+
|             ip|          timestamp|method|   endpoint|status_code|latency|
+---------------+-------------------+------+-----------+-----------+-------+
| 130.31.184.234|2023-02-26 04:15:21| PATCH|     /users|        400|     61|
|  212.228.86.35|2023-02-26 04:15:21| PATCH| /customers|        400|     66|
|  28.252.170.12|2023-02-26 04:15:21|   GET|    /events|        401|     73|
|   180.97.92.48|2023-02-26 04:15:22| PATCH|   /parsers|        503|     17|
|   73.218.61.17|2023-02-26 04:15:22|   GET|     /lists|        201|     91|
|   24.15.193.50|2023-02-26 04:15:23|   PUT|      /auth|        400|     24|
|  31.181.214.70|2023-02-26 04:15:23| PATCH|      /auth|        404|     17|
| 192.175.62.126|2023-02-26 04:15:23|   GET| /playbooks|        201|     45|
|  50.78.180.190|2023-02-26 04:15:23|DELETE|    /events|        403|     65|
|151.183.175.203|2023-02-26 04:15:23|DELETE| /playbooks|        400|     99|

In [6]:
# 스키마 확인
ss.sql(f"SELECT * FROM {table_name}").printSchema()

root
 |-- ip: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- method: string (nullable = true)
 |-- endpoint: string (nullable = true)
 |-- status_code: string (nullable = true)
 |-- latency: integer (nullable = true)



In [7]:
# a) 컬럼 변환
# a-1) 현재 latency 컬럼의 단위는 millseconds인데, seconds 단위인 latency_seconds 컬럼을 새로 만들기.
query = f"""
    SELECT *, 
    latency / 1000 AS latency_seconds
    FROM {table_name}
"""

ss.sql(query).show()

+---------------+-------------------+------+-----------+-----------+-------+---------------+
|             ip|          timestamp|method|   endpoint|status_code|latency|latency_seconds|
+---------------+-------------------+------+-----------+-----------+-------+---------------+
| 130.31.184.234|2023-02-26 04:15:21| PATCH|     /users|        400|     61|          0.061|
|  212.228.86.35|2023-02-26 04:15:21| PATCH| /customers|        400|     66|          0.066|
|  28.252.170.12|2023-02-26 04:15:21|   GET|    /events|        401|     73|          0.073|
|   180.97.92.48|2023-02-26 04:15:22| PATCH|   /parsers|        503|     17|          0.017|
|   73.218.61.17|2023-02-26 04:15:22|   GET|     /lists|        201|     91|          0.091|
|   24.15.193.50|2023-02-26 04:15:23|   PUT|      /auth|        400|     24|          0.024|
|  31.181.214.70|2023-02-26 04:15:23| PATCH|      /auth|        404|     17|          0.017|
| 192.175.62.126|2023-02-26 04:15:23|   GET| /playbooks|        201|  

In [21]:
# a-2) StringType으로 받은 timestamp 컬럼을, TimestampType으로 변경.
ss.sql(f"""
    SELECT ip, TIMESTAMP(timestamp) AS timestamp, method, endpoint, status_code, latency
    FROM {table_name}
    """).show()

+---------------+-------------------+------+-----------+-----------+-------+
|             ip|          timestamp|method|   endpoint|status_code|latency|
+---------------+-------------------+------+-----------+-----------+-------+
| 130.31.184.234|2023-02-26 04:15:21| PATCH|     /users|        400|     61|
|  212.228.86.35|2023-02-26 04:15:21| PATCH| /customers|        400|     66|
|  28.252.170.12|2023-02-26 04:15:21|   GET|    /events|        401|     73|
|   180.97.92.48|2023-02-26 04:15:22| PATCH|   /parsers|        503|     17|
|   73.218.61.17|2023-02-26 04:15:22|   GET|     /lists|        201|     91|
|   24.15.193.50|2023-02-26 04:15:23|   PUT|      /auth|        400|     24|
|  31.181.214.70|2023-02-26 04:15:23| PATCH|      /auth|        404|     17|
| 192.175.62.126|2023-02-26 04:15:23|   GET| /playbooks|        201|     45|
|  50.78.180.190|2023-02-26 04:15:23|DELETE|    /events|        403|     65|
|151.183.175.203|2023-02-26 04:15:23|DELETE| /playbooks|        400|     99|

In [23]:
# b) filter
# b-1) status_code = 400, endpoint = "/users"인 row만 필터링
ss.sql(f"""
SELECT * FROM {table_name}
WHERE status_code = '400' AND endpoint = '/users'
""").show()

+---------------+-------------------+------+--------+-----------+-------+
|             ip|          timestamp|method|endpoint|status_code|latency|
+---------------+-------------------+------+--------+-----------+-------+
| 130.31.184.234|2023-02-26 04:15:21| PATCH|  /users|        400|     61|
|  28.15.191.142|2023-02-26 04:15:23|DELETE|  /users|        400|     64|
|  230.10.23.194|2023-02-26 04:15:27|  POST|  /users|        400|     81|
|146.176.215.164|2023-02-26 04:15:31|  POST|  /users|        400|      7|
|  47.11.220.144|2023-02-26 04:15:35|   GET|  /users|        400|     36|
|  162.75.159.67|2023-02-26 04:15:43|DELETE|  /users|        400|     31|
|   133.26.14.69|2023-02-26 04:15:46|  POST|  /users|        400|     97|
|   8.71.145.153|2023-02-26 04:15:52| PATCH|  /users|        400|     50|
| 212.58.227.134|2023-02-26 04:15:50|DELETE|  /users|        400|     98|
|118.235.209.255|2023-02-26 04:15:59|   GET|  /users|        400|     37|
|  146.35.62.119|2023-02-26 04:16:06| 

In [24]:
# c) group by
# c-1) method, endpoint 별 latency의 최댓값, 최솟값, 평균값 확인
ss.sql(f"""
SELECT 
    method, 
    endpoint, 
    MAX(latency) AS max_latency,
    MIN(latency) AS min_latency,
    AVG(latency) AS mean_latency
FROM  {table_name}
GROUP BY method, endpoint
""").show()

+------+-----------+-----------+-----------+------------------+
|method|   endpoint|max_latency|min_latency|      mean_latency|
+------+-----------+-----------+-----------+------------------+
|   GET|    /events|         85|          1|41.354838709677416|
|DELETE| /playbooks|         99|          6| 51.10909090909091|
|DELETE| /customers|         98|          5| 53.05769230769231|
|DELETE|     /lists|         99|          4| 47.73529411764706|
| PATCH|   /parsers|         99|          1|50.578947368421055|
| PATCH|     /users|        100|          1| 50.82692307692308|
|   GET| /customers|        100|          1| 49.02564102564103|
|   PUT| /playbooks|         99|          1| 43.06818181818182|
|  POST| /customers|         98|          2| 51.78688524590164|
|   GET|     /users|         97|          1| 49.04347826086956|
|   PUT|     /users|         98|          1| 51.22857142857143|
|  POST|    /events|        100|          3|48.791666666666664|
|   PUT|     /lists|        100|        

In [26]:
# c-2) 분 단위의, 중복을 제거한 ip 리스트, 개수 뽑기
# ss.sql(f"""
ss.sql(f"""
SELECT 
    hour(date_trunc('HOUR', timestamp)) AS hour,
    minute(date_trunc('MINUTE', timestamp)) AS minute,
    collect_set(ip) AS ip_list,
    count(ip) AS ip_count
FROM {table_name}
GROUP BY hour, minute
ORDER BY hour, minute
""").show()

+----+------+--------------------+--------+
|hour|minute|             ip_list|ip_count|
+----+------+--------------------+--------+
|   4|    15|[206.229.65.248, ...|     848|
|   4|    16|[111.237.158.56, ...|    1408|
|   4|    17|[38.212.159.99, 8...|      24|
+----+------+--------------------+--------+

