In [4]:
from pyspark.sql import SparkSession  
import pyspark.sql.functions as func
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType, BooleanType, DoubleType
from dash import Dash, html, dash_table, dcc, callback, Output, Input
import pandas as pd
import plotly.express as px

# Create a SparkSession
spark = (SparkSession
  .builder
  .config("spark.submit.deployMode", "client")
  .appName("SparkSQLExampleApp")
  .getOrCreate())

In [5]:
datafile='log20170112min.csv'
schema = StructType([
    StructField("ip", StringType(), False),
    StructField("date", StringType(), False),
    StructField("time", StringType(), False),
    StructField("zone", DoubleType(), True),
    StructField("cik", DoubleType(), False),
    StructField("accession", StringType(), False),
    StructField("extention", StringType(), True),
    StructField("code", DoubleType(), False),
    StructField("size", DoubleType(), False),
    StructField("idx", DoubleType(), False),
    StructField("norefer", DoubleType(), True),
    StructField("noagent", DoubleType(), True),
    StructField("find", DoubleType(), True),
    StructField("crawler", DoubleType(), True),
    StructField("browser", StringType(), True)
])

In [6]:
df = (spark.read.format("csv")
  .schema(schema)
  .option("header", "true")
  .load(datafile))

In [7]:
df=df.withColumn("timestamp", concat(df.date, lit(" "), df.time).cast("Timestamp"))
df=df.drop("date").drop("time").drop("zone").drop("norefer").drop("browser")
df=df.where(df.code>199.0).where(df.noagent<=1.0).dropna()

In [8]:
df=df.withColumn("cik",df.cik.cast("Integer")).withColumn("code",df.code.cast("Integer")).withColumn("size",df.size.cast("Integer")).withColumn("idx",df.idx.cast("Boolean")).withColumn("noagent",df.noagent.cast("Boolean")).withColumn("crawler",df.crawler.cast("Boolean")).withColumn("find",df.find.cast("Integer"))
df=df.withColumn("webpage", concat(df.cik,lit("/"),df.accession,lit("/"),df.extention))
splitcol = split(df.extention,"\.")
df2 = df.withColumn("sc",splitcol).withColumn("al",size(splitcol))
df = df2.where(df2.al==2).withColumn("filetype", element_at(df2.sc,-1)).drop("sc").drop("al")
valid_filetypes = ["htm","txt","xml","xsd","css","pdf","html","zip","xls","xlsx","fil","xm","FIL","XML","png","asp","json","xsl","PDF","khtml"]
df = df.filter(df.filetype.isin(valid_filetypes))
df

DataFrame[ip: string, cik: int, accession: string, extention: string, code: int, size: int, idx: boolean, noagent: boolean, find: int, crawler: boolean, timestamp: timestamp, webpage: string, filetype: string]

In [9]:
df.show(10,False)

                                                                                

+---------------+-------+--------------------+-----------------------+----+-------+-----+-------+----+-------+-------------------+---------------------------------------------------+--------+
|ip             |cik    |accession           |extention              |code|size   |idx  |noagent|find|crawler|timestamp          |webpage                                            |filetype|
+---------------+-------+--------------------+-----------------------+----+-------+-----+-------+----+-------+-------------------+---------------------------------------------------+--------+
|107.22.225.dea |1403095|0001209191-17-003373|-index.htm             |200 |7483   |true |false  |10  |false  |2017-01-12 00:00:00|1403095/0001209191-17-003373/-index.htm            |htm     |
|116.236.230.ccg|1025771|0001477932-17-000158|R1.htm                 |404 |0      |false|false  |10  |true   |2017-01-12 00:00:00|1025771/0001477932-17-000158/R1.htm                |htm     |
|117.89.50.ihd  |926042 |0000926042-17-0

In [10]:
#Number of logs
rec_cnt=df.count()
rec_cnt

                                                                                

763163

In [11]:
# Most requested file types
df_mrft = df.groupBy("filetype").count().sort("count",ascending=False)
df_mrft.show(10)
dfp_mrft = df_mrft.limit(10).toPandas()

                                                                                

+--------+------+
|filetype| count|
+--------+------+
|     htm|346517|
|     txt|292305|
|     xml|100127|
|     xsd|  8066|
|     pdf|  6800|
|    html|  5799|
|     zip|  1028|
|     xls|   986|
|    xlsx|   954|
|     fil|   259|
+--------+------+
only showing top 10 rows



                                                                                

In [12]:
# Busiest time of day (requests)
df_btod = df.groupBy(window(df["timestamp"],"5 minute")).count()
df_btod = df_btod.withColumn("sttime", df_btod.window.start).drop("window").select("sttime","count")
df_btod = df_btod.sort("count",ascending=False).withColumnRenamed("count","requests")
dfp_btod = df_btod.limit(3).toPandas()

                                                                                

In [13]:
# Busiest time of day (request size)
df_btods = df.groupBy(window(df["timestamp"],"5 minute")).agg({"size":"sum"})
df_btods = df_btods.withColumn("sttime", df_btods.window.start).drop("window").withColumnRenamed("sum(size)","size").select("sttime","size")
df_btods = df_btods.sort("size",ascending=False)
dfp_btods = df_btods.limit(3).toPandas()

                                                                                

In [14]:
# Most accessed webpage
df_maw = df.groupBy("webpage").count()
df_maw = df_maw.sort("count",ascending=False).limit(10)
dfp_maw = df_maw.limit(3).toPandas()

                                                                                

In [15]:
# Largest document accessed
df_lda = df.sort("size",ascending=False).limit(10).select("webpage","size")
dfp_lda = df_lda.limit(3).toPandas()

                                                                                

In [16]:
# HTML response codes distribution
df_hrcd = df.groupBy("code").count()
dfp_hrcd = df_hrcd.toPandas()

                                                                                

In [17]:
# HTML response codes distribution over time
df_hrcdt = df.groupBy([window(df["timestamp"],"5 minute"),"code"]).count()
df_hrcdt = df_hrcdt.withColumn("sttime", df_hrcdt.window.start).drop("window").select("sttime","code","count")
dfp_hrcdt = df_hrcdt.toPandas()

                                                                                

In [18]:
# Requests histogram
df_rh = df_btod.sort("sttime")
dfp_rh = df_rh.toPandas()

                                                                                

In [19]:
# Request size histogram
df_rsh = df_btods.sort("sttime")
dfp_rsh = df_rsh.toPandas()

                                                                                

In [20]:
# Index page requests histogram
df_irh = df.where(df.idx==True).groupBy(window(df["timestamp"],"5 minute")).count()
df_irh = df_irh.withColumn("sttime", df_irh.window.start).drop("window").select("sttime","count")
df_irh = df_irh.sort("sttime")
dfp_irh = df_irh.toPandas()

                                                                                

In [21]:
# Crawler request histogram
df_crh = df.where(df.crawler==True).groupBy(window(df["timestamp"],"5 minute")).count()
df_crh = df_crh.withColumn("sttime", df_crh.window.start).drop("window").select("sttime","count")
df_crh = df_crh.sort("sttime")
dfp_crh = df_crh.toPandas()

                                                                                

In [22]:
# Crawler request size in comparison to others
df_crsc = df.groupBy("crawler").agg({"size":"sum"}).withColumnRenamed("sum(size)","size")
dfp_crsc = df_crsc.toPandas()

                                                                                

In [35]:
# Requests by source
df_rs = df.groupBy("find").agg({"find":"count"}).withColumnRenamed("count(find)","count")
df_rs = df_rs.sort("find")
source_map = {1:'get Company',2:'get Current',3:'Find Companies',4:'Search EDGAR',5:'EDGARFS Client',6:'Current',7:'Archive',8:'Viewer',9:'Index',10:'Other'}
dfp_rs = df_rs.toPandas()
dfp_rs["find"] = dfp_rs["find"].map(source_map)

                                                                                

In [32]:
# Sources by time
df_st = df.groupBy([window(df["timestamp"],"5 minute"),"find"]).count()
df_st = df_st.withColumn("sttime", df_st.window.start).drop("window").select("sttime","find","count").sort(["sttime","find","count"])
dfp_st = df_st.toPandas()
dfp_st["find"] = dfp_st["find"].map(source_map)

                                                                                

In [25]:
# Most Requests by IP Address
df_rip = df.groupBy("ip").count()
df_rip = df_rip.sort("count",ascending=False).limit(10)
dfp_rip = df_rip.limit(3).toPandas()

                                                                                

In [26]:
# Largest requests by IP address
df_lrip = df.groupBy("ip").agg({"size":"sum"}).withColumnRenamed("sum(size)","size")
df_lrip = df_lrip.sort("size",ascending=False)
dfp_lrip = df_lrip.limit(3).toPandas()

                                                                                

In [27]:
# Most popular CIKs
df_mpcik = df.groupBy("cik").count()
df_mpcik = df_mpcik.sort("count",ascending=False)
dfp_mpcik = df_mpcik.limit(3).toPandas()

                                                                                

In [28]:
# Largest CIK requests
df_lcik = df.groupBy("cik").agg({"size":"sum"}).withColumnRenamed("sum(size)","size")
df_lcik = df_lcik.sort("size",ascending=False)
dfp_lcik = df_lcik.limit(3).toPandas()

                                                                                

In [29]:
dl = {"Attribute": ["IP Address","","","Central Index Key (CIK)","","","Webpage","","","Time of day","","",],
      "":[1,2,3,1,2,3,1,2,3,1,2,3],
      "Most Requests":[dfp_rip["ip"][0]+" - "+str(dfp_rip["count"][0]),dfp_rip["ip"][1]+" - "+str(dfp_rip["count"][1]),dfp_rip["ip"][2]+" - "+str(dfp_rip["count"][2]),
                       str(dfp_mpcik["cik"][0])+" - "+str(dfp_mpcik["count"][0]),str(dfp_mpcik["cik"][1])+" - "+str(dfp_mpcik["count"][1]),str(dfp_mpcik["cik"][2])+" - "+str(dfp_mpcik["count"][2]),
                       dfp_maw["webpage"][0]+" - "+str(dfp_maw["count"][0]),dfp_maw["webpage"][1]+" - "+str(dfp_maw["count"][1]),dfp_maw["webpage"][2]+" - "+str(dfp_maw["count"][2]),
                       str(dfp_btod["sttime"][0])+" - "+str(dfp_btod["requests"][0]),str(dfp_btod["sttime"][1])+" - "+str(dfp_btod["requests"][1]),str(dfp_btod["sttime"][2])+" - "+str(dfp_btod["requests"][2])],
      "Largest Requests":[dfp_lrip["ip"][0]+" - "+str(dfp_lrip["size"][0]),dfp_lrip["ip"][1]+" - "+str(dfp_lrip["size"][1]),dfp_lrip["ip"][2]+" - "+str(dfp_lrip["size"][2]),
                           str(dfp_lcik["cik"][0])+" - "+str(dfp_lcik["size"][0]),str(dfp_lcik["cik"][1])+" - "+str(dfp_lcik["size"][1]),str(dfp_lcik["cik"][2])+" - "+str(dfp_lcik["size"][2]),
                           dfp_lda["webpage"][0]+" - "+str(dfp_lda["size"][0]),dfp_lda["webpage"][1]+" - "+str(dfp_lda["size"][1]),dfp_lda["webpage"][2]+" - "+str(dfp_lda["size"][2]),
                           str(dfp_btods["sttime"][0])+" - "+str(dfp_btods["size"][0]),str(dfp_btods["sttime"][1])+" - "+str(dfp_btods["size"][1]),str(dfp_btods["sttime"][2])+" - "+str(dfp_btods["size"][2])]}
dfp_daily = pd.DataFrame(dl)


In [33]:
# Create dashboard
# Initialize the app
app = Dash()

# App layout
#app.layout = [
#    html.Div(children='My First App with Data'),
#    dash_table.DataTable(data=df.to_dict('records'), page_size=10),
#    dcc.Graph(figure=px.pie(df,"ip","size"))
#]
app.layout = html.Div([
    dcc.Tabs(id="tabs", value='daily', children=[
        dcc.Tab(label='Dashboard', value='daily')
    ]),
    html.Div(id='tab-content')
])

@app.callback(Output('tab-content', 'children'),
              Input('tabs', 'value'))
def render_content(tab):
        return html.Div([
            html.H2('Server log analyses'),
            html.H3('Number of records: '+str(rec_cnt)),
            html.Div([
                dcc.Graph(figure=px.pie(dfp_mrft, names='filetype', values='count', title='File type requested'))
            ], style={'width': '48%', 'display': 'inline-block', 'border':'1px solid black', 'margin': '2px'}),
            html.Div([
                dcc.Graph(figure=px.pie(dfp_hrcd, names='code', values='count', title='HTML Response code'))
            ], style={'width': '48%', 'display': 'inline-block', 'border':'1px solid black', 'margin': '2px'}),
            html.Div([
                dcc.Graph(figure=px.pie(dfp_crsc, names='crawler', values='size', title='Crawler traffic share'))
            ], style={'width': '48%', 'display': 'inline-block', 'border':'1px solid black', 'margin': '2px'}),
            html.Div([
                dcc.Graph(figure=px.pie(dfp_rs, names='find', values='count', title='Request sources'))
            ], style={'width': '48%', 'display': 'inline-block', 'border':'1px solid black', 'margin': '2px'}),
            html.Div([
                dcc.Graph(figure=px.bar(dfp_rh, x='sttime', y='requests', title='Requests by time of day'))
            ], style={'width': '48%', 'display': 'inline-block', 'border':'1px solid black', 'margin': '2px'}),
            html.Div([
                dcc.Graph(figure=px.bar(dfp_rsh, x='sttime', y='size', title='Request size by time of day'))
            ], style={'width': '48%', 'display': 'inline-block', 'border':'1px solid black', 'margin': '2px'}),
            html.Div([
                dcc.Graph(figure=px.bar(dfp_irh, x='sttime', y='count',  title='Index page requests by time of day'))
            ], style={'width': '48%', 'display': 'inline-block', 'border':'1px solid black', 'margin': '2px'}),
            html.Div([
                dcc.Graph(figure=px.bar(dfp_crh, x='sttime', y='count', title='Crawler requests by time of day'))
            ], style={'width': '48%', 'display': 'inline-block', 'border':'1px solid black', 'margin': '2px'}),
            html.Div([
                dcc.Graph(figure=px.line(dfp_st, x='sttime', y='count', color='find', title='Request source by time of day'))
            ], style={'width': '48%', 'display': 'inline-block','border':'1px solid black', 'margin': '2px'}),
            html.Div([
                dcc.Graph(figure=px.bar(dfp_hrcdt, x='sttime', y='count', color='code', title='HTML Response codes by time of day'))
            ], style={'width': '48%', 'display': 'inline-block', 'border':'1px solid black', 'margin': '2px'}),
            html.Div([
                dash_table.DataTable(
                    data=dfp_daily.to_dict('records'), page_size=14
                )
            ])
        ])


#app.run(jupyter_mode="external", port=8050)

In [36]:
app.run(jupyter_mode="external", port=8050)

Dash app running on http://127.0.0.1:8050/
