In [None]:
"""
Fetch publicly available gas prices from German gas stations
based on MTS-K data hosted by tankerkoenig.de.
"""

import json
from typing import Dict, Any
import requests
import pandas as pd
import sqlalchemy
from sqlalchemy.engine import Connection

class CONSTANTS(object):
    """Just a simple constant class."""

    __slots__ = ()
    with open("./parameters.jsonc", mode="r", encoding="utf8") as fopen:
        PARAMETERS = json.load(fopen)

    BASE_URL = PARAMETERS["BASE_URL"]
    HOST = PARAMETERS["HOST"]
    MYSQL_PORT = PARAMETERS["MYSQL_PORT"]
    MYSQL_USER = PARAMETERS["MYSQL_USER"]
    MYSQL_PASSWORD = PARAMETERS["MYSQL_PASSWORD"]


def get_mariadb_connection(
    host: str,
    user: str,
    password: str,
) -> Connection:
    """Creates a connection to a MariaDB instance.

    Args:
        host (str): Docker host
        user (str): MariaDB user
        password (str): MariaDB password

    Returns:
        conn: SQLAlchemy connection
    """
    engine = sqlalchemy.create_engine(
        f"mysql+pymysql://{user}:{password}@{host}/fhem?charset=utf8mb4",
    )
    conn = engine.connect()

    return conn

const = CONSTANTS()

conn = get_mariadb_connection(
    host=const.HOST, user=const.MYSQL_USER, password=const.MYSQL_PASSWORD
)

df = pd.read_sql("SELECT * FROM fhem.f_t_gas_price", index_col="ID", con=conn, coerce_float=False)



In [None]:
df

In [None]:
df.to_parquet("./test.parquet")

In [None]:
import os

os.environ['SPARK_LOCAL_IP'] = "127.0.1.1"


In [None]:
import pyspark
from delta import *

builder = pyspark.sql.SparkSession.builder.master("local[*]").appName("MyApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

sdf = spark.createDataFrame(df)

sdf.write.format("delta").save("./delta_gas_prices")

In [None]:
spark.read.load("./delta_gas_prices/").count()