In [20]:
import findspark
findspark.init()

import os
from pyspark.sql import SparkSession,SQLContext
from pyspark.sql.functions import *


spark = SparkSession.builder.master("local").appName('operations').getOrCreate()
sc=spark.sparkContext
from pyspark.sql import *
from pyspark.sql.functions import col, max as max_, min as min_, unix_timestamp


In [2]:
from pyspark.sql.functions import *
from pyspark.sql.types import *


In [3]:
file_data_df = spark.read.format("csv").option("header", "true").load("hdfs://localhost:9000/sparkData/CpuLogs/*.csv")



In [5]:
df = file_data_df.select("DateTime", "keyboard","mouse","user_name")
temp_table_name = "CpuLogs"
df.createOrReplaceTempView(temp_table_name)

In [6]:
user_counts = spark.sql("SELECT user_name, count(*) AS count FROM CpuLogs GROUP BY user_name;")

In [7]:
user_counts.show()



+--------------------+-----+
|           user_name|count|
+--------------------+-----+
|salinabodale73@gm...|  569|
|sharlawar77@gmail...|  580|
|rahilstar11@gmail...|  551|
|deepshukla292@gma...|  565|
|  iamnzm@outlook.com|  614|
|markfernandes66@g...|  508|
|damodharn21@gmail...|  253|
|bhagyashrichalke2...|  482|
+--------------------+-----+





In [None]:
type(user_counts)

Putting created output to hdfs

In [None]:
user_counts.repartition(1).write.save('hdfs://localhost:9000/sparkData/MySql_operations', format='csv', mode='append')

ceating schema for importing data from hdfs

In [8]:
schema = StructType([\
    StructField("user_name", StringType(), True),\
    StructField("count", IntegerType(), True)])

Importing data from hdfs to spark dataframe

In [9]:
count_df = spark.read.format("csv").option("header", "false").schema(schema).load("hdfs://localhost:9000/sparkData/MySql_operations/part-00000-ef648781-639e-4993-9528-42795999c2f8-c000.csv")

setting mysql data


In [25]:
database = 'sparkOperations'
user = 'root'
password = 'Ayurn@6299'

saving spark dataframe to MySql table

In [27]:
jdbc_url = f"jdbc:mysql://localhost:3306/"+ database

count_df.write.format('jdbc').option("url",jdbc_url) \
    .mode("overwrite") \
    .option("dbtable","total_user_count") \
    .option("driver","com.mysql.jdbc.Driver") \
    .option("user",user).option("password",password).save()
    




loading data from mysql table to spark dataframe

In [28]:
df_COUNT = spark.read.format("jdbc") \
    .option("url","jdbc:mysql://localhost:3306/sparkOperations") \
    .option("driver","com.mysql.jdbc.Driver") \
    .option("dbtable","total_user_count") \
    .option("user",user) \
    .option("password",password).load()

In [29]:
df_COUNT.show()



+--------------------+-----+
|           user_name|count|
+--------------------+-----+
|salinabodale73@gm...|  569|
|sharlawar77@gmail...|  580|
|rahilstar11@gmail...|  551|
|deepshukla292@gma...|  565|
|  iamnzm@outlook.com|  614|
|markfernandes66@g...|  508|
|damodharn21@gmail...|  253|
|bhagyashrichalke2...|  482|
+--------------------+-----+





Finding users with highest number of average hours

In [30]:
avg_hr_df= spark.sql("select user_name ,count('') as total from CpuLogs where keyboard !=0 or mouse!=0 group by user_name")
avg_hr_df.createOrReplaceTempView("avg_hr")
hrs_df=spark.sql("select user_name, ((((total-1)*5)*60)/6) as avg_hrs from avg_hr")
final_avg_hr=hrs_df.withColumn("Minutes", round((col("avg_hrs")/60),2))\
.withColumn("Hours", floor((col("Minutes")/60)))\
.withColumn("hourmin", floor(col("Minutes")-(col("Hours").cast("int") * 60)))\
.withColumn("Days", floor((col("Hours")/24)))\
.withColumn("Days2", col("Days")*24)\
.withColumn("Time", when((col("Hours")==0) &(col("Days")==0), concat(col("hourmin"),lit("min"))).when((col("Hours")!=0)&(col("Days")==0), concat(col("Hours"),lit("hr "),col("hourmin"),lit("min"))).when(col("Days")!=0, concat(col("Days"),lit("d "),(col("Hours")-col("Days2")),lit("hr "),col("hourmin"),lit("min"))))\
.drop("Minutes","Hours","hourmin","Days","Days2","avg_hrs")\
.sort(desc("Time"))

In [31]:
final_avg_hr.show()



+--------------------+---------+
|           user_name|     Time|
+--------------------+---------+
|deepshukla292@gma...|6hr 35min|
|salinabodale73@gm...| 6hr 2min|
|  iamnzm@outlook.com|6hr 21min|
|sharlawar77@gmail...|6hr 20min|
|rahilstar11@gmail...|5hr 31min|
|markfernandes66@g...|5hr 23min|
|bhagyashrichalke2...| 5hr 0min|
|damodharn21@gmail...|2hr 38min|
+--------------------+---------+





saving data to hdfs from spark dataframe

In [34]:
final_avg_hr.repartition(1).write.save('hdfs://localhost:9000/sparkData/MySql_operations', format='csv', mode='append')



ceating schema for importing data from hdfs

In [36]:
schema_high = StructType([\
    StructField("user_name", StringType(), True),\
    StructField("highest_avg_hr", StringType(), True)])

Importing data from hdfs to spark dataframe

In [37]:
highest_avg_df = spark.read.format("csv").option("header", "false").schema(schema_high).load("hdfs://localhost:9000/sparkData/MySql_operations/part-00000-813ce953-699a-4802-bcc2-b46ae771a04c-c000.csv")

saving spark dataframe to MySql table

In [38]:
highest_avg_df.write.format('jdbc').option("url",jdbc_url) \
    .mode("overwrite") \
    .option("dbtable","highest_avg_time") \
    .option("driver","com.mysql.jdbc.Driver") \
    .option("user",user).option("password",password).save()
    



loading data from mysql table to spark dataframe

In [39]:
df_highest_avg = spark.read.format("jdbc") \
    .option("url","jdbc:mysql://localhost:3306/sparkOperations") \
    .option("driver","com.mysql.jdbc.Driver") \
    .option("dbtable","highest_avg_time") \
    .option("user",user) \
    .option("password",password).load()

In [40]:
df_highest_avg.show()

+--------------------+--------------+
|           user_name|highest_avg_hr|
+--------------------+--------------+
|deepshukla292@gma...|     6hr 35min|
|salinabodale73@gm...|      6hr 2min|
|  iamnzm@outlook.com|     6hr 21min|
|sharlawar77@gmail...|     6hr 20min|
|rahilstar11@gmail...|     5hr 31min|
|markfernandes66@g...|     5hr 23min|
|bhagyashrichalke2...|      5hr 0min|
|damodharn21@gmail...|     2hr 38min|
+--------------------+--------------+



Finding users with lowest number of average hours

In [41]:
final_avg_hr_lowest=hrs_df.withColumn("Minutes", round((col("avg_hrs")/60),2))\
.withColumn("Hours", floor((col("Minutes")/60)))\
.withColumn("hourmin", floor(col("Minutes")-(col("Hours").cast("int") * 60)))\
.withColumn("Days", floor((col("Hours")/24)))\
.withColumn("Days2", col("Days")*24)\
.withColumn("Time", when((col("Hours")==0) &(col("Days")==0), concat(col("hourmin"),lit("min"))).when((col("Hours")!=0)&(col("Days")==0), concat(col("Hours"),lit("hr "),col("hourmin"),lit("min"))).when(col("Days")!=0, concat(col("Days"),lit("d "),(col("Hours")-col("Days2")),lit("hr "),col("hourmin"),lit("min"))))\
.drop("Minutes","Hours","hourmin","Days","Days2","avg_hrs")\
.sort(asc("Time"))

saving data to hdfs from spark dataframe

In [42]:
final_avg_hr_lowest.repartition(1).write.save('hdfs://localhost:9000/sparkData/MySql_operations', format='csv', mode='append')



ceating schema for importing data from hdfs

In [43]:
schema_low = StructType([\
    StructField("user_name", StringType(), True),\
    StructField("lowest_avg_hr", StringType(), True)])

Importing data from hdfs to spark dataframe

In [47]:
lowest_avg_df = spark.read.format("csv").option("header", "false").schema(schema_low).load("hdfs://localhost:9000/sparkData/MySql_operations/part-00000-fba753d3-2712-446c-ba08-d511a326bc1d-c000.csv")

saving spark dataframe to MySql table

In [48]:
lowest_avg_df.write.format('jdbc').option("url",jdbc_url) \
    .mode("overwrite") \
    .option("dbtable","lowest_avg_time") \
    .option("driver","com.mysql.jdbc.Driver") \
    .option("user",user).option("password",password).save()
    

loading data from mysql table to spark dataframe

In [51]:
df_lowest_avg = spark.read.format("jdbc") \
    .option("url","jdbc:mysql://localhost:3306/sparkOperations") \
    .option("driver","com.mysql.jdbc.Driver") \
    .option("dbtable","lowest_avg_time") \
    .option("user",user) \
    .option("password",password).load()

In [52]:
df_lowest_avg.show()

+--------------------+-------------+
|           user_name|lowest_avg_hr|
+--------------------+-------------+
|damodharn21@gmail...|    2hr 38min|
|bhagyashrichalke2...|     5hr 0min|
|markfernandes66@g...|    5hr 23min|
|rahilstar11@gmail...|    5hr 31min|
|sharlawar77@gmail...|    6hr 20min|
|  iamnzm@outlook.com|    6hr 21min|
|salinabodale73@gm...|     6hr 2min|
|deepshukla292@gma...|    6hr 35min|
+--------------------+-------------+



Finding users with highest numbers of idle hours

In [54]:
idle_count = spark.sql("select user_name, DateTime from CpuLogs where keyboard=0 and mouse=0").groupBy("user_name").count()
idle_count.createOrReplaceTempView("idle_time")
idle_df=spark.sql("select user_name, (((count-1)*5*60)/6) as idle_min from idle_time")
idle_hrs=idle_df.withColumn("Minutes", round((col("idle_min")/60),2))\
.withColumn("Hours", floor((col("Minutes")/60)))\
.withColumn("hourmin", floor(col("Minutes")-(col("Hours").cast("int") * 60)))\
.withColumn("Days", floor((col("Hours")/24)))\
.withColumn("Days2", col("Days")*24)\
.withColumn("Idle_Hrs", when((col("Hours")==0) &(col("Days")==0), concat(col("hourmin"),lit("min"))).when((col("Hours")!=0)&(col("Days")==0), concat(col("Hours"),lit("hr "),col("hourmin"),lit("min"))).when(col("Days")!=0, concat(col("Days"),lit("d "),(col("Hours")-col("Days2")),lit("hr "),col("hourmin"),lit("min"))))\
.drop("Minutes","Hours","hourmin","Days","Days2",'idle_min')\
.sort(asc("Idle_Hrs"))

Putting created output to hdfs

In [55]:
idle_hrs.repartition(1).write.save('hdfs://localhost:9000/sparkData/MySql_operations', format='csv', mode='append')



ceating schema for importing data from hdfs

In [56]:
schema_idle = StructType([\
    StructField("user_name", StringType(), True),\
    StructField("idle_hr", StringType(), True)])

Importing data from hdfs to spark dataframe

In [58]:
lowest_idle_hr = spark.read.format("csv").option("header", "false").schema(schema_idle).load("hdfs://localhost:9000/sparkData/MySql_operations/part-00000-4f11bd1a-ca64-4bbf-bf36-420565cfe753-c000.csv")

saving spark dataframe to MySql table

In [59]:
lowest_idle_hr.write.format('jdbc').option("url",jdbc_url) \
    .mode("overwrite") \
    .option("dbtable","lowest_idle_hr") \
    .option("driver","com.mysql.jdbc.Driver") \
    .option("user",user).option("password",password).save()
    



loading data from mysql table to spark dataframe

In [60]:
df_lowest_idle = spark.read.format("jdbc") \
    .option("url","jdbc:mysql://localhost:3306/sparkOperations") \
    .option("driver","com.mysql.jdbc.Driver") \
    .option("dbtable","lowest_idle_hr") \
    .option("user",user) \
    .option("password",password).load()

In [61]:
df_lowest_idle.show()

+--------------------+---------+
|           user_name|  idle_hr|
+--------------------+---------+
|deepshukla292@gma...|1hr 14min|
|markfernandes66@g...|1hr 38min|
|bhagyashrichalke2...|1hr 40min|
|sharlawar77@gmail...|1hr 41min|
|salinabodale73@gm...|1hr 50min|
|rahilstar11@gmail...| 2hr 5min|
|  iamnzm@outlook.com| 2hr 8min|
|damodharn21@gmail...|    50min|
+--------------------+---------+

