In [1]:
from pyspark.sql.functions import hour,minute,second,col,avg,when
from pyspark import SparkContext
import pyspark.sql.functions as sql_functions
from pyspark.sql import SQLContext
import datetime

In [2]:
sc = SparkContext()
sqlContext = SQLContext(sc)

In [3]:
df = sqlContext.read.csv("hdfs://localhost:54310/spark/usr_log_data.csv",header=True,inferSchema = True)
df.show()

+--------------------+-------------------+-------------------+-------------------+-------------------+
|           user_name|          idle_time|      working_hours|         start_time|           end_time|
+--------------------+-------------------+-------------------+-------------------+-------------------+
|  sahil24c@gmail.com|2019-10-24 05:05:00|2019-10-24 05:50:00|2019-10-24 08:30:02|2019-10-24 19:25:02|
|magadum.iranna@gm...|2019-10-24 02:15:00|2019-10-24 08:39:59|2019-10-24 08:30:02|2019-10-24 19:25:01|
|  yathink3@gmail.com|2019-10-24 01:30:00|2019-10-24 09:24:59|2019-10-24 08:30:02|2019-10-24 19:25:01|
|  shelkeva@gmail.com|2019-10-24 00:30:00|2019-10-24 09:10:01|2019-10-24 08:45:01|2019-10-24 18:25:02|
|puruissimple@gmai...|2019-10-24 03:15:00|2019-10-24 07:19:59|2019-10-24 08:50:02|2019-10-24 19:25:01|
|sangita.awaghad19...|2019-10-24 01:55:00|2019-10-24 08:40:00|2019-10-24 08:50:01|2019-10-24 19:25:01|
|vaishusawant143@g...|2019-10-24 00:35:00|2019-10-24 09:55:00|2019-10-24 

In [4]:
df1 = df.withColumn('hours', hour(df['start_time']))
x = df1.select('user_name','hours')
x.show()

+--------------------+-----+
|           user_name|hours|
+--------------------+-----+
|  sahil24c@gmail.com|    8|
|magadum.iranna@gm...|    8|
|  yathink3@gmail.com|    8|
|  shelkeva@gmail.com|    8|
|puruissimple@gmai...|    8|
|sangita.awaghad19...|    8|
|vaishusawant143@g...|    8|
|     you@example.com|    8|
|samadhanmahajan73...|    9|
|vishnu23kumar@gma...|    9|
|ashutoshrit64@gma...|    9|
|akshaybavalekar10...|    9|
|khairnarswapna99@...|    9|
|kukadeshilpaa7m95...|    9|
|sarikabarge111@gm...|    9|
|narsimharaj.kasu0...|    9|
|antonyalexcm@gmai...|    9|
|jitupatil937@gmai...|    9|
|akshaypatwari24@g...|    9|
|aheteshams007@gma...|    9|
+--------------------+-----+
only showing top 20 rows



In [5]:
x = x.withColumn('h_s', x['hours'] * 3600)
x.show(5)

+--------------------+-----+-----+
|           user_name|hours|  h_s|
+--------------------+-----+-----+
|  sahil24c@gmail.com|    8|28800|
|magadum.iranna@gm...|    8|28800|
|  yathink3@gmail.com|    8|28800|
|  shelkeva@gmail.com|    8|28800|
|puruissimple@gmai...|    8|28800|
+--------------------+-----+-----+
only showing top 5 rows



In [6]:
df2 = df.withColumn('min', minute(df['start_time']))
y = df2.select('user_name', 'min')
y = y.withColumn('m_s', y['min'] * 60)
y.show(5)

+--------------------+---+----+
|           user_name|min| m_s|
+--------------------+---+----+
|  sahil24c@gmail.com| 30|1800|
|magadum.iranna@gm...| 30|1800|
|  yathink3@gmail.com| 30|1800|
|  shelkeva@gmail.com| 45|2700|
|puruissimple@gmai...| 50|3000|
+--------------------+---+----+
only showing top 5 rows



In [7]:
df3 = df.withColumn('sec', second(df['start_time']))
z = df3.select('user_name', 'sec')
z.show(5)

+--------------------+---+
|           user_name|sec|
+--------------------+---+
|  sahil24c@gmail.com|  2|
|magadum.iranna@gm...|  2|
|  yathink3@gmail.com|  2|
|  shelkeva@gmail.com|  1|
|puruissimple@gmai...|  2|
+--------------------+---+
only showing top 5 rows



In [8]:
df4 = x.join(y, on = ['user_name'], how = 'inner')
df4.show(5)

+--------------------+-----+-----+---+----+
|           user_name|hours|  h_s|min| m_s|
+--------------------+-----+-----+---+----+
|  sahil24c@gmail.com|    8|28800| 30|1800|
|magadum.iranna@gm...|    8|28800| 30|1800|
|  yathink3@gmail.com|    8|28800| 30|1800|
|  shelkeva@gmail.com|    8|28800| 45|2700|
|puruissimple@gmai...|    8|28800| 50|3000|
+--------------------+-----+-----+---+----+
only showing top 5 rows



In [9]:
df5 = df4.join(z, on = ['user_name'], how = 'inner') 
df5.show(5)

+--------------------+-----+-----+---+----+---+
|           user_name|hours|  h_s|min| m_s|sec|
+--------------------+-----+-----+---+----+---+
|  sahil24c@gmail.com|    8|28800| 30|1800|  2|
|magadum.iranna@gm...|    8|28800| 30|1800|  2|
|  yathink3@gmail.com|    8|28800| 30|1800|  2|
|  shelkeva@gmail.com|    8|28800| 45|2700|  1|
|puruissimple@gmai...|    8|28800| 50|3000|  2|
+--------------------+-----+-----+---+----+---+
only showing top 5 rows



In [10]:
df6 = df5.withColumn('start_hours', (df5['h_s'] + df5['m_s'] + df5['sec'])/3600)
df6.show(5)

+--------------------+-----+-----+---+----+---+-----------------+
|           user_name|hours|  h_s|min| m_s|sec|      start_hours|
+--------------------+-----+-----+---+----+---+-----------------+
|  sahil24c@gmail.com|    8|28800| 30|1800|  2|8.500555555555556|
|magadum.iranna@gm...|    8|28800| 30|1800|  2|8.500555555555556|
|  yathink3@gmail.com|    8|28800| 30|1800|  2|8.500555555555556|
|  shelkeva@gmail.com|    8|28800| 45|2700|  1|8.750277777777777|
|puruissimple@gmai...|    8|28800| 50|3000|  2| 8.83388888888889|
+--------------------+-----+-----+---+----+---+-----------------+
only showing top 5 rows



In [11]:
df11 = df.withColumn('hours', hour(df['end_time']))
x1 = df11.select('user_name','hours')
x1.show(5)

+--------------------+-----+
|           user_name|hours|
+--------------------+-----+
|  sahil24c@gmail.com|   19|
|magadum.iranna@gm...|   19|
|  yathink3@gmail.com|   19|
|  shelkeva@gmail.com|   18|
|puruissimple@gmai...|   19|
+--------------------+-----+
only showing top 5 rows



In [12]:
x1 = x1.withColumn('h_s', x1['hours'] * 3600)
x1.show(5)

+--------------------+-----+-----+
|           user_name|hours|  h_s|
+--------------------+-----+-----+
|  sahil24c@gmail.com|   19|68400|
|magadum.iranna@gm...|   19|68400|
|  yathink3@gmail.com|   19|68400|
|  shelkeva@gmail.com|   18|64800|
|puruissimple@gmai...|   19|68400|
+--------------------+-----+-----+
only showing top 5 rows



In [13]:
df22 = df.withColumn('min', minute(df['end_time']))
y1 = df22.select('user_name', 'min')
y1 = y1.withColumn('m_s', y1['min'] * 60)
y1.show(5)

+--------------------+---+----+
|           user_name|min| m_s|
+--------------------+---+----+
|  sahil24c@gmail.com| 25|1500|
|magadum.iranna@gm...| 25|1500|
|  yathink3@gmail.com| 25|1500|
|  shelkeva@gmail.com| 25|1500|
|puruissimple@gmai...| 25|1500|
+--------------------+---+----+
only showing top 5 rows



In [14]:
df33 = df.withColumn('sec', second(df['end_time']))
z1 = df33.select('user_name', 'sec')
z1.show(5)

+--------------------+---+
|           user_name|sec|
+--------------------+---+
|  sahil24c@gmail.com|  2|
|magadum.iranna@gm...|  1|
|  yathink3@gmail.com|  1|
|  shelkeva@gmail.com|  2|
|puruissimple@gmai...|  1|
+--------------------+---+
only showing top 5 rows



In [15]:
df44 = x1.join(y1, on = ['user_name'], how = 'inner')
df44.show(5)

+--------------------+-----+-----+---+----+
|           user_name|hours|  h_s|min| m_s|
+--------------------+-----+-----+---+----+
|  sahil24c@gmail.com|   19|68400| 25|1500|
|magadum.iranna@gm...|   19|68400| 25|1500|
|  yathink3@gmail.com|   19|68400| 25|1500|
|  shelkeva@gmail.com|   18|64800| 25|1500|
|puruissimple@gmai...|   19|68400| 25|1500|
+--------------------+-----+-----+---+----+
only showing top 5 rows



In [16]:
df55 = df44.join(z1, on = ['user_name'], how = 'inner')
df55.show(5)

+--------------------+-----+-----+---+----+---+
|           user_name|hours|  h_s|min| m_s|sec|
+--------------------+-----+-----+---+----+---+
|  sahil24c@gmail.com|   19|68400| 25|1500|  2|
|magadum.iranna@gm...|   19|68400| 25|1500|  1|
|  yathink3@gmail.com|   19|68400| 25|1500|  1|
|  shelkeva@gmail.com|   18|64800| 25|1500|  2|
|puruissimple@gmai...|   19|68400| 25|1500|  1|
+--------------------+-----+-----+---+----+---+
only showing top 5 rows



In [17]:
df66 = df55.drop('hours', 'min')
df66.show(5)

+--------------------+-----+----+---+
|           user_name|  h_s| m_s|sec|
+--------------------+-----+----+---+
|  sahil24c@gmail.com|68400|1500|  2|
|magadum.iranna@gm...|68400|1500|  1|
|  yathink3@gmail.com|68400|1500|  1|
|  shelkeva@gmail.com|64800|1500|  2|
|puruissimple@gmai...|68400|1500|  1|
+--------------------+-----+----+---+
only showing top 5 rows



In [18]:
df77 = df66.withColumn('end_hours', (df66['h_s'] + df66['m_s'] + df66['sec'])/3600)
df77.show(5)

+--------------------+-----+----+---+------------------+
|           user_name|  h_s| m_s|sec|         end_hours|
+--------------------+-----+----+---+------------------+
|  sahil24c@gmail.com|68400|1500|  2| 19.41722222222222|
|magadum.iranna@gm...|68400|1500|  1|19.416944444444443|
|  yathink3@gmail.com|68400|1500|  1|19.416944444444443|
|  shelkeva@gmail.com|64800|1500|  2| 18.41722222222222|
|puruissimple@gmai...|68400|1500|  1|19.416944444444443|
+--------------------+-----+----+---+------------------+
only showing top 5 rows



In [19]:
df8 = df6.select("user_name","start_hours")
df8.show(5)

+--------------------+-----------------+
|           user_name|      start_hours|
+--------------------+-----------------+
|  sahil24c@gmail.com|8.500555555555556|
|magadum.iranna@gm...|8.500555555555556|
|  yathink3@gmail.com|8.500555555555556|
|  shelkeva@gmail.com|8.750277777777777|
|puruissimple@gmai...| 8.83388888888889|
+--------------------+-----------------+
only showing top 5 rows



In [20]:
df9 = df8.join(df77, on = ['user_name'], how = 'inner')
df9.show(5)

+--------------------+------------------+-----+----+---+------------------+
|           user_name|       start_hours|  h_s| m_s|sec|         end_hours|
+--------------------+------------------+-----+----+---+------------------+
|ashutoshrit64@gma...| 9.000277777777777|68400|1500|  2| 19.41722222222222|
|giridhardandikwar...|10.667222222222222|68400|1500|  2| 19.41722222222222|
|       nikitapawar17|11.250555555555556|68400|1500|  2| 19.41722222222222|
|salinabodale73@gm...|10.250277777777777|68400|1500|  1|19.416944444444443|
|mail2anik.officia...| 8.500277777777777|36000|   0|  1|10.000277777777777|
+--------------------+------------------+-----+----+---+------------------+
only showing top 5 rows



In [21]:
df10 = df9.select("user_name","start_hours","end_hours")
df10.show(5)

+--------------------+------------------+------------------+
|           user_name|       start_hours|         end_hours|
+--------------------+------------------+------------------+
|ashutoshrit64@gma...| 9.000277777777777| 19.41722222222222|
|giridhardandikwar...|10.667222222222222| 19.41722222222222|
|       nikitapawar17|11.250555555555556| 19.41722222222222|
|salinabodale73@gm...|10.250277777777777|19.416944444444443|
|mail2anik.officia...| 8.500277777777777|10.000277777777777|
+--------------------+------------------+------------------+
only showing top 5 rows



In [22]:
df10 = df9.select("user_name","start_hours","end_hours")
df10.show(5)

+--------------------+------------------+------------------+
|           user_name|       start_hours|         end_hours|
+--------------------+------------------+------------------+
|ashutoshrit64@gma...| 9.000277777777777| 19.41722222222222|
|giridhardandikwar...|10.667222222222222| 19.41722222222222|
|       nikitapawar17|11.250555555555556| 19.41722222222222|
|salinabodale73@gm...|10.250277777777777|19.416944444444443|
|mail2anik.officia...| 8.500277777777777|10.000277777777777|
+--------------------+------------------+------------------+
only showing top 5 rows



In [25]:
df11 = df10.withColumn('Time_spent', (df10['end_hours'] - df10["start_hours"]))
df11.show(5)

+--------------------+------------------+------------------+------------------+
|           user_name|       start_hours|         end_hours|        Time_spent|
+--------------------+------------------+------------------+------------------+
|ashutoshrit64@gma...| 9.000277777777777| 19.41722222222222|10.416944444444445|
|giridhardandikwar...|10.667222222222222| 19.41722222222222|              8.75|
|       nikitapawar17|11.250555555555556| 19.41722222222222| 8.166666666666666|
|salinabodale73@gm...|10.250277777777777|19.416944444444443| 9.166666666666666|
|mail2anik.officia...| 8.500277777777777|10.000277777777777|               1.5|
+--------------------+------------------+------------------+------------------+
only showing top 5 rows



In [27]:
df12 = df11.select('user_name','Time_spent')
df12.show(5)

+--------------------+------------------+
|           user_name|        Time_spent|
+--------------------+------------------+
|ashutoshrit64@gma...|10.416944444444445|
|giridhardandikwar...|              8.75|
|       nikitapawar17| 8.166666666666666|
|salinabodale73@gm...| 9.166666666666666|
|mail2anik.officia...|               1.5|
+--------------------+------------------+
only showing top 5 rows



In [28]:
average = df11.select(avg('Time_spent'))
average.show()

+-----------------+
|  avg(Time_spent)|
+-----------------+
|9.168699494949495|
+-----------------+



In [29]:
# import pymysql

In [31]:
# db_connection = pymysql.connect(host="localhost", user="admin", passwd="Admin@12", database="spark")
# db_cursor = db_connection.cursor()

In [40]:
# query = 'insert into avg_time_spent values("%s")';(average)

DataFrame[avg(Time_spent): double]

In [41]:
# db_cursor.execute(query)
# db_connection.commit()

InternalError: (1366, "Incorrect decimal value: '%s' for column 'avg_time' at row 1")