'''
@Author: Naziya

@Date: 2021-09-06

@Last Modified by: Naziya

@Last Modified : 2021-09-06

@Title : Program Aim is to create a dataframe from cpu log data csv file using pyspark an perform the operations.
        Then store the datafrme into the sql and load data from mysql table.
'''

In [None]:
from pyspark.sql import *
import mysql.connector
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql import functions as func

appName = "PySpark MySQL Example - via mysql.connector"
master = "local"
spark = SparkSession.builder.master(master).appName(appName).getOrCreate()


In [None]:
df = spark.read.csv("hdfs://localhost:9000/spark_sql/merged_CpuLogData.csv",header=True)
df2 = df.select("user_name","DateTime","keyboard","mouse")

In [None]:
df2.head(4)

In [None]:
# 1) Display users and their record counts
dfc = df2.groupBy("user_name").count()
dfc.createOrReplaceTempView("tempdata")
record_count =  spark.sql("select * from tempdata")
record_count.show()


Users with highest number of average hours

In [None]:
df.createOrReplaceTempView("view1")
df1 = spark.sql("select user_name from view1 where keyboard != 0 or mouse != 0").groupBy("user_name").count()
df1.show(truncate=False)

In [None]:
df3 = df1.createOrReplaceTempView("hour_view")

In [None]:
df4 = spark.sql("select user_name,count,((((count-1)*5)*60)/6) as avg_secs from hour_view")

In [None]:
df4.show(truncate=False)

In [None]:
from pyspark.sql.functions import *
highest_avg_hour = df4.withColumn("average_hours", concat(
            floor(col("avg_secs") % 86400 / 3600), lit(":"),
            floor((col("avg_secs") % 86400) % 3600 / 60), lit(""),
           
        ))\
    .drop("avg_secs")

In [None]:
highest_avg_hour.show(truncate=False)

Users with highest number of average hours

In [None]:
from pyspark.sql.functions import *
lowest_avg_hour = df4.withColumn("average_hours", concat(
            floor(col("avg_secs") % 86400 / 3600), lit(":"),
            floor((col("avg_secs") % 86400) % 3600 / 60), lit(""),
           
        ))\
    .drop("avg_secs")\
    .sort(asc("average_hours"))

In [None]:
lowest_avg_hour.show()


Users with highest number of idle hours


In [None]:
df5 = spark.sql("select user_name from view1 where keyboard == 0 and mouse == 0").groupBy("user_name").count()
df5.show()

In [None]:
df5.createOrReplaceTempView('idle_hour_view')

In [None]:
df6 = spark.sql("select user_name,count,((((count-1)*5)*60)/6) as average_min from idle_hour_view")

In [None]:
df6.show(truncate=False)

In [None]:
from pyspark.sql.functions import *
idle_hour = df6.withColumn("idle_hours", concat(
            floor(col("average_min") % 86400 / 3600), lit(":"),
            floor((col("average_min") % 86400) % 3600 / 60), lit(""),
           
        ))\
    .drop("average_min")\
    .sort(desc("idle_hours"))

In [None]:
idle_hour.show(truncate=False)

In [None]:
import os
from LoggerFormat import logger
from dotenv import load_dotenv
load_dotenv('.env')
# set variable to be used to connect the database
database=os.getenv("DB_NAME")
user=os.getenv("USERNAME")
password=os.getenv("PASSWORD")
host=os.getenv("HOST")


In [None]:
jdbc_url = f"jdbc:mysql://localhost:3306/"+ database
try:
   
   #write the dataFrame into a sql table
   dfc.write.format("jdbc").option("url",jdbc_url) \
      .mode("overwrite")\
      .option("dbtable","user_record_count") \
      .option("driver","com.mysql.jdbc.Driver")\
      .option("user",user).option("password",password).save()
   print("Successfully wrote to mysql")

except Exception as e:
   logger.error(e)



SAVING SPARK DATAFRAME INTO MYSQL DATABASE

In [None]:
try:
   highest_avg_hour.write.format("jdbc").option("url",jdbc_url) \
      .mode("overwrite") \
      .option("dbtable","highest_number_average_hours") \
      .option("user",user).option("password",password).save()
   print("Successfully wrote to mysql")

except Exception as e:
   logger.error(e)

In [None]:
try:
    lowest_avg_hour.write.format("jdbc").option("url",jdbc_url) \
        .mode("overwrite") \
        .option("dbtable","lowest_number_average_hours") \
        .option("user",user).option("password",password).save()
    print("Successfully wrote to mysql")

except Exception as e:
   logger.error(e)

In [None]:
try:
    idle_hour.write.format("jdbc").option("url",jdbc_url) \
        .mode("overwrite") \
        .option("dbtable","highest_number_of_idle_hours") \
        .option("user",user).option("password",password).save()
    print("Successfully wrote to mysql")

except Exception as e:
   logger.error(e)

In [None]:
try:
    conn = mysql.connector.connect(
                    host=host,
                    user=user,
                    password=password,
                    database=database
                )
    cursor = conn.cursor()
    
    query = "SELECT user_name,count FROM highest_number_of_idle_hours"
    #Create a pandas dataframe
    pdf = pd.read_sql(query, con=conn)
    conn.close()
    
    # Convert Pandas dataframe to spark DataFrame
    df = spark.createDataFrame(pdf)


except Exception as e:
    logger.error(e)

 
df.show()
