# Web logs data set analysis


In [1]:
# import the  pyspark library
from pyspark.sql.session import SparkSession

In [2]:
from datetime import datetime

In [3]:
# storing staring time of the spark session creating time and assign to a variable
start_time = datetime.now()

In [4]:
# creating spark session and assign to a spark variable
spark = SparkSession\
    .builder\
    .appName("log")\
    .master("local[1]")\
    .getOrCreate()

time_elapsed = datetime.now() - start_time
print("Run time of the create spark session (HH:MM:SS) {}".format(time_elapsed))

Run time of the create spark session (HH:MM:SS) 0:00:19.984225


In [5]:
# converting log into json and calculating time
start_time = datetime.now()
base_df = spark.read.json('fsm-20210817.logs')
time_elapsed = datetime.now() - start_time
print("Run time of the json data read  and creating dataframe ("
      "HH:MM:SS) {}".format(time_elapsed))

Run time of the json data read  and creating dataframe (HH:MM:SS) 0:02:05.179064


In [6]:
# showing first 3 rows from dataframe and calculating time
start_time = datetime.now()
base_df.show(3)
time_elapsed = datetime.now() - start_time
print("Run time of the base dataframe (HH:MM:SS) {}".format(time_elapsed))

+--------------------+--------+--------------------+----------+---------+--------------------+-------+--------------------+------------+---------------+----------+-----------+-----------+--------------------+--------------------+--------------------+-----------+-----------+-------+--------+-------+------+------+--------------------+----------+----+----+
|          @timestamp|@version|               agent|bytes_sent|client_ip|               cloud|    ecs|                host|http_referer|http_user_agent|httpmethod|httpversion|      input|          kubernetes|                 log|             message|remote_addr|remote_user|request|response|service|status|stream|                tags|time_local|   x|   y|
+--------------------+--------+--------------------+----------+---------+--------------------+-------+--------------------+------------+---------------+----------+-----------+-----------+--------------------+--------------------+--------------------+-----------+-----------+-------+------

In [7]:
# selecting required columns from the base dataframe and calculating time
start_time = datetime.now()
required_columns = base_df.select(
    base_df['@timestamp'], base_df['kubernetes']['pod']['name'], base_df['status'])
time_elapsed = datetime.now() - start_time
print("Run time of select columns dataframe (HH:MM:SS) {}".format(time_elapsed))

Run time of select columns dataframe (HH:MM:SS) 0:00:00.221048


In [8]:
# showing first 50 rows from dataframe and calculating time
start_time = datetime.now()
data_frame = required_columns
data_frame.show(50)
time_elapsed = datetime.now() - start_time
print("Run time of the required type  dataframe (HH:MM:SS) {}".format(time_elapsed))

+--------------------+--------------------+------+
|          @timestamp| kubernetes.pod.name|status|
+--------------------+--------------------+------+
|2021-08-17T00:00:...|cert-manager-5695...|  null|
|2021-08-17T00:00:...|fsm-backend-cron-...|  null|
|2021-08-17T00:00:...|fsm-backend-cron-...|  null|
|2021-08-17T00:00:...|fsm-backend-cron-...|  null|
|2021-08-17T00:00:...|fsm-backend-cron-...|  null|
|2021-08-17T00:00:...|fsm-backend-cron-...|  null|
|2021-08-17T00:00:...|fsm-backend-cron-...|  null|
|2021-08-17T00:00:...|fsm-backend-cron-...|  null|
|2021-08-17T00:00:...|fsm-backend-cron-...|  null|
|2021-08-17T00:00:...|fsm-backend-cron-...|  null|
|2021-08-17T00:00:...|fsm-backend-cron-...|  null|
|2021-08-17T00:00:...|fsm-backend-cron-...|  null|
|2021-08-17T00:00:...|fsm-backend-cron-...|  null|
|2021-08-17T00:00:...|fsm-backend-cron-...|  null|
|2021-08-17T00:00:...|fsm-backend-cron-...|  null|
|2021-08-17T00:00:...|fsm-backend-cron-...|  null|
|2021-08-17T00:00:...|fsm-backe

In [9]:
# import the library 
from pyspark.sql.functions import *

In [10]:
# counting status , creating new column in the  dataframe and calculating time
start_time = datetime.now()
data_frame = data_frame.withColumn('application_log', when(
    data_frame.status.isNull(), 1).otherwise(0))
data_frame = data_frame.withColumn('success', when(
    (data_frame.status >= 200) & (data_frame.status <= 299), 1).otherwise(0))
data_frame = data_frame.withColumn('error', when(
    (data_frame.status >= 300), 1).otherwise(0))
data_frame.show(50)
time_elapsed = datetime.now() - start_time
print("Run time of the status count dataframe (HH:MM:SS) {}".format(time_elapsed))

+--------------------+--------------------+------+---------------+-------+-----+
|          @timestamp| kubernetes.pod.name|status|application_log|success|error|
+--------------------+--------------------+------+---------------+-------+-----+
|2021-08-17T00:00:...|cert-manager-5695...|  null|              1|      0|    0|
|2021-08-17T00:00:...|fsm-backend-cron-...|  null|              1|      0|    0|
|2021-08-17T00:00:...|fsm-backend-cron-...|  null|              1|      0|    0|
|2021-08-17T00:00:...|fsm-backend-cron-...|  null|              1|      0|    0|
|2021-08-17T00:00:...|fsm-backend-cron-...|  null|              1|      0|    0|
|2021-08-17T00:00:...|fsm-backend-cron-...|  null|              1|      0|    0|
|2021-08-17T00:00:...|fsm-backend-cron-...|  null|              1|      0|    0|
|2021-08-17T00:00:...|fsm-backend-cron-...|  null|              1|      0|    0|
|2021-08-17T00:00:...|fsm-backend-cron-...|  null|              1|      0|    0|
|2021-08-17T00:00:...|fsm-ba

In [11]:
# getting hours from the timestamp column and calculating time
start_time = datetime.now()
data_frame = data_frame.withColumn("hour", hour(col("@timestamp")))
data_frame.show(truncate=False)
time_elapsed = datetime.now() - start_time
print("Run time of the find hour dataframe (HH:MM:SS) {}".format(time_elapsed))

+------------------------+--------------------------------------+------+---------------+-------+-----+----+
|@timestamp              |kubernetes.pod.name                   |status|application_log|success|error|hour|
+------------------------+--------------------------------------+------+---------------+-------+-----+----+
|2021-08-17T00:00:00.144Z|cert-manager-5695c78d49-q9s9j         |null  |1              |0      |0    |5   |
|2021-08-17T00:00:00.001Z|fsm-backend-cron-prod-6bd6459455-p9p49|null  |1              |0      |0    |5   |
|2021-08-17T00:00:00.002Z|fsm-backend-cron-prod-6bd6459455-p9p49|null  |1              |0      |0    |5   |
|2021-08-17T00:00:00.003Z|fsm-backend-cron-prod-6bd6459455-p9p49|null  |1              |0      |0    |5   |
|2021-08-17T00:00:00.004Z|fsm-backend-cron-prod-6bd6459455-p9p49|null  |1              |0      |0    |5   |
|2021-08-17T00:00:00.813Z|fsm-backend-cron-prod-6bd6459455-p9p49|null  |1              |0      |0    |5   |
|2021-08-17T00:00:01.026Z|fs

In [12]:
# grouping server , hours columns and calculating the success and error count of server
start_time = datetime.now()
group_data_frame = data_frame.groupBy("`kubernetes.pod.name`", "hour")\
    .agg(sum("application_log").alias('application'), sum("success")
         .alias('success'), sum("error").alias('error')).sort('hour')
group_data_frame.show(50)
time_elapsed = datetime.now() - start_time
print("Run time of the group and sum of the status ("
      "HH:MM:SS) {}".format(time_elapsed))

+--------------------+----+-----------+-------+-----+
| kubernetes.pod.name|hour|application|success|error|
+--------------------+----+-----------+-------+-----+
|fsm-teachers-fron...|   0|        230|      0|    0|
|learnbuddy-fronte...|   0|          4|      0|    0|
|runner-gitlab-run...|   0|          2|      0|    0|
|fsm-frontend-prod...|   0|        278|      0|    0|
|fsm-frontend-prod...|   0|        276|      0|    0|
|nginx-ingress-con...|   0|        831|      0|    0|
|fsm-backend-prod-...|   0|       4725|      0|    0|
|learnbuddy-backen...|   0|        592|    100|    0|
|fsm-backend-cron-...|   0|      54027|      0|    0|
|cert-manager-5695...|   0|       1360|      0|    0|
|fsm-backend-prod-...|   0|       5317|      0|    0|
|nginx-ingress-con...|   0|        774|      0|    0|
|fsm-backend-prod-...|   0|       5827|      0|    0|
|fsm-teachers-fron...|   1|        254|      0|    0|
|cert-manager-cain...|   1|         75|      0|    0|
|runner-gitlab-run...|   1| 

In [13]:
# changing the kubernetes.pod.name column name to server
start_time = datetime.now()
df = group_data_frame.withColumnRenamed("kubernetes.pod.name", "server")
df.show(100)
time_elapsed = datetime.now() - start_time
print("Run time of the change server name  ("
      "HH:MM:SS) {}".format(time_elapsed))

+--------------------+----+-----------+-------+-----+
|              server|hour|application|success|error|
+--------------------+----+-----------+-------+-----+
|fsm-backend-prod-...|   0|       4725|      0|    0|
|fsm-frontend-prod...|   0|        276|      0|    0|
|cert-manager-5695...|   0|       1360|      0|    0|
|fsm-backend-cron-...|   0|      54027|      0|    0|
|learnbuddy-fronte...|   0|          4|      0|    0|
|runner-gitlab-run...|   0|          2|      0|    0|
|learnbuddy-backen...|   0|        592|    100|    0|
|fsm-frontend-prod...|   0|        278|      0|    0|
|fsm-teachers-fron...|   0|        230|      0|    0|
|nginx-ingress-con...|   0|        774|      0|    0|
|fsm-backend-prod-...|   0|       5317|      0|    0|
|fsm-backend-prod-...|   0|       5827|      0|    0|
|nginx-ingress-con...|   0|        831|      0|    0|
|fsm-frontend-prod...|   1|        244|      0|    0|
|fsm-backend-prod-...|   1|       1330|      0|    0|
|fsm-teachers-fron...|   1| 

In [14]:
# counting total rows and calculating time
start_time = datetime.now()
print("the total rows are : ")
print(df.count())
time_elapsed = datetime.now() - start_time
print("Run time of the count num  of rows in dataframe ("
      "HH:MM:SS) {}".format(time_elapsed))


the total rows are : 
403
Run time of the count num  of rows in dataframe (HH:MM:SS) 0:02:26.733161


In [15]:
# # to save json file
# df.write.json("zipcodes.json")
# df.write().json("data/out_employees/");

# writing PySpark data frame into .csv file

In [16]:
# Write DataFrame data to CSV file
# df.toPandas().to_csv("server_log.csv")

# Converting PySpark DataFrame to Pandas DataFrame

In [17]:
# creating pandas data frame
start_time = datetime.now()
pandas_data_frame = df.toPandas()
time_elapsed = datetime.now() - start_time
print("Run time of the creating pandas data frame ("
      "HH:MM:SS) {}".format(time_elapsed))

Run time of the creating pandas data frame (HH:MM:SS) 0:02:29.340510


# writing pandas data frame into mysql data base

In [18]:
# import the sqlalchemy library
from sqlalchemy import create_engine

In [19]:
# create sqlalchemy engine
start_time = datetime.now()
engine = create_engine("mysql+pymysql://{user}:{pw}@localhost/{db}"
                       .format(user="root",
                               pw="root",
                               db="mydatabase"))
time_elapsed = datetime.now() - start_time
print("Run time of the creating mysql engine ("
      "HH:MM:SS) {}".format(time_elapsed))

Run time of the creating mysql engine (HH:MM:SS) 0:00:01.255787


In [20]:
# Insert whole DataFrame into MySQL
start_time = datetime.now()
pandas_data_frame.to_sql('serverlogs', con=engine, if_exists='append', chunksize=1000, index=False)
time_elapsed = datetime.now() - start_time
print("Run time of the creating mysql engine ("
      "HH:MM:SS) {}".format(time_elapsed))

Run time of the creating mysql engine (HH:MM:SS) 0:00:03.038925


In [21]:
# stopping the spark session
spark.stop()