In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('Spark-SQL').getOrCreate()

In [28]:
df = spark.read.csv("hdfs://localhost:9000//Cpu_Log/*.csv", header=True, inferSchema=True)
df.printSchema()
print("Number of rows in the files are: ",df.count())

root
 |-- DateTime: string (nullable = true)
 |-- Cpu Count: integer (nullable = true)
 |-- Cpu Working Time: double (nullable = true)
 |-- Cpu idle Time: double (nullable = true)
 |-- cpu_percent: double (nullable = true)
 |-- Usage Cpu Count : integer (nullable = true)
 |-- number of software interrupts since boot: integer (nullable = true)
 |-- number of system calls since boot: integer (nullable = true)
 |-- number of interrupts since boot: integer (nullable = true)
 |-- cpu avg load over 1 min: double (nullable = true)
 |-- cpu avg load over 5 min: double (nullable = true)
 |-- cpu avg load over 15 min: double (nullable = true)
 |-- system_total_memory: long (nullable = true)
 |-- system_used_memory: long (nullable = true)
 |-- system_free_memory: long (nullable = true)
 |-- system_active_memory: long (nullable = true)
 |-- system_inactive_memory: long (nullable = true)
 |-- system_buffers_memory: integer (nullable = true)
 |-- system_cached_memory: long (nullable = true)
 |-- sys

In [9]:
df.select('user_name').groupBy('user_name').count().show()

+--------------------+-----+
|           user_name|count|
+--------------------+-----+
|salinabodale73@gm...|  569|
|sharlawar77@gmail...|  580|
|rahilstar11@gmail...|  551|
|deepshukla292@gma...|  565|
|  iamnzm@outlook.com|  614|
|markfernandes66@g...|  508|
|damodharn21@gmail...|  253|
|bhagyashrichalke2...|  482|
+--------------------+-----+



In [13]:
#create a temporary view to perform the cpu log queries.

df.createOrReplaceTempView("cpu_log")

In [18]:
df2= spark.sql('select user_name, keyboard, mouse from cpu_log').show(3)

+------------------+--------+-----+
|         user_name|keyboard|mouse|
+------------------+--------+-----+
|iamnzm@outlook.com|     1.0| 32.0|
|iamnzm@outlook.com|     0.0|  0.0|
|iamnzm@outlook.com|     0.0|  0.0|
+------------------+--------+-----+
only showing top 3 rows



In [26]:
df2=spark.sql('select user_name, count(user_name) from cpu_log where keyboard != 0 or mouse !=0 group by user_name').show()

+--------------------+----------------+
|           user_name|count(user_name)|
+--------------------+----------------+
|salinabodale73@gm...|             440|
|sharlawar77@gmail...|             457|
|rahilstar11@gmail...|             399|
|deepshukla292@gma...|             475|
|  iamnzm@outlook.com|             459|
|markfernandes66@g...|             389|
|damodharn21@gmail...|             191|
|bhagyashrichalke2...|             361|
+--------------------+----------------+



In [35]:
#finding the users with the total average working hours:
df2 = spark.sql('select user_name, ((count(user_name)*5*60)/6) as avg_work_sec from cpu_log where keyboard != 0 or mouse !=0 group by user_name')
df2.show()


+--------------------+------------+
|           user_name|avg_work_sec|
+--------------------+------------+
|salinabodale73@gm...|     22000.0|
|sharlawar77@gmail...|     22850.0|
|rahilstar11@gmail...|     19950.0|
|deepshukla292@gma...|     23750.0|
|  iamnzm@outlook.com|     22950.0|
|markfernandes66@g...|     19450.0|
|damodharn21@gmail...|      9550.0|
|bhagyashrichalke2...|     18050.0|
+--------------------+------------+



In [40]:
#to convert seconds to hours and minutes format for easy readibility
from pyspark.sql.functions import *

df3 = df2.withColumn("Minutes", round((col("avg_work_sec")/60),2))\
.withColumn("Hours", floor((col("Minutes")/60)))\
.withColumn("hourmin", floor(col("Minutes")-(col("Hours").cast("int") * 60)))\
.withColumn("Days", floor((col("Hours")/24)))\
.withColumn("Days2", col("Days")*24)\
.withColumn("avg_work_hrs", when((col("Hours")==0) &(col("Days")==0), concat(col("hourmin"),lit("min"))).when((col("Hours")!=0)&(col("Days")==0), concat(col("Hours"),lit(":"),col("hourmin"),lit(""))).when(col("Days")!=0, concat(col("Days"),lit("d "),(col("Hours")-col("Days2")),lit("hr "),col("hourmin"),lit("min"))))\
.drop("Minutes","Hours","hourmin","Days","Days2",'wrk_hrs')\
.orderBy(col('avg_work_hrs').desc())
df3.show()

+--------------------+------------+------------+
|           user_name|avg_work_sec|avg_work_hrs|
+--------------------+------------+------------+
|salinabodale73@gm...|     22000.0|         6:6|
|deepshukla292@gma...|     23750.0|        6:35|
|  iamnzm@outlook.com|     22950.0|        6:22|
|sharlawar77@gmail...|     22850.0|        6:20|
|rahilstar11@gmail...|     19950.0|        5:32|
|markfernandes66@g...|     19450.0|        5:24|
|bhagyashrichalke2...|     18050.0|         5:0|
|damodharn21@gmail...|      9550.0|        2:39|
+--------------------+------------+------------+



In [53]:
df3.createOrReplaceTempView('avg_hour')
#printing max worked hr employee
df4 = spark.sql('select user_name, avg_work_hrs from avg_hour order by avg_work_hrs desc limit(1)').show()

+--------------------+------------+
|           user_name|avg_work_hrs|
+--------------------+------------+
|salinabodale73@gm...|         6:6|
+--------------------+------------+



In [54]:
#printing min worked hr employee
df4 = spark.sql('select user_name, avg_work_hrs from avg_hour order by avg_work_hrs asc limit(1)').show()

+--------------------+------------+
|           user_name|avg_work_hrs|
+--------------------+------------+
|damodharn21@gmail...|        2:39|
+--------------------+------------+



In [None]:
#finding users with idle hours

In [58]:
df5 = spark.sql('select user_name, count(user_name) from cpu_log where keyboard = 0 and mouse =0 group by user_name').show() 

+--------------------+----------------+
|           user_name|count(user_name)|
+--------------------+----------------+
|salinabodale73@gm...|             129|
|sharlawar77@gmail...|             123|
|rahilstar11@gmail...|             152|
|deepshukla292@gma...|              90|
|  iamnzm@outlook.com|             155|
|markfernandes66@g...|             119|
|damodharn21@gmail...|              62|
|bhagyashrichalke2...|             121|
+--------------------+----------------+



In [59]:
df5 = spark.sql('select user_name, ((count(user_name)*5*60)/6) as avg_idle_sec from cpu_log where keyboard = 0 and mouse =0 group by user_name')
df5.show()

+--------------------+------------+
|           user_name|avg_idle_sec|
+--------------------+------------+
|salinabodale73@gm...|      6450.0|
|sharlawar77@gmail...|      6150.0|
|rahilstar11@gmail...|      7600.0|
|deepshukla292@gma...|      4500.0|
|  iamnzm@outlook.com|      7750.0|
|markfernandes66@g...|      5950.0|
|damodharn21@gmail...|      3100.0|
|bhagyashrichalke2...|      6050.0|
+--------------------+------------+



In [60]:
df6 = df5.withColumn("Minutes", round((col("avg_idle_sec")/60),2))\
.withColumn("Hours", floor((col("Minutes")/60)))\
.withColumn("hourmin", floor(col("Minutes")-(col("Hours").cast("int") * 60)))\
.withColumn("Days", floor((col("Hours")/24)))\
.withColumn("Days2", col("Days")*24)\
.withColumn("avg_idle_hrs", when((col("Hours")==0) &(col("Days")==0), concat(col("hourmin"),lit("min"))).when((col("Hours")!=0)&(col("Days")==0), concat(col("Hours"),lit(":"),col("hourmin"),lit(""))).when(col("Days")!=0, concat(col("Days"),lit("d "),(col("Hours")-col("Days2")),lit("hr "),col("hourmin"),lit("min"))))\
.drop("Minutes","Hours","hourmin","Days","Days2",'wrk_hrs')\
.orderBy(col('avg_idle_hrs').desc())
df6.show()

+--------------------+------------+------------+
|           user_name|avg_idle_sec|avg_idle_hrs|
+--------------------+------------+------------+
|damodharn21@gmail...|      3100.0|       51min|
|  iamnzm@outlook.com|      7750.0|         2:9|
|rahilstar11@gmail...|      7600.0|         2:6|
|salinabodale73@gm...|      6450.0|        1:47|
|sharlawar77@gmail...|      6150.0|        1:42|
|bhagyashrichalke2...|      6050.0|        1:40|
|markfernandes66@g...|      5950.0|        1:39|
|deepshukla292@gma...|      4500.0|        1:15|
+--------------------+------------+------------+



In [62]:
df6.createOrReplaceTempView('idle_hr')
#printing highest avg idle hour
df7 = spark.sql('select user_name, avg_idle_hrs from idle_hr order by avg_idle_sec desc limit(1)').show()

+------------------+------------+
|         user_name|avg_idle_hrs|
+------------------+------------+
|iamnzm@outlook.com|         2:9|
+------------------+------------+



In [63]:
#printing lowest avg idle hour
df7 = spark.sql('select user_name, avg_idle_hrs from idle_hr order by avg_idle_sec asc limit(1)').show()

+--------------------+------------+
|           user_name|avg_idle_hrs|
+--------------------+------------+
|damodharn21@gmail...|       51min|
+--------------------+------------+

