In [1]:
from pyspark.sql import DataFrame
from pyspark.sql.functions import avg, max
from pyspark.sql.functions import avg, max, round as spark_round
from pyspark.sql import SparkSession


class ChargePointsETLJob:
    input_path = 'C:\\Users\\Anhnh\\OneDrive\\Documents\\MayoCodility\\electric-chargepoints-2017.csv'
    output_path = 'C:\\Users\\Anhnh\\OneDrive\\Documents\\MayoCodility\\chargepoints-2017-analysis'

    def __init__(self):
        self.spark_session = (SparkSession.builder
                                          .master("local[*]")
                                          .appName("ElectricChargePointsETLJob")
                                          .getOrCreate())

    def extract(self, input_path=None):
        if input_path is None:
            input_path = self.input_path
        return self.spark_session.read.option("header", True).csv(input_path)

    def transform(self, df):
        return df.groupBy("CPID") \
                .agg(
                     spark_round(max("PluginDuration"), 2).alias("max_duration"),
                    spark_round(avg("PluginDuration"), 2).alias("avg_duration")
                ) \
                .withColumnRenamed("CPID", "chargepoint_id")

    def load(self, df, output_path=None):
        if output_path is None:
            output_path = self.output_path
        print("[INFO] Previewing transformed data...")
        df.show(truncate=False)
        print("[INFO] Done.")

    def run(self):
        print("[STEP] Reading file...")
        df = self.extract()

        print("[STEP] Transforming data...")
        transformed_df = self.transform(df)

        print("[STEP] Showing transformed data:")
        transformed_df.show()

In [2]:
import os
print(os.path.exists(ChargePointsETLJob.input_path))

True


In [3]:
def run(self):
    print("[STEP] Reading file...")
    df = self.spark_session.read.option("header", True).csv(self.input_path)
    
    print("[STEP] File read, showing schema:")
    df.printSchema()

    print("[STEP] Previewing data:")
    df.show(5)

    print("[STEP] Transforming:")
    transformed_df = self.transform(df)
    
    print("[STEP] Showing transformed data:")
    transformed_df.show()

In [4]:
import os
print("[DEBUG] JAVA_HOME =", os.environ.get("JAVA_HOME"))

[DEBUG] JAVA_HOME = C:\Program Files\Java\jdk-17.0.15_6


In [5]:
from pyspark.sql import SparkSession

print("[TEST] Starting Spark...")
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("TestSpark") \
    .getOrCreate()

print("[TEST] Spark started.")
spark.range(5).show()

[TEST] Starting Spark...
[TEST] Spark started.
+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
+---+



In [6]:
if __name__ == "__main__":
    job = ChargePointsETLJob()
    job.run()

[STEP] Reading file...
[STEP] Transforming data...
[STEP] Showing transformed data:
+--------------+------------+------------+
|chargepoint_id|max_duration|avg_duration|
+--------------+------------+------------+
|       AN00019|      480.37|      257.35|
|       AN00056|        1.67|        0.84|
|       AN00073|       22.42|       35.91|
|       AN00080|       21.25|       16.88|
|       AN00093|       25.49|       12.73|
|       AN00098|        3.63|       11.77|
|       AN00116|       45.03|       28.64|
|       AN00119|       40.84|       21.01|
|       AN00158|       20.17|       16.92|
|       AN00159|        9.18|       15.94|
|       AN00170|       27.08|       20.25|
|       AN00210|       14.37|        9.43|
|       AN00218|        3.46|        1.56|
|       AN00230|        3.31|        9.09|
|       AN00231|        8.92|        7.39|
|       AN00233|        2.55|        9.96|
|       AN00249|         7.7|       16.27|
|       AN00254|        19.2|       16.35|
|       AN002