## Q1.Security Data Analysis

# (1)

In [25]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.config('spark.ui.port', 4040).appName("Project_Security").getOrCreate()

http_path = "Q1_data/*/http.log.gz"
dns_path = "Q1_data/*/dns.log.gz"

#读取成为dataframe
df_http = spark.read.json(http_path)
df_dns = spark.read.json(dns_path)

#通过cast转换格式至Timestamp
df_http = df_http.withColumn("ts", df_http["ts"].cast("Timestamp"))

#创建临时视图
df_http.createOrReplaceTempView("http_log")
df_dns.createOrReplaceTempView("dns_log")

                                                                                

# (2)

In [26]:
#Spark SQL API
#过滤、分组和排序
##这里count(*)就是计算每个分组的行数，也就是uri数
##添加了一个筛选无内容uri的做法，防止空uri混入影响数据判断
operated_sql_http = spark.sql("""
    SELECT uri, COUNT(*) AS access_count
    FROM http_log
    WHERE status_code = 200 AND method = 'GET' AND uri != '/'
    GROUP BY uri
    ORDER BY access_count DESC
""")

operated_sql_http.show(5)



+--------------------+------------+
|                 uri|access_count|
+--------------------+------------+
|/admin/config.php...|         556|
|  /main.php?logout=1|         194|
|/top.php?stuff=15...|         191|
|            /top.php|         179|
|/main.php?stuff=1...|         172|
+--------------------+------------+
only showing top 5 rows



                                                                                

In [15]:
#Spark Dataframe
#过滤、分组和排序
from pyspark.sql.functions import col, count
filtered_http = df_http.filter((col("status_code") == 200) & (col("method") == "GET")& (col("uri") != "/"))
grouped_http = filtered_http.groupBy("uri").agg(count("*").alias("access_count"))
operated_df_http = grouped_http.orderBy(col("access_count").desc())
operated_df_http.show(5)



+--------------------+------------+
|                 uri|access_count|
+--------------------+------------+
|/admin/config.php...|         556|
|  /main.php?logout=1|         194|
|/top.php?stuff=15...|         191|
|            /top.php|         179|
|/main.php?stuff=1...|         172|
+--------------------+------------+
only showing top 5 rows



                                                                                

# (3)

In [17]:
##join
joined_df = spark.sql("""
    Select *
    From http_log
    Join dns_log
    On http_log.uid = dns_log.uid
""")
joined_df.show(5)

                                                                                

+----+---------+---------+---------+---------+------+--------------+----------+---------------+------+-------+--------+----------------+--------------+----------+---------------+-----------------+-----------+----------+----+-----------+---+---+---+----------+--------+-------+---+---+---+---+----+---+-------+---------+---------+---------+---------+-----+------+-----------+-----+----------+-----+-----+----------+--------+---+--------+---+---+
|host|id.orig_h|id.orig_p|id.resp_h|id.resp_p|method|orig_filenames|orig_fuids|orig_mime_types|origin|proxied|referrer|request_body_len|resp_filenames|resp_fuids|resp_mime_types|response_body_len|status_code|status_msg|tags|trans_depth| ts|uid|uri|user_agent|username|version| AA| RA| RD| TC|TTLs|  Z|answers|id.orig_h|id.orig_p|id.resp_h|id.resp_p|proto|qclass|qclass_name|qtype|qtype_name|query|rcode|rcode_name|rejected|rtt|trans_id| ts|uid|
+----+---------+---------+---------+---------+------+--------------+----------+---------------+------+-------+

In [19]:
##calculate
calculate_http = spark.sql("""
    SELECT uri, ROUND((COUNT(proto = 'tcp')/COUNT(*))*100,2) AS tcp_percentage
    FROM http_log
    WHERE status_code = 200 AND method = 'GET' AND uri != '/'
    GROUP BY uri
    ORDER BY access_count DESC
""")
calculate_http.show(5)
##http_log的数据中不包含proto列，但http_log和dns_log合成后的结果为空，故无法实验

AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `proto` cannot be resolved. Did you mean one of the following? [`host`, `ts`, `uid`, `uri`, `method`].; line 2 pos 29;
'Sort ['access_count DESC NULLS LAST], true
+- 'Aggregate [uri#267], [uri#267, 'ROUND((('COUNT(('proto = tcp)) / count(1)) * 100), 2) AS tcp_percentage#1540]
   +- Filter (((status_code#261L = cast(200 as bigint)) AND (method#249 = GET)) AND NOT (uri#267 = /))
      +- SubqueryAlias http_log
         +- View (`http_log`, [host#244,id.orig_h#245,id.orig_p#246L,id.resp_h#247,id.resp_p#248L,method#249,orig_filenames#250,orig_fuids#251,orig_mime_types#252,origin#253,proxied#254,referrer#255,request_body_len#256L,resp_filenames#257,resp_fuids#258,resp_mime_types#259,response_body_len#260L,status_code#261L,status_msg#262,tags#263,trans_depth#264L,ts#354,uid#266,uri#267,user_agent#268,username#269,version#270])
            +- Project [host#244, id.orig_h#245, id.orig_p#246L, id.resp_h#247, id.resp_p#248L, method#249, orig_filenames#250, orig_fuids#251, orig_mime_types#252, origin#253, proxied#254, referrer#255, request_body_len#256L, resp_filenames#257, resp_fuids#258, resp_mime_types#259, response_body_len#260L, status_code#261L, status_msg#262, tags#263, trans_depth#264L, cast(ts#265 as timestamp) AS ts#354, uid#266, uri#267, ... 3 more fields]
               +- Relation [host#244,id.orig_h#245,id.orig_p#246L,id.resp_h#247,id.resp_p#248L,method#249,orig_filenames#250,orig_fuids#251,orig_mime_types#252,origin#253,proxied#254,referrer#255,request_body_len#256L,resp_filenames#257,resp_fuids#258,resp_mime_types#259,response_body_len#260L,status_code#261L,status_msg#262,tags#263,trans_depth#264L,ts#265,uid#266,uri#267,... 3 more fields] json


## Q2.Survival Analysis

# (1)

# (2)

# (3)

# (4)