In [1]:
from datetime import datetime
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.sql import DataFrame
from datapipeline_konfio.extractor.extractor_coingecko import ExtractorCoinGecko

# Extracción 

In [2]:
def extraction_process(
    spark: SparkSession, date_range: dict, api_demo_value: str, coint_to_search: str
)->DataFrame:

    obj_extractor = ExtractorCoinGecko(
        spark, date_range, api_demo_value, coint_to_search
    )
    df_spark = obj_extractor.extract_data_from_source()

    return df_spark

# Transformación 

In [3]:
def transform_process(df_spark: DataFrame) -> DataFrame:

    df_spark = df_spark.dropDuplicates()

    window_spec = Window.partitionBy("symbol").orderBy("date").rowsBetween(-4, 0)

    df_spark = df_spark.withColumn(
        "moving_avg_5d", F.avg("price_usd").over(window_spec)
    )

    df_spark = df_spark.orderBy("date")

    return df_spark

# Carga

In [4]:
def load_process(df_spark: DataFrame) -> DataFrame:

    (df_spark.write \
        .format("jdbc") \
        .option("url", "jdbc:postgresql://postgres:5432/db_konfio") \
        .option("dbtable", "tbl_coint_price") \
        .option("user", "konfio") \
        .option("password", "1234") \
        .option("driver", "org.postgresql.Driver") \
        .mode("overwrite")
        .save()
    )

In [5]:
if __name__ == "__main__":

    DRIVER_PATH = "/home/jovyan/work/drivers/postgresql-42.2.18.jar"
    API_DEMO_VALUE = "CG-ioYW4FevLL58FGqWBWpym6GW"
    COINT_TO_SEARCH = "Bitcoin"
    DATE_RANGE = {"from_date":datetime(2025, 3, 1),"to_date":datetime(2025,3,30)}
    
    spark = SparkSession.builder \
        .appName("Engine") \
        .master("local[*]") \
        .config("spark.jars", DRIVER_PATH) \
        .config("spark.driver.extraClassPath", DRIVER_PATH) \
        .config("spark.executor.extraClassPath", DRIVER_PATH) \
        .getOrCreate()
    
    df_spark = extraction_process(spark,DATE_RANGE,API_DEMO_VALUE,COINT_TO_SEARCH)
    df_spark = transform_process(df_spark)
    load_process(df_spark)