In [None]:
# %pip install requests
# %pip install pandas
# %pip install urllib

# %pip install python-dotenv
# %pip install python-decouple

# %pip install psycopg2-binary
# %pip install sqlalchemy

# %pip install pyspark[all]

# libary included to solve not module pyspark found
%pip install findspark

In [113]:
# imports
import os
import requests
import urllib.parse

# datetime para manejo de fechas
from datetime import datetime

# DB connector and libraries
import psycopg2

import findspark
findspark.init()
# Pyspark utils
from pyspark.sql import SparkSession
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.functions import to_date, col, from_unixtime
from pyspark.sql.types import ( 
    FloatType, 
    LongType,
    IntegerType, 
    StructType, 
    BooleanType,
    StructField,
    DateType 
)

In [None]:
print(os.getcwd())

## Iniciar la sesión de spark

In [3]:
class Spark_Session:
    
    _driver_path: str = ""
    
    def __init__(self):
        
#         Define environment variables
        self._driver_path = os.getcwd() + "/driver_jdbc/postgresql-42.2.27.jre7.jar"

        os.environ['PYSPARK_SUBMIT_ARGS'] = f'--driver-class-path {self._driver_path} --jars {self._driver_path} pyspark-shell'
        os.environ['SPARK_CLASSPATH'] = self._driver_path
        # Start spark session
        self._start_spark_session()
        
    def _start_spark_session(self):
        self._spark: SparkSession = SparkSession.builder \
            .master("local") \
            .appName("Conexion entre Pyspark y Redshift") \
            .config("spark.jars", self._driver_path) \
            .config("spark.executor.extraClassPath", self._driver_path) \
            .getOrCreate()
    
    def return_session(self):
        """
        Función que retorna la sessión de spark creada al instanciar la clase.
        """
        return self._spark
            
spark = Spark_Session()
spark_session = spark.return_session()


In [115]:
def url_constructor(url: str, **kargs: dict[any]) -> str:

    return f"{url}?{urllib.parse.urlencode(kargs)}"

## Obtenemos la información desde la url y creamos el df

In [123]:
# Vamos a consultar la información historica diaria que nos emite Binance sin limites inicio y fin

data = requests.get( \
                url_constructor("https://api.binance.com/api/v3/klines", symbol="BTCUSDT", interval="1d")
        ).json()

# Headers obtenidos desde la documentación de binance
# schema = StructType([ \
#     StructField("Open time",LongType(),True), \
#     StructField("Open",FloatType(),True), \
#     StructField("High",FloatType(),True), \
#     StructField("Low", FloatType(), True), \
#     StructField("Close", FloatType(), True), \
#     StructField("Volume", IntegerType(), True), \
#     StructField("Close time", LongType(), True), \
#     StructField("Quote asset volume", FloatType(), True), \
#     StructField("Number of trades", IntegerType(), True), \
#     StructField("Taker buy base asset volume", FloatType(), True), \
#     StructField("Taker buy quote asset volume", FloatType(), True), \
#     StructField("Ignore", BooleanType(), True) \
# ])

headers = ("Open_time",
           "Open",
           "High", 
           "Low", 
           "Close", 
           "Volume", 
           "Close_time", 
           "Quote asset volume", 
           "Trades_qty",
           "Taker buy base asset volume",
           "Taker buy quote asset volume", 
           "Ignore"
        )

# Se crea el dataframe utilizando la session de spark generada anteriormente
df = spark_session \
        .createDataFrame(data, [x for x in headers])
# df = spark_session \
#         .createDataFrame(data, schema)




In [124]:
# Parseamos los datos
df = df.withColumn("Open", df["Open"].cast(FloatType()))
df = df.withColumn("High", df["High"].cast(FloatType()))
df = df.withColumn("Low", df["Low"].cast(FloatType()))
df = df.withColumn("Close", df["Close"].cast(FloatType()))
df = df.withColumn("Volume", df["Volume"].cast(FloatType()))
# df = df.withColumn("Quote asset volume", df["Quote asset volume"].cast(FloatType()))
df = df.withColumn("Trades_qty", df["Trades_qty"].cast(IntegerType()))
# df = df.withColumn("Taker buy base asset volume", df["Taker buy base asset volume"].cast(FloatType()))
# df = df.withColumn("Taker buy quote asset volume", df["Taker buy quote asset volume"].cast(FloatType()))
# df = df.withColumn("Ignore", df["Ignore"].cast(BooleanType()))

# Eliminamos columnas que no vamos a necesitar
df = df.drop("Quote asset volume","Taker buy base asset volume","Taker buy quote asset volume","Ignore")

# Manejo de fechas
# # Los timestamps estan en milisegundos por lo que deben pasarse a segundos
# # Luego los parseamos con la función from_unixtime
df = df.withColumn("Open_time", 
                   from_unixtime((df["Open_time"]/1000)) \
                   .cast(DateType()) \
                   )
df = df.withColumn("Close_time", 
                   from_unixtime((df["Close_time"]/1000)) \
                   .cast(DateType()) \
                    )

# Creamos la columna de promedio entre los valores high and close
df = df.withColumn("Daily_Avg", (df.High + df.Low)/2)
df = df.withColumn("Open_Close_Gap", (df.Open + df.Close)/2)







In [125]:
# Imprimos metricas del df
df.describe().show()
type(df)

+-------+------------------+-----------------+-----------------+------------------+-----------------+----------------+-----------------+------------------+
|summary|              Open|             High|              Low|             Close|           Volume|      Trades_qty|        Daily_Avg|    Open_Close_Gap|
+-------+------------------+-----------------+-----------------+------------------+-----------------+----------------+-----------------+------------------+
|  count|               500|              500|              500|               500|              500|             500|              500|               500|
|   mean|26109.764966796876|26674.98291015625|25515.16741015625|26085.773681640625|163836.8904316406|     3801760.088| 26095.0751953125|26097.769298828123|
| stddev| 7991.312222187446|8185.175579883869|7776.166364369698| 7960.330834745574|135864.3127664517|3076266.40468566|7972.068081849734|7963.7993469477315|
|    min|          15781.29|          16315.0|          15476.0|

pyspark.sql.dataframe.DataFrame

In [126]:
# Filtramos aquellos valores donde la fechas sean nulas
df = df.filter(df['Open_time'].isNotNull() & df['Close_time'].isNotNull())

# Eliminamos duplicados con la fecha open_time
df.select(col('Open_time')).dropDuplicates()

DataFrame[Open_time: date]

In [129]:
df.printSchema()
df.describe().show()
df.show()


root
 |-- Open_time: date (nullable = true)
 |-- Open: float (nullable = true)
 |-- High: float (nullable = true)
 |-- Low: float (nullable = true)
 |-- Close: float (nullable = true)
 |-- Volume: float (nullable = true)
 |-- Close_time: date (nullable = true)
 |-- Trades_qty: integer (nullable = true)
 |-- Daily_Avg: double (nullable = true)
 |-- Open_Close_Gap: double (nullable = true)

+-------+------------------+-----------------+-----------------+------------------+-----------------+----------------+-----------------+------------------+
|summary|              Open|             High|              Low|             Close|           Volume|      Trades_qty|        Daily_Avg|    Open_Close_Gap|
+-------+------------------+-----------------+-----------------+------------------+-----------------+----------------+-----------------+------------------+
|  count|               500|              500|              500|               500|              500|             500|              500|    

In [112]:
datetime.fromtimestamp(1644624000000/1000).strftime("%m/%d/%Y %H:%M:%S")


'02/12/2022 00:00:00'

## Creamos las funcionalidades de la db

In [130]:
class ETL():

    _table_name = "bitcoin_daily_prices"

    def __init__(self) -> None:
        # Configuramos redshift importando la data desde constantes de entorno
        self._HOST = os.environ['REDSHIFT_HOST']
        self._PORT = os.environ['REDSHIFT_PORT']
        self._DB = os.environ['REDSHIFT_DB']
        self._USER = os.environ["REDSHIFT_USER"]
        self._DB_NAME = os.environ["REDSHIFT_DB_NAME"]
        self._SCHEMA = os.environ["REDSHIFT_SCHEMA"]
        self._PASSWORD = os.environ["REDSHIFT_PASSWORD"]
        self._DRIVER = "io.github.spark_redshift_community.spark.redshift"
        self._URL = f"jdbc:redshift:// \
        {self._HOST}:{self._PORT}/{self._DB}?user= \
        {self._USER}&password={self._PASSWORD}"

        # Iniciamos la conexión
        self._create_engine()
        # Creamos la tabla en el warehouse
        self._create_table()

    def _create_table(self):
        cursor = self._conn.cursor()
        try:
            cursor.execute(f"""
            create table if not exists 
                {self._SCHEMA}.{self._table_name} (
                OPEN_TIME DATETIME distkey,
                OPEN DECIMAL(10,2) NOT NULL,
                HIGH DECIMAL(10,2) NOT NULL,
                LOW DECIMAL(10,2) NOT NULL,
                CLOSE DECIMAL(10,2) NOT NULL,
                VOLUME DECIMAL(10,2) NOT NULL,
                CLOSE_TIME DATETIME,
                TADES_QTY INT NOT NULL,
                Daily_Avg DECIMAL(10,2) NOT NULL,
                Open_Close_Gap DECIMAL(10,2) NOT NULL
                
            ) sortkey(OPEN_TIME);
            """)
            self._conn.commit()
            print("Ejecución correcta")
        except: raise Exception("Ha ocurrido un error")
            # No es el mejor manejo de errores pero para esto esta bien.
        finally:
            cursor.close()

    def _create_engine(self) -> None:
        # Creamos la conexión a la db
        try:
            self._conn = psycopg2.connect(
                host = self._HOST,
                port = self._PORT,
                dbname = self._DB_NAME,
                user = self._USER,
                password = self._PASSWORD
            )
        except: raise Exception("Ha ocurrido un error al tartar de conectarse a la db")

    
    def retrieve_all_data_from_db(self) -> DataFrame:
        """
        Función que retorna la info de la base de datos para el schema y tablas del ejercicio
        Nota: En caso de hacer consultas mas complejas esto debera refactorizarse pero por ahora va bien.

        Returns
        --------------------------------------------------------
        data : dataframe.DataFrame -> Spark Dataframe
        """
        query = f"select * from {self._SCHEMA+self._table_name}"
        data = spark.read \
            .format("jdbc") \
            .option("url", self._URL) \
            .option("dbtable", f"({query}) as data") \
            .option("user", self._USER) \
            .option("password", self._PASSWORD) \
            .option("driver", self._DRIVER) \
            .option("driver", "org.postgresql.Driver") \
            .load()
        
        return data

    def insert_data_into_db(self, dataframe: DataFrame) -> None:
        """ 
        Función para insertar la info desde el dataframe del ejercicio. 
        Nota: Es valida puntualmente para este caso de uso. Pensar en escalarlo.

        Params
        ------------------------------------------------------
        dataframe: dataframe.DataFrame -> Spark Dataframe
        """
        try:
            dataframe.write \
                .format("jdbc") \
                .option("url", self._URL) \
                .option("dbtable", self._SCHEMA+self._table_name) \
                .option("user", self._USER) \
                .option("password", self._PASSWORD) \
                .option("driver", self._DRIVER) \
                .mode("overwrite") \
                .save()
            print("Información insertada correctamente")
        except: raise Exception("Ha ocurrido un error al insertar la data en el db")

    def close_connection(self) -> None:
        self._conn.close()


## Instanciamos la clase y realizamos las operaciones de insertar y leer desde la db

In [None]:
etl = ETL()

try:
    # Insertamos la información en el dataframe
    # Debe verificarse que se imprima el mensaje de success y no suceda la excepción
    etl.insert_data_into_db(dataframe=df)
    # Asignamos a un nuevo dataframe la información obtenida desde la db
    db_df = etl.retrieve_all_data_from_db()
except:
    print("Ocurrio un error")
finally:
    # Debemos de tener en cuenta que una vez realizadas las operaciones deben cerrarse las conexiones.
    etl.close_connection()


In [None]:
db_df.printSchema()
db_df.show()
db_df.describe().show()