In [1]:
from pyspark.sql import SparkSession

import pyspark.sql.functions as func


In [2]:
# 스파크 세션 생성 (Windows에서만 필요한 설정 부분입니다!)
spark = SparkSession.builder.appName("StructuredStreaming").getOrCreate()


In [5]:
# logs 디렉토리를 모니터링하여 새로운 로그 데이터를 읽어와서 accessLines 변수에 저장한다.
accessLines = spark.read.text("logs")
accessLines.collect()[:10]

[Row(value='66.249.75.159 - - [29/Nov/2015:03:50:05 +0000] "GET /robots.txt HTTP/1.1" 200 55 "-" "Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)"'),
 Row(value='66.249.75.168 - - [29/Nov/2015:03:50:06 +0000] "GET /blog/ HTTP/1.1" 200 8083 "-" "Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)"'),
 Row(value='185.71.216.232 - - [29/Nov/2015:03:53:15 +0000] "POST /wp-login.php HTTP/1.1" 200 1691 "http://nohatenews.com/wp-login.php" "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:40.0) Gecko/20100101 Firefox/40.0"'),
 Row(value='54.165.199.171 - - [29/Nov/2015:04:32:27 +0000] "GET /sitemap_index.xml HTTP/1.0" 200 592 "-" "W3 Total Cache/0.9.4.1"'),
 Row(value='54.165.199.171 - - [29/Nov/2015:04:32:27 +0000] "GET /post-sitemap.xml HTTP/1.0" 200 2502 "-" "W3 Total Cache/0.9.4.1"'),
 Row(value='54.165.199.171 - - [29/Nov/2015:04:32:27 +0000] "GET /page-sitemap.xml HTTP/1.0" 200 11462 "-" "W3 Total Cache/0.9.4.1"'),
 Row(value='54.165.199.171 - - [29/N

In [8]:


# 정규 표현식을 사용하여 공통 로그 형식을 DataFrame으로 파싱합니다.
contentSizeExp = r'\s(\d+)$'  # content size를 추출하기 위한 정규 표현식
statusExp = r'\s(\d{3})\s'  # 상태 코드를 추출하기 위한 정규 표현식
generalExp = r'\"(\S+)\s(\S+)\s*(\S*)\"'  # 메소드, 엔드포인트, 프로토콜을 추출하기 위한 정규 표현식
timeExp = r'\[(\d{2}/\w{3}/\d{4}:\d{2}:\d{2}:\d{2} -\d{4})]'  # 타임스탬프를 추출하기 위한 정규 표현식
hostExp = r'(^\S+\.[\S+\.]+\S+)\s'  # 호스트를 추출하기 위한 정규 표현식

logsDF = accessLines.select(func.regexp_extract('value', hostExp, 1).alias('host'),  # 호스트 추출
                         func.regexp_extract('value', timeExp, 1).alias('timestamp'),  # 타임스탬프 추출
                         func.regexp_extract('value', generalExp, 1).alias('method'),  # 메소드 추출
                         func.regexp_extract('value', generalExp, 2).alias('endpoint'),  # 엔드포인트 추출
                         func.regexp_extract('value', generalExp, 3).alias('protocol'),  # 프로토콜 추출
                         func.regexp_extract('value', statusExp, 1).cast('integer').alias('status'),  # 상태 코드 추출
                         func.regexp_extract('value', contentSizeExp, 1).cast('integer').alias('content_size'))  # content size 추출
print(logsDF)
logsDF.collect()[:5]

DataFrame[host: string, timestamp: string, method: string, endpoint: string, protocol: string, status: int, content_size: int]


[Row(host='66.249.75.159', timestamp='', method='GET', endpoint='/robots.txt', protocol='HTTP/1.1', status=200, content_size=None),
 Row(host='66.249.75.168', timestamp='', method='GET', endpoint='/blog/', protocol='HTTP/1.1', status=200, content_size=None),
 Row(host='185.71.216.232', timestamp='', method='POST', endpoint='/wp-login.php', protocol='HTTP/1.1', status=200, content_size=None),
 Row(host='54.165.199.171', timestamp='', method='GET', endpoint='/sitemap_index.xml', protocol='HTTP/1.0', status=200, content_size=None),
 Row(host='54.165.199.171', timestamp='', method='GET', endpoint='/post-sitemap.xml', protocol='HTTP/1.0', status=200, content_size=None)]

In [9]:
logsDF2 = logsDF.withColumn("eventTime", func.current_timestamp())  # 현재 타임스탬프를 eventTime 컬럼으로 추가
logsDF2.collect()[:5]

[Row(host='66.249.75.159', timestamp='', method='GET', endpoint='/robots.txt', protocol='HTTP/1.1', status=200, content_size=None, eventTime=datetime.datetime(2023, 12, 3, 17, 9, 14, 456305)),
 Row(host='66.249.75.168', timestamp='', method='GET', endpoint='/blog/', protocol='HTTP/1.1', status=200, content_size=None, eventTime=datetime.datetime(2023, 12, 3, 17, 9, 14, 456305)),
 Row(host='185.71.216.232', timestamp='', method='POST', endpoint='/wp-login.php', protocol='HTTP/1.1', status=200, content_size=None, eventTime=datetime.datetime(2023, 12, 3, 17, 9, 14, 456305)),
 Row(host='54.165.199.171', timestamp='', method='GET', endpoint='/sitemap_index.xml', protocol='HTTP/1.0', status=200, content_size=None, eventTime=datetime.datetime(2023, 12, 3, 17, 9, 14, 456305)),
 Row(host='54.165.199.171', timestamp='', method='GET', endpoint='/post-sitemap.xml', protocol='HTTP/1.0', status=200, content_size=None, eventTime=datetime.datetime(2023, 12, 3, 17, 9, 14, 456305))]

In [11]:

# Keep a running count of endpoints

# 엔드포인트별로 실행 횟수를 계산합니다.
# logsDF2 데이터프레임을 window 함수를 사용하여 
# eventTime을 30초 간격으로 구분하고, window 간격을 10초
# endpoint를 그룹화하여 count() 함수를 적용하여 endpointCounts 데이터프레임을 생성한다.
endpointCounts = logsDF2.groupBy(func.window(func.col("eventTime"), \
      "30 seconds", "10 seconds"), func.col("endpoint")).count()  
endpointCounts.collect()[:5]


[Row(window=Row(start=datetime.datetime(2023, 12, 3, 17, 10, 40), end=datetime.datetime(2023, 12, 3, 17, 11, 10)), endpoint='/?p=13636', count=1),
 Row(window=Row(start=datetime.datetime(2023, 12, 3, 17, 10, 30), end=datetime.datetime(2023, 12, 3, 17, 11)), endpoint='/technology/', count=68),
 Row(window=Row(start=datetime.datetime(2023, 12, 3, 17, 10, 30), end=datetime.datetime(2023, 12, 3, 17, 11)), endpoint='/feeds/tampa-bay-times-top-news/', count=1),
 Row(window=Row(start=datetime.datetime(2023, 12, 3, 17, 10, 40), end=datetime.datetime(2023, 12, 3, 17, 11, 10)), endpoint='/robots.txt', count=123),
 Row(window=Row(start=datetime.datetime(2023, 12, 3, 17, 10, 30), end=datetime.datetime(2023, 12, 3, 17, 11)), endpoint='/minneapolis-sports/', count=58)]

In [13]:

sortedEndpointCounts = endpointCounts.orderBy(func.col("count").desc())  # 실행 횟수를 기준으로 내림차순으로 정렬
sortedEndpointCounts.collect()[:5]

[Row(window=Row(start=datetime.datetime(2023, 12, 3, 17, 15, 40), end=datetime.datetime(2023, 12, 3, 17, 16, 10)), endpoint='/xmlrpc.php', count=68494),
 Row(window=Row(start=datetime.datetime(2023, 12, 3, 17, 16), end=datetime.datetime(2023, 12, 3, 17, 16, 30)), endpoint='/xmlrpc.php', count=68494),
 Row(window=Row(start=datetime.datetime(2023, 12, 3, 17, 15, 50), end=datetime.datetime(2023, 12, 3, 17, 16, 20)), endpoint='/xmlrpc.php', count=68494),
 Row(window=Row(start=datetime.datetime(2023, 12, 3, 17, 16), end=datetime.datetime(2023, 12, 3, 17, 16, 30)), endpoint='/wp-login.php', count=1923),
 Row(window=Row(start=datetime.datetime(2023, 12, 3, 17, 15, 50), end=datetime.datetime(2023, 12, 3, 17, 16, 20)), endpoint='/wp-login.php', count=1923)]

In [None]:

# Display the stream to the console

# 스트림을 콘솔에 출력합니다.
query = sortedEndpointCounts.writeStream.outputMode("complete").format("console") \
      .queryName("counts").start()

# Wait until we terminate the scripts
query.awaitTermination()  # 스크립트가 종료될 때까지 대기합니다.

# Stop the session
spark.stop()  # SparkSession을 종료합니다.
