In [1]:
import boto3
import pandas as pd
from botocore.exceptions import ClientError
import os
from dotenv import load_dotenv
load_dotenv()

from py4j.protocol import Py4JJavaError
from pyspark.sql import SparkSession
import findspark
findspark.init()

import pyodbc

In [2]:
# creating spark session
spark = SparkSession.builder\
                    .master("local")\
                    .appName("cpu_log_app")\
                    .getOrCreate()

In [3]:
# connecting to aws s3
s3 = boto3.client(service_name = 's3',
                    region_name = 'ap-southeast-2',
                    aws_access_key_id = os.environ.get('AWS_ACCESS_KEY_ID'),
                    aws_secret_access_key = os.environ.get('AWS_SECRET_ACCESS_KEY')
                    )

In [4]:
# creating sql server connection object
con = pyodbc.connect("Driver={Sql Server};Server=SUSHANTSEVEN;Database=Python_sql;Trusted_connection=yes")
cursor = con.cursor()

In [6]:
# collecting the data from s3 bucket
bucket_name = 'cpu-bucket'
data_ls = list()
response = s3.list_objects_v2(Bucket=bucket_name)
if 'Contents' in response:
    for obj in response['Contents']:
        data_ls.append(obj['Key'])
else:
    print("Bucket is empty.")

print(data_ls)

['CpuLogData2019-09-16.csv', 'CpuLogData2019-09-17.csv', 'CpuLogData2019-09-18.csv', 'CpuLogData2019-09-19.csv', 'CpuLogData2019-09-20.csv']


In [8]:
# converting pandas df to spark df``
pd_df_list = list()
spark_df_list = list()
for i in range(len(data_ls)):
    response = s3.get_object(Bucket=bucket_name, Key=data_ls[i])
    pd_df_list.append(pd.read_csv(response['Body']))
    spark_df_list.append(spark.createDataFrame(pd_df_list[i]))

In [9]:
# union operation on all the data frames
union_log_df = spark_df_list[0].union(spark_df_list[1]).union(spark_df_list[2]).union(spark_df_list[3]).union(spark_df_list[4])
union_log_df.show()

+-------------------+---------+----------------+-------------+-----------+----------------+----------------------------------------+---------------------------------+-------------------------------+-----------------------+-----------------------+------------------------+-------------------+------------------+------------------+--------------------+----------------------+---------------------+--------------------+--------------------+----------------------+-----------------+----------------+----------------+---------------+----------------+---------------+----------------+----------------------------+--------------------------+----------------------------+--------------------+------------------------+----------------------+-------------------------+--------------------------------------+------------------------------------+---------------------------------------------------+---------------------------------------------------+--------------+--------------------+--------+------+----------

In [10]:
union_log_df.printSchema()


root
 |-- DateTime: string (nullable = true)
 |-- Cpu Count: long (nullable = true)
 |-- Cpu Working Time: double (nullable = true)
 |-- Cpu idle Time: double (nullable = true)
 |-- cpu_percent: double (nullable = true)
 |-- Usage Cpu Count : long (nullable = true)
 |-- number of software interrupts since boot: long (nullable = true)
 |-- number of system calls since boot: long (nullable = true)
 |-- number of interrupts since boot: long (nullable = true)
 |-- cpu avg load over 1 min: double (nullable = true)
 |-- cpu avg load over 5 min: double (nullable = true)
 |-- cpu avg load over 15 min: double (nullable = true)
 |-- system_total_memory: long (nullable = true)
 |-- system_used_memory: long (nullable = true)
 |-- system_free_memory: long (nullable = true)
 |-- system_active_memory: long (nullable = true)
 |-- system_inactive_memory: long (nullable = true)
 |-- system_buffers_memory: long (nullable = true)
 |-- system_cached_memory: long (nullable = true)
 |-- system_shared_memory:

In [11]:
union_log_df.createOrReplaceTempView("union_log_tab")
spark.sql("select * from union_log_tab").show()

+-------------------+---------+----------------+-------------+-----------+----------------+----------------------------------------+---------------------------------+-------------------------------+-----------------------+-----------------------+------------------------+-------------------+------------------+------------------+--------------------+----------------------+---------------------+--------------------+--------------------+----------------------+-----------------+----------------+----------------+---------------+----------------+---------------+----------------+----------------------------+--------------------------+----------------------------+--------------------+------------------------+----------------------+-------------------------+--------------------------------------+------------------------------------+---------------------------------------------------+---------------------------------------------------+--------------+--------------------+--------+------+----------

Finding users with highest numbers of idle hours

In [12]:
ideal_hrs_df = spark.sql("select DateTime,user_name,keyboard,mouse from union_log_tab where keyboard=0.0 and mouse = 0.0")
ideal_hrs_df.createOrReplaceTempView("ideal_hrs_tab")
highest_ideal_hrs = spark.sql("select user_name, count(*) as ideal_count from ideal_hrs_tab group by user_name order by ideal_count DESC")
highest_ideal_hrs.show()

+--------------------+-----------+
|           user_name|ideal_count|
+--------------------+-----------+
|rahilstar11@gmail...|        129|
|  iamnzm@outlook.com|        127|
|salinabodale73@gm...|        118|
|markfernandes66@g...|        112|
|sharlawar77@gmail...|        109|
|deepshukla292@gma...|         78|
|bhagyashrichalke2...|         62|
|damodharn21@gmail...|         45|
+--------------------+-----------+



Uploading result to sql server

In [13]:
# create table
cursor.execute('create table highest_ideal_hrs (user_name nvarchar(60), ideal_hrs_count int)')
con.commit()

In [14]:
# iterating over the data frame and inserting data into table
for row in highest_ideal_hrs.collect():
    user_name = row['user_name']
    ideal_hrs_count = row['ideal_count']
    try:    
        cursor.execute("INSERT INTO highest_ideal_hrs (user_name, ideal_hrs_count) VALUES (? , ?);", (user_name, ideal_hrs_count))
    except Exception as e:
        print(e)
    else:
        con.commit()
        print('Data inserted')

Data inserted
Data inserted
Data inserted
Data inserted
Data inserted
Data inserted
Data inserted
Data inserted


Finding users with lowest number of average hours

In [15]:
avg_hrs_df = spark.sql("select DateTime,user_name,keyboard,mouse from union_log_tab")
avg_hrs_df.createOrReplaceTempView("avg_hrs_tab")
spark.sql("select * from avg_hrs_tab").show()

+-------------------+--------------------+--------+------+
|           DateTime|           user_name|keyboard| mouse|
+-------------------+--------------------+--------+------+
|2019-09-16 12:55:03|rahilstar11@gmail...|     0.0|   0.0|
|2019-09-16 12:55:02|salinabodale73@gm...|  2919.5| 888.0|
|2019-09-16 12:55:01|bhagyashrichalke2...|   144.0|2886.0|
|2019-09-16 13:00:01|bhagyashrichalke2...|    21.0|  44.0|
|2019-09-16 13:00:01|  iamnzm@outlook.com|    41.0|8251.0|
|2019-09-16 13:00:01|deepshukla292@gma...|   249.5|4266.0|
|2019-09-16 13:00:02|salinabodale73@gm...|   135.5| 692.0|
|2019-09-16 13:00:04|sharlawar77@gmail...|   303.0| 243.0|
|2019-09-16 13:00:03|rahilstar11@gmail...|    22.5| 170.0|
|2019-09-16 13:05:01|salinabodale73@gm...|   179.5| 108.0|
|2019-09-16 13:05:02|bhagyashrichalke2...|   387.0| 676.0|
|2019-09-16 13:05:01|deepshukla292@gma...|   165.0|5701.0|
|2019-09-16 13:05:02|  iamnzm@outlook.com|    52.0|7971.0|
|2019-09-16 13:05:04|sharlawar77@gmail...|    17.0|  72.

In [16]:
lowest_avg_hrs = spark.sql("select user_name, avg(keyboard+mouse) as average_hours from avg_hrs_tab group by user_name order by average_hours")
lowest_avg_hrs.show()

+--------------------+------------------+
|           user_name|     average_hours|
+--------------------+------------------+
|rahilstar11@gmail...| 267.0662251655629|
|sharlawar77@gmail...| 534.6867219917012|
|  iamnzm@outlook.com| 807.9402697495183|
|salinabodale73@gm...| 834.0409663865546|
|markfernandes66@g...| 888.3780193236715|
|bhagyashrichalke2...|1730.0115979381444|
|deepshukla292@gma...| 2397.088888888889|
|damodharn21@gmail...| 2587.509433962264|
+--------------------+------------------+



Uploading result to sql server

In [17]:
# create table
cursor.execute('create table lowest_avg_hrs (user_name nvarchar(60), average_hrs float)')
con.commit()


In [18]:
# iterating over the data frame and inserting data into table
for row in lowest_avg_hrs.collect():
    user_name = row['user_name']
    avg_hrs = row['average_hours']
    try:    
        cursor.execute("INSERT INTO lowest_avg_hrs (user_name, average_hrs) VALUES (? , ?);", (user_name, avg_hrs))
    except Exception as e:
        print(e)
    else:
        con.commit()
        print('Data inserted')

Data inserted
Data inserted
Data inserted
Data inserted
Data inserted
Data inserted
Data inserted
Data inserted


Finding users with highest number of average hours

In [19]:
highest_avg_hrs = spark.sql("select user_name, avg(keyboard+mouse) as average_hours from avg_hrs_tab group by user_name order by average_hours desc")
highest_avg_hrs.show()

+--------------------+------------------+
|           user_name|     average_hours|
+--------------------+------------------+
|damodharn21@gmail...| 2587.509433962264|
|deepshukla292@gma...| 2397.088888888889|
|bhagyashrichalke2...|1730.0115979381444|
|markfernandes66@g...| 888.3780193236715|
|salinabodale73@gm...| 834.0409663865546|
|  iamnzm@outlook.com| 807.9402697495183|
|sharlawar77@gmail...| 534.6867219917012|
|rahilstar11@gmail...| 267.0662251655629|
+--------------------+------------------+



Uploading result to sql server

In [21]:
# create table
cursor.execute('create table highest_avg_hrs (user_name nvarchar(60), average_hrs float)')
con.commit()


ProgrammingError: ('42S01', "[42S01] [Microsoft][ODBC SQL Server Driver][SQL Server]There is already an object named 'highest_avg_hrs' in the database. (2714) (SQLExecDirectW)")

In [22]:
# iterating over the data frame and inserting data into table
for row in highest_avg_hrs.collect():
    user_name = row['user_name']
    avg_hrs = row['average_hours']
    try:    
        cursor.execute("INSERT INTO highest_avg_hrs (user_name, average_hrs) VALUES (? , ?);", (user_name, avg_hrs))
    except Exception as e:
        print(e)
    else:
        con.commit()
        print('Data inserted')

Data inserted
Data inserted
Data inserted
Data inserted
Data inserted
Data inserted
Data inserted
Data inserted


Finding users with highest numbers of times late comings

In [23]:
highest_late_comers = spark.sql("""select user_name, late_comers, dense_rank() over (ORDER BY late_comers desc) as rank 
          from (select user_name, count(*) as late_comers from avg_hrs_tab where DateTime > "2019-09-17 08:31:00" 
          group by user_name) as counts order by late_comers desc""")
highest_late_comers.show()

+--------------------+-----------+----+
|           user_name|late_comers|rank|
+--------------------+-----------+----+
|  iamnzm@outlook.com|        443|   1|
|markfernandes66@g...|        414|   2|
|sharlawar77@gmail...|        404|   3|
|salinabodale73@gm...|        385|   4|
|rahilstar11@gmail...|        381|   5|
|deepshukla292@gma...|        372|   6|
|bhagyashrichalke2...|        309|   7|
|damodharn21@gmail...|        159|   8|
+--------------------+-----------+----+



In [24]:
# create table
cursor.execute('create table highest_late_comers (user_name nvarchar(60), no_of_times int, rank int)')
con.commit()


In [25]:
# iterating over the data frame and inserting data into table
for row in highest_late_comers.collect():
    user_name = row['user_name']
    times = row['late_comers']
    rank = row['rank']
    try:    
        cursor.execute("INSERT INTO highest_late_comers (user_name, no_of_times, rank) VALUES (? , ?, ?);", (user_name, times, rank))
    except Exception as e:
        print(e)
    else:
        con.commit()
        print('Data inserted')

Data inserted
Data inserted
Data inserted
Data inserted
Data inserted
Data inserted
Data inserted
Data inserted
