In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.functions import col, when, mode
from pyspark.sql.types import IntegerType, LongType, FloatType, DoubleType, StringType, ArrayType
from pyspark.sql.functions import explode
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructField

In [3]:
spark = SparkSession.builder \
    .master("local") \
    .appName("Write_Parquet")\
    .config("spark.driver.memory", "15g")\
    .config("spark.executor.memory", "15g") \
    .getOrCreate()

# Load CSV files
hypercharge_location = spark.read.csv("C:/Users/david/PycharmProjects/Big Data Project\dataset/full/hypercarge_locations.csv", header=True, inferSchema=True)
cdr = spark.read.csv("C:/Users/david\PycharmProjects/Big Data Project/dataset/full/cdr.csv", header=True, inferSchema=True)
pdr = spark.read.csv("C:/Users/david/PycharmProjects/Big Data Project/dataset/full/pdr_locations.csv", header=True, inferSchema=True)

# Load Parquet file
hypercharge_sessions = spark.read.parquet("C:/Users/david/PycharmProjects/Big Data Project/dataset/full/hypercarge_sessions.parquet")

In [4]:
cdr.show(5)

+------+--------------------+---------+------------+--------------------+-------------+--------------------+-----------+-------------------+----------+-------------------+--------------------------------+---------------------------------+-------------+--------------------------------------+------------------+-------------------+--------------------+-------+------------------+--------------------+------------+-------------+-------+-----------+
|CDR ID|             EVSE ID|Operatore|Potenza (kW)|        Station Nome|Station Città|   Station Indirizzo|Data inizio|         Ora inizio| Data fine|           Ora fine|Ricavi totali (€) (IVA esclusa) |Ricavi Energia (€) (IVA esclusa) |Energia (kWh)|Ricavi Penalty Time (€) (IVA esclusa) |Tempo Totale (min)|Inizio penalty time|Contachilometri (Km)|Auth ID|         Auth ID.1|  Partner session ID| Auth method|Tipo di presa|EMI3 ID|Type Status|
+------+--------------------+---------+------------+--------------------+-------------+-------------------

In [6]:
pdr.show(5)

+------------+--------------------+--------------------+-----------------------------+----------------+--------------------+----------+--------------------+--------------------+--------------------+---------------------+-------------------+--------------------+------------------+---------------+-----------------------+-----------------------+------------------+-----------------+-----------------+-----------------------+---------------+--------------------+----------------+---------------------+------------------+-------------------+-------------------------+--------------------------+------------------+------------------+----------------------------+------------------------+------------------+---------------------------+--------------------+-------------------------+--------------------------+-----------------------------+------------------------------+------------------------------------+------------------------------------+------------------------------------+------------------------

In [67]:
# Display basic information about the dataframes
print("Basic Information")
hypercharge_sessions.printSchema()
cdr.printSchema()
pdr.printSchema()
hypercharge_location.printSchema()

Basic Information
root
 |-- serialNumber: string (nullable = true)
 |-- gpsLat: double (nullable = true)
 |-- gpsLong: double (nullable = true)
 |-- locationStreet: string (nullable = true)
 |-- locationZipCode: string (nullable = true)
 |-- locationTown: string (nullable = true)
 |-- locationProvince: string (nullable = true)
 |-- locationCountry: string (nullable = true)
 |-- locationUpdateNote: string (nullable = true)
 |-- endClientName: integer (nullable = true)
 |-- distributorName: string (nullable = true)
 |-- corporationName: string (nullable = true)
 |-- operatorName: integer (nullable = true)
 |-- lendeeName: integer (nullable = true)
 |-- evId: string (nullable = true)
 |-- type: string (nullable = true)
 |-- physicalPosition: long (nullable = true)
 |-- cableLength: long (nullable = true)
 |-- producer: string (nullable = true)
 |-- chargingSessionGraphData: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- currentEv: double (nullable =

In [34]:
# Create Spark session
spark = SparkSession.builder.appName("Explode Sessions Example").getOrCreate()

# Define the schema for the 'sessions' column
session_schema = StructType([
    StructField("count", IntegerType(), True),
    StructField("energy", DoubleType(), True),
    StructField("date", StringType(), True),
    StructField("cost", StringType(), True),
    StructField("countInHouse", IntegerType(), True),
    StructField("countInPublic", IntegerType(), True),
    StructField("energyInHouse", DoubleType(), True),
    StructField("energyInPublic", DoubleType(), True),
    StructField("costInHouse", DoubleType(), True),
    StructField("costInPublic", DoubleType(), True)
])

# Parse the sessions column if it is a JSON string
pdr_parsed = pdr.withColumn("sessions_new", from_json(pdr["sessions"], ArrayType(session_schema)))

# Explode and select fields
pdr_exploded = pdr_parsed.select("station_uid", explode(pdr_parsed["sessions_new"]).alias("session"))

pdr_sessions = pdr_exploded.select(
    "station_uid",
    "session.count",
    "session.energy",
    "session.date",
    "session.cost",
    "session.countInHouse",
    "session.countInPublic",
    "session.energyInHouse",
    "session.energyInPublic",
    "session.costInHouse",
    "session.costInPublic"
)

# Show final result
pdr_sessions.show(truncate=False)

+------------------------------------+-----+------------------+-------+------+------------+-------------+-------------+--------------+-----------+------------+
|station_uid                         |count|energy            |date   |cost  |countInHouse|countInPublic|energyInHouse|energyInPublic|costInHouse|costInPublic|
+------------------------------------+-----+------------------+-------+------+------------+-------------+-------------+--------------+-----------+------------+
|fe01af77-ad39-4ad9-80bf-322d7c02a4a9|19   |323.54699999999997|2024-01|177.93|0           |0            |0.0          |0.0           |0.0        |0.0         |
|fe01af77-ad39-4ad9-80bf-322d7c02a4a9|14   |325.219           |2024-02|178.87|0           |0            |0.0          |0.0           |0.0        |0.0         |
|fe01af77-ad39-4ad9-80bf-322d7c02a4a9|14   |287.817           |2024-03|158.31|0           |0            |0.0          |0.0           |0.0        |0.0         |
|fe01af77-ad39-4ad9-80bf-322d7c02a4a9|25

In [35]:
# Define the schema for the 'plugs' column
plugs_schema = StructType([
    StructField("max_electric_power", IntegerType(), True),
    StructField("max_voltage", IntegerType(), True),
    StructField("power_type_id", IntegerType(), True),
    StructField("plug_type_id", IntegerType(), True),
    StructField("plug_format_id", IntegerType(), True),
    StructField("type", StringType(), True),
    StructField("format", StringType(), True)
])

# Parse the sessions column if it is a JSON string
pdr_parsed = pdr.withColumn("plugs_new", from_json(pdr["plugs"], ArrayType(plugs_schema)))

# Explode the 'plugs' column while keeping the 'station_uid'
df_exploded_plugs = pdr_parsed.select("station_uid", explode(pdr_parsed["plugs_new"]).alias("plug"))

# Select the individual fields from 'plug' and keep 'station_uid'
pdr_plugs = df_exploded_plugs.select(
    "station_uid",  # Keep the station identifier
    "plug.max_electric_power",
    "plug.max_voltage",
    "plug.power_type_id",
    "plug.plug_type_id",
    "plug.plug_format_id",
    "plug.type",
    "plug.format"
)

# Show the final result
pdr_plugs.show(truncate=False)

+------------------------------------+------------------+-----------+-------------+------------+--------------+-------+------+
|station_uid                         |max_electric_power|max_voltage|power_type_id|plug_type_id|plug_format_id|type   |format|
+------------------------------------+------------------+-----------+-------------+------------+--------------+-------+------+
|fe01af77-ad39-4ad9-80bf-322d7c02a4a9|62500             |500        |3            |21          |2             |COMBO  |CABLE |
|fe01af77-ad39-4ad9-80bf-322d7c02a4a9|62500             |500        |3            |1           |2             |CHADEMO|CABLE |
|fe01af77-ad39-4ad9-80bf-322d7c02a4a9|22000             |230        |2            |20          |1             |TYPE 2 |SOCKET|
|addf1a35-1966-494a-8eea-8d9448eb8013|300000            |600        |3            |21          |2             |COMBO  |CABLE |
|addf1a35-1966-494a-8eea-8d9448eb8013|300000            |600        |3            |21          |2             |

In [36]:
hypercharge_sessions = hypercharge_sessions.select(
    "gpsLat", "gpsLong", "locationStreet", "locationZipCode", "locationTown", "locationProvince",
    "locationCountry", "endClientName", "distributorName", "corporationName", "operatorName", "type", "physicalPosition", 
    col("`session.averagePower`").alias("session_averagePower"), 
    col("`session.car`").alias("session_car"), 
    col("`session.distributorName`").alias("session_distributorName"), 
    col("`session.end`").alias("session_end"), 
    col("`session.meterStart`").alias("session_meterStart"), 
    col("`session.meterStop`").alias("session_meterStop"), 
    col("`session.physicalPosition`").alias("session_physicalPosition"), 
    col("`session.peakPower`").alias("session_peakPower"), 
    col("`session.start`").alias("session_start"), 
    col("`session.socStart`").alias("session_socStart"), 
    col("`session.socStop`").alias("session_socStop"), 
    col("`session.chargingSessionId`").alias("session_chargingSessionId"), 
    col("`session.type`").alias("session_type"), 
    col("`session.position`").alias("session_position"), 
    col("`session.transactionIdNew`").alias("session_transactionIdNew"), 
    col("`carChargeParameter.car`").alias("carChargeParameter_car"), 
    col("`carChargeParameter.excludeFromStatistics`").alias("carChargeParameter_excludeFromStatistics"), 
    col("`carChargeParameter.batteryCapacity`").alias("carChargeParameter_batteryCapacity")
)

In [37]:
location_variables = [
    "chargerId", "numberStacks", "chassis", "isPublic", "chargePointIdentity", 
    "customerIccid", "locationTown", "locationZipCode", "locationProvince", 
    "locationCountry", "isRemoteLocation", "outletList", "status_position", 
    "status_status", "endClientId", "surroundingChargers"
]
hypercharge_location = hypercharge_location.select(*location_variables)

pdr_variables = [
    "station_uid", "station_address", "station_type_name", "station_brand", 
    "station_model", "connector_evse_id", 
    "station_owner_company_is_shopping_center",
    "station_owner_company_pay_description", "station_owner_company_pois", 
    "station_owner_company_experiences", "station_owner_company_sustainability_profile", 
    "station_owner_company_distances", "station_owner_company_show_advanced_services", 
    "station_owner_company_show_roaming", "station_owner_company_show_map", 
    "station_owner_company_keyfob_fee", "station_owner_company_owner_cost_per_kwh", 
    "plugs", "sessions", "totEnergy", "totEnergyLocal", "totEnergyNotLocal", 
    "totSessions", "totSessionsNotLocal", "totSessionsLocal", "totCost", 
    "totHouseSessions", "totPublicSessions", "totPublicCost", "totHouseCost", 
    "totHouseEnergy", "totPublicEnergy"
]
pdr = pdr.select(*pdr_variables)

cdr_variables = [
    "CDR ID", "EVSE ID", "Operatore", "Potenza (kW)", "Station Nome", "Station Città", 
    "Station Indirizzo", "Data inizio", "Ora inizio", "Data fine", "Ora fine", 
    "Ricavi totali (€) (IVA esclusa) ", "Ricavi Energia (€) (IVA esclusa) ", 
    "Energia (kWh)", "Ricavi Penalty Time (€) (IVA esclusa) ", "Tempo Totale (min)", 
    "Inizio penalty time", "Contachilometri (Km)"
]
cdr = cdr.select(*cdr_variables)

In [38]:
hypercharge_sessions = hypercharge_sessions.drop("carChargeParameter")

# Lists of interesting variables to reduce data frame size
session_variables = [
    "gpsLat", "gpsLong", "locationStreet", "locationZipCode", "locationTown", 
    "locationProvince", "locationCountry", "distributorName", "corporationName", 
    "type", "physicalPosition", "session_averagePower", "session_car", 
    "session_distributorName", "session_end", "session_meterStart", 
    "session_meterStop", "session_physicalPosition", "session_peakPower", 
    "session_start", "session_socStart", "session_socStop", 
    "session_chargingSessionId", "session_type", "session_position", 
    "session_transactionIdNew", "carChargeParameter_car", 
    "carChargeParameter_batteryCapacity"
]
hypercharge_sessions = hypercharge_sessions.select(*session_variables)

In [39]:
print("Sessions Stats")
numeric_columns = [col for col, dtype in hypercharge_sessions.dtypes if dtype in ['int', 'double', 'float']]
pandas_sessions = hypercharge_sessions.select(numeric_columns).toPandas()
display(pandas_sessions.describe())

Sessions Stats


Unnamed: 0,gpsLat,gpsLong,session_averagePower,session_car,session_distributorName,session_physicalPosition,session_peakPower,session_type,session_transactionIdNew,carChargeParameter_batteryCapacity
count,12978.0,12978.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,3706.0
mean,43.106707,8.948087,,,,,,,,18149.044522
std,6.186204,10.427072,,,,,,,,28934.976897
min,2.0,-50.0,,,,,,,,0.0
25%,42.53072,9.237866,,,,,,,,0.0
50%,44.4525,11.193636,,,,,,,,0.0
75%,45.0749,12.61855,,,,,,,,42000.0
max,60.0,16.777855,,,,,,,,135000.0


In [40]:
print("Location Stats")
pandas_location = hypercharge_location.toPandas()
display(pandas_location.describe())

print("CDR Stats")
pandas_cdr = cdr.toPandas()
display(pandas_cdr.describe())

print("PDR Stats")
pandas_pdr = pdr.toPandas()
display(pandas_pdr.describe())

Location Stats


Unnamed: 0,chargerId,numberStacks,locationZipCode,status_position
count,374.0,374.0,153.0,374.0
mean,43247.780749,3.823529,35724.941176,0.0
std,33380.514195,0.42177,24216.932744,0.0
min,6590.0,1.0,10.0,0.0
25%,13418.25,4.0,20045.0,0.0
50%,19747.5,4.0,31032.0,0.0
75%,83748.75,4.0,50019.0,0.0
max,83842.0,4.0,83100.0,0.0


CDR Stats


Unnamed: 0,CDR ID,Potenza (kW),Ora inizio,Ora fine,Ricavi totali (€) (IVA esclusa),Ricavi Energia (€) (IVA esclusa),Energia (kWh),Ricavi Penalty Time (€) (IVA esclusa),Tempo Totale (min),Inizio penalty time
count,300000.0,300000.0,300000,300000,300000.0,300000.0,300000.0,300000.0,300000.0,300000.0
mean,1326766.0,282.026348,2024-10-08 14:08:57.396426752,2024-10-08 14:26:52.545593088,20.01075,20.01071,29.690876,0.0,26.8268,0.0
min,813476.0,22.0,2024-10-08 00:00:01,2024-10-08 00:00:01,0.0,0.0,0.001,0.0,0.0,0.0
25%,1104226.0,300.0,2024-10-08 10:49:36,2024-10-08 11:11:00,11.1,11.1,16.652,0.0,15.0,0.0
50%,1284196.0,300.0,2024-10-08 14:17:07,2024-10-08 14:40:50,18.81,18.81,27.904,0.0,24.0,0.0
75%,1571378.0,300.0,2024-10-08 17:49:02,2024-10-08 18:09:26,27.77,27.77,40.97625,0.0,34.0,0.0
max,1766361.0,400.0,2024-10-08 23:59:59,2024-10-08 23:59:59,247.96,247.96,364.652,0.0,5915.0,0.0
std,252664.7,70.270005,,,12.102538,12.102568,17.662696,0.0,24.014121,0.0


PDR Stats


Unnamed: 0,station_postal_code,station_commissioning_date,station_owner_company_pay_description,station_owner_company_owner_cost_per_kwh,totEnergy,totEnergyLocal,totEnergyNotLocal,totSessions,totSessionsNotLocal,totSessionsLocal,totCost,totHouseSessions,totPublicSessions,totPublicCost,totHouseCost,totHouseEnergy,totPublicEnergy
count,830.0,285,830.0,830.0,830.0,830.0,830.0,830.0,830.0,830.0,830.0,830.0,830.0,830.0,830.0,830.0,830.0
mean,36763.586747,2024-03-12 23:14:44.210526208,0.0,0.0,14006.847572,0.000308,13711.930864,465.036145,454.827711,0.00241,9467.685855,0.0,465.036145,9467.685855,0.0,0.0,14006.847572
min,10.0,2023-09-11 02:00:00,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
25%,20026.0,2023-11-22 01:00:00,0.0,0.0,1134.687,0.0,1100.365,79.0,77.0,0.0,584.035,0.0,79.0,584.035,0.0,0.0,1134.687
50%,33010.0,2024-02-26 01:00:00,0.0,0.0,7879.4,0.0,7646.5245,283.5,276.5,0.0,5360.65,0.0,283.5,5360.65,0.0,0.0,7879.4
75%,52041.0,2024-07-04 02:00:00,0.0,0.0,23273.082,0.0,22836.1835,729.75,715.75,0.0,15833.2225,0.0,729.75,15833.2225,0.0,0.0,23273.082
max,83100.0,2024-08-09 02:00:00,0.0,0.0,95445.22,0.147,92886.066,3062.0,2981.0,1.0,64974.67,0.0,3062.0,64974.67,0.0,0.0,95445.22
std,24449.650843,,0.0,0.0,15775.476469,0.006348,15449.816311,490.645257,480.294118,0.049058,10802.687982,0.0,490.645257,10802.687982,0.0,0.0,15775.476469


PDR Stats


Unnamed: 0,count,energy,countInHouse,countInPublic,energyInHouse,energyInPublic,costInHouse,costInPublic
count,6203.0,6203.0,6203.0,6203.0,6203.0,6203.0,6203.0,6203.0
mean,62.133161,1872.117038,0.0,0.0,0.0,0.0,0.0,0.0
std,68.945924,2258.893963,0.0,0.0,0.0,0.0,0.0,0.0
min,1.0,0.147,0.0,0.0,0.0,0.0,0.0,0.0
25%,9.0,126.67,0.0,0.0,0.0,0.0,0.0,0.0
50%,35.0,936.289,0.0,0.0,0.0,0.0,0.0,0.0
75%,96.0,2998.6995,0.0,0.0,0.0,0.0,0.0,0.0
max,588.0,16980.799,0.0,0.0,0.0,0.0,0.0,0.0


In [41]:
print("PDR Sessions Stats")
pandas_pdr_sessions = pdr_sessions.toPandas()
display(pandas_pdr_sessions.describe())

print("PDR Plugs Stats")
pandas_pdr_plugs = pdr_plugs.toPandas()
display(pandas_pdr_plugs.describe())

PDR Sessions Stats


Unnamed: 0,count,energy,countInHouse,countInPublic,energyInHouse,energyInPublic,costInHouse,costInPublic
count,6203.0,6203.0,6203.0,6203.0,6203.0,6203.0,6203.0,6203.0
mean,62.133161,1872.117038,0.0,0.0,0.0,0.0,0.0,0.0
std,68.945924,2258.893963,0.0,0.0,0.0,0.0,0.0,0.0
min,1.0,0.147,0.0,0.0,0.0,0.0,0.0,0.0
25%,9.0,126.67,0.0,0.0,0.0,0.0,0.0,0.0
50%,35.0,936.289,0.0,0.0,0.0,0.0,0.0,0.0
75%,96.0,2998.6995,0.0,0.0,0.0,0.0,0.0,0.0
max,588.0,16980.799,0.0,0.0,0.0,0.0,0.0,0.0


PDR Plugs Stats


Unnamed: 0,max_electric_power,max_voltage,power_type_id,plug_type_id,plug_format_id
count,830.0,830.0,830.0,830.0,830.0
mean,214602.409639,540.722892,2.86747,18.60241,1.868675
std,128304.376472,148.444148,0.339271,6.303425,0.337959
min,22000.0,150.0,2.0,1.0,1.0
25%,62500.0,500.0,3.0,21.0,2.0
50%,300000.0,600.0,3.0,21.0,2.0
75%,300000.0,600.0,3.0,21.0,2.0
max,400000.0,1000.0,3.0,21.0,2.0


In [14]:
# Function to calculate percentage of null values
def null_percentage(df):
    null_counts = df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).collect()[0]
    total_count = df.count()
    return {k: v/total_count*100 for k, v in null_counts.asDict().items()}

In [43]:
session_nulls = null_percentage(hypercharge_sessions)
cdr_nulls = null_percentage(cdr)
pdr_nulls = null_percentage(pdr)
location_nulls = null_percentage(hypercharge_location)
pdr_session_nulls = null_percentage(pdr_sessions)
pdr_plugs_nulls = null_percentage(pdr_plugs)

In [16]:
def display_null_percentages(null_counts_dict):
  for col, percentage in null_counts_dict.items():
    print(f"{col:30s}{percentage:.2f}%")

In [44]:
print("Percentage of null values in Location file:")
display_null_percentages(location_nulls)
print("\n")

print("Percentage of null values in CDR file:")
display_null_percentages(cdr_nulls)
print("\n")

print("Percentage of null values in PDR file:")
display_null_percentages(pdr_nulls)
print("\n")

print("Percentage of null values in Session file:")
display_null_percentages(session_nulls)
print("\n")

print("Percentage of null values in PDR Sessions file:")
display_null_percentages(pdr_session_nulls)
print("\n")

print("Percentage of null values in PDR Plugs file:")
display_null_percentages(pdr_plugs_nulls)

Percentage of null values in Location file:
chargerId                     0.00%
numberStacks                  0.00%
chassis                       0.00%
isPublic                      0.00%
chargePointIdentity           30.21%
customerIccid                 89.84%
locationTown                  59.09%
locationZipCode               59.09%
locationProvince              59.09%
locationCountry               39.30%
isRemoteLocation              0.00%
outletList                    0.00%
status_position               0.00%
status_status                 0.00%
endClientId                   99.47%
surroundingChargers           30.75%


Percentage of null values in CDR file:
CDR ID                        0.00%
EVSE ID                       0.00%
Operatore                     0.00%
Potenza (kW)                  0.00%
Station Nome                  0.00%
Station Città                 0.00%
Station Indirizzo             0.00%
Data inizio                   0.00%
Ora inizio                    0.00%
Data fi

In [45]:
# Check for duplicate rows
print("Duplicate rows in Session:", hypercharge_sessions.count() - hypercharge_sessions.dropDuplicates().count())
print("Duplicate rows in CDR:", cdr.count() - cdr.dropDuplicates().count())
print("Duplicate rows in PDR:", pdr.count() - pdr.dropDuplicates().count())
print("Duplicate rows in Locations:", hypercharge_location.count() - hypercharge_location.dropDuplicates().count())
print("Duplicate rows in PDR Sessions:", pdr_sessions.count() - pdr_sessions.dropDuplicates().count())
print("Duplicate rows in PDR Plugs:", pdr_plugs.count() - pdr_plugs.dropDuplicates().count())

Duplicate rows in Session: 0
Duplicate rows in CDR: 0
Duplicate rows in PDR: 2
Duplicate rows in Locations: 0
Duplicate rows in PDR: 0
Duplicate rows in PDR: 273


In [50]:
def detect_outliers(df):
    outlier_conditions = []
    total_rows = df.count()
    
    for col_name in df.schema.names:
        col_type = df.schema[col_name].dataType
        if isinstance(col_type, (IntegerType, LongType, FloatType, DoubleType)):
            quantiles = df.approxQuantile(col_name, [0.25, 0.75], 0.05)
            IQR = quantiles[1] - quantiles[0]
            lower_bound = quantiles[0] - 1.5 * IQR
            upper_bound = quantiles[1] + 1.5 * IQR
            
            outlier_condition = (col(col_name) < lower_bound) | (col(col_name) > upper_bound)
            outlier_count = df.filter(outlier_condition).count()
            
            print(f"Outliers in {col_name}: {outlier_count} ({outlier_count/total_rows:.2%})")
            print("Outlier condition:", outlier_condition)
            print("\n")
            
            outlier_conditions.append(outlier_condition)
    
    # Combine all outlier conditions
    all_outlier_conditions = outlier_conditions[0]
    for condition in outlier_conditions[1:]:
        all_outlier_conditions = all_outlier_conditions | condition
    
    return all_outlier_conditions

In [46]:
# Data cleaning
def clean_data(df):
    # Drop columns with 100% nulls
    columns_to_drop = [
        "station_street_number",
        "station_owner_company_stars",
        "station_owner_company_pois",
        "station_owner_company_experiences",
        "station_owner_company_sustainability_profile",
        "station_owner_company_distances",
        "station_owner_company_keyfob_fee",
        'station_commissioning_date'
    ]
    
    df = df.drop(*columns_to_drop)
    
    # For columns with 9.52% nulls (station_brand and station_model),
    # we'll use mode imputation as a simple method for categorical variables
    for column in ['station_brand', 'station_model']:
        mode_value = df.select(mode(column)).first()[0]
        df = df.withColumn(column, when(col(column).isNull(), mode_value).otherwise(col(column)))
    
    return df

pdr_clean = clean_data(pdr)
cdr_clean = cdr.drop("Auth ID")

In [22]:
# Check the results
cdr_nulls = null_percentage(cdr_clean)
pdr_nulls = null_percentage(pdr_clean)
display_null_percentages(cdr_nulls)
print("\n")
display_null_percentages(pdr_nulls)

CDR ID                        0.00%
EVSE ID                       0.00%
Operatore                     0.00%
Potenza (kW)                  0.00%
Station Nome                  0.00%
Station Città                 0.00%
Station Indirizzo             0.00%
Data inizio                   0.00%
Ora inizio                    0.00%
Data fine                     0.00%
Ora fine                      0.00%
Ricavi totali (€) (IVA esclusa) 0.00%
Ricavi Energia (€) (IVA esclusa) 0.00%
Energia (kWh)                 0.00%
Ricavi Penalty Time (€) (IVA esclusa) 0.00%
Tempo Totale (min)            0.00%
Inizio penalty time           0.00%
Contachilometri (Km)          0.00%
Auth ID                       0.00%
Auth method                   0.00%
Type Status                   0.00%


station_uid                   0.00%
station_address               0.00%
station_postal_code           0.00%
station_city                  0.00%
station_country_id            0.00%
station_type_name             0.00%
station_brand

In [47]:
# Drop duplicates
pdr_clean = pdr_clean.dropDuplicates()
pdr_plugs_clean = pdr_plugs.dropDuplicates()

In [48]:
# Check the results
print("Duplicate rows in PDR:", pdr_clean.count() - pdr_clean.dropDuplicates().count())
print("Duplicate rows in PDR Plugs:", pdr_plugs_clean.count() - pdr_plugs_clean.dropDuplicates().count())

Duplicate rows in PDR: 0
Duplicate rows in PDR Plugs: 0


In [51]:
print("CDR outliers")
cdr_outlier_conditions = detect_outliers(cdr_clean)
print("\n")

print("PDR outliers")
pdr_outlier_conditions = detect_outliers(pdr_clean)
# investigate: totEnergy, totEnergyNotLocal

print("Plugs outliers")
pdr_plugs_outlier_conditions = detect_outliers(pdr_plugs_clean)
print("\n")
#Drop max voltage outliers

print("Session outliers")
pdr_sessions_outlier_conditions = detect_outliers(pdr_sessions)
print("\n")
# Drop energy outliers

CDR outliers
Outliers in CDR ID: 0 (0.00%)
Outlier condition: Column<'((CDR ID < 368067.0) OR (CDR ID > 2258259.0))'>


Outliers in Potenza (kW): 29008 (9.67%)
Outlier condition: Column<'((Potenza (kW) < 300.0) OR (Potenza (kW) > 300.0))'>


Outliers in Ricavi totali (€) (IVA esclusa) : 1173 (0.39%)
Outlier condition: Column<'((Ricavi totali (€) (IVA esclusa)  < -17.004999999999995) OR (Ricavi totali (€) (IVA esclusa)  > 53.595))'>


Outliers in Ricavi Energia (€) (IVA esclusa) : 1173 (0.39%)
Outlier condition: Column<'((Ricavi Energia (€) (IVA esclusa)  < -17.004999999999995) OR (Ricavi Energia (€) (IVA esclusa)  > 53.595))'>


Outliers in Energia (kWh): 1199 (0.40%)
Outlier condition: Column<'((Energia (kWh) < -24.174499999999995) OR (Energia (kWh) > 78.5735))'>


Outliers in Ricavi Penalty Time (€) (IVA esclusa) : 0 (0.00%)
Outlier condition: Column<'((Ricavi Penalty Time (€) (IVA esclusa)  < 0.0) OR (Ricavi Penalty Time (€) (IVA esclusa)  > 0.0))'>


Outliers in Tempo Totale (min):

In [52]:
def drop_outliers(df: DataFrame, drop_col: str) -> DataFrame:
    # Calculate the first and third quartiles
    quantiles = df.approxQuantile(drop_col, [0.25, 0.75], 0.05)
    Q1 = quantiles[0]
    Q3 = quantiles[1]
    IQR = Q3 - Q1

    # Define the lower and upper bounds
    lower_bound = Q1 - 1.5 * IQR
    upper_bound = Q3 + 1.5 * IQR

    # Filter out outliers
    filtered_df = df.filter((col(drop_col) >= lower_bound) & (col(drop_col) <= upper_bound))

    # Show how many rows were dropped
    print(f"Dropped outliers from column {drop_col}: {df.count() - filtered_df.count()} rows dropped.")
    
    return filtered_df

In [53]:
# Drop outliers from the 'max_voltage' column
pdr_plugs_clean = drop_outliers(pdr_plugs_clean, drop_col='max_voltage')

Dropped outliers from column max_voltage: 133 rows dropped.


In [54]:
# Drop outliers and columns appearing to be empty (only containing 0)
cdr_clean = cdr_clean.drop("Inizio penalty time", "Ricavi Penalty Time (€) (IVA esclusa) ")
pdr_clean = pdr_clean.drop("totSessionsLocal", "totEnergyLocal", "totHouseEnergy", "totHouseCost", "totHouseSessions", "station_owner_company_owner_cost_per_kwh", "station_owner_company_pay_description")
pdr_sessions_clean = pdr_sessions.drop("costInPublic", "costInHouse", "energyInPublic", "energyInHouse", "countInPublic", "countInHouse")

In [None]:
# Convert 'Tempo Totale (min)' from string to integer
cdr_clean = cdr_clean.withColumn("Tempo Totale (min)", cdr["Tempo Totale (min)"].cast("integer"))
cdr_clean = cdr_clean.withColumn("Contachilometri (Km)", cdr["Contachilometri (Km)"].cast("integer"))
# Cast 'station_postal_code' from integer to string
pdr_clean = pdr_clean.withColumn("station_postal_code", col("station_postal_code").cast("string"))

In [None]:
cdr_clean.write.csv("cdr_clean.csv", header=True)
pdr_clean.write.csv("pdr_clean.csv", header=True)
pdr_plugs_clean.write.csv("pdr_plugs_clean.csv", header=True)
pdr_sessions_clean.write.csv("pdr_sessions_clean.csv", header=True)

In [18]:
spark.stop()