In [1]:
import yfinance as yf
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, DateType, DoubleType, StringType


def get_stock_data(stock_symbols_str, start_date, end_date):
  # Initialize Spark Session
  spark = SparkSession.builder.appName("StockETL").getOrCreate()

  stock_symbols = [symbol.strip() for symbol in stock_symbols_str.split(',')]
  all_stock_data = []

  for symbol in stock_symbols:
    try:
            # Descargar datos de yfinance
            stock = yf.Ticker(symbol)
            data = stock.history(start=start_date, end=end_date)

            # Seleccionar solo la fecha y el precio de cierre
            data = data[['Close']]

            # Agregar el campo 'stock'
            data['stock'] = symbol

            # Reiniciar el índice para que la fecha sea una columna
            data = data.reset_index()

            # Convert the 'Date' column to datetime objects without timezone
            data['Date'] = pd.to_datetime(data['Date']).dt.date
            data['Close'] = pd.to_numeric(data['Close']).round(2)

            # Renombrar columnas para que coincidan con el esquema de PySpark
            data = data.rename(columns={'Date': 'Fecha', 'Close': 'Precio', 'stock': 'Stock'})

            all_stock_data.append(data)

    except Exception as e:
            print(f"Error processing stock symbol {symbol}: {e}")

    if not all_stock_data:
        print("No data fetched for any stock symbol.")
        return None

    combined_df_pandas = pd.concat(all_stock_data, ignore_index=True)

    schema = StructType([
        StructField("Fecha", DateType(), True),
        StructField("Precio", DoubleType(), True),
        StructField("Stock", StringType(), True)
    ])

    spark_df = spark.createDataFrame(combined_df_pandas, schema=schema)

    return spark_df

In [3]:
# Ejemplo de uso (puedes usar parámetros de formulario en Colab)
stock_symbols_input = 'KO,NVDA,AAPL'
start_date_input = '2025-01-01'
end_date_input = '2025-01-31'

stock_df = get_stock_data(stock_symbols_input, start_date_input, end_date_input)

# Mostrar el DataFrame de PySpark (solo las primeras filas)
if stock_df:
  stock_df.show()

+----------+------+-----+
|     Fecha|Precio|Stock|
+----------+------+-----+
|2025-01-02| 60.07|   KO|
|2025-01-03| 59.98|   KO|
|2025-01-06| 59.07|   KO|
|2025-01-07|  59.1|   KO|
|2025-01-08| 59.94|   KO|
|2025-01-10| 59.32|   KO|
|2025-01-13| 59.88|   KO|
|2025-01-14| 60.26|   KO|
|2025-01-15|  60.0|   KO|
|2025-01-16| 60.47|   KO|
|2025-01-17| 60.91|   KO|
|2025-01-21| 60.63|   KO|
|2025-01-22| 60.01|   KO|
|2025-01-23| 59.77|   KO|
|2025-01-24| 60.15|   KO|
|2025-01-27| 62.04|   KO|
|2025-01-28| 60.57|   KO|
|2025-01-29| 61.03|   KO|
|2025-01-30| 62.21|   KO|
+----------+------+-----+



In [4]:
stock_df.write.parquet("stock_data.parquet")

# CARGA DE DATOS EN BASE DE DATOS

In [5]:
!pip install psycopg2-binary

Collecting psycopg2-binary
  Downloading psycopg2_binary-2.9.11-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.whl.metadata (4.9 kB)
Downloading psycopg2_binary-2.9.11-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.whl (4.2 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m4.2/4.2 MB[0m [31m34.3 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: psycopg2-binary
Successfully installed psycopg2-binary-2.9.11


In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, DateType, DoubleType, StringType


# Create a new SparkSession
spark_new = SparkSession.builder \
    .appName("Load RandomUser to PostgreSQL") \
    .config("spark.executor.memory", "2g") \
    .config("spark.jars", "/content/postgresql-42.2.5.jar") \
    .getOrCreate()

In [2]:
parquet_file_path = "stock_data.parquet"

In [3]:
parquet_df = spark_new.read.parquet(parquet_file_path)
print(f"DataFrame loaded from Parquet file at '{parquet_file_path}' successfully.")
parquet_df.show()

DataFrame loaded from Parquet file at 'stock_data.parquet' successfully.
+----------+------+-----+
|     Fecha|Precio|Stock|
+----------+------+-----+
|2025-01-16| 60.47|   KO|
|2025-01-17| 60.91|   KO|
|2025-01-21| 60.63|   KO|
|2025-01-22| 60.01|   KO|
|2025-01-23| 59.77|   KO|
|2025-01-24| 60.15|   KO|
|2025-01-27| 62.04|   KO|
|2025-01-28| 60.57|   KO|
|2025-01-29| 61.03|   KO|
|2025-01-30| 62.21|   KO|
|2025-01-02| 60.07|   KO|
|2025-01-03| 59.98|   KO|
|2025-01-06| 59.07|   KO|
|2025-01-07|  59.1|   KO|
|2025-01-08| 59.94|   KO|
|2025-01-10| 59.32|   KO|
|2025-01-13| 59.88|   KO|
|2025-01-14| 60.26|   KO|
|2025-01-15|  60.0|   KO|
+----------+------+-----+



In [6]:
from google.colab import userdata

BD_PWS = userdata.get('BD_PWS')
print("Secret 'BD_PWS' loaded successfully.")

Secret 'BD_PWS' loaded successfully.


In [7]:
try:
    # Database connection properties
    jdbc_url = "jdbc:postgresql://dpg-d5lr8psmrvns73encg2g-a.virginia-postgres.render.com/db_g6"


    # Table name where you want to save the data
    table_name = "stock_data" # Using a different table name to avoid overwriting

    # Write the DataFrame to the database
    parquet_df.write \
    .format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", table_name) \
    .option("user", "codigo") \
    .option("password", BD_PWS) \
    .option("driver", "org.postgresql.Driver") \
    .mode("overwrite") \
    .save()
    print(f"DataFrame saved to table '{table_name}' successfully.")

except Exception as e:
    print(f"Error processing Parquet file or saving to database: {e}")

DataFrame saved to table 'stock_data' successfully.
