In [1]:
import findspark
findspark.init()
import pyspark
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
spark=SparkSession.builder.getOrCreate()


2022-10-18 16:51:55,392 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [2]:
spark.sparkContext.setLogLevel("ERROR")

In [3]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

### Create a data frame will all files across all folders

In [4]:
#Creating schema
myschema=StructType([StructField("DTime",StringType(),True),
                     StructField("Electricity_Facility_kW_Hourly",FloatType(),True),
                     StructField("Fans_Electricity_kW_Hourly",FloatType(),True),
                     StructField("Cooling_Electricity_kW_Hourly",FloatType(),True),
                     StructField("Heating_Electricity_kW_Hourly",FloatType(),True),
                     StructField("InteriorLights_Electricity_kW_Hourly",FloatType(),True),
                     StructField("InteriorEquipments_Electricity_kW_Hourly",FloatType(),True),
                     StructField("Gas_Facility_kW_Hourly",FloatType(),True),
                     StructField("Heating_Gas_kW_Hourly",FloatType(),True),
                     StructField("InteriorEquipments_Gas_kW_Hourly",FloatType(),True),
                     StructField("WaterHeater_WaterSystems_Gas_kW_Hourly",FloatType(),True)
                    ])        

## Loading_data from all folders

In [5]:
df=spark.read.format("csv").option("recursiveFileLookup","true").option("header","True").schema(myschema).load("hdfs://127.0.0.1:9000/user/labuser/bdpl/electricity_data")

#### Finding the DataFrame shape

In [6]:
col=len(df.columns)
rows=df.count()
print(f'Dataframe shape:',(rows,col))



Dataframe shape: (1962240, 11)


                                                                                

In [7]:
df.printSchema()

root
 |-- DTime: string (nullable = true)
 |-- Electricity_Facility_kW_Hourly: float (nullable = true)
 |-- Fans_Electricity_kW_Hourly: float (nullable = true)
 |-- Cooling_Electricity_kW_Hourly: float (nullable = true)
 |-- Heating_Electricity_kW_Hourly: float (nullable = true)
 |-- InteriorLights_Electricity_kW_Hourly: float (nullable = true)
 |-- InteriorEquipments_Electricity_kW_Hourly: float (nullable = true)
 |-- Gas_Facility_kW_Hourly: float (nullable = true)
 |-- Heating_Gas_kW_Hourly: float (nullable = true)
 |-- InteriorEquipments_Gas_kW_Hourly: float (nullable = true)
 |-- WaterHeater_WaterSystems_Gas_kW_Hourly: float (nullable = true)



### Add a column that contains the filename to this data frame.

In [11]:
newDF=df.withColumn('filename',input_file_name())
newDF.show

counting=newDF.select(countDistinct('filename'))
counting.show()



+------------------------+
|count(DISTINCT filename)|
+------------------------+
|                     224|
+------------------------+



                                                                                

### Find and remove any duplicate rows available in the dataset

In [12]:
newDF.dropDuplicates()

DataFrame[DTime: string, Electricity_Facility_kW_Hourly: float, Fans_Electricity_kW_Hourly: float, Cooling_Electricity_kW_Hourly: float, Heating_Electricity_kW_Hourly: float, InteriorLights_Electricity_kW_Hourly: float, InteriorEquipments_Electricity_kW_Hourly: float, Gas_Facility_kW_Hourly: float, Heating_Gas_kW_Hourly: float, InteriorEquipments_Gas_kW_Hourly: float, WaterHeater_WaterSystems_Gas_kW_Hourly: float, filename: string]

In [13]:
newDF.count()



1962240

### Get column-wise null records for the data frame

In [19]:
from pyspark.sql.functions import col,isnan,when,count
df2=newDF.select([count(when(col(c).contains('None')|\
                             col(c).contains('NULL')|\
                             (col(c)=='')|\
                             col(c).isNull()|\
                             (col(c)==0)|\
                             isnan(c),c
                            )).alias(c) 
                  for c in newDF.columns])
df2.show()



+-----+------------------------------+--------------------------+-----------------------------+-----------------------------+------------------------------------+----------------------------------------+----------------------+---------------------+--------------------------------+--------------------------------------+--------+
|DTime|Electricity_Facility_kW_Hourly|Fans_Electricity_kW_Hourly|Cooling_Electricity_kW_Hourly|Heating_Electricity_kW_Hourly|InteriorLights_Electricity_kW_Hourly|InteriorEquipments_Electricity_kW_Hourly|Gas_Facility_kW_Hourly|Heating_Gas_kW_Hourly|InteriorEquipments_Gas_kW_Hourly|WaterHeater_WaterSystems_Gas_kW_Hourly|filename|
+-----+------------------------------+--------------------------+-----------------------------+-----------------------------+------------------------------------+----------------------------------------+----------------------+---------------------+--------------------------------+--------------------------------------+--------+
|    0|   

                                                                                

### Partition of the resultant data frame by month

In [20]:
#making month column
df_month=newDF.withColumn("Month",substring('DTime',1,3))

In [21]:
#from pyspark.sql.functions import spark_partition_id
df_month.select(spark_partition_id().alias("partition_id")).groupBy("partition_id").count().show()



+------------+------+
|partition_id| count|
+------------+------+
|           1|219000|
|           6|227760|
|           3|227760|
|           5|227760|
|           4|227760|
|           8|148920|
|           7|236520|
|           2|227760|
|           0|219000|
+------------+------+



In [22]:
df_month.repartition(10).select(spark_partition_id().alias("partition_id")).groupBy("partition_id").count().show()

                                                                                

+------------+------+
|partition_id| count|
+------------+------+
|           1|196224|
|           6|196224|
|           3|196224|
|           5|196224|
|           9|196224|
|           4|196224|
|           8|196224|
|           7|196224|
|           2|196224|
|           0|196224|
+------------+------+



### Get the number of rows from each partition

In [23]:
df_month.write.partitionBy('month').format('csv').mode('overwrite').option("header",True).save("hdfs://127.0.0.1:9000/user/labuser/bdpl/electricity_data/monthly_consumption_partitions/")

                                                                                

In [24]:
path='hdfs://127.0.0.1:9000/user/labuser/bdpl/electricity_data/monthly_consumption_partitions/'
fs=spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())
list_status=fs.listStatus(spark._jvm.org.apache.hadoop.fs.Path(path))
partitioned_files=[file.getPath().getName() for file in list_status]
partitioned_files

['_SUCCESS',
 'month= 01',
 'month= 02',
 'month= 03',
 'month= 04',
 'month= 05',
 'month= 06',
 'month= 07',
 'month= 08',
 'month= 09',
 'month= 10',
 'month= 11',
 'month= 12']

In [26]:
for file in partitioned_files[1:]:
    path='hdfs://127.0.0.1:9000/user/labuser/bdpl/electricity_data/monthly_consumption_partitions/'+file
    df=spark.read.csv(path,header=True)
    print(file,'num_rows:'+str(df.count()))

month= 01 num_rows:166656
month= 02 num_rows:150528
month= 03 num_rows:166656
month= 04 num_rows:161280
month= 05 num_rows:166656
month= 06 num_rows:161280
month= 07 num_rows:166656
month= 08 num_rows:166656
month= 09 num_rows:161280
month= 10 num_rows:166656
month= 11 num_rows:161280
month= 12 num_rows:166656
