In [39]:
from datetime import datetime
from datetime import timezone

import pytz
import requests

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

from pyspark.sql.functions import *
from pyspark.sql.types import *

from helpers import *

In [42]:
def get_completeness_data(df: DataFrame, total_rows: float):
    def get_n_null_values_per_column(column):
        return df.filter(col(column).isNull()).count()

    return (("completeness", column, 1 - (get_n_null_values_per_column(column) / total_rows)) for column in df.columns)


In [18]:
columns = [
    "timestamp",
    "zip_code",
    "eess_id",
    "ccaa_id",
    "municipality_id",
    "province_id",
    "sale_type",
    "label",
    "address",
    "municipality",
    "province",
    "locality",
    "latitude",
    "longitude",
    # "biodiesel_price",
    # "bioethanol_price",
    # "compressed_natural_gas_price",
    # "liquefied_natural_gas_price",
    # "liquefied_petroleum_gases_price",
    # "diesel_a_price",
    # "diesel_b_price",
    # "diesel_premium_price",
    "gasoline_95_e10_price",
    "gasoline_95_e5_price",
    "gasoline_95_e5_premium_price",
    # "gasoline_98_e10_price",
    # "gasoline_98_e5_price",
    # "hydrogen_price",
]

In [None]:
schema = StructType([
    StructField('timestamp', StringType(), True), 
    StructField('zip_code', LongType(), True), 
    StructField('eess_id', LongType(), True), 
    StructField('ccaa_id', LongType(), True), 
    StructField('municipality_id', LongType(), True), 
    StructField('province_id', LongType(), True), 
    StructField('sale_type', StringType(), True),
    StructField('label', StringType(), True), 
    StructField('address', StringType(), True), 
    StructField('municipality', StringType(), True), 
    StructField('province', StringType(), True), 
    StructField('locality', StringType(), True),

    # Price Columns
    StructField('latitude', DoubleType(), True), 
    StructField('longitude', DoubleType(), True), 
    StructField('biodiesel_price', DoubleType(), True), 
    StructField('bioethanol_price', DoubleType(), True), 
    StructField('compressed_natural_gas_price', DoubleType(), True), 
    StructField('liquefied_natural_gas_price', DoubleType(), True), 
    StructField('liquefied_petroleum_gases_price', DoubleType(), True), 
    StructField('diesel_a_price', DoubleType(), True), 
    StructField('diesel_b_price', DoubleType(), True), 
    StructField('diesel_premium_price', DoubleType(), True), 
    StructField('gasoline_95_e10_price', DoubleType(), True), 
    StructField('gasoline_95_e5_price', DoubleType(), True), 
    StructField('gasoline_95_e5_premium_price', DoubleType(), True), 
    StructField('gasoline_98_e10_price', DoubleType(), True), 
    StructField('gasoline_98_e5_price', DoubleType(), True), 
    StructField('hydrogen_price', DoubleType(), True),
])

In [14]:
spark = get_spark_session()

In [15]:
pandas_df = read_csv_as_pd_df(BUCKET)

Reading spain_fuel_prices_2024-10-11T11:50:49.486761.csv
Dropping Date column from spain_fuel_prices_2024-10-11T11:50:49.486761.csv
Reading spain_fuel_prices_2024-10-12T10:03:27.045933.csv
Dropping Date column from spain_fuel_prices_2024-10-12T10:03:27.045933.csv
Reading spain_fuel_prices_2024-10-13T10:05:07.csv
Reading spain_fuel_prices_2024-10-14T10:02:20.csv
Reading spain_fuel_prices_2024-10-15T10:02:20.csv
Reading spain_fuel_prices_2024-10-16T10:02:21.csv
Reading spain_fuel_prices_2024-10-17T10:02:20.csv
Reading spain_fuel_prices_2024-10-18T10:02:22.csv
Reading spain_fuel_prices_2024-10-19T10:02:21.csv
Reading spain_fuel_prices_2024-10-20T10:02:22.csv
Reading spain_fuel_prices_2024-10-21T10:02:22.csv
Reading spain_fuel_prices_2024-10-22T10:02:19.csv
Reading spain_fuel_prices_2024-10-23T10:02:21.csv
Reading spain_fuel_prices_2024-10-24T10:02:22.csv
Reading spain_fuel_prices_2024-10-25T10:02:17.csv
Reading spain_fuel_prices_2024-10-26T10:02:21.csv
Reading spain_fuel_prices_2024-10-27

In [16]:
df = spark.createDataFrame(pandas_df)

In [33]:
df.count()

                                                                                

477270

In [35]:
df = df.withColumn("date", to_date("timestamp"))

In [38]:
df.groupBy("date").count().withColumn("count", format_number("count", 0)).orderBy("date").show()

[Stage 37:>                                                         (0 + 8) / 8]

+----------+------+
|      date| count|
+----------+------+
|2024-10-11|11,992|
|2024-10-12|11,990|
|2024-10-13|11,979|
|2024-10-14|12,001|
|2024-10-15|12,007|
|2024-10-16|12,010|
|2024-10-17|12,013|
|2024-10-18|11,987|
|2024-10-19|11,979|
|2024-10-20|11,963|
|2024-10-21|11,972|
|2024-10-22|11,978|
|2024-10-23|11,944|
|2024-10-24|11,931|
|2024-10-25|11,935|
|2024-10-26|11,933|
|2024-10-27|11,918|
|2024-10-28|11,921|
|2024-10-29|11,916|
|2024-10-30|11,894|
+----------+------+
only showing top 20 rows



                                                                                

In [23]:
df.groupBy(df.columns).count().filter(col("count") > 2).count()

                                                                                

0

In [45]:
list(get_completeness_data(df, df.count()))

                                                                                

[('completeness', 'timestamp', 1.0),
 ('completeness', 'zip_code', 1.0),
 ('completeness', 'eess_id', 1.0),
 ('completeness', 'ccaa_id', 1.0),
 ('completeness', 'municipality_id', 1.0),
 ('completeness', 'province_id', 1.0),
 ('completeness', 'sale_type', 1.0),
 ('completeness', 'label', 1.0),
 ('completeness', 'address', 1.0),
 ('completeness', 'municipality', 1.0),
 ('completeness', 'province', 1.0),
 ('completeness', 'locality', 1.0),
 ('completeness', 'latitude', 1.0),
 ('completeness', 'longitude', 1.0),
 ('completeness', 'biodiesel_price', 1.0),
 ('completeness', 'bioethanol_price', 1.0),
 ('completeness', 'compressed_natural_gas_price', 1.0),
 ('completeness', 'liquefied_natural_gas_price', 1.0),
 ('completeness', 'liquefied_petroleum_gases_price', 1.0),
 ('completeness', 'diesel_a_price', 1.0),
 ('completeness', 'diesel_b_price', 1.0),
 ('completeness', 'diesel_premium_price', 1.0),
 ('completeness', 'gasoline_95_e10_price', 1.0),
 ('completeness', 'gasoline_95_e5_price', 1.0),