In [10]:
from pyspark.sql import SparkSession


class ChargePointsETLJob:
    input_path = r'C:\Projects\Finances\data-mnp\asses\electric-chargepoints-2017.csv'
    output_path = r'C:\Projects\Finances\data-mnp\asses\asa.parquet'

    def __init__(self):
        self.spark_session = (SparkSession.builder
                                          .master("local[*]")
                                          .appName("ElectricChargePointsETLJob")
                                          .getOrCreate())

    def extract(self):
        df = self.spark_session.read.csv( self.input_path, header=True, inferSchema=True)
        return df

    def transform(self, df):
        from pyspark.sql.functions import avg, max , round
        result_df = df.groupBy("CPID").agg(  
                    avg("pluginduration").alias("avg_plugin_duration"), 
                    max("pluginduration").alias("max_plugin_duration")) \
                    .withColumnRenamed("CPID", "chargepoint_id")
        # Round the columns
        result_df = result_df.withColumn("avg_plugin_duration", round("avg_plugin_duration", 2)) \
                             .withColumn("max_plugin_duration", round("max_plugin_duration", 2))
        return result_df

    def load(self, df):
        df.write.parquet(self.output_path)



In [3]:
etl_job = ChargePointsETLJob()
df = etl_job.extract()

etl_job.save_parquet(transformed_df)

In [12]:
df.show()

+-------------+-------+----------+-------------------+----------+-------------------+------+------------------+
|ChargingEvent|   CPID| StartDate|          StartTime|   EndDate|            EndTime|Energy|    PluginDuration|
+-------------+-------+----------+-------------------+----------+-------------------+------+------------------+
|     16673806|AN11719|2017-12-31|2024-06-08 14:46:00|2017-12-31|2024-06-08 18:00:00|   2.4|3.2333333333333334|
|     16670986|AN01706|2017-12-31|2024-06-08 11:25:00|2017-12-31|2024-06-08 13:14:00|   6.1|1.8166666666666667|
|      3174961|AN18584|2017-12-31|2024-06-08 11:26:11|2018-01-01|2024-06-08 12:54:11|  24.0|25.466666666666665|
|     16674334|AN00812|2017-12-31|2024-06-08 15:18:00|2018-01-01|2024-06-08 14:06:00|   6.7|              22.8|
|      3176831|AN24139|2017-12-31|2024-06-08 18:25:18|2018-01-01|2024-06-08 13:09:18|   6.1|18.733333333333334|
|     16673920|AN03984|2017-12-31|2024-06-08 14:54:00|2017-12-31|2024-06-08 19:19:00|   5.6| 4.416666666

In [5]:
transformed_df = etl_job.transform(df)

In [7]:
transformed_df.show()

+--------------+-------------------+-------------------+
|chargepoint_id|avg_plugin_duration|max_plugin_duration|
+--------------+-------------------+-------------------+
|       AN03946|               7.88|              13.35|
|       AN00218|               1.56|               3.46|
|       AN08663|              15.95|              41.71|
|       AN05089|               9.78|              12.85|
|       AN08083|               1.46|               2.73|
|       AN00603|              28.72|              62.92|
|       AN04630|              22.94|              34.32|
|       AN16172|              11.24|              14.93|
|       AN06965|               8.99|               25.4|
|       AN08377|              17.82|              23.95|
|       AN10884|               16.9|               24.1|
|       AN11526|              11.58|              17.87|
|       AN09764|               5.92|              14.58|
|       AN06739|                4.9|              11.79|
|       AN12429|               

In [11]:
etl_job.load(transformed_df)

AnalysisException: [PATH_ALREADY_EXISTS] Path file:/C:/Projects/Finances/data-mnp/asses already exists. Set mode as "overwrite" to overwrite the existing path.