In [1]:
import sys
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import ArrayType, StructType, StructField, StringType, StringType, IntegerType
import pandas as pd

In [2]:
appName = "Pyspark Example - JSON file to Spark Data Frame"
master = "local"

In [3]:
# create Spark Session

spark = SparkSession.builder.appName(appName).master(master).getOrCreate()

In [4]:
# create a schema for the dataframe

schema = StructType([StructField('Category', StringType(), True),
                     StructField('Count', IntegerType(), True),
                     StructField('Description', StringType(), True),])

In [5]:
# create DataFrame

json_file = 'example.json'

df_json_file = spark.read.json(json_file, schema, multiLine=True)

print(df_json_file.schema)

df_json_file.show()

StructType(List(StructField(Category,StringType,true),StructField(Count,IntegerType,true),StructField(Description,StringType,true)))
+----------+-----+------------------+
|  Category|Count|       Description|
+----------+-----+------------------+
|Category A|  100|This is category A|
|Category B|  120|This is category B|
|Category C|  150|This is category C|
+----------+-----+------------------+



In [6]:
df = spark.read.option("multiline", "true").json("data.json")

#df.printSchema()

In [7]:
print(df.schema)

df.show()

StructType(List(StructField(_id,StructType(List(StructField($oid,StringType,true))),true),StructField(date,StructType(List(StructField($date,LongType,true))),true),StructField(dtHour,StringType,true),StructField(humidity,StringType,true),StructField(roomId,StringType,true),StructField(tempC,StringType,true),StructField(tempF,StringType,true)))
+--------------------+---------------+-------------------+---------------+------+--------------+--------------+
|                 _id|           date|             dtHour|       humidity|roomId|         tempC|         tempF|
+--------------------+---------------+-------------------+---------------+------+--------------+--------------+
|{60b79cf2390a4678...|{1622646002682}|02-06-2021 16:00:02|Humidity: 78.8%|     1|Temp C: 21.2 C|Temp F: 70.2 F|
|{60b79cf2dc7e036f...|{1622646002379}|02-06-2021 16:00:02|  Humidity: 57%|     2|Temp C: 23.0 C|Temp F: 73.4 F|
|{60b79cf8390a4678...|{1622646008088}|02-06-2021 16:00:08|             78|     3|          15.

In [8]:
import pyspark.sql.functions as f

df_date = df.withColumn('dateHour', f.to_timestamp('dtHour', format='yyyy-MM-dd HH:mm:ss'))

df_date.show()

#print(df_date.schema)

df_date.printSchema()


+--------------------+---------------+-------------------+---------------+------+--------------+--------------+--------+
|                 _id|           date|             dtHour|       humidity|roomId|         tempC|         tempF|dateHour|
+--------------------+---------------+-------------------+---------------+------+--------------+--------------+--------+
|{60b79cf2390a4678...|{1622646002682}|02-06-2021 16:00:02|Humidity: 78.8%|     1|Temp C: 21.2 C|Temp F: 70.2 F|    null|
|{60b79cf2dc7e036f...|{1622646002379}|02-06-2021 16:00:02|  Humidity: 57%|     2|Temp C: 23.0 C|Temp F: 73.4 F|    null|
|{60b79cf8390a4678...|{1622646008088}|02-06-2021 16:00:08|             78|     3|          15.3|          59.5|    null|
|{60b79f48390a4678...|{1622646600595}|02-06-2021 16:10:00|Humidity: 79.3%|     1|Temp C: 21.4 C|Temp F: 70.5 F|    null|
|{60b79f48dc7e036f...|{1622646600698}|02-06-2021 16:10:00|  Humidity: 57%|     2|Temp C: 23.0 C|Temp F: 73.4 F|    null|
|{60b79f4a390a4678...|{162264660

In [52]:
## Humidity Cell --1

import pyspark.sql.functions as f
from pyspark.sql.functions import regexp_replace


df_humidity = df.withColumn("humidity", f.regexp_replace(f.col("humidity"),"[Humidity: %]",""))

#df_humidity.show()

df_humidity_room1 = df_humidity.select(df_humidity['dtHour'],df_humidity['roomId'],df_humidity['humidity'])

df_humidity_room1.show()

df_humidity_room1.printSchema()

+-------------------+------+--------+
|             dtHour|roomId|humidity|
+-------------------+------+--------+
|02-06-2021 16:00:02|     1|    78.8|
|02-06-2021 16:00:02|     2|      57|
|02-06-2021 16:00:08|     3|      78|
|02-06-2021 16:10:00|     1|    79.3|
|02-06-2021 16:10:00|     2|      57|
|02-06-2021 16:10:02|     3|      78|
|02-06-2021 16:20:00|     1|    75.2|
|02-06-2021 16:20:00|     2|      57|
|02-06-2021 16:20:02|     3|      78|
|02-06-2021 16:30:00|     1|    75.7|
|02-06-2021 16:30:00|     2|      57|
|02-06-2021 16:30:02|     3|      78|
|02-06-2021 16:40:00|     1|    75.5|
|02-06-2021 16:40:00|     2|      57|
|02-06-2021 16:40:03|     3|      78|
|02-06-2021 16:50:00|     1|    75.7|
|02-06-2021 16:50:00|     2|      57|
|02-06-2021 16:50:02|     3|      78|
|02-06-2021 17:00:00|     1|    75.5|
|02-06-2021 17:00:00|     2|      56|
+-------------------+------+--------+
only showing top 20 rows

root
 |-- dtHour: string (nullable = true)
 |-- roomId: string

In [76]:
### ROOM1 ROOM2 ROOM3 Humidity----Cell-02

#df_humidity_room1 = df_humidity.select(df_humidity['dtHour'],df_humidity['roomId'],df_humidity['humidity'])
#df_humidity_room2 = df_humidity.select(df_humidity['dtHour'],df_humidity['roomId'],df_humidity['humidity'])
df_humidity_room3 = df_humidity.select(df_humidity['dtHour'],df_humidity['roomId'],df_humidity['humidity'])

#df_room1_humi = df_humidity_room1.where(df_humidity_room1['roomId'] == 1).collect()
#df_room2_humi = df_humidity_room1.where(df_humidity_room2['roomId'] == 2).collect()
df_room3_humi = df_humidity_room3.where(df_humidity_room2['roomId'] == 3).collect()

#df_humidity_room1.show()

print(df_room3_humi)

[Row(dtHour='02-06-2021 16:00:08', roomId='3', humidity='78'), Row(dtHour='02-06-2021 16:10:02', roomId='3', humidity='78'), Row(dtHour='02-06-2021 16:20:02', roomId='3', humidity='78'), Row(dtHour='02-06-2021 16:30:02', roomId='3', humidity='78'), Row(dtHour='02-06-2021 16:40:03', roomId='3', humidity='78'), Row(dtHour='02-06-2021 16:50:02', roomId='3', humidity='78'), Row(dtHour='02-06-2021 17:00:04', roomId='3', humidity='78'), Row(dtHour='02-06-2021 17:10:02', roomId='3', humidity='73'), Row(dtHour='02-06-2021 17:20:03', roomId='3', humidity='73'), Row(dtHour='02-06-2021 17:30:02', roomId='3', humidity='73'), Row(dtHour='02-06-2021 17:40:02', roomId='3', humidity='54'), Row(dtHour='02-06-2021 17:50:02', roomId='3', humidity='54'), Row(dtHour='02-06-2021 18:00:03', roomId='3', humidity='54'), Row(dtHour='02-06-2021 18:10:02', roomId='3', humidity='61'), Row(dtHour='02-06-2021 18:20:02', roomId='3', humidity='61'), Row(dtHour='02-06-2021 18:30:02', roomId='3', humidity='61'), Row(dtH

In [77]:
### ROOM1 ROOM2 ROOM3 Humidity ----Cell-03

#sensor_humidity1 = pd.DataFrame(df_room1_humi, columns=['Timestamp','roomId','humidity'])#.set_index('Timestamp')
#sensor_humidity2 = pd.DataFrame(df_room2_humi, columns=['Timestamp','roomId','humidity'])#.set_index('Timestamp')
sensor_humidity3 = pd.DataFrame(df_room3_humi, columns=['Timestamp','roomId','humidity'])#.set_index('Timestamp')

print(sensor_humidity3)


                Timestamp roomId humidity
0     02-06-2021 16:00:08      3       78
1     02-06-2021 16:10:02      3       78
2     02-06-2021 16:20:02      3       78
3     02-06-2021 16:30:02      3       78
4     02-06-2021 16:40:03      3       78
...                   ...    ...      ...
2735  23-06-2021 18:50:04      3       30
2736  23-06-2021 19:00:03      3       31
2737  23-06-2021 19:10:04      3       31
2738  23-06-2021 19:20:04      3       31
2739  23-06-2021 19:30:03      3       31

[2740 rows x 3 columns]


In [80]:
### ROOM1 ROOM2 ROOM3 Humidity----Cell-04

### SAVE FILE ###

### Finally Done: Timestamp column into dtype:datetime and humidity column into dtype:float ###


#sensor_humidity1['Timestamp'] = pd.to_datetime(sensor_humidity1["Timestamp"])
#sensor_humidity2['Timestamp'] = pd.to_datetime(sensor_humidity2["Timestamp"])
sensor_humidity3['Timestamp'] = pd.to_datetime(sensor_humidity3["Timestamp"])

#sensor_humidity1['humidity'] = pd.to_numeric(sensor_humidity1['humidity'], errors='ignore')
#sensor_humidity2['humidity'] = pd.to_numeric(sensor_humidity2['humidity'], errors='ignore')
sensor_humidity3['humidity'] = pd.to_numeric(sensor_humidity3['humidity'], errors='ignore')

#df_humidity1 = sensor_humidity1[['Timestamp','humidity']]
#df_humidity2 = sensor_humidity2[['Timestamp','humidity']]
df_humidity3 = sensor_humidity3[['Timestamp','humidity']]


#df_humidity1.to_csv('humidity_room1.csv', index=False)  ## Save file in csv without index
#df_humidity2.to_csv('humidity_room2.csv', index=False)  ## Save file in csv without index
#df_humidity3.to_csv('humidity_room3.csv', index=False)  ## Save file in csv without index

#df_humidity1.to_csv('humidity_room1_index.csv', index=False)  ## Save file in csv with index
#df_humidity2.to_csv('humidity_room2_index.csv', index=False)  ## Save file in csv with index
#df_humidity3.to_csv('humidity_room3_index.csv', index=False)  ## Save file in csv with index



#print(df_humidity1['Timestamp'])
#print(df_humidity1['humidity'])
#print(df_humidity3)


In [41]:
## TempC Cell --1

import pyspark.sql.functions as f
from pyspark.sql.functions import regexp_replace


df_tempC = df.withColumn("tempC", f.regexp_replace(f.col("tempC"),"[Temp C:]",""))

#df_tempC.show()

df_tempC_room1 = df_tempC.select(df_tempC['dtHour'],df_tempC['roomId'],df_tempC['tempC'])

df_tempC_room1.show(200)

df_tempC_room1.printSchema()

+-------------------+------+-----+
|             dtHour|roomId|tempC|
+-------------------+------+-----+
|02-06-2021 16:00:02|     1| 21.2|
|02-06-2021 16:00:02|     2| 23.0|
|02-06-2021 16:00:08|     3| 15.3|
|02-06-2021 16:10:00|     1| 21.4|
|02-06-2021 16:10:00|     2| 23.0|
|02-06-2021 16:10:02|     3| 15.3|
|02-06-2021 16:20:00|     1| 21.2|
|02-06-2021 16:20:00|     2| 24.0|
|02-06-2021 16:20:02|     3| 15.3|
|02-06-2021 16:30:00|     1| 21.1|
|02-06-2021 16:30:00|     2| 24.0|
|02-06-2021 16:30:02|     3| 15.3|
|02-06-2021 16:40:00|     1| 21.2|
|02-06-2021 16:40:00|     2| 24.0|
|02-06-2021 16:40:03|     3| 15.3|
|02-06-2021 16:50:00|     1| 21.2|
|02-06-2021 16:50:00|     2| 24.0|
|02-06-2021 16:50:02|     3| 15.3|
|02-06-2021 17:00:00|     1| 21.1|
|02-06-2021 17:00:00|     2| 24.0|
|02-06-2021 17:00:04|     3| 15.3|
|02-06-2021 17:10:00|     1| 21.1|
|02-06-2021 17:10:00|     2| 24.0|
|02-06-2021 17:10:02|     3| 14.6|
|02-06-2021 17:20:00|     1| 21.2|
|02-06-2021 17:20:00

In [11]:
### ROOM3_Temp ----Cell-01

### Using SQL -- we need to register it as a temporary view

df_tempC_room1.createOrReplaceTempView('tempC3')

sql_df3 = spark.sql('select * From tempC3 where roomId == 3')

sql_df3.show()

type(sql_df3)

sql_df3.printSchema()

+-------------------+------+-----+
|             dtHour|roomId|tempC|
+-------------------+------+-----+
|02-06-2021 16:00:08|     3| 15.3|
|02-06-2021 16:10:02|     3| 15.3|
|02-06-2021 16:20:02|     3| 15.3|
|02-06-2021 16:30:02|     3| 15.3|
|02-06-2021 16:40:03|     3| 15.3|
|02-06-2021 16:50:02|     3| 15.3|
|02-06-2021 17:00:04|     3| 15.3|
|02-06-2021 17:10:02|     3| 14.6|
|02-06-2021 17:20:03|     3| 14.6|
|02-06-2021 17:30:02|     3| 14.6|
|02-06-2021 17:40:02|     3| 18.1|
|02-06-2021 17:50:02|     3| 18.1|
|02-06-2021 18:00:03|     3| 18.1|
|02-06-2021 18:10:02|     3| 15.3|
|02-06-2021 18:20:02|     3| 15.3|
|02-06-2021 18:30:02|     3| 15.3|
|02-06-2021 18:40:02|     3| 15.3|
|02-06-2021 18:50:02|     3| 15.3|
|02-06-2021 19:00:03|     3| 14.8|
|02-06-2021 19:10:02|     3| 14.8|
+-------------------+------+-----+
only showing top 20 rows

root
 |-- dtHour: string (nullable = true)
 |-- roomId: string (nullable = true)
 |-- tempC: string (nullable = true)



In [13]:
### ROOM3_Temp ----Cell-02
# convert spark df to pandas df

pandasDF3 = sql_df3.toPandas()


print(pandasDF3)
type(pandasDF3)


                   dtHour roomId tempC
0     02-06-2021 16:00:08      3  15.3
1     02-06-2021 16:10:02      3  15.3
2     02-06-2021 16:20:02      3  15.3
3     02-06-2021 16:30:02      3  15.3
4     02-06-2021 16:40:03      3  15.3
...                   ...    ...   ...
2735  23-06-2021 18:50:04      3  19.3
2736  23-06-2021 19:00:03      3  18.9
2737  23-06-2021 19:10:04      3  18.9
2738  23-06-2021 19:20:04      3  18.9
2739  23-06-2021 19:30:03      3  18.9

[2740 rows x 3 columns]


pandas.core.frame.DataFrame

In [15]:
### ROOM3_Temp ----Cell-03

df_tempC_room3 = df_tempC.select(df_tempC['dtHour'],df_tempC['roomId'],df_tempC['tempC'])

df_room3 = df_tempC_room3.where(df_tempC_room3['roomId'] == 3).collect()

#df_tempC_room3.show()

print(df_room3)



[Row(dtHour='02-06-2021 16:00:08', roomId='3', tempC='15.3'), Row(dtHour='02-06-2021 16:10:02', roomId='3', tempC='15.3'), Row(dtHour='02-06-2021 16:20:02', roomId='3', tempC='15.3'), Row(dtHour='02-06-2021 16:30:02', roomId='3', tempC='15.3'), Row(dtHour='02-06-2021 16:40:03', roomId='3', tempC='15.3'), Row(dtHour='02-06-2021 16:50:02', roomId='3', tempC='15.3'), Row(dtHour='02-06-2021 17:00:04', roomId='3', tempC='15.3'), Row(dtHour='02-06-2021 17:10:02', roomId='3', tempC='14.6'), Row(dtHour='02-06-2021 17:20:03', roomId='3', tempC='14.6'), Row(dtHour='02-06-2021 17:30:02', roomId='3', tempC='14.6'), Row(dtHour='02-06-2021 17:40:02', roomId='3', tempC='18.1'), Row(dtHour='02-06-2021 17:50:02', roomId='3', tempC='18.1'), Row(dtHour='02-06-2021 18:00:03', roomId='3', tempC='18.1'), Row(dtHour='02-06-2021 18:10:02', roomId='3', tempC='15.3'), Row(dtHour='02-06-2021 18:20:02', roomId='3', tempC='15.3'), Row(dtHour='02-06-2021 18:30:02', roomId='3', tempC='15.3'), Row(dtHour='02-06-2021 

In [17]:
### ROOM3_Temp ----Cell-04

sensor_temp3 = pd.DataFrame(df_room3, columns=['Timestamp','roomId','temp'])#.set_index('Timestamp')

print(sensor_temp3)


                Timestamp roomId  temp
0     02-06-2021 16:00:08      3  15.3
1     02-06-2021 16:10:02      3  15.3
2     02-06-2021 16:20:02      3  15.3
3     02-06-2021 16:30:02      3  15.3
4     02-06-2021 16:40:03      3  15.3
...                   ...    ...   ...
2735  23-06-2021 18:50:04      3  19.3
2736  23-06-2021 19:00:03      3  18.9
2737  23-06-2021 19:10:04      3  18.9
2738  23-06-2021 19:20:04      3  18.9
2739  23-06-2021 19:30:03      3  18.9

[2740 rows x 3 columns]


In [20]:
### ROOM3_Temp ----Cell-05

### SAVE FILE ###

### Finally Done: Timestamp column into dtype:datetime and temp column into dtype:float ###

sensor_temp3['Timestamp'] = pd.to_datetime(sensor_temp3["Timestamp"])

sensor_temp3['temp'] = pd.to_numeric(sensor_temp3['temp'], errors='ignore')

df_temp3 = sensor_temp3[['Timestamp','temp']]


#df_temp3.to_csv('temp_room3.csv', index=False)  ## Save file in csv without index

#df_temp3.to_csv('temp_room3_index.csv', index=False)  ## Save file in csv with index



#print(df_temp['Timestamp'])
#print(df_temp['temp'])
#print(df_temp)


In [23]:
### ROOM2_Temp ----Cell-01
### Using SQL -- we need to register it as a temporary view

df_tempC_room1.createOrReplaceTempView('tempC2')

sql_df2 = spark.sql('select * From tempC2 where roomId == 2')

sql_df2.show()

type(sql_df2)

sql_df2.printSchema()

+-------------------+------+-----+
|             dtHour|roomId|tempC|
+-------------------+------+-----+
|02-06-2021 16:00:02|     2| 23.0|
|02-06-2021 16:10:00|     2| 23.0|
|02-06-2021 16:20:00|     2| 24.0|
|02-06-2021 16:30:00|     2| 24.0|
|02-06-2021 16:40:00|     2| 24.0|
|02-06-2021 16:50:00|     2| 24.0|
|02-06-2021 17:00:00|     2| 24.0|
|02-06-2021 17:10:00|     2| 24.0|
|02-06-2021 17:20:00|     2| 24.0|
|02-06-2021 17:30:00|     2| 24.0|
|02-06-2021 17:40:00|     2| 24.0|
|02-06-2021 17:50:00|     2| 24.0|
|02-06-2021 18:00:00|     2| 23.0|
|02-06-2021 18:10:00|     2| 23.0|
|02-06-2021 18:20:00|     2| 23.0|
|02-06-2021 18:30:00|     2| 23.0|
|02-06-2021 18:40:00|     2| 23.0|
|02-06-2021 18:50:00|     2| 23.0|
|02-06-2021 19:00:00|     2| 23.0|
|02-06-2021 19:10:00|     2| 23.0|
+-------------------+------+-----+
only showing top 20 rows

root
 |-- dtHour: string (nullable = true)
 |-- roomId: string (nullable = true)
 |-- tempC: string (nullable = true)



In [24]:
### ROOM2_Temp ----Cell-02
# convert spark df to pandas df

pandasDF2 = sql_df2.toPandas()


print(pandasDF2)
type(pandasDF2)

                   dtHour roomId tempC
0     02-06-2021 16:00:02      2  23.0
1     02-06-2021 16:10:00      2  23.0
2     02-06-2021 16:20:00      2  24.0
3     02-06-2021 16:30:00      2  24.0
4     02-06-2021 16:40:00      2  24.0
...                   ...    ...   ...
2311  24-06-2021 08:40:00      2  20.0
2312  24-06-2021 08:50:00      2  20.0
2313  24-06-2021 09:00:00      2  20.0
2314  24-06-2021 09:10:00      2  20.0
2315  24-06-2021 09:20:00      2  20.0

[2316 rows x 3 columns]


pandas.core.frame.DataFrame

In [26]:
### ROOM2_Temp ----Cell-03

df_tempC_room2 = df_tempC.select(df_tempC['dtHour'],df_tempC['roomId'],df_tempC['tempC'])

df_room2 = df_tempC_room2.where(df_tempC_room2['roomId'] == 2).collect()

#df_tempC_room2.show()

print(df_room2)

[Row(dtHour='02-06-2021 16:00:02', roomId='2', tempC='23.0'), Row(dtHour='02-06-2021 16:10:00', roomId='2', tempC='23.0'), Row(dtHour='02-06-2021 16:20:00', roomId='2', tempC='24.0'), Row(dtHour='02-06-2021 16:30:00', roomId='2', tempC='24.0'), Row(dtHour='02-06-2021 16:40:00', roomId='2', tempC='24.0'), Row(dtHour='02-06-2021 16:50:00', roomId='2', tempC='24.0'), Row(dtHour='02-06-2021 17:00:00', roomId='2', tempC='24.0'), Row(dtHour='02-06-2021 17:10:00', roomId='2', tempC='24.0'), Row(dtHour='02-06-2021 17:20:00', roomId='2', tempC='24.0'), Row(dtHour='02-06-2021 17:30:00', roomId='2', tempC='24.0'), Row(dtHour='02-06-2021 17:40:00', roomId='2', tempC='24.0'), Row(dtHour='02-06-2021 17:50:00', roomId='2', tempC='24.0'), Row(dtHour='02-06-2021 18:00:00', roomId='2', tempC='23.0'), Row(dtHour='02-06-2021 18:10:00', roomId='2', tempC='23.0'), Row(dtHour='02-06-2021 18:20:00', roomId='2', tempC='23.0'), Row(dtHour='02-06-2021 18:30:00', roomId='2', tempC='23.0'), Row(dtHour='02-06-2021 

In [27]:
### ROOM2_Temp ----Cell-04


sensor_temp2 = pd.DataFrame(df_room2, columns=['Timestamp','roomId','temp'])#.set_index('Timestamp')

print(sensor_temp2)

                Timestamp roomId  temp
0     02-06-2021 16:00:02      2  23.0
1     02-06-2021 16:10:00      2  23.0
2     02-06-2021 16:20:00      2  24.0
3     02-06-2021 16:30:00      2  24.0
4     02-06-2021 16:40:00      2  24.0
...                   ...    ...   ...
2311  24-06-2021 08:40:00      2  20.0
2312  24-06-2021 08:50:00      2  20.0
2313  24-06-2021 09:00:00      2  20.0
2314  24-06-2021 09:10:00      2  20.0
2315  24-06-2021 09:20:00      2  20.0

[2316 rows x 3 columns]


In [30]:
### ROOM2_Temp ----Cell-05
### SAVE FILE ###
### Finally Done: Timestamp column into dtype:datetime and temp column into dtype:float ###

sensor_temp2['Timestamp'] = pd.to_datetime(sensor_temp2["Timestamp"])

sensor_temp2['temp'] = pd.to_numeric(sensor_temp2['temp'], errors='ignore')

df_temp2 = sensor_temp2[['Timestamp','temp']]


#df_temp2.to_csv('temp_room2.csv', index=False)  ## Save file in csv without index

#df_temp2.to_csv('temp_room2_index.csv', index=False)  ## Save file in csv with index



#print(df_temp['Timestamp'])
#print(df_temp['temp'])
#print(df_temp)

In [16]:
### ROOM1_Temp ----Cell-01
### Using SQL -- we need to register it as a temporary view

df_tempC_room1.createOrReplaceTempView('tempC')

sql_df = spark.sql('select * From tempC where roomId == 1')

sql_df.show()

type(sql_df)

sql_df.printSchema()

+-------------------+------+-----+
|             dtHour|roomId|tempC|
+-------------------+------+-----+
|02-06-2021 16:00:02|     1| 21.2|
|02-06-2021 16:10:00|     1| 21.4|
|02-06-2021 16:20:00|     1| 21.2|
|02-06-2021 16:30:00|     1| 21.1|
|02-06-2021 16:40:00|     1| 21.2|
|02-06-2021 16:50:00|     1| 21.2|
|02-06-2021 17:00:00|     1| 21.1|
|02-06-2021 17:10:00|     1| 21.1|
|02-06-2021 17:20:00|     1| 21.2|
|02-06-2021 17:30:00|     1| 21.2|
|02-06-2021 17:40:00|     1| 21.2|
|02-06-2021 17:50:00|     1| 21.2|
|02-06-2021 18:00:00|     1| 21.2|
|02-06-2021 18:10:00|     1| 21.2|
|02-06-2021 18:20:00|     1| 21.1|
|02-06-2021 18:30:00|     1| 21.0|
|02-06-2021 18:40:00|     1| 21.0|
|02-06-2021 18:50:00|     1| 20.9|
|02-06-2021 19:00:00|     1| 20.8|
|02-06-2021 19:10:00|     1| 20.8|
+-------------------+------+-----+
only showing top 20 rows

root
 |-- dtHour: string (nullable = true)
 |-- roomId: string (nullable = true)
 |-- tempC: string (nullable = true)



In [17]:
### ROOM1_Temp ----Cell-02
# convert spark df to pandas df

pandasDF = sql_df.toPandas()


print(pandasDF)
type(pandasDF)

                   dtHour roomId tempC
0     02-06-2021 16:00:02      1  21.2
1     02-06-2021 16:10:00      1  21.4
2     02-06-2021 16:20:00      1  21.2
3     02-06-2021 16:30:00      1  21.1
4     02-06-2021 16:40:00      1  21.2
...                   ...    ...   ...
2767  23-06-2021 19:00:00      1  22.5
2768  23-06-2021 19:10:00      1  22.3
2769  23-06-2021 19:20:00      1  22.2
2770  23-06-2021 19:30:00      1  22.1
2771  24-06-2021 09:20:02      1  21.9

[2772 rows x 3 columns]


pandas.core.frame.DataFrame

In [18]:
### ROOM1_Temp ----Cell-03

df_tempC_room1 = df_tempC.select(df_tempC['dtHour'],df_tempC['roomId'],df_tempC['tempC'])

df_room1 = df_tempC_room1.where(df_tempC_room1['roomId'] == 1).collect()

#df_tempC_room1.show()

print(df_room1)

[Row(dtHour='02-06-2021 16:00:02', roomId='1', tempC='21.2'), Row(dtHour='02-06-2021 16:10:00', roomId='1', tempC='21.4'), Row(dtHour='02-06-2021 16:20:00', roomId='1', tempC='21.2'), Row(dtHour='02-06-2021 16:30:00', roomId='1', tempC='21.1'), Row(dtHour='02-06-2021 16:40:00', roomId='1', tempC='21.2'), Row(dtHour='02-06-2021 16:50:00', roomId='1', tempC='21.2'), Row(dtHour='02-06-2021 17:00:00', roomId='1', tempC='21.1'), Row(dtHour='02-06-2021 17:10:00', roomId='1', tempC='21.1'), Row(dtHour='02-06-2021 17:20:00', roomId='1', tempC='21.2'), Row(dtHour='02-06-2021 17:30:00', roomId='1', tempC='21.2'), Row(dtHour='02-06-2021 17:40:00', roomId='1', tempC='21.2'), Row(dtHour='02-06-2021 17:50:00', roomId='1', tempC='21.2'), Row(dtHour='02-06-2021 18:00:00', roomId='1', tempC='21.2'), Row(dtHour='02-06-2021 18:10:00', roomId='1', tempC='21.2'), Row(dtHour='02-06-2021 18:20:00', roomId='1', tempC='21.1'), Row(dtHour='02-06-2021 18:30:00', roomId='1', tempC='21.0'), Row(dtHour='02-06-2021 

In [19]:
type(df_room1)

list

In [20]:
### ROOM1_Temp ----Cell-04

sensor_temp = pd.DataFrame(df_room1, columns=['Timestamp','roomId','temp'])#.set_index('Timestamp')

print(sensor_temp)

                Timestamp roomId  temp
0     02-06-2021 16:00:02      1  21.2
1     02-06-2021 16:10:00      1  21.4
2     02-06-2021 16:20:00      1  21.2
3     02-06-2021 16:30:00      1  21.1
4     02-06-2021 16:40:00      1  21.2
...                   ...    ...   ...
2767  23-06-2021 19:00:00      1  22.5
2768  23-06-2021 19:10:00      1  22.3
2769  23-06-2021 19:20:00      1  22.2
2770  23-06-2021 19:30:00      1  22.1
2771  24-06-2021 09:20:02      1  21.9

[2772 rows x 3 columns]


In [21]:
### ROOM1_Temp ----Cell-05
### SAVE FILE ###
### Finally Done: Timestamp column into dtype:datetime and temp column into dtype:float ###


sensor_temp['Timestamp'] = pd.to_datetime(sensor_temp["Timestamp"])

sensor_temp['temp'] = pd.to_numeric(sensor_temp['temp'], errors='ignore')

df_temp = sensor_temp[['Timestamp','temp']]


#df_temp.to_csv('temp_room1.csv', index=False)  ## Save file in csv without index

#df_temp.to_csv('temp_room1_index.csv', index=False)  ## Save file in csv with index



#print(df_temp['Timestamp'])
#print(df_temp['temp'])
#print(df_temp)

In [17]:

df_temp = df_temp.set_index(df_temp['Timestamp'])

df_temp.index = pd.to_datetime(df_temp.index)

df_temp['temp'] = pd.to_numeric(df_temp['temp'])

#df_temp['Timestamp'] = pd.to_datetime(df_temp['Timestamp'])

print(df_temp['temp'])
print(df_temp['Timestamp'])

Timestamp
2021-02-06 16:00:02    21.2
2021-02-06 16:10:00    21.4
2021-02-06 16:20:00    21.2
2021-02-06 16:30:00    21.1
2021-02-06 16:40:00    21.2
                       ... 
2021-06-23 19:00:00    22.5
2021-06-23 19:10:00    22.3
2021-06-23 19:20:00    22.2
2021-06-23 19:30:00    22.1
2021-06-24 09:20:02    21.9
Name: temp, Length: 2772, dtype: float64
Timestamp
2021-02-06 16:00:02   2021-02-06 16:00:02
2021-02-06 16:10:00   2021-02-06 16:10:00
2021-02-06 16:20:00   2021-02-06 16:20:00
2021-02-06 16:30:00   2021-02-06 16:30:00
2021-02-06 16:40:00   2021-02-06 16:40:00
                              ...        
2021-06-23 19:00:00   2021-06-23 19:00:00
2021-06-23 19:10:00   2021-06-23 19:10:00
2021-06-23 19:20:00   2021-06-23 19:20:00
2021-06-23 19:30:00   2021-06-23 19:30:00
2021-06-24 09:20:02   2021-06-24 09:20:02
Name: Timestamp, Length: 2772, dtype: datetime64[ns]


In [24]:
import numpy

agg_hr = df_temp.groupby(pd.Grouper(freq='H')).aggregate(numpy.mean)

print(agg_hr)

                          temp
Timestamp                     
2021-02-06 16:00:00  21.216667
2021-02-06 17:00:00  21.166667
2021-02-06 18:00:00  21.066667
2021-02-06 19:00:00  20.666667
2021-02-06 20:00:00  20.333333
...                        ...
2021-12-06 19:00:00  20.233333
2021-12-06 20:00:00  20.183333
2021-12-06 21:00:00  20.116667
2021-12-06 22:00:00  20.100000
2021-12-06 23:00:00  20.050000

[7280 rows x 1 columns]


In [217]:
import matplotlib.pyplot as plt

#daily = data.resample('D')['impact'].count()
#monthly = daily.resample('M').mean()
#ax = monthly.plot(kind='bar')
#plt.show()



daily = df_temp.resample('D')['temp'].mean()
monthly = daily.resample('M').mean()
#ax = monthly.plot(kind='bar')
#plt.show()

print(monthly)

Timestamp
2021-02-28    20.497917
2021-03-31    19.546528
2021-04-30    19.763194
2021-05-31    18.997917
2021-06-30    21.020925
2021-07-31    20.192361
2021-08-31    20.225694
2021-09-30    20.485417
2021-10-31    19.900000
2021-11-30    20.426389
2021-12-31    20.101389
Freq: M, Name: temp, dtype: float64


In [201]:
dftempc = df_temp['temp']

print(dftempc)

Timestamp
2021-02-06 16:00:02    21.2
2021-02-06 16:10:00    21.4
2021-02-06 16:20:00    21.2
2021-02-06 16:30:00    21.1
2021-02-06 16:40:00    21.2
                       ... 
2021-06-23 19:00:00    22.5
2021-06-23 19:10:00    22.3
2021-06-23 19:20:00    22.2
2021-06-23 19:30:00    22.1
2021-06-24 09:20:02    21.9
Name: temp, Length: 2772, dtype: float64


In [179]:
#df['date'] = pd.to_datetime(df['date'])
#weekly_summary = df.story_point.resample('W', on='date').sum()

##If need new column:

#weekly_summary['weekly'] = df.story_point.resample('W', on='date').transform('sum')

##Or create DatetimeIndex:

df_temp['Timestamp'] = pd.to_datetime(df_temp['Timestamp'])
df_temp = df_temp.set_index('Timestamp')
#weekly_summary = df_temp.temp.resample('H').sum()

##If need new column:

#weekly_summary['weekly'] = df.story_point.resample('W').transform('sum')

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  # Remove the CWD from sys.path while we load stuff.


In [180]:
#daily = df.set_index('timestamp').groupby(pd.TimeGrouper(freq='D', axis=1), axis=1)['impact'].count()
#df.groupby(pd.Grouper(freq='M')).sum()
#df.groupby([pd.Grouper(freq='M'), 'a']).sum()
#df.resample('M').sum()


df_temp['Timestamp'] = pd.to_datetime(df_temp['Timestamp'])
df_temp = df_temp.set_index('Timestamp')
daily = df_temp.groupby([pd.Grouper(freq='D'),'temp']).sum()

#print(daily)

#df['date'] = pd.to_datetime(df['date'])
#weekly_summary = df.story_point.resample('W', on='date').sum()

#daily = data.resample('D')['impact'].count()
#monthly = daily.resample('M').mean()
#ax = monthly.plot(kind='bar')
#plt.show()


daily = df_temp.resample('D')['temp'].count()


KeyError: 'Timestamp'

In [181]:
#daily = data.resample('D')['impact'].count()
#monthly = daily.resample('M').mean()
#ax = monthly.plot(kind='bar')
#plt.show()



monthly = sensor_temp.resample('M')['temp'].mean()



TypeError: Only valid with DatetimeIndex, TimedeltaIndex or PeriodIndex, but got an instance of 'RangeIndex'

In [182]:
sensor_temp.resample('H', on='Timestamp').temp.sum()

Timestamp
2021-02-06 16:00:00    21.221.421.221.121.221.2
2021-02-06 17:00:00    21.121.121.221.221.221.2
2021-02-06 18:00:00    21.221.221.121.021.020.9
2021-02-06 19:00:00    20.820.820.720.620.620.5
2021-02-06 20:00:00    20.520.420.320.320.320.2
                                 ...           
2021-12-06 19:00:00    20.320.320.220.220.220.2
2021-12-06 20:00:00    20.220.220.220.220.220.1
2021-12-06 21:00:00    20.120.220.120.120.120.1
2021-12-06 22:00:00    20.120.120.120.120.120.1
2021-12-06 23:00:00    20.120.120.120.020.020.0
Freq: H, Name: temp, Length: 7280, dtype: object

In [183]:
#room1_df = pd(df_room1,columns=['Timestamp','Id','Temp'])

#room1_df = pd(df_room1).transpose()
#room1_df.columns = ['Timestamp','Id','Temp']
#print (room1_df)

#room1_df = pd.DataFrame(df_room1 = df_room1).transpose().copy()


In [184]:
# convert list into pandas dataframe



In [16]:
sql_df.show()

+-------------------+------+-----+
|             dtHour|roomId|tempC|
+-------------------+------+-----+
|02-06-2021 16:00:02|     1| 21.2|
|02-06-2021 16:10:00|     1| 21.4|
|02-06-2021 16:20:00|     1| 21.2|
|02-06-2021 16:30:00|     1| 21.1|
|02-06-2021 16:40:00|     1| 21.2|
|02-06-2021 16:50:00|     1| 21.2|
|02-06-2021 17:00:00|     1| 21.1|
|02-06-2021 17:10:00|     1| 21.1|
|02-06-2021 17:20:00|     1| 21.2|
|02-06-2021 17:30:00|     1| 21.2|
|02-06-2021 17:40:00|     1| 21.2|
|02-06-2021 17:50:00|     1| 21.2|
|02-06-2021 18:00:00|     1| 21.2|
|02-06-2021 18:10:00|     1| 21.2|
|02-06-2021 18:20:00|     1| 21.1|
|02-06-2021 18:30:00|     1| 21.0|
|02-06-2021 18:40:00|     1| 21.0|
|02-06-2021 18:50:00|     1| 20.9|
|02-06-2021 19:00:00|     1| 20.8|
|02-06-2021 19:10:00|     1| 20.8|
+-------------------+------+-----+
only showing top 20 rows



In [17]:
#Converted tempC columns in float

from pyspark.sql.functions import col


for tempC in sql_df.columns:
    df = sql_df.withColumn(tempC, col(tempC).cast('float'))

print(df)
df.show()
df.printSchema()

DataFrame[dtHour: string, roomId: string, tempC: float]
+-------------------+------+-----+
|             dtHour|roomId|tempC|
+-------------------+------+-----+
|02-06-2021 16:00:02|     1| 21.2|
|02-06-2021 16:10:00|     1| 21.4|
|02-06-2021 16:20:00|     1| 21.2|
|02-06-2021 16:30:00|     1| 21.1|
|02-06-2021 16:40:00|     1| 21.2|
|02-06-2021 16:50:00|     1| 21.2|
|02-06-2021 17:00:00|     1| 21.1|
|02-06-2021 17:10:00|     1| 21.1|
|02-06-2021 17:20:00|     1| 21.2|
|02-06-2021 17:30:00|     1| 21.2|
|02-06-2021 17:40:00|     1| 21.2|
|02-06-2021 17:50:00|     1| 21.2|
|02-06-2021 18:00:00|     1| 21.2|
|02-06-2021 18:10:00|     1| 21.2|
|02-06-2021 18:20:00|     1| 21.1|
|02-06-2021 18:30:00|     1| 21.0|
|02-06-2021 18:40:00|     1| 21.0|
|02-06-2021 18:50:00|     1| 20.9|
|02-06-2021 19:00:00|     1| 20.8|
|02-06-2021 19:10:00|     1| 20.8|
+-------------------+------+-----+
only showing top 20 rows

root
 |-- dtHour: string (nullable = true)
 |-- roomId: string (nullable = tru

In [18]:
sql_df.printSchema()

root
 |-- dtHour: string (nullable = true)
 |-- roomId: string (nullable = true)
 |-- tempC: string (nullable = true)



In [19]:
from pyspark.sql.functions import to_timestamp
from pyspark.sql.functions import col
from pyspark.sql import functions as f 

#df2 = df.withColumn('dtHour', f.col('dtHour').cast("timestamp"))

#df2 = df.withColumn('dtHour',to_timestamp(f.col('dtHour').cast('timestamp'),'yyyyMMdd HH:mm:ss'))

df2 = df.withColumn('dateHour', f.to_timestamp('dtHour', format='yyyy-MM-dd HH:mm:ss'))

print(df2)
df2.show()
df2.printSchema()

DataFrame[dtHour: string, roomId: string, tempC: float, dateHour: timestamp]
+-------------------+------+-----+--------+
|             dtHour|roomId|tempC|dateHour|
+-------------------+------+-----+--------+
|02-06-2021 16:00:02|     1| 21.2|    null|
|02-06-2021 16:10:00|     1| 21.4|    null|
|02-06-2021 16:20:00|     1| 21.2|    null|
|02-06-2021 16:30:00|     1| 21.1|    null|
|02-06-2021 16:40:00|     1| 21.2|    null|
|02-06-2021 16:50:00|     1| 21.2|    null|
|02-06-2021 17:00:00|     1| 21.1|    null|
|02-06-2021 17:10:00|     1| 21.1|    null|
|02-06-2021 17:20:00|     1| 21.2|    null|
|02-06-2021 17:30:00|     1| 21.2|    null|
|02-06-2021 17:40:00|     1| 21.2|    null|
|02-06-2021 17:50:00|     1| 21.2|    null|
|02-06-2021 18:00:00|     1| 21.2|    null|
|02-06-2021 18:10:00|     1| 21.2|    null|
|02-06-2021 18:20:00|     1| 21.1|    null|
|02-06-2021 18:30:00|     1| 21.0|    null|
|02-06-2021 18:40:00|     1| 21.0|    null|
|02-06-2021 18:50:00|     1| 20.9|    null|

In [20]:
from pyspark.sql.functions import to_timestamp
from pyspark.sql.types import TimestampType


for dtHour in df.columns:
    df1 = df.withColumn("dtHour",df['dtHour'].cast(TimestampType()))

print(df1)
df1.show()
df1.printSchema()

DataFrame[dtHour: timestamp, roomId: string, tempC: float]
+------+------+-----+
|dtHour|roomId|tempC|
+------+------+-----+
|  null|     1| 21.2|
|  null|     1| 21.4|
|  null|     1| 21.2|
|  null|     1| 21.1|
|  null|     1| 21.2|
|  null|     1| 21.2|
|  null|     1| 21.1|
|  null|     1| 21.1|
|  null|     1| 21.2|
|  null|     1| 21.2|
|  null|     1| 21.2|
|  null|     1| 21.2|
|  null|     1| 21.2|
|  null|     1| 21.2|
|  null|     1| 21.1|
|  null|     1| 21.0|
|  null|     1| 21.0|
|  null|     1| 20.9|
|  null|     1| 20.8|
|  null|     1| 20.8|
+------+------+-----+
only showing top 20 rows

root
 |-- dtHour: timestamp (nullable = true)
 |-- roomId: string (nullable = true)
 |-- tempC: float (nullable = true)



In [21]:

for dtHour in sql_df.columns:
    #df = sql_df.withColumn('dtHour', f.to_timestamp(f.col("dtHour"),"yyyyMMdd HH:mm:ss"))
    df = sql_df.withColumn('dtHour', f.to_date("dtHour","yyyyMMdd HH:mm:ss"))
    
df.show()
df.printSchema()

+------+------+-----+
|dtHour|roomId|tempC|
+------+------+-----+
|  null|     1| 21.2|
|  null|     1| 21.4|
|  null|     1| 21.2|
|  null|     1| 21.1|
|  null|     1| 21.2|
|  null|     1| 21.2|
|  null|     1| 21.1|
|  null|     1| 21.1|
|  null|     1| 21.2|
|  null|     1| 21.2|
|  null|     1| 21.2|
|  null|     1| 21.2|
|  null|     1| 21.2|
|  null|     1| 21.2|
|  null|     1| 21.1|
|  null|     1| 21.0|
|  null|     1| 21.0|
|  null|     1| 20.9|
|  null|     1| 20.8|
|  null|     1| 20.8|
+------+------+-----+
only showing top 20 rows

root
 |-- dtHour: date (nullable = true)
 |-- roomId: string (nullable = true)
 |-- tempC: string (nullable = true)



In [25]:
pandasDF = sql_df.toPandas() 

print(pandasDF)

                   dtHour roomId tempC
0     02-06-2021 16:00:02      1  21.2
1     02-06-2021 16:10:00      1  21.4
2     02-06-2021 16:20:00      1  21.2
3     02-06-2021 16:30:00      1  21.1
4     02-06-2021 16:40:00      1  21.2
...                   ...    ...   ...
2767  23-06-2021 19:00:00      1  22.5
2768  23-06-2021 19:10:00      1  22.3
2769  23-06-2021 19:20:00      1  22.2
2770  23-06-2021 19:30:00      1  22.1
2771  24-06-2021 09:20:02      1  21.9

[2772 rows x 3 columns]


In [29]:
pandasDF.iloc[1:]



Unnamed: 0,dtHour,roomId,tempC
1,02-06-2021 16:10:00,1,21.4
2,02-06-2021 16:20:00,1,21.2
3,02-06-2021 16:30:00,1,21.1
4,02-06-2021 16:40:00,1,21.2
5,02-06-2021 16:50:00,1,21.2
...,...,...,...
2767,23-06-2021 19:00:00,1,22.5
2768,23-06-2021 19:10:00,1,22.3
2769,23-06-2021 19:20:00,1,22.2
2770,23-06-2021 19:30:00,1,22.1


In [117]:
#df2 = pd.to_datetime(df['col1'])

for date in df_hour:
    df_hour = pd.to_datetime(pd_hour)
print(pd_hour)

KeyboardInterrupt: 

In [112]:
print(pandasDF)

                   dtHour roomId               tempC
0     02-06-2021 16:00:02      1 2021-02-06 16:00:02
1     02-06-2021 16:10:00      1 2021-02-06 16:10:00
2     02-06-2021 16:20:00      1 2021-02-06 16:20:00
3     02-06-2021 16:30:00      1 2021-02-06 16:30:00
4     02-06-2021 16:40:00      1 2021-02-06 16:40:00
...                   ...    ...                 ...
2767  23-06-2021 19:00:00      1 2021-06-23 19:00:00
2768  23-06-2021 19:10:00      1 2021-06-23 19:10:00
2769  23-06-2021 19:20:00      1 2021-06-23 19:20:00
2770  23-06-2021 19:30:00      1 2021-06-23 19:30:00
2771  24-06-2021 09:20:02      1 2021-06-24 09:20:02

[2772 rows x 3 columns]
